在这篇博客中,我们将探讨如何使用 Ent、Atlas 与 pgvector 构建一个 RAG(Retrieval Augmented Generation)系统。
RAG 是一种将检索步骤加入到生成模型的技术。它并不单靠模型内部知识,而是从外部来源检索相关文档或数据,并利用这些信息生成更精准、更具上下文感知的回复。此方法在构建问答系统、聊天机器人以及任何需要实时或领域特定知识的场景中尤为有用。
设置 Ent 模型
首先,让我们通过初始化 Go 模块来开始教程:
go mod init github.com/rotemtam/entrag # 你可以将模块路径替换为自己的
在本项目中,我们将使用 Ent,一个 Go 的实体框架,来定义我们的数据库模式。数据库将存储我们想要检索的文档(按固定大小分块)以及每个块的向量表示。通过运行下面的命令来初始化 Ent 项目:
go run -mod=mod entgo.io/ent/cmd/ent new Embedding Chunk
此命令为我们的数据模型创建占位符。项目结构应该如下所示:
├── ent
│ ├── generate.go
│ └── schema
│ ├── chunk.go
│ └── embedding.go
├── go.mod
└── go.sum
接下来,让我们定义 Chunk 模型的模式。打开 ent/schema/chunk.go 文件并按下面的方式定义:
package schema
import (
"entgo.io/ent"
"entgo.io/ent/schema/edge"
"entgo.io/ent/schema/field"
)
// Chunk 代表 Chunk 实体的模式定义。
type Chunk struct {
ent.Schema
}
// Chunk 的字段。
func (Chunk) Fields() []ent.Field {
return []ent.Field{
field.String("path"),
field.Int("nchunk"),
field.Text("data"),
}
}
// Chunk 的边。
func (Chunk) Edges() []ent.Edge {
return []ent.Edge{
edge.To("embedding", Embedding.Type).StorageKey(edge.Column("chunk_id")).Unique(),
}
}
此模式定义了一个包含 path、nchunk 和 data 三个字段的 Chunk 实体。path 字段存储文档路径,nchunk 存储块编号,data 存储分块文本数据。我们还为 Embedding 实体定义了一个关联边,用于存储块的向量表示。
在继续之前,请安装 pgvector 包。pgvector 是 PostgreSQL 的一个扩展,提供向量操作和相似度搜索支持。我们需要它来存储和检索块的向量表示。
go get github.com/pgvector/pgvector-go
接下来,定义 Embedding 模型的模式。打开 ent/schema/embedding.go 并按如下方式定义:
package schema
import (
"entgo.io/ent"
"entgo.io/ent/dialect"
"entgo.io/ent/dialect/entsql"
"entgo.io/ent/schema/edge"
"entgo.io/ent/schema/field"
"entgo.io/ent/schema/index"
"github.com/pgvector/pgvector-go"
)
// Embedding 代表 Embedding 实体的模式定义。
type Embedding struct {
ent.Schema
}
// Embedding 的字段。
func (Embedding) Fields() []ent.Field {
return []ent.Field{
field.Other("embedding", pgvector.Vector{}).
SchemaType(map[string]string{
dialect.Postgres: "vector(1536)",
}),
}
}
// Embedding 的边。
func (Embedding) Edges() []ent.Edge {
return []ent.Edge{
edge.From("chunk", Chunk.Type).Ref("embedding").Unique().Required(),
}
}
func (Embedding) Indexes() []ent.Index {
return []ent.Index{
index.Fields("embedding").
Annotations(
entsql.IndexType("hnsw"),
entsql.OpClass("vector_l2_ops"),
),
}
}
该模式定义了一个仅包含单个字段 embedding(类型为 pgvector.Vector)的 Embedding 实体。embedding 字段存储块的向量表示。我们同样为 Chunk 实体定义了边,并在 embedding 字段上创建了使用 hnsw 索引类型和 vector_l2_ops 操作类的索引,从而能够高效执行向量相似度搜索。
最后,通过执行以下命令生成 Ent 代码:
go mod tidy
go generate ./...
Ent 将根据模式定义生成所需的代码。
配置数据库
接下来,设置 PostgreSQL 数据库。我们将使用 Docker 在本地运行 PostgreSQL 实例。由于我们需要 pgvector 扩展,我们将使用带有预装扩展的 pgvector/pgvector:pg17 Docker 镜像。
docker run --rm --name postgres -e POSTGRES_PASSWORD=pass -p 5432:5432 -d pgvector/pgvector:pg17
我们将使用 Atlas,一个与 Ent 集成的数据库 schema‑as‑code 工具,用来管理数据库模式。通过以下命令安装 Atlas:
curl -sSfL https://atlasgo.io/install.sh | sh
其他安装选项请参阅 Atlas 安装文档。
由于我们需要管理扩展,请确保拥有 Atlas Pro 账户。你可以运行以下命令免费试用:
atlas login
如果你想跳过 Atlas,可以使用 此文件 中的语句直接将所需模式应用到数据库
现在,创建基础配置 base.pg.hcl,为公共 schema 提供向量扩展:
schema "public" {
}
extension "vector" {
schema = schema.public
}
接下来,创建 Atlas 配置文件,它将组合 base.pg.hcl 与 Ent 模式:
data "composite_schema" "schema" {
schema {
url = "file://base.pg.hcl"
}
schema "public" {
url = "ent://ent/schema"
}
}
env "local" {
url = getenv("DB_URL")
schema {
src = data.composite_schema.schema.url
}
dev = "docker://pgvector/pg17/dev"
}
此配置定义了一个包含 base.pg.hcl 与 Ent 模式的复合 schema,并为本地开发定义了名为 local 的环境,使用 dev 字段指定开发数据库 URL,供 Atlas 归一化 schema 并进行各种计算。
接下来,执行以下命令将 schema 应用到数据库:
export DB_URL='postgresql://postgres:pass@localhost:5432/postgres?sslmode=disable'
atlas schema apply --env local
Atlas 将根据我们的配置加载目标状态,与数据库当前状态进行比较,并生成迁移计划以使数据库达到期望状态:
Planning migration statements (5 in total):
-- create extension "vector":
-> CREATE EXTENSION "vector" WITH SCHEMA "public" VERSION "0.8.0";
-- create "chunks" table:
-> CREATE TABLE "public"."chunks" (
"id" bigint NOT NULL GENERATED BY DEFAULT AS IDENTITY,
"path" character varying NOT NULL,
"nchunk" bigint NOT NULL,
"data" text NOT NULL,
PRIMARY KEY ("id")
);
-- create "embeddings" table:
-> CREATE TABLE "public"."embeddings" (
"id" bigint NOT NULL GENERATED BY DEFAULT AS IDENTITY,
"embedding" public.vector(1536) NOT NULL,
"chunk_id" bigint NOT NULL,
PRIMARY KEY ("id"),
CONSTRAINT "embeddings_chunks_embedding" FOREIGN KEY ("chunk_id") REFERENCES "public"."chunks" ("id") ON UPDATE NO ACTION ON DELETE NO ACTION
);
-- create index "embedding_embedding" to table: "embeddings":
-> CREATE INDEX "embedding_embedding" ON "public"."embeddings" USING hnsw ("embedding" vector_l2_ops);
-- create index "embeddings_chunk_id_key" to table: "embeddings":
-> CREATE UNIQUE INDEX "embeddings_chunk_id_key" ON "public"."embeddings" ("chunk_id");
-------------------------------------------
Analyzing planned statements (5 in total):
-- non-optimal columns alignment:
-- L4: Table "chunks" has 8 redundant bytes of padding per row. To reduce disk space,
the optimal order of the columns is as follows: "id", "nchunk", "path",
"data" https://atlasgo.io/lint/analyzers#PG110
-- ok (370.25µs)
-------------------------
-- 114.306667ms
-- 5 schema changes
-- 1 diagnostic
-------------------------------------------
? Approve or abort the plan:
▸ Approve and apply
Abort
Atlas 在计划变更的同时也会提供诊断信息和优化建议。在这个例子中,它建议重新排序 chunks 表中的列以降低磁盘空间占用。由于本教程并不关注磁盘空间,我们可以直接选择 Approve and apply。
最后,我们可以再次执行 atlas schema apply 来验证 schema 是否成功应用。Atlas 将输出:
Schema is synced, no changes to be made
Scaffold CLI
现在数据库 schema 已经就绪,为了让 CLI 应用更方便,使用 alecthomas/kong 库构建一个小型工具,能够加载、索引和查询数据库中的文档。
首先,安装 kong 库:
go get github.com/alecthomas/kong
接着,在 cmd/entrag/main.go 中定义 CLI 应用:
package main
import (
"fmt"
"os"
"github.com/alecthomas/kong"
)
// CLI 表示全局选项和子命令。
type CLI struct {
// DBURL 通过 环境变量 DB_URL 读取。
DBURL string `kong:"env='DB_URL',help='Database URL for the application.'"`
OpenAIKey string `kong:"env='OPENAI_KEY',help='OpenAI API key for the application.'"`
// 子命令
Load *LoadCmd `kong:"cmd,help='Load command that accepts a path.'"`
Index *IndexCmd `kong:"cmd,help='Create embeddings for any chunks that do not have one.'"`
Ask *AskCmd `kong:"cmd,help='Ask a question about the indexed documents'"`
}
func main() {
var cli CLI
app := kong.Parse(&cli,
kong.Name("entrag"),
kong.Description("Ask questions about markdown files."),
kong.UsageOnError(),
)
if err := app.Run(&cli); err != nil {
fmt.Fprintf(os.Stderr, "错误: %s\n", err)
os.Exit(1)
}
}
在 cmd/entrag/rag.go 中再创建一个文件并添加以下内容:
package main
type (
// LoadCmd 用于将 Markdown 文件加载到数据库中。
LoadCmd struct {
Path string `help:"path to dir with markdown files" type:"existingdir" required:""`
}
// IndexCmd 用于在数据库中创建嵌入索引。
IndexCmd struct {
}
// AskCmd 是另一个叶子命令。
AskCmd struct {
// Text 是 ask 命令的位置参数。
Text string `kong:"arg,required,help='Text for the ask command.'"`
}
)
验证 scaffold 的 CLI 是否工作,在终端运行:
go run ./cmd/entrag --help
若一切正常,你应该能看到下面的帮助输出:
Usage: entrag <command> [flags]
Ask questions about markdown files.
Flags:
-h, --help Show context-sensitive help.
--dburl=STRING Database URL for the application ($DB_URL).
--open-ai-key=STRING OpenAI API key for the application ($OPENAI_KEY).
Commands:
load --path=STRING [flags]
Load command that accepts a path.
index [flags]
Create embeddings for any chunks that do not have one.
ask <text> [flags]
Ask a question about the indexed documents
Run "entrag <command> --help" for more information on a command.
将文档加载到数据库
接下来,需要一些 Markdown 文件以供加载。创建一个名为 data 的文件夹,并添加若干 Markdown 文件。示例中我们使用 ent/ent 仓库的 docs 目录作为 Markdown 源。
现在实现 LoadCmd 的 Run 方法以将 Markdown 文件加载到数据库。打开 cmd/entrag/rag.go 并添加以下代码:
const (
tokenEncoding = "cl100k_base"
chunkSize = 1000
)
// Run 是当执行“load”命令时调用的方法。
func (cmd *LoadCmd) Run(ctx *CLI) error {
client, err := ctx.entClient()
if err != nil {
return fmt.Errorf("连接 PostgreSQL 失败: %w", err)
}
tokTotal := 0
return filepath.WalkDir(ctx.Load.Path, func(path string, d fs.DirEntry, err error) error {
if filepath.Ext(path) == ".mdx" || filepath.Ext(path) == ".md" {
chunks := breakToChunks(path)
for i, chunk := range chunks {
tokTotal += len(chunk)
client.Chunk.Create().
SetData(chunk).
SetPath(path).
SetNchunk(i).
SaveX(context.Background())
}
}
return nil
})
}
func (c *CLI) entClient() (*ent.Client, error) {
return ent.Open("postgres", c.DBURL)
}
此代码定义了 LoadCmd 的 Run 方法。该方法读取指定路径下的 Markdown 文件,将其拆分为 1000 个 token 的块,并将每块保存到数据库。我们使用 entClient 方法创建一个新的 Ent 客户端,使用 CLI 选项中指定的数据库 URL。
breakToChunks 的实现见 entrag 仓库。
随后,运行:
go run ./cmd/entrag load --path=data
命令完成后,数据库中应已加载块。可使用以下命令验证:
docker exec -it postgres psql -U postgres -d postgres -c "SELECT COUNT(*) FROM chunks;"
结果类似:
count
-------
276
(1 row)
创建 Embedding
现在我们已将文档加载到数据库,需要为每个块创建 Embedding。我们将使用 OpenAI API 生成每个块的 Embedding。首先安装 openai 包:
go get github.com/sashabaranov/go-openai
若你没有 OpenAI API Key,可在 OpenAI 平台 注册并生成一个 API Key。随后将其保存到环境变量 OPENAI_KEY:
export OPENAI_KEY=<your OpenAI API key>
然后实现 IndexCmd 的 Run 方法,以为缺失 Embedding 的块生成 Embedding 并保存到数据库。打开 cmd/entrag/rag.go 并添加:
// Run 是当执行“index”命令时调用的方法。
func (cmd *IndexCmd) Run(cli *CLI) error {
client, err := cli.entClient()
if err != nil {
return fmt.Errorf("连接 PostgreSQL 失败: %w", err)
}
ctx := context.Background()
chunks := client.Chunk.Query().
Where(
chunk.Not(
chunk.HasEmbedding(),
),
).
Order(ent.Asc(chunk.FieldID)).
AllX(ctx)
for _, ch := range chunks {
log.Println("为段落创建嵌入", ch.Path, ch.Nchunk)
embedding := getEmbedding(ch.Data)
_, err := client.Embedding.Create().
SetEmbedding(pgvector.NewVector(embedding)).
SetChunk(ch).
Save(ctx)
if err != nil {
return fmt.Errorf("创建嵌入时出错: %v", err)
}
}
return nil
}
// getEmbedding 调用 OpenAI Embedding API 计算给定字符串的嵌入值并返回。
func getEmbedding(data string) []float32 {
client := openai.NewClient(os.Getenv("OPENAI_KEY"))
queryReq := openai.EmbeddingRequest{
Input: []string{data},
Model: openai.AdaEmbeddingV2,
}
queryResponse, err := client.CreateEmbeddings(context.Background(), queryReq)
if err != nil {
log.Fatalf("获取嵌入时出错: %v", err)
}
return queryResponse.Data[0].Embedding
}
此方法查询缺失 Embedding 的块,生成 Embedding 并保存。运行 index 命令即可:
go run ./cmd/entrag index
日志示例:
2025/02/13 13:04:42 为段落创建嵌入 /Users/home/entr/data/md/aggregate.md 0
2025/02/13 13:04:43 为段落创建嵌入 /Users/home/entr/data/md/ci.mdx 0
…
提问
现在我们已完成文档加载与 Embedding 的创建,下面实现 AskCmd,用于向已索引的文档提问。打开 cmd/entrag/rag.go 并添加:
// Run 是当执行“ask”命令时调用的方法。
func (cmd *AskCmd) Run(ctx *CLI) error {
client, err := ctx.entClient()
if err != nil {
return fmt.Errorf("连接 PostgreSQL 失败: %w", err)
}
question := cmd.Text
emb := getEmbedding(question)
embVec := pgvector.NewVector(emb)
embs := client.Embedding.
Query().
Order(func(s *sql.Selector) {
s.OrderExpr(sql.ExprP("embedding <-> $1", embVec))
}).
WithChunk().
Limit(5).
AllX(context.Background())
b := strings.Builder{}
for _, e := range embs {
chnk := e.Edges.Chunk
b.WriteString(fmt.Sprintf("来自文件: %v\n", chnk.Path))
b.WriteString(chnk.Data)
}
query := fmt.Sprintf(`请使用下列 ent 文档信息回答随后的问题。
信息:
%v
问题:%v`, b.String(), question)
oac := openai.NewClient(ctx.OpenAIKey)
resp, err := oac.CreateChatCompletion(
context.Background(),
openai.ChatCompletionRequest{
Model: openai.GPT4o,
Messages: []openai.ChatCompletionMessage{
{
Role: openai.ChatMessageRoleUser,
Content: query,
},
},
},
)
if err != nil {
return fmt.Errorf("创建聊天完成时出错: %v", err)
}
choice := resp.Choices[0]
out, err := glamour.Render(choice.Message.Content, "dark")
fmt.Print(out)
return nil
}
这段代码先将用户问题转换为向量,然后在数据库中查询相似 Embedding,并取前 5 条结果。随后将其文本与问题一起组成提示,调用 OpenAI GPT 生成答案,并使用 glamour 渲染输出。
在运行 ask 命令前,安装 glamour 包:
go get github.com/charmbracelet/glamour
接着执行:
go run ./cmd/entrag ask "tl;dr What is Ent?"
系统的响应类似:
Ent 是一个面向 Go 语言的开源实体框架(ORM)。它允许开发者在 Go 代码中定义数据模型或图结构。Ent 强调以下原则:schema-as-code、通过代码生成的静态类型与显式 API、简单查询与图遍历、静态类型谓词以及无特定存储的抽象。它支持多种数据库,包括 MySQL、MariaDB、PostgreSQL、SQLite 以及 Gremlin 基础的图数据库,旨在提升 Go 开发的生产力。
进一步示例
go run ./cmd/entrag ask "how to define order field in entgql"
选择可比较字段:在你的 schema 中挑选你想要排序的字段,可是文本、时间戳、整数、枚举等。
为字段加注解:使用
entgql.OrderField注解在所选字段上。注解中字段名应为大写且与 GraphQL 枚举匹配。更新 Schema:例如在
ent/schema中:
func (Todo) Fields() []ent.Field {
return []ent.Field{
field.Text("text").
NotEmpty().
Annotations(
entgql.OrderField("TEXT"),
),
field.Time("created_at").
Default(time.Now).
Immutable().
Annotations(
entgql.OrderField("CREATED_AT"),
),
field.Enum("status").
NamedValues(
"InProgress", "IN_PROGRESS",
"Completed", "COMPLETED",
).
Default("IN_PROGRESS").
Annotations(
entgql.OrderField("STATUS"),
),
field.Int("priority").
Default(0).
Annotations(
entgql.OrderField("PRIORITY"),
),
}
}
- 或者支持多字段排序:可使用
entgql.MultiOrder()注解:
func (Todo) Annotations() []schema.Annotation {
return []schema.Annotation{
entgql.MultiOrder(),
}
}
- 生成 GraphQL 类型:确保 GraphQL 枚举与上述设置一致,例如:
enum OrderDirection {
ASC
DESC
}
enum TodoOrderField {
CREATED_AT
PRIORITY
STATUS
TEXT
}
input TodoOrder {
direction: OrderDirection!
field: TodoOrderField
}
- 在查询中加入
orderBy参数,允许客户端按字段排序:
type Query {
todos(
after: Cursor
first: Int
before: Cursor
last: Int
orderBy: TodoOrder
): TodoConnection!
}
按照这些步骤,你的 Ent‑based 应用即可在 GraphQL API 中支持多字段排序。
go run ./cmd/entrag ask "what's the difference between privacy rules and interceptors"
- 隐私规则:
- 目的:主要用于执行访问控制策略,决定查询与更新是否被允许。
- 实现:在
ent.Policy接口中实现各项方法(EvalQuery、EvalMutation),通过若干条件判断后返回 Allow / Deny / Skip。 - 运作:在查询过程中评估是否满足特定条件,若满足则允许,若不满足则拒绝。
- 适用场景:适合通过规则管理访问权限,确保用户仅在满足条件时才可执行操作。
- 拦截器:
- 目的:将其用作“端口”或展示形态,向多种体验——匹配所有可查询/所有权限下的免费美????
(原文不完整,略)
说明:如果你想在回调中使用第一种..??/
总结:隐私规则专注于访问控制,拦截器则环绕“此处”定位等工具
结束语
在本文中,我们演示了如何使用 Ent、Atlas 与 pgvector 构建一个 RAG 系统。特别感谢 Eli Bendersky 的原始博客以及多年优秀的 Go 之作。







Credit: martinfowler.com


