数据迁移 (Data Migrations)
迁移通常用于更改数据库模式,但在某些情况下,需要修改存储在数据库中的数据。例如,添加种子数据,或使用自定义默认值回填空列。
这种类型的迁移称为数据迁移。本文将讨论如何使用 Ent 规划数据迁移并将其集成到常规的模式迁移工作流中。
迁移类型
Ent 目前支持两种类型的迁移:版本化迁移 和 声明式迁移(也称为自动迁移)。两种迁移类型均可执行数据迁移。
版本化迁移
使用版本化迁移时,数据迁移应存储在相同的 migrations 目录中,并以与常规迁移相同的方式执行。但建议将数据迁移和模式迁移分别存储在不同的文件中,以便于测试。
此类迁移使用 SQL 格式,因为即使 Ent 模式被修改且生成的代码不再与数据迁移文件兼容,该文件也可以安全执行(并原样存储)。
有两种创建数据迁移脚本的方法:手动创建和生成创建。通过手动编辑,用户可以编写所有 SQL 语句,并能精确控制执行内容。或者,用户可以使用 Ent 生成数据迁移。建议验证生成的文件是否正确生成,因为在某些情况下可能需要手动修复或编辑。
手动创建
1. 如果尚未安装 Atlas,请查阅其 入门指南。
2. 使用 Atlas 创建新的迁移文件:
atlas migrate new <migration_name> \
--dir "file://my/project/migrations"
3. 编辑迁移文件并添加自定义数据迁移。例如:
-- 使用默认值为 NULL 或 null 的 tags 回填数据。
UPDATE `users` SET `tags` = '["foo","bar"]' WHERE `tags` IS NULL OR JSON_CONTAINS(`tags`, 'null', '$');
4. 更新迁移目录的 完整性文件:
atlas migrate hash \
--dir "file://my/project/migrations"
如果不确定如何测试数据迁移文件,请查看下面的 测试 部分。
生成脚本
目前,Ent 提供了对生成数据迁移文件的初步支持。使用此选项,在大多数情况下可以简化手动编写复杂 SQL 语句的过程。但仍建议验证生成的文件是否正确生成,因为在某些边缘情况下可能需要手动编辑。
1. 创建您的 版本化迁移设置(如果尚未设置)。
2. 创建您的第一个数据迁移函数。下面将提供一些示例来演示如何编写此类函数:
- 单语句 (Single Statement)
- 多语句 (Multi Statement)
- 数据播种 (Data Seeding)
package migratedata
// BackfillUnknown 使用默认值 'Unknown' 回填所有空用户名。
func BackfillUnknown(dir *migrate.LocalDir) error {
w := &schema.DirWriter{Dir: dir}
client := ent.NewClient(ent.Driver(schema.NewWriteDriver(dialect.MySQL, w)))
// 将所有空名称更改为 'unknown'。
err := client.User.
Update().
Where(
user.NameEQ(""),
).
SetName("Unknown").
Exec(context.Background())
if err != nil {
return fmt.Errorf("failed generating statement: %w", err)
}
// 将内容写入迁移目录。
return w.FlushChange(
"unknown_names",
"使用默认值 'unknown' 回填所有空用户名。",
)
}
然后,在 ent/migrate/main.go 中使用此函数将生成以下迁移文件:
-- 使用默认值 'unknown' 回填所有空用户名。
UPDATE `users` SET `name` = 'Unknown' WHERE `users`.`name` = '';
package migratedata
// BackfillUserTags 用于生成迁移文件 '20221126185750_backfill_user_tags.sql'。
func BackfillUserTags(dir *migrate.LocalDir) error {
w := &schema.DirWriter{Dir: dir}
client := ent.NewClient(ent.Driver(schema.NewWriteDriver(dialect.MySQL, w)))
// 为没有任何标签的用户添加默认标签 "foo" 和 "bar"。
err := client.User.
Update().
Where(func(s *sql.Selector) {
s.Where(
sql.Or(
sql.IsNull(user.FieldTags),
sqljson.ValueIsNull(user.FieldTags),
),
)
}).
SetTags([]string{"foo", "bar"}).
Exec(context.Background())
if err != nil {
return fmt.Errorf("failed generating backfill statement: %w", err)
}
// 使用自定义注释记录到目前为止的所有更改。
w.Change("使用默认值为 NULL 或 null 的 tags 回填数据。")
// 为具有特定前缀或后缀的用户追加 "org" 特殊标签。
err = client.User.
Update().
Where(
user.Or(
user.NameHasPrefix("org-"),
user.NameHasSuffix("-org"),
),
// 仅追加给没有此标签的用户。
func(s *sql.Selector) {
s.Where(
sql.Not(sqljson.ValueContains(user.FieldTags, "org")),
)
},
).
AppendTags([]string{"org"}).
Exec(context.Background())
if err != nil {
return fmt.Errorf("failed generating backfill statement: %w", err)
}
// 使用自定义注释记录到目前为止的所有更改。
w.Change("为组织帐户追加 'org' 标签(如果尚未拥有)。")
// 将内容写入迁移目录。
return w.Flush("backfill_user_tags")
}
然后,在 ent/migrate/main.go 中使用此函数将生成以下迁移文件:
-- 使用默认值为 NULL 或 null 的 tags 回填数据。
UPDATE `users` SET `tags` = '["foo","bar"]' WHERE `tags` IS NULL OR JSON_CONTAINS(`tags`, 'null', '$');
-- 为组织帐户追加 'org' 标签(如果尚未拥有)。
UPDATE `users` SET `tags` = CASE WHEN (JSON_TYPE(JSON_EXTRACT(`tags`, '$')) IS NULL OR JSON_TYPE(JSON_EXTRACT(`tags`, '$')) = 'NULL') THEN JSON_ARRAY('org') ELSE JSON_ARRAY_APPEND(`tags`, '$', 'org') END WHERE (`users`.`name` LIKE 'org-%' OR `users`.`name` LIKE '%-org') AND (NOT (JSON_CONTAINS(`tags`, '"org"', '$') = 1));
package migratedata
// SeedUsers 将初始用户添加到数据库。
func SeedUsers(dir *migrate.LocalDir) error {
w := &schema.DirWriter{Dir: dir}
client := ent.NewClient(ent.Driver(schema.NewWriteDriver(dialect.MySQL, w)))
// 生成 INSERT 语句。
err := client.User.CreateBulk(
client.User.Create().SetName("a8m").SetAge(1).SetTags([]string{"foo"}),
client.User.Create().SetName("nati").SetAge(1).SetTags([]string{"bar"}),
).Exec(context.Background())
if err != nil {
return fmt.Errorf("failed generating statement: %w", err)
}
// 将内容写入迁移目录。
return w.FlushChange(
"seed_users",
"将初始用户添加到数据库。",
)
}
然后,在 ent/migrate/main.go 中使用此函数将生成以下迁移文件:
-- 将初始用户添加到数据库。
INSERT INTO `users` (`age`, `name`, `tags`) VALUES (1, 'a8m', '["foo"]'), (1, 'nati', '["bar"]');
3. 如果编辑了生成的文件,需要使用以下命令更新迁移目录的 完整性文件:
atlas migrate hash \
--dir "file://my/project/migrations"
测试
添加迁移文件后,强烈建议在本地数据库上应用它们,以确保其有效且能达到预期结果。此过程可以手动完成,也可以通过程序自动化。
1. 执行所有迁移文件直至最后一个创建的数据迁移文件:
# 文件总数。
number_of_files=$(ls ent/migrate/migrations/*.sql | wc -l)
# 执行除最新文件外的所有文件。
atlas migrate apply $[number_of_files-1] \
--dir "file://my/project/migrations" \
-u "mysql://root:pass@localhost:3306/test"
2. 确保最后一个迁移文件待执行:
atlas migrate status \
--dir "file://my/project/migrations" \
-u "mysql://root:pass@localhost:3306/test"
迁移状态:待处理 (Migration Status: PENDING)
-- 当前版本:<VERSION_N-1> (Current Version: <VERSION_N-1>)
-- 下一个版本:<VERSION_N> (Next Version: <VERSION_N>)
-- 已执行文件:<N-1> (Executed Files: <N-1>)
-- 待处理文件:1 (Pending Files: 1)
3. 在运行数据迁移文件之前,用代表生产数据库的临时数据填充本地数据库。
4. 运行 atlas migrate apply 并确保其成功执行。
atlas migrate apply \
--dir "file://my/project/migrations" \
-u "mysql://root:pass@localhost:3306/test"
注意,使用 atlas schema clean 可以清理用于本地开发的数据库,并重复此过程,直到数据迁移文件达到预期结果。
自动迁移
在声明式工作流中,数据迁移使用 Diff 或 Apply 钩子 实现。这是因为,与版本化选项不同,此类型的迁移在应用时不持有名称或版本。因此,当使用钩子写入数据时,必须在执行 schema.Change 之前检查其类型,以确保数据迁移不会多次应用。
func FillNullValues(dbdialect string) schema.ApplyHook {
return func(next schema.Applier) schema.Applier {
return schema.ApplyFunc(func(ctx context.Context, conn dialect.ExecQuerier, plan *migrate.Plan) error {
// 搜索触发数据迁移的 schema.Change。
hasC := func() bool {
for _, c := range plan.Changes {
m, ok := c.Source.(*schema.ModifyTable)
if ok && m.T.Name == user.Table && schema.Changes(m.Changes).IndexModifyColumn(user.FieldName) != -1 {
return true
}
}
return false
}()
// 找到更改,应用数据迁移。
if hasC {
// 在此阶段,有三种方法可以将 NULL 值 UPDATE 为 "Unknown"。
// 向 migrate.Plan 追加自定义 migrate.Change,在 dialect.ExecQuerier 上直接执行 SQL 语句,
// 或使用生成的 ent.Client。
// 从迁移连接创建临时客户端。
client := ent.NewClient(
ent.Driver(sql.NewDriver(dbdialect, sql.Conn{ExecQuerier: conn.(*sql.Tx)})),
)
if err := client.User.
Update().
SetName("Unknown").
Where(user.NameIsNil()).
Exec(ctx); err != nil {
return err
}
}
return next.Apply(ctx, conn, plan)
})
}
}
更多示例,请查看 Apply Hook 示例部分。