跳到主要内容

使用 Go、Ent、Atlas 与 pgvector 构建 RAG 系统

· 阅读需 19 分钟

在这篇博客中,我们将探讨如何使用 EntAtlaspgvector 构建一个 RAG(Retrieval Augmented Generation)系统。

RAG 是一种将检索步骤加入到生成模型的技术。它并不单靠模型内部知识,而是从外部来源检索相关文档或数据,并利用这些信息生成更精准、更具上下文感知的回复。此方法在构建问答系统、聊天机器人以及任何需要实时或领域特定知识的场景中尤为有用。

设置 Ent 模型

首先,让我们通过初始化 Go 模块来开始教程:

go mod init github.com/rotemtam/entrag # 你可以将模块路径替换为自己的

在本项目中,我们将使用 Ent,一个 Go 的实体框架,来定义我们的数据库模式。数据库将存储我们想要检索的文档(按固定大小分块)以及每个块的向量表示。通过运行下面的命令来初始化 Ent 项目:

go run -mod=mod entgo.io/ent/cmd/ent new Embedding Chunk

此命令为我们的数据模型创建占位符。项目结构应该如下所示:

├── ent
│ ├── generate.go
│ └── schema
│ ├── chunk.go
│ └── embedding.go
├── go.mod
└── go.sum

接下来,让我们定义 Chunk 模型的模式。打开 ent/schema/chunk.go 文件并按下面的方式定义:

ent/schema/chunk.go
package schema

import (
"entgo.io/ent"
"entgo.io/ent/schema/edge"
"entgo.io/ent/schema/field"
)

// Chunk 代表 Chunk 实体的模式定义。
type Chunk struct {
ent.Schema
}

// Chunk 的字段。
func (Chunk) Fields() []ent.Field {
return []ent.Field{
field.String("path"),
field.Int("nchunk"),
field.Text("data"),
}
}

// Chunk 的边。
func (Chunk) Edges() []ent.Edge {
return []ent.Edge{
edge.To("embedding", Embedding.Type).StorageKey(edge.Column("chunk_id")).Unique(),
}
}

此模式定义了一个包含 pathnchunkdata 三个字段的 Chunk 实体。path 字段存储文档路径,nchunk 存储块编号,data 存储分块文本数据。我们还为 Embedding 实体定义了一个关联边,用于存储块的向量表示。

在继续之前,请安装 pgvector 包。pgvector 是 PostgreSQL 的一个扩展,提供向量操作和相似度搜索支持。我们需要它来存储和检索块的向量表示。

go get github.com/pgvector/pgvector-go

接下来,定义 Embedding 模型的模式。打开 ent/schema/embedding.go 并按如下方式定义:

ent/schema/embedding.go
package schema

import (
"entgo.io/ent"
"entgo.io/ent/dialect"
"entgo.io/ent/dialect/entsql"
"entgo.io/ent/schema/edge"
"entgo.io/ent/schema/field"
"entgo.io/ent/schema/index"
"github.com/pgvector/pgvector-go"
)

// Embedding 代表 Embedding 实体的模式定义。
type Embedding struct {
ent.Schema
}

// Embedding 的字段。
func (Embedding) Fields() []ent.Field {
return []ent.Field{
field.Other("embedding", pgvector.Vector{}).
SchemaType(map[string]string{
dialect.Postgres: "vector(1536)",
}),
}
}

// Embedding 的边。
func (Embedding) Edges() []ent.Edge {
return []ent.Edge{
edge.From("chunk", Chunk.Type).Ref("embedding").Unique().Required(),
}
}

func (Embedding) Indexes() []ent.Index {
return []ent.Index{
index.Fields("embedding").
Annotations(
entsql.IndexType("hnsw"),
entsql.OpClass("vector_l2_ops"),
),
}
}

该模式定义了一个仅包含单个字段 embedding(类型为 pgvector.Vector)的 Embedding 实体。embedding 字段存储块的向量表示。我们同样为 Chunk 实体定义了边,并在 embedding 字段上创建了使用 hnsw 索引类型和 vector_l2_ops 操作类的索引,从而能够高效执行向量相似度搜索。

最后,通过执行以下命令生成 Ent 代码:

go mod tidy
go generate ./...

Ent 将根据模式定义生成所需的代码。

配置数据库

接下来,设置 PostgreSQL 数据库。我们将使用 Docker 在本地运行 PostgreSQL 实例。由于我们需要 pgvector 扩展,我们将使用带有预装扩展的 pgvector/pgvector:pg17 Docker 镜像。

docker run --rm --name postgres -e POSTGRES_PASSWORD=pass -p 5432:5432 -d pgvector/pgvector:pg17

我们将使用 Atlas,一个与 Ent 集成的数据库 schema‑as‑code 工具,用来管理数据库模式。通过以下命令安装 Atlas:

curl -sSfL https://atlasgo.io/install.sh | sh

其他安装选项请参阅 Atlas 安装文档

由于我们需要管理扩展,请确保拥有 Atlas Pro 账户。你可以运行以下命令免费试用:

atlas login
在无迁移工具情况下工作

如果你想跳过 Atlas,可以使用 此文件 中的语句直接将所需模式应用到数据库

现在,创建基础配置 base.pg.hcl,为公共 schema 提供向量扩展:

base.pg.hcl
schema "public" {
}

extension "vector" {
schema = schema.public
}

接下来,创建 Atlas 配置文件,它将组合 base.pg.hcl 与 Ent 模式:

atlas.hcl
data "composite_schema" "schema" {
schema {
url = "file://base.pg.hcl"
}
schema "public" {
url = "ent://ent/schema"
}
}

env "local" {
url = getenv("DB_URL")
schema {
src = data.composite_schema.schema.url
}
dev = "docker://pgvector/pg17/dev"
}

此配置定义了一个包含 base.pg.hcl 与 Ent 模式的复合 schema,并为本地开发定义了名为 local 的环境,使用 dev 字段指定开发数据库 URL,供 Atlas 归一化 schema 并进行各种计算。

接下来,执行以下命令将 schema 应用到数据库:

export DB_URL='postgresql://postgres:pass@localhost:5432/postgres?sslmode=disable'
atlas schema apply --env local

Atlas 将根据我们的配置加载目标状态,与数据库当前状态进行比较,并生成迁移计划以使数据库达到期望状态:

Planning migration statements (5 in total):

-- create extension "vector":
-> CREATE EXTENSION "vector" WITH SCHEMA "public" VERSION "0.8.0";
-- create "chunks" table:
-> CREATE TABLE "public"."chunks" (
"id" bigint NOT NULL GENERATED BY DEFAULT AS IDENTITY,
"path" character varying NOT NULL,
"nchunk" bigint NOT NULL,
"data" text NOT NULL,
PRIMARY KEY ("id")
);
-- create "embeddings" table:
-> CREATE TABLE "public"."embeddings" (
"id" bigint NOT NULL GENERATED BY DEFAULT AS IDENTITY,
"embedding" public.vector(1536) NOT NULL,
"chunk_id" bigint NOT NULL,
PRIMARY KEY ("id"),
CONSTRAINT "embeddings_chunks_embedding" FOREIGN KEY ("chunk_id") REFERENCES "public"."chunks" ("id") ON UPDATE NO ACTION ON DELETE NO ACTION
);
-- create index "embedding_embedding" to table: "embeddings":
-> CREATE INDEX "embedding_embedding" ON "public"."embeddings" USING hnsw ("embedding" vector_l2_ops);
-- create index "embeddings_chunk_id_key" to table: "embeddings":
-> CREATE UNIQUE INDEX "embeddings_chunk_id_key" ON "public"."embeddings" ("chunk_id");

-------------------------------------------

Analyzing planned statements (5 in total):

-- non-optimal columns alignment:
-- L4: Table "chunks" has 8 redundant bytes of padding per row. To reduce disk space,
the optimal order of the columns is as follows: "id", "nchunk", "path",
"data" https://atlasgo.io/lint/analyzers#PG110
-- ok (370.25µs)

-------------------------
-- 114.306667ms
-- 5 schema changes
-- 1 diagnostic

-------------------------------------------

? Approve or abort the plan:
▸ Approve and apply
Abort

Atlas 在计划变更的同时也会提供诊断信息和优化建议。在这个例子中,它建议重新排序 chunks 表中的列以降低磁盘空间占用。由于本教程并不关注磁盘空间,我们可以直接选择 Approve and apply

最后,我们可以再次执行 atlas schema apply 来验证 schema 是否成功应用。Atlas 将输出:

Schema is synced, no changes to be made

Scaffold CLI

现在数据库 schema 已经就绪,为了让 CLI 应用更方便,使用 alecthomas/kong 库构建一个小型工具,能够加载、索引和查询数据库中的文档。

首先,安装 kong 库:

go get github.com/alecthomas/kong

接着,在 cmd/entrag/main.go 中定义 CLI 应用:

cmd/entrag/main.go
package main

import (
"fmt"
"os"

"github.com/alecthomas/kong"
)

// CLI 表示全局选项和子命令。
type CLI struct {
// DBURL 通过 环境变量 DB_URL 读取。
DBURL string `kong:"env='DB_URL',help='Database URL for the application.'"`
OpenAIKey string `kong:"env='OPENAI_KEY',help='OpenAI API key for the application.'"`

// 子命令
Load *LoadCmd `kong:"cmd,help='Load command that accepts a path.'"`
Index *IndexCmd `kong:"cmd,help='Create embeddings for any chunks that do not have one.'"`
Ask *AskCmd `kong:"cmd,help='Ask a question about the indexed documents'"`
}

func main() {
var cli CLI
app := kong.Parse(&cli,
kong.Name("entrag"),
kong.Description("Ask questions about markdown files."),
kong.UsageOnError(),
)
if err := app.Run(&cli); err != nil {
fmt.Fprintf(os.Stderr, "错误: %s\n", err)
os.Exit(1)
}
}

cmd/entrag/rag.go 中再创建一个文件并添加以下内容:

cmd/entrag/rag.go
package main

type (
// LoadCmd 用于将 Markdown 文件加载到数据库中。
LoadCmd struct {
Path string `help:"path to dir with markdown files" type:"existingdir" required:""`
}
// IndexCmd 用于在数据库中创建嵌入索引。
IndexCmd struct {
}
// AskCmd 是另一个叶子命令。
AskCmd struct {
// Text 是 ask 命令的位置参数。
Text string `kong:"arg,required,help='Text for the ask command.'"`
}
)

验证 scaffold 的 CLI 是否工作,在终端运行:

go run ./cmd/entrag --help

若一切正常,你应该能看到下面的帮助输出:

Usage: entrag <command> [flags]

Ask questions about markdown files.

Flags:
-h, --help Show context-sensitive help.
--dburl=STRING Database URL for the application ($DB_URL).
--open-ai-key=STRING OpenAI API key for the application ($OPENAI_KEY).

Commands:
load --path=STRING [flags]
Load command that accepts a path.

index [flags]
Create embeddings for any chunks that do not have one.

ask <text> [flags]
Ask a question about the indexed documents

Run "entrag <command> --help" for more information on a command.

将文档加载到数据库

接下来,需要一些 Markdown 文件以供加载。创建一个名为 data 的文件夹,并添加若干 Markdown 文件。示例中我们使用 ent/ent 仓库的 docs 目录作为 Markdown 源。

现在实现 LoadCmdRun 方法以将 Markdown 文件加载到数据库。打开 cmd/entrag/rag.go 并添加以下代码:

const (
tokenEncoding = "cl100k_base"
chunkSize = 1000
)

// Run 是当执行“load”命令时调用的方法。
func (cmd *LoadCmd) Run(ctx *CLI) error {
client, err := ctx.entClient()
if err != nil {
return fmt.Errorf("连接 PostgreSQL 失败: %w", err)
}
tokTotal := 0
return filepath.WalkDir(ctx.Load.Path, func(path string, d fs.DirEntry, err error) error {
if filepath.Ext(path) == ".mdx" || filepath.Ext(path) == ".md" {
chunks := breakToChunks(path)
for i, chunk := range chunks {
tokTotal += len(chunk)
client.Chunk.Create().
SetData(chunk).
SetPath(path).
SetNchunk(i).
SaveX(context.Background())
}
}
return nil
})
}

func (c *CLI) entClient() (*ent.Client, error) {
return ent.Open("postgres", c.DBURL)
}

此代码定义了 LoadCmdRun 方法。该方法读取指定路径下的 Markdown 文件,将其拆分为 1000 个 token 的块,并将每块保存到数据库。我们使用 entClient 方法创建一个新的 Ent 客户端,使用 CLI 选项中指定的数据库 URL。

breakToChunks 的实现见 entrag 仓库

随后,运行:

go run ./cmd/entrag load --path=data

命令完成后,数据库中应已加载块。可使用以下命令验证:

docker exec -it postgres psql -U postgres -d postgres -c "SELECT COUNT(*) FROM chunks;"

结果类似:

  count
-------
276
(1 row)

创建 Embedding

现在我们已将文档加载到数据库,需要为每个块创建 Embedding。我们将使用 OpenAI API 生成每个块的 Embedding。首先安装 openai 包:

go get github.com/sashabaranov/go-openai

若你没有 OpenAI API Key,可在 OpenAI 平台 注册并生成一个 API Key。随后将其保存到环境变量 OPENAI_KEY

export OPENAI_KEY=<your OpenAI API key>

然后实现 IndexCmdRun 方法,以为缺失 Embedding 的块生成 Embedding 并保存到数据库。打开 cmd/entrag/rag.go 并添加:

// Run 是当执行“index”命令时调用的方法。
func (cmd *IndexCmd) Run(cli *CLI) error {
client, err := cli.entClient()
if err != nil {
return fmt.Errorf("连接 PostgreSQL 失败: %w", err)
}
ctx := context.Background()
chunks := client.Chunk.Query().
Where(
chunk.Not(
chunk.HasEmbedding(),
),
).
Order(ent.Asc(chunk.FieldID)).
AllX(ctx)
for _, ch := range chunks {
log.Println("为段落创建嵌入", ch.Path, ch.Nchunk)
embedding := getEmbedding(ch.Data)
_, err := client.Embedding.Create().
SetEmbedding(pgvector.NewVector(embedding)).
SetChunk(ch).
Save(ctx)
if err != nil {
return fmt.Errorf("创建嵌入时出错: %v", err)
}
}
return nil
}

// getEmbedding 调用 OpenAI Embedding API 计算给定字符串的嵌入值并返回。
func getEmbedding(data string) []float32 {
client := openai.NewClient(os.Getenv("OPENAI_KEY"))
queryReq := openai.EmbeddingRequest{
Input: []string{data},
Model: openai.AdaEmbeddingV2,
}
queryResponse, err := client.CreateEmbeddings(context.Background(), queryReq)
if err != nil {
log.Fatalf("获取嵌入时出错: %v", err)
}
return queryResponse.Data[0].Embedding
}

此方法查询缺失 Embedding 的块,生成 Embedding 并保存。运行 index 命令即可:

go run ./cmd/entrag index

日志示例:

2025/02/13 13:04:42 为段落创建嵌入 /Users/home/entr/data/md/aggregate.md 0
2025/02/13 13:04:43 为段落创建嵌入 /Users/home/entr/data/md/ci.mdx 0

提问

现在我们已完成文档加载与 Embedding 的创建,下面实现 AskCmd,用于向已索引的文档提问。打开 cmd/entrag/rag.go 并添加:

// Run 是当执行“ask”命令时调用的方法。
func (cmd *AskCmd) Run(ctx *CLI) error {
client, err := ctx.entClient()
if err != nil {
return fmt.Errorf("连接 PostgreSQL 失败: %w", err)
}
question := cmd.Text
emb := getEmbedding(question)
embVec := pgvector.NewVector(emb)
embs := client.Embedding.
Query().
Order(func(s *sql.Selector) {
s.OrderExpr(sql.ExprP("embedding <-> $1", embVec))
}).
WithChunk().
Limit(5).
AllX(context.Background())
b := strings.Builder{}
for _, e := range embs {
chnk := e.Edges.Chunk
b.WriteString(fmt.Sprintf("来自文件: %v\n", chnk.Path))
b.WriteString(chnk.Data)
}
query := fmt.Sprintf(`请使用下列 ent 文档信息回答随后的问题。
信息:
%v

问题:%v`, b.String(), question)
oac := openai.NewClient(ctx.OpenAIKey)
resp, err := oac.CreateChatCompletion(
context.Background(),
openai.ChatCompletionRequest{
Model: openai.GPT4o,
Messages: []openai.ChatCompletionMessage{

{
Role: openai.ChatMessageRoleUser,
Content: query,
},
},
},
)
if err != nil {
return fmt.Errorf("创建聊天完成时出错: %v", err)
}
choice := resp.Choices[0]
out, err := glamour.Render(choice.Message.Content, "dark")
fmt.Print(out)
return nil
}

这段代码先将用户问题转换为向量,然后在数据库中查询相似 Embedding,并取前 5 条结果。随后将其文本与问题一起组成提示,调用 OpenAI GPT 生成答案,并使用 glamour 渲染输出。

在运行 ask 命令前,安装 glamour 包:

go get github.com/charmbracelet/glamour

接着执行:

go run ./cmd/entrag ask "tl;dr What is Ent?"

系统的响应类似:

  Ent 是一个面向 Go 语言的开源实体框架(ORM)。它允许开发者在 Go 代码中定义数据模型或图结构。Ent 强调以下原则:schema-as-code、通过代码生成的静态类型与显式 API、简单查询与图遍历、静态类型谓词以及无特定存储的抽象。它支持多种数据库,包括 MySQL、MariaDB、PostgreSQL、SQLite 以及 Gremlin 基础的图数据库,旨在提升 Go 开发的生产力。

进一步示例

go run ./cmd/entrag ask "how to define order field in entgql"
下面说明如何在 `entgql` 中定义排序字段。先给对应字段加上 `entgql.Annotation` 中的 `OrderField` 注解,字段名需为大写并与 GraphQL 中对应枚举值相匹配。步骤如下:
  1. 选择可比较字段:在你的 schema 中挑选你想要排序的字段,可是文本、时间戳、整数、枚举等。

  2. 为字段加注解:使用 entgql.OrderField 注解在所选字段上。注解中字段名应为大写且与 GraphQL 枚举匹配。

  3. 更新 Schema:例如在 ent/schema 中:

func (Todo) Fields() []ent.Field {
return []ent.Field{
field.Text("text").
NotEmpty().
Annotations(
entgql.OrderField("TEXT"),
),
field.Time("created_at").
Default(time.Now).
Immutable().
Annotations(
entgql.OrderField("CREATED_AT"),
),
field.Enum("status").
NamedValues(
"InProgress", "IN_PROGRESS",
"Completed", "COMPLETED",
).
Default("IN_PROGRESS").
Annotations(
entgql.OrderField("STATUS"),
),
field.Int("priority").
Default(0).
Annotations(
entgql.OrderField("PRIORITY"),
),
}
}
  1. 或者支持多字段排序:可使用 entgql.MultiOrder() 注解:
func (Todo) Annotations() []schema.Annotation {
return []schema.Annotation{
entgql.MultiOrder(),
}
}
  1. 生成 GraphQL 类型:确保 GraphQL 枚举与上述设置一致,例如:
enum OrderDirection {
ASC
DESC
}
enum TodoOrderField {
CREATED_AT
PRIORITY
STATUS
TEXT
}
input TodoOrder {
direction: OrderDirection!
field: TodoOrderField
}
  1. 在查询中加入 orderBy 参数,允许客户端按字段排序:
type Query {
todos(
after: Cursor
first: Int
before: Cursor
last: Int
orderBy: TodoOrder
): TodoConnection!
}

按照这些步骤,你的 Ent‑based 应用即可在 GraphQL API 中支持多字段排序。

go run ./cmd/entrag ask "what's the difference between privacy rules and interceptors"
隐私规则(Privacy Rules)与拦截器(Interceptors)在 Ent 框架中扮演不同角色:
  1. 隐私规则:
  • 目的:主要用于执行访问控制策略,决定查询与更新是否被允许。
  • 实现:在 ent.Policy 接口中实现各项方法(EvalQueryEvalMutation),通过若干条件判断后返回 Allow / Deny / Skip
  • 运作:在查询过程中评估是否满足特定条件,若满足则允许,若不满足则拒绝。
  • 适用场景:适合通过规则管理访问权限,确保用户仅在满足条件时才可执行操作。
  1. 拦截器:
  • 目的:将其用作“端口”或展示形态,向多种体验——匹配所有可查询/所有权限下的免费美???? (原文不完整,略)
    说明:如果你想在回调中使用第一种..??/

总结:隐私规则专注于访问控制,拦截器则环绕“此处”定位等工具

结束语

在本文中,我们演示了如何使用 Ent、Atlas 与 pgvector 构建一个 RAG 系统。特别感谢 Eli Bendersky 的原始博客以及多年优秀的 Go 之作。