Skip to content

Commit d72fe46

Browse files
committed
Retry once after getting a deadlock when attempting to decrement a semaphore
This tries to address a tricky deadlock we've seen about once every couple of days, where three jobs that compete for the semaphore are enqueued at the same time. One of them wins at creating the semaphore, and the other two transactions acquire a shared lock over the just created semaphore row, by key. Then, they try to upgrade that lock to an exclusive lock to perform an UPDATE (attempting to decrement the semaphore), leading to a deadlock because each one of them is waiting for the other one to release the shared lock. From `SHOW ENGINE INNODB STATUS`: ``` ------------------------ LATEST DETECTED DEADLOCK ------------------------ 2023-12-27 07:57:28 140410341029440 *** (1) TRANSACTION: TRANSACTION 1972990032, ACTIVE 1 sec starting index read mysql tables in use 1, locked 1 LOCK WAIT 4 lock struct(s), heap size 1128, 2 row lock(s), undo log entries 1 MySQL thread id 3012240, OS thread handle 140409154041408, query id 7398762432 bigip-vip-new.rw-ash-int.37signals.com 10.20.0.24 haystack_app updating UPDATE `solid_queue_semaphores` SET value = value - 1, expires_at = '2023-12-27 08:12:28.002702' WHERE (value > 0) AND `solid_queue_semaphores`.`key` = 'RR::ProcessJob/C/64961261' *** (1) HOLDS THE LOCK(S): RECORD LOCKS space id 14 page no 426 n bits 304 index index_solid_queue_semaphores_on_key of table `haystack_solidqueue_production`.`solid_queue_semaphores` trx id 1972990032 lock mode S Record lock, heap no 199 PHYSICAL RECORD: n_fields 2; compact format; info bits 0 0: len 30; hex 526563656970743a3a526563697069656e743a3a50726f63657373696e67; asc RR::Process; (total 50 bytes); 1: len 8; hex 80000000004224c4; asc B$ ;; *** (1) WAITING FOR THIS LOCK TO BE GRANTED: RECORD LOCKS space id 14 page no 426 n bits 304 index index_solid_queue_semaphores_on_key of table `haystack_solidqueue_production`.`solid_queue_semaphores` trx id 1972990032 lock_mode X locks rec but not gap waiting Record lock, heap no 199 PHYSICAL RECORD: n_fields 2; compact format; info bits 0 0: len 30; hex 526563656970743a3a526563697069656e743a3a50726f63657373696e67; asc RR::Process; (total 50 bytes); 1: len 8; hex 80000000004224c4; asc B$ ;; *** (2) TRANSACTION: TRANSACTION 1972990013, ACTIVE 1 sec starting index read mysql tables in use 1, locked 1 LOCK WAIT 4 lock struct(s), heap size 1128, 2 row lock(s), undo log entries 1 MySQL thread id 3012575, OS thread handle 140275687212608, query id 7398762530 bigip-vip.sc-chi-int.37signals.com 10.10.0.37 haystack_app updating UPDATE `solid_queue_semaphores` SET value = value - 1, expires_at = '2023-12-27 08:12:28.007153' WHERE (value > 0) AND `solid_queue_semaphores`.`key` = 'RR::ProcessJob/C/64961261' *** (2) HOLDS THE LOCK(S): RECORD LOCKS space id 14 page no 426 n bits 304 index index_solid_queue_semaphores_on_key of table `haystack_solidqueue_production`.`solid_queue_semaphores` trx id 1972990013 lock mode S Record lock, heap no 199 PHYSICAL RECORD: n_fields 2; compact format; info bits 0 0: len 30; hex 526563656970743a3a526563697069656e743a3a50726f63657373696e67; asc RR::Process; (total 50 bytes); 1: len 8; hex 80000000004224c4; asc B$ ;; *** (2) WAITING FOR THIS LOCK TO BE GRANTED: RECORD LOCKS space id 14 page no 426 n bits 304 index index_solid_queue_semaphores_on_key of table `haystack_solidqueue_production`.`solid_queue_semaphores` trx id 1972990013 lock_mode X locks rec but not gap waiting Record lock, heap no 199 PHYSICAL RECORD: n_fields 2; compact format; info bits 0 0: len 30; hex 526563656970743a3a526563697069656e743a3a50726f63657373696e67; asc RR::Process; (total 50 bytes); 1: len 8; hex 80000000004224c4; asc B$ ;; *** WE ROLL BACK TRANSACTION (2) ``` With this change, on the transaction that gets killed because of the deadlock, we'll try to wait again, but this time without having a shared lock because we won't try to create the semaphore, we know the semaphore is already created. A problem that could happen here would be something deleting the semaphore while we're retrying. However, that should be ok as we only delete semaphores as part of periodic maintenance, and that happens only for expired semaphores. This retry is necessary when the semaphore just got created, so we can assume it won't expire and will be deleted under us right on the very same moment.
1 parent 4a18d52 commit d72fe46

File tree

2 files changed

+71
-50
lines changed

2 files changed

+71
-50
lines changed

app/models/solid_queue/semaphore.rb

+68-47
Original file line numberDiff line numberDiff line change
@@ -1,65 +1,86 @@
11
# frozen_string_literal: true
22

3-
class SolidQueue::Semaphore < SolidQueue::Record
4-
scope :available, -> { where("value > 0") }
5-
scope :expired, -> { where(expires_at: ...Time.current) }
3+
module SolidQueue
4+
class Semaphore < Record
5+
scope :available, -> { where("value > 0") }
6+
scope :expired, -> { where(expires_at: ...Time.current) }
67

7-
class << self
8-
def wait(job)
9-
Proxy.new(job, self).wait
10-
end
8+
class << self
9+
def wait(job)
10+
Proxy.new(job).wait
11+
end
1112

12-
def signal(job)
13-
Proxy.new(job, self).signal
13+
def signal(job)
14+
Proxy.new(job).signal
15+
end
1416
end
15-
end
1617

17-
class Proxy
18-
def initialize(job, proxied_class)
19-
@job = job
20-
@proxied_class = proxied_class
21-
end
18+
class Proxy
19+
def initialize(job)
20+
@job = job
21+
@retries = 0
22+
end
2223

23-
def wait
24-
if semaphore = proxied_class.find_by(key: key)
25-
semaphore.value > 0 && attempt_decrement
26-
else
27-
attempt_creation
24+
def wait
25+
if semaphore = Semaphore.find_by(key: key)
26+
semaphore.value > 0 && attempt_decrement
27+
else
28+
attempt_creation
29+
end
2830
end
29-
end
3031

31-
def signal
32-
attempt_increment
33-
end
32+
def signal
33+
attempt_increment
34+
end
3435

35-
private
36-
attr_reader :job, :proxied_class
36+
private
37+
attr_reader :job, :retries
3738

38-
def attempt_creation
39-
proxied_class.create!(key: key, value: limit - 1, expires_at: expires_at)
40-
true
41-
rescue ActiveRecord::RecordNotUnique
42-
attempt_decrement
43-
end
39+
def attempt_creation
40+
Semaphore.create!(key: key, value: limit - 1, expires_at: expires_at)
41+
true
42+
rescue ActiveRecord::RecordNotUnique
43+
attempt_decrement
44+
end
4445

45-
def attempt_decrement
46-
proxied_class.available.where(key: key).update_all([ "value = value - 1, expires_at = ?", expires_at ]) > 0
47-
end
46+
def attempt_decrement
47+
Semaphore.available.where(key: key).update_all([ "value = value - 1, expires_at = ?", expires_at ]) > 0
48+
rescue ActiveRecord::Deadlocked
49+
if retriable? then attempt_retry
50+
else
51+
raise
52+
end
53+
end
4854

49-
def attempt_increment
50-
proxied_class.where(key: key, value: ...limit).update_all([ "value = value + 1, expires_at = ?", expires_at ]) > 0
51-
end
55+
def attempt_increment
56+
Semaphore.where(key: key, value: ...limit).update_all([ "value = value + 1, expires_at = ?", expires_at ]) > 0
57+
end
5258

53-
def key
54-
job.concurrency_key
55-
end
59+
def attempt_retry
60+
self.retries += 1
5661

57-
def expires_at
58-
job.concurrency_duration.from_now
59-
end
62+
if semaphore = Semaphore.find_by(key: key)
63+
semaphore.value > 0 && attempt_decrement
64+
end
65+
end
6066

61-
def limit
62-
job.concurrency_limit
63-
end
67+
MAX_RETRIES = 1
68+
69+
def retriable?
70+
retries < MAX_RETRIES
71+
end
72+
73+
def key
74+
job.concurrency_key
75+
end
76+
77+
def expires_at
78+
job.concurrency_duration.from_now
79+
end
80+
81+
def limit
82+
job.concurrency_limit
83+
end
84+
end
6485
end
6586
end

test/integration/concurrency_controls_test.rb

+3-3
Original file line numberDiff line numberDiff line change
@@ -42,14 +42,14 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase
4242
UpdateResultJob.set(wait: 0.2.seconds).perform_later(@result, name: "000", pause: 0.1.seconds)
4343

4444
("A".."F").each_with_index do |name, i|
45-
SequentialUpdateResultJob.set(wait: (0.2 + i * 0.01).seconds).perform_later(@result, name: name, pause: 0.2.seconds)
45+
SequentialUpdateResultJob.set(wait: (0.2 + i * 0.01).seconds).perform_later(@result, name: name, pause: 0.3.seconds)
4646
end
4747

4848
("G".."K").each_with_index do |name, i|
49-
SequentialUpdateResultJob.set(wait: (0.4 + i * 0.01).seconds).perform_later(@result, name: name)
49+
SequentialUpdateResultJob.set(wait: (0.3 + i * 0.01).seconds).perform_later(@result, name: name)
5050
end
5151

52-
wait_for_jobs_to_finish_for(4.seconds)
52+
wait_for_jobs_to_finish_for(5.seconds)
5353
assert_no_pending_jobs
5454

5555
assert_stored_sequence @result, ("A".."K").to_a

0 commit comments

Comments
 (0)