跳到主要内容

使用 Ent Hooks 将更改同步至外部数据系统

· 阅读需 9 分钟

Ent 社区常见的问题之一是如何将 Ent 应用程序(例如 MySQL 或 PostgreSQL)背后的对象或引用与外部服务同步。例如,用户想在 Ent 创建或删除用户时,在 CRM 内部创建或删除记录,实体更新时发布消息到 Pub/Sub 系统,或者验证对象存储(如 AWS S3 或 Google Cloud Storage)中 blobs 的引用。

确保两个独立数据系统之间的一致性并不简单。当我们想要将例如一系统中记录的删除传播到另一系统时,没有明显的方法能保证两侧最终处于同步状态,因为其中一个可能失败,或它们之间的网络链接可能慢或中断。说了这些,尤其在微服务架构盛行之下,这些问题变得更常见,分布式系统研究者提出了多种解决模式,例如 Saga Pattern

这些模式的应用通常很复杂且困难,因此在许多情况下,架构师并不会追求“完美”的设计,而是追求更简单的方案,例如接受系统间的一定不一致或后台调和流程。

本文不讨论如何为 Ent 实现分布式事务或 Saga 模式,而是聚焦于研究如何在 Ent 变异发生前后钩入并执行自定义逻辑。

将变异传播到外部系统

在本例中,我们将创建一个简单的 User 模式,包含两个不变字符串字段,"name""avatar_url"。让我们运行 ent init 命令为我们的 User 创建一个骨架模式:

go run entgo.io/ent/cmd/ent new User

然后,添加 nameavatar_url 字段,并运行 go generate 生成资产。

ent/schema/user.go
type User struct {
ent.Schema
}

func (User) Fields() []ent.Field {
return []ent.Field{
field.String("name").
Immutable(),
field.String("avatar_url").
Immutable(),
}
}
go generate ./ent

问题

avatar_url 字段定义了指向我们对象存储桶中图片的 URL(例如 AWS S3)。为此讨论,我们想确保:

  • 当用户被创建时,URL 所指向的图片在我们的桶中存在。
  • 垃圾图片在用户被删除后被清除。也就是说,当系统中删除用户后,其头像图片也应被删除。

为操作 blobs,我们使用 gocloud.dev/blob 包。该包为读取、写入、删除与列举桶中 blobs 提供抽象。类似于 database/sql 包,它允许通过配置驱动 URL 以相同 API 与多种对象存储交互。例如:

// 打开一个内存桶。 
if bucket, err := blob.OpenBucket(ctx, "mem://photos/"); err != nil {
log.Fatal("打开内存桶失败:", err)
}

// 打开名为 photos 的 S3 桶。
if bucket, err := blob.OpenBucket(ctx, "s3://photos"); err != nil {
log.Fatal("打开 S3 桶失败:", err)
}

// 打开名为 photos 的 Google Cloud Storage 桶。
if bucket, err := blob.OpenBucket(ctx, "gs://my-bucket"); err != nil {
log.Fatal("打开 GCS 桶失败:", err)
}

模式钩子

Hooks 是 Ent 的强大功能,允许在变异操作前后添加自定义逻辑。

钩子可以通过 client.Use 动态定义(称为“运行时钩子”),也可以显式在模式上定义(称为“模式钩子”),如下所示:

// User 的 Hooks
func (User) Hooks() []ent.Hook {
return []ent.Hook{
EnsureImageExists(),
DeleteOrphans(),
}
}

如你所料,EnsureImageExists 钩子将负责确保当用户创建时其头像 URL 在桶中存在,DeleteOrphans 钩子将确保删除垃圾图片。让我们开始编写它们。

ent/schema/hooks.go
func EnsureImageExists() ent.Hook {
hk := func(next ent.Mutator) ent.Mutator {
return hook.UserFunc(func(ctx context.Context, m *ent.UserMutation) (ent.Value, error) {
avatarURL, exists := m.AvatarURL()
if !exists {
return nil, errors.New("avatar 字段缺失")
}
// TODO:
// 1. 验证 "avatarURL" 指向桶中实际存在的对象。
// 2. 否则,失败。
return next.Mutate(ctx, m)
})
}
// 将钩子仅限于 "Create" 操作。
return hook.On(hk, ent.OpCreate)
}

func DeleteOrphans() ent.Hook {
hk := func(next ent.Mutator) ent.Mutator {
return hook.UserFunc(func(ctx context.Context, m *ent.UserMutation) (ent.Value, error) {
id, exists := m.ID()
if !exists {
return nil, errors.New("id 字段缺失")
}
// TODO:
// 1. 获取被删除用户的 AvatarURL 字段。
// 2. 将删除操作级联到对象存储。
return next.Mutate(ctx, m)
})
}
// 将钩子仅限于 "DeleteOne" 操作。
return hook.On(hk, ent.OpDeleteOne)
}

现在,你可能会问:我们如何从变异钩子访问 blob 客户端?下一节你将了解到。

注入依赖

entc.Dependency 选项允许通过结构体字段扩展生成的构造器,并提供在客户端初始化时注入它们的选项。

要将 blob.Bucket 注入并可在钩子中使用,我们可以参照网站外部依赖教程,并将 gocloud.dev/blob.Bucket 作为依赖:

ent/entc.go
func main() {
opts := []entc.Option{
entc.Dependency(
entc.DependencyName("Bucket"),
entc.DependencyType(&blob.Bucket{}),
),
}
if err := entc.Generate("./schema", &gen.Config{}, opts...); err != nil {
log.Fatalf("运行 ent 代码生成时出错: %v", err)
}
}

接着重新运行代码生成:

go generate ./ent

我们现在可以从所有生成的构造器访问 Bucket API。让我们完成上述钩子的实现。

ent/schema/hooks.go
// EnsureImageExists 确保 avatar_url 指向桶中的实际对象。
func EnsureImageExists() ent.Hook {
hk := func(next ent.Mutator) ent.Mutator {
return hook.UserFunc(func(ctx context.Context, m *ent.UserMutation) (ent.Value, error) {
avatarURL, exists := m.AvatarURL()
if !exists {
return nil, errors.New("avatar 字段缺失")
}
switch exists, err := m.Bucket.Exists(ctx, avatarURL); {
case err != nil:
return nil, fmt.Errorf("检查键存在性: %w", err)
case !exists:
return nil, fmt.Errorf("键 %q 在桶中不存在", avatarURL)
default:
return next.Mutate(ctx, m)
}
})
}
return hook.On(hk, ent.OpCreate)
}

// DeleteOrphans 将用户删除级联到桶中。
func DeleteOrphans() ent.Hook {
hk := func(next ent.Mutator) ent.Mutator {
return hook.UserFunc(func(ctx context.Context, m *ent.UserMutation) (ent.Value, error) {
id, exists := m.ID()
if !exists {
return nil, errors.New("id 字段缺失")
}
u, err := m.Client().User.Get(ctx, id)
if err != nil {
return nil, fmt.Errorf("获取已删除用户时出错: %w", err)
}
if err := m.Bucket.Delete(ctx, u.AvatarURL); err != nil {
return nil, fmt.Errorf("从桶中删除用户头像时出错: %w", err)
}
return next.Mutate(ctx, m)
})
}
return hook.On(hk, ent.OpDeleteOne)
}

现在,是时候测试我们的钩子了!让我们编写一个可测试的示例来验证我们的 2 个钩子是否按预期工作。

package main

import (
"context"
"fmt"
"log"

"github.com/a8m/ent-sync-example/ent"
_ "github.com/a8m/ent-sync-example/ent/runtime"

"entgo.io/ent/dialect"
_ "github.com/mattn/go-sqlite3"
"gocloud.dev/blob"
_ "gocloud.dev/blob/memblob"
)

func Example_SyncCreate() {
ctx := context.Background()
// 打开一个内存桶。
bucket, err := blob.OpenBucket(ctx, "mem://photos/")
if err != nil {
log.Fatal("打开桶失败:", err)
}
client, err := ent.Open(
dialect.SQLite,
"file:ent?mode=memory&cache=shared&_fk=1",
// 在客户端初始化时注入 blob.Bucket
ent.Bucket(bucket),
)
if err != nil {
log.Fatal("打开 sqlite 连接失败:", err)
}
defer client.Close()
if err := client.Schema.Create(ctx); err != nil {
log.Fatal("创建 schema 资源失败:", err)
}
if err := client.User.Create().SetName("a8m").SetAvatarURL("a8m.png").Exec(ctx); err == nil {
log.Fatal("期望创建用户失败,因为图像不存在于桶中")
}
if err := bucket.WriteAll(ctx, "a8m.png", []byte{255, 255, 255}, nil); err != nil {
log.Fatalf("上传图像到桶中失败: %v", err)
}
fmt.Printf("%q\n", keys(ctx, bucket))

// 由于图像已上传至桶中,用户创建应成功。
u := client.User.Create().SetName("a8m").SetAvatarURL("a8m.png").SaveX(ctx)

// 删除用户时,应同时删除其图像。
client.User.DeleteOne(u).ExecX(ctx)
fmt.Printf("%q\n", keys(ctx, bucket))

// Output:
// ["a8m.png"]
// []
}

结束语

太棒了!我们已配置 Ent 扩展生成的代码并注入 blob.Bucket 为一个 外部依赖。接下来,我们定义了两个变异钩子,并使用 blob.Bucket API 来满足我们的业务约束。

此示例的完整代码可在 github.com/a8m/ent-sync-example 获取。

了解更多 Ent 新闻与更新: