跳到主要内容

使用 Ent 的数据库锁定技术

· 阅读需 12 分钟

锁定是任何并发计算程序的基本构造块。当许多操作同时发生时,程序员会使用锁来保证对资源的互斥访问。锁(以及其他互斥原语)存在于堆栈的许多不同层级,从低级的 CPU 指令到应用层的 API(例如 Go 里的 sync.Mutex)。

在使用关系型数据库时,应用开发者常常需要能够在记录上获取锁。想象一下一个名为 inventory 的表,列出了在电商网站上可供出售的商品。该表可能有一列 state,可以设置为 availablepurchased。若想避免两个用户同时认为已经成功购买同一件库存商品,应用必须阻止两个操作将记录从 available 变为 purchased

如何让应用保证这一点?仅让服务器检查所需项目是否为 available 后再将其设置为 purchased 并不够。想象这样一个情景:两个用户同时尝试购买同一件商品。来自它们浏览器的两个请求几乎同时到达应用服务器。两者都会查询数据库获取该项目的状态,并看到它是 available。于是两者的请求处理器都会发出 UPDATE 查询,将状态设置为 purchased 并将 buyer_id 设为请求用户的 id。两个查询都会成功,但记录的最终状态将是最后发出的 UPDATE 查询的用户被视为该商品的买家。

多年来,出现了不同的技术,让开发者能够编写对用户提供这些保证的应用程序。其中一些涉及数据库提供的显式锁定机制,而另一些则利用数据库更通用的 ACID 属性来实现互斥。在本文中,我们将使用 Ent 探索这两种技术的实现。

乐观锁定

乐观锁定(有时也称为乐观并发控制)是一种可用于实现锁定行为的技术,而无需显式为任何记录获取锁。

从宏观上看,乐观锁定的工作方式如下:

  • 每条记录被分配一个数值版本号。该值必须单调递增。通常使用最新行更新的 Unix 时间戳。
  • 事务读取记录时,记下其版本号。
  • 发出 UPDATE 语句以修改记录:
    • 语句必须包含一个谓词,要求版本号自上一次读到后未发生变化。例如:WHERE id=<id> AND version=<previous version>
    • 语句必须将版本号递增。部分应用将当前值加 1,部分应用将其设为当前时间戳。
  • 数据库返回 UPDATE 语句修改的行数。如果行数为 0,说明有人在我们读取后、想更新前已经修改了记录。事务被视为失败,回滚并可重试。

乐观锁定通常用于“低争用”环境(两事务相互冲突的可能性相对低)且锁定逻辑可以信任在应用层完成的情况。如果数据库中有写入者无法保证遵守所需逻辑,则此技术就失效。

让我们看看如何使用 Ent 应用此技术。

首先为 User 定义我们的 ent.Schema。用户有一个 online 布尔字段表示其是否在线,以及一个 int64 字段存储当前版本号。

// User holds the schema definition for the User entity.
type User struct {
ent.Schema
}

// Fields of the User.
func (User) Fields() []ent.Field {
return []ent.Field{
field.Bool("online"),
field.Int64("version").
DefaultFunc(func() int64 {
return time.Now().UnixNano()
}).
Comment("Unix time of when the latest update occurred")
}
}

接下来实现一个简单的乐观锁定更新 online 字段:

func optimisticUpdate(tx *ent.Tx, prev *ent.User, online bool) error {
// The next version number for the record must monotonically increase
// using the current timestamp is a common technique to achieve this.
nextVer := time.Now().UnixNano()

// We begin the update operation:
n := tx.User.Update().

// We limit our update to only work on the correct record and version:
Where(user.ID(prev.ID), user.Version(prev.Version)).

// We set the next version:
SetVersion(nextVer).

// We set the value we were passed by the user:
SetOnline(online).
SaveX(context.Background())

// SaveX returns the number of affected records. If this value is
// different from 1 the record must have been changed by another
// process.
if n != 1 {
return fmt.Errorf("update failed: user id=%d updated by another process", prev.ID)
}
return nil
}

然后编写测试,验证如果两个进程尝试编辑同一条记录,只有一个会成功:

func TestOCC(t *testing.T) {
client := enttest.Open(t, "sqlite3", "file:ent?mode=memory&cache=shared&_fk=1")
ctx := context.Background()

// Create the user for the first time.
orig := client.User.Create().SetOnline(true).SaveX(ctx)

// Read another copy of the same user.
userCopy := client.User.GetX(ctx, orig.ID)

// Open a new transaction:
tx, err := client.Tx(ctx)
if err != nil {
log.Fatalf("failed creating transaction: %v", err)
}

// Try to update the record once. This should succeed.
if err := optimisticUpdate(tx, userCopy, false); err != nil {
tx.Rollback()
log.Fatal("unexpected failure:", err)
}

// Try to update the record a second time. This should fail.
err = optimisticUpdate(tx, orig, false)
if err == nil {
log.Fatal("expected second update to fail")
}
fmt.Println(err)
}

运行测试:

=== RUN   TestOCC
update failed: user id=1 updated by another process
--- PASS: Test (0.00s)

太好了!使用乐观锁定我们可以阻止两个进程互相踩踏!

悲观锁定

如前所述,乐观锁定并不总是适用。对于我们更愿意将维护锁完整性职责委托给数据库的用例,一些数据库引擎(如 MySQL、Postgres、MariaDB,但不包括 SQLite)提供了悲观锁定能力。这些数据库支持一个对 SELECT 语句的修饰符,叫做 SELECT ... FOR UPDATE。MySQL 文档 解释

A SELECT ... FOR UPDATE reads the latest available data, setting exclusive locks on each row it reads. Thus, it sets the same locks a searched SQL UPDATE would set on the rows.

或者,文档中解释了 SELECT ... FOR SHARE 语句:

Sets a shared mode lock on any rows that are read. Other sessions can read the rows, but cannot modify them until your transaction commits. If any of these rows were changed by another transaction that has not yet committed, your query waits until that transaction ends and then uses the latest values.

Ent 最近添加了对 FOR SHARE/FOR UPDATE 语句的支持,使用名为 sql/lock 的功能标志。要使用它,请修改 generate.go 文件,加入 --feature sql/lock

//go:generate go run -mod=mod entgo.io/ent/cmd/ent generate --feature sql/lock ./schema 

接下来实现一个使用悲观锁定以确保仅有单个进程可以更新我们的 User 对象的 online 字段的函数:

func pessimisticUpdate(tx *ent.Tx, id int, online bool) (*ent.User, error) {
ctx := context.Background()

// On our active transaction, we begin a query against the user table
u, err := tx.User.Query().

// We add a predicate limiting the lock to the user we want to update.
Where(user.ID(id)).

// We use the ForUpdate method to tell ent to ask our DB to lock
// the returned records for update.
ForUpdate(
// We specify that the query should not wait for the lock to be
// released and instead fail immediately if the record is locked.
sql.WithLockAction(sql.NoWait),
).
Only(ctx)

// If we failed to acquire the lock we do not proceed to update the record.
if err != nil {
return nil, err
}

// Finally, we set the online field to the desired value.
return u.Update().SetOnline(online).Save(ctx)
}

现在编写测试,验证如果两个进程尝试编辑同一条记录,只有一个会成功:

func TestPessimistic(t *testing.T) {
ctx := context.Background()
client := enttest.Open(t, dialect.MySQL, "root:pass@tcp(localhost:3306)/test?parseTime=True")

// Create the user for the first time.
orig := client.User.Create().SetOnline(true).SaveX(ctx)

// Open a new transaction. This transaction will acquire the lock on our user record.
tx, err := client.Tx(ctx)
if err != nil {
log.Fatalf("failed creating transaction: %v", err)
}
defer tx.Commit()

// Open a second transaction. This transaction is expected to fail at
// acquiring the lock on our user record.
tx2, err := client.Tx(ctx)
if err != nil {
log.Fatalf("failed creating transaction: %v", err)
}
defer tx.Commit()

// The first update is expected to succeed.
if _, err := pessimisticUpdate(tx, orig.ID, true); err != nil {
log.Fatalf("unexpected error: %s", err)
}

// Because we did not run tx.Commit yet, the row is still locked when
// we try to update it a second time. This operation is expected to
// fail.
_, err = pessimisticUpdate(tx2, orig.ID, true)
if err == nil {
log.Fatal("expected second update to fail")
}
fmt.Println(err)
}

示例中值得一提的几点:

  • 请注意我们使用真实的 MySQL 实例来运行此测试,因为 SQLite 不支持 SELECT .. FOR UPDATE
  • 为了示例的简化,我们使用了 sql.NoWait 选项,告诉数据库在无法获取锁时返回错误。这意味着调用应用在收到错误后需要重试写操作。若不指定此选项,我们可以创建应用在获取锁失败时阻塞直至锁释放并继续操作的流程。这不一定总是理想的,但会开启一些有趣的设计选项。
  • 我们必须始终提交事务。忘记提交可能导致严重问题。请记住,在锁保持期间,没人能读取或更新该记录。

运行测试:

=== RUN   TestPessimistic
Error 3572: Statement aborted because lock(s) could not be acquired immediately and NOWAIT is set.
--- PASS: TestPessimistic (0.08s)

太好了!我们利用 MySQL 的 “锁定读取” 能力以及 Ent 对其新支持,实现了真正的互斥保证。

结论

我们从介绍导致应用开发者在使用数据库时寻求锁定技术的业务需求开始。随后介绍了两种在更新数据库记录时实现互斥的方法,并演示了如何在 Ent 中使用这些技术。

有问题吗?需要帮助开始?欢迎加入我们的 Discord 服务器Slack 频道

关于更多 Ent 新闻与更新: