2021 年 12 月,Django 在新发布的 4.0 版本中提供了对 MariaDB 数据库后端的 QuerySet.select_for_update()
中 skip_locked
参数的支持。这个参数在 MariaDB 10.6 中被引入,而八个月后,Django 的下一个主要版本便宣布了这个参数提供的支持。
这个参数并不是 MariaDB 首创的。早在 2017 年的 MySQL 8.0.1 版本中,MySQL 便将两个 SELECT FOR UPDATE
的主要特性 SKIP LOCKED
以及 NOWAIT
引入数据库的实现中。为此,当时还任职于 Oracle 的工程师 Martin Hansson 还撰写了一篇博客文章来解释这两个新加入的特性。这篇博客文章很详细,这里不再赘述。值得注意的是,在文章的最后一段,Martin 特意强调了 SKIP LOCKED
描述符的一个用途:
SKIP LOCKED
is very handy in the case of multi-threaded workers trying to find the next N rows in a table that need processing.
它特别适合于多进程(线程)Worker 的实现。传统的实现方案中,为了避免运作于同一个数据表的多个 Worker 的数据竞争,经常会通过表中加入名为 picked
的字段或者使用(某种意义上的)全局分布式锁来解决该问题。有了 SKIP LOCKED
描述符之后,我们能更方便地完成此类需求。
我们以 Python Django 框架代码为例,展示结合 QuerySet.select_for_update()
中 skip_locked
参数是如何使用的。
不支持 SQLite
(应该很多人都已经知道了,)SQLite 不支持 QuerySet.select_for_update()
语句。在一个以 SQLite 为后端的数据库实现中执行 QuerySet.select_for_update()
将不会有任何效果:
Student.objects.create(
id=1,
student_name="Alice"
)
Student.objects.create(
id=2,
student_name="Bob"
)
然后,通过两个 Shell 执行下列语句:
from django.db import transaction
with transaction.atomic():
students = Student.objects.select_for_update().all()
print([(student.id, student.student_name) for student in students])
input() # 将事务挂起
可以发现,两个 Shell 的输出都是 [(1, 'Alice'), (2, 'Bob')]
。在 SQLite 中的
没有任何效果。QuerySet.select_for_update()
MariaDB 的情况
我们将数据库后端更换为 MariaDB 进行测试。在本例中我们使用了 MariaDB 10.6 版本,这是 Ubuntu 22.04 LTS 系统的默认 MariaDB 版本。同样执行上述代码后,第一个 Shell 的输出结果为:
[(1, 'Alice'), (2, 'Bob')]
而第二个 Shell 一开始不会有任何结果,而等待 50 秒后(因为 MariaDB 的 table_lock_wait_timeout
的默认值是 50),则会输出:
OperationalError: (1205, 'Lock wait timeout exceeded; try restarting transaction')
当我们在第一个 Shell 中将选取逻辑改为只选取第一个学生(Student.objects.select_for_update.get(id=1)
),而第二个 Shell 中希望
时,情况也是如此。此时,数据库锁的情况是这样的(通过 Students.objects
.select_for_update().all()show engine innodb status
):
------------
TRANSACTIONS
------------
Trx id counter 352
Purge done for trx's n:o < 350 undo n:o < 0 state: running but idle
History list length 0
LIST OF TRANSACTIONS FOR EACH SESSION:
---TRANSACTION 351, ACTIVE 134 sec
2 lock struct(s), heap size 1128, 1 row lock(s)
MariaDB thread id 37, OS thread handle 139993933403712, query id 393 localhost root
只有部分的行被选中,应用于 QuerySet.all()
上的 QuerySet.select_for_update()
描述的调用仍然会阻塞,这是可预期的行为。
现在,我们将 skip_locked
参数应用于 QuerySet.select_for_update()
上再来试试看。我们更改第二个 Shell 中执行的语句,将其改为:
# ...
students = Student.objects.select_for_update(skip_locked=True).all()
# ...
执行后,没有任何阻塞,我们得到输出:
[(2, 'Bob')]
此时数据库锁的情况是:
------------
TRANSACTIONS
------------
Trx id counter 353
Purge done for trx's n:o < 350 undo n:o < 0 state: running but idle
History list length 0
LIST OF TRANSACTIONS FOR EACH SESSION:
---TRANSACTION 352, ACTIVE 28 sec
2 lock struct(s), heap size 1128, 2 row lock(s)
MariaDB thread id 39, OS thread handle 139993933096512, query id 403 localhost root
---TRANSACTION 351, ACTIVE 365 sec
2 lock struct(s), heap size 1128, 1 row lock(s)
MariaDB thread id 37, OS thread handle 139993933403712, query id 393 localhost root
在日志中,TRANSACTION 352
是本次请求新增的数据库锁。
一个更加复杂的例子
我们目前已经向学生数据表中添加另外几名学生的数据信息。现在,Students 表里已经共有 10 名学生的数据。我们的两个 Shell 会话中分别在事务里进行了如下的选取:
Student.objects.select_for_update().filter(id__in=[1,3,4,7])
得到输出 [(1, 'Alice'), (3, 'Charlie'), (4, 'Delta'), (7, 'Golf')]
另一个 Shell 中:
Student.objects.select_for_update(skip_locked=True).all()[:5]
得到输出 [(5, 'Echo'), (6, 'Foxtrot'), (8, 'hotel'), (9, 'India'), (10, 'Jack')]
可以看到,skip_locked
甚至可以结合 QuerySet
的数组分片操作使用。这样一来,在多个 Worker 同时基于某种条件对数据库记录进行操作的过程中,就不再需要依赖额外字段或者外部锁来对这些 Worker 之间的潜在数据竞争加以限制了。