Django 中的 select_for_update(skip_locked)

2021 年 12 月,Django 在新发布的 4.0 版本中提供了对 MariaDB 数据库后端的 QuerySet.select_for_update()skip_locked 参数的支持。这个参数在 MariaDB 10.6 中被引入,而八个月后,Django 的下一个主要版本便宣布了这个参数提供的支持。

这个参数并不是 MariaDB 首创的。早在 2017 年的 MySQL 8.0.1 版本中,MySQL 便将两个 SELECT FOR UPDATE 的主要特性 SKIP LOCKED 以及 NOWAIT 引入数据库的实现中。为此,当时还任职于 Oracle 的工程师 Martin Hansson 还撰写了一篇博客文章来解释这两个新加入的特性。这篇博客文章很详细,这里不再赘述。值得注意的是,在文章的最后一段,Martin 特意强调了 SKIP LOCKED 描述符的一个用途:

SKIP LOCKED is very handy in the case of multi-threaded workers trying to find the next N rows in a table that need processing.

它特别适合于多进程(线程)Worker 的实现。传统的实现方案中,为了避免运作于同一个数据表的多个 Worker 的数据竞争,经常会通过表中加入名为 picked 的字段或者使用(某种意义上的)全局分布式锁来解决该问题。有了 SKIP LOCKED 描述符之后,我们能更方便地完成此类需求。

我们以 Python Django 框架代码为例,展示结合 QuerySet.select_for_update()skip_locked 参数是如何使用的。

不支持 SQLite

(应该很多人都已经知道了,)SQLite 不支持 QuerySet.select_for_update() 语句。在一个以 SQLite 为后端的数据库实现中执行 QuerySet.select_for_update() 将不会有任何效果:

Student.objects.create(
    id=1,
    student_name="Alice"
)
Student.objects.create(
    id=2,
    student_name="Bob"
)

然后,通过两个 Shell 执行下列语句:

from django.db import transaction

with transaction.atomic():
    students = Student.objects.select_for_update().all()
    print([(student.id, student.student_name) for student in students])
    input() # 将事务挂起

可以发现,两个 Shell 的输出都是 [(1, 'Alice'), (2, 'Bob')]。在 SQLite 中的 QuerySet.select_for_update() 没有任何效果。

MariaDB 的情况

我们将数据库后端更换为 MariaDB 进行测试。在本例中我们使用了 MariaDB 10.6 版本,这是 Ubuntu 22.04 LTS 系统的默认 MariaDB 版本。同样执行上述代码后,第一个 Shell 的输出结果为:

[(1, 'Alice'), (2, 'Bob')]

而第二个 Shell 一开始不会有任何结果,而等待 50 秒后(因为 MariaDB 的 table_lock_wait_timeout默认值是 50),则会输出:

OperationalError: (1205, 'Lock wait timeout exceeded; try restarting transaction')

当我们在第一个 Shell 中将选取逻辑改为只选取第一个学生(Student.objects.select_for_update.get(id=1)),而第二个 Shell 中希望 Students.objects.select_for_update().all() 时,情况也是如此。此时,数据库锁的情况是这样的(通过 show engine innodb status):

------------
TRANSACTIONS
------------
Trx id counter 352
Purge done for trx's n:o < 350 undo n:o < 0 state: running but idle
History list length 0
LIST OF TRANSACTIONS FOR EACH SESSION:
---TRANSACTION 351, ACTIVE 134 sec
2 lock struct(s), heap size 1128, 1 row lock(s)
MariaDB thread id 37, OS thread handle 139993933403712, query id 393 localhost root

只有部分的行被选中,应用于 QuerySet.all() 上的 QuerySet.select_for_update() 描述的调用仍然会阻塞,这是可预期的行为。

现在,我们将 skip_locked 参数应用于 QuerySet.select_for_update() 上再来试试看。我们更改第二个 Shell 中执行的语句,将其改为:

# ...
students = Student.objects.select_for_update(skip_locked=True).all()
# ...

执行后,没有任何阻塞,我们得到输出:

[(2, 'Bob')]

此时数据库锁的情况是:

------------
TRANSACTIONS
------------
Trx id counter 353
Purge done for trx's n:o < 350 undo n:o < 0 state: running but idle
History list length 0
LIST OF TRANSACTIONS FOR EACH SESSION:
---TRANSACTION 352, ACTIVE 28 sec
2 lock struct(s), heap size 1128, 2 row lock(s)
MariaDB thread id 39, OS thread handle 139993933096512, query id 403 localhost root 
---TRANSACTION 351, ACTIVE 365 sec
2 lock struct(s), heap size 1128, 1 row lock(s)
MariaDB thread id 37, OS thread handle 139993933403712, query id 393 localhost root

在日志中,TRANSACTION 352 是本次请求新增的数据库锁。

一个更加复杂的例子

我们目前已经向学生数据表中添加另外几名学生的数据信息。现在,Students 表里已经共有 10 名学生的数据。我们的两个 Shell 会话中分别在事务里进行了如下的选取:

Student.objects.select_for_update().filter(id__in=[1,3,4,7])

得到输出 [(1, 'Alice'), (3, 'Charlie'), (4, 'Delta'), (7, 'Golf')]

另一个 Shell 中:

Student.objects.select_for_update(skip_locked=True).all()[:5]

得到输出 [(5, 'Echo'), (6, 'Foxtrot'), (8, 'hotel'), (9, 'India'), (10, 'Jack')]

可以看到,skip_locked 甚至可以结合 QuerySet 的数组分片操作使用。这样一来,在多个 Worker 同时基于某种条件对数据库记录进行操作的过程中,就不再需要依赖额外字段或者外部锁来对这些 Worker 之间的潜在数据竞争加以限制了。