Skip to content

Commit 4f29fc8

Browse files
authored
Merge pull request #417 from ikyn-inc/interruptible
Reimplement Interruptible
2 parents 76b7eaf + 17c4cd5 commit 4f29fc8

13 files changed

+94
-44
lines changed

.gitignore

+3
Original file line numberDiff line numberDiff line change
@@ -14,5 +14,8 @@
1414
# Folder for Visual Studio Code
1515
/.vscode/
1616

17+
# Files for RVM holdouts
18+
.ruby-gemset
19+
1720
# misc
1821
.DS_Store

.rubocop.yml

+1-1
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
inherit_gem: { rubocop-rails-omakase: rubocop.yml }
55

66
AllCops:
7-
TargetRubyVersion: 3.0
7+
TargetRubyVersion: 3.3
88
Exclude:
99
- "test/dummy/db/schema.rb"
1010
- "test/dummy/db/queue_schema.rb"

Gemfile.lock

+5-1
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@ GEM
7575
reline (>= 0.4.2)
7676
json (2.8.2)
7777
language_server-protocol (3.17.0.3)
78+
logger (1.6.2)
7879
loofah (2.23.1)
7980
crass (~> 1.0.2)
8081
nokogiri (>= 1.12.0)
@@ -177,16 +178,19 @@ GEM
177178
PLATFORMS
178179
arm64-darwin-22
179180
arm64-darwin-23
181+
arm64-darwin-24
180182
x86_64-darwin-21
181183
x86_64-darwin-23
182184
x86_64-linux
183185

184186
DEPENDENCIES
185-
debug
187+
debug (~> 1.9)
188+
logger
186189
mocha
187190
mysql2
188191
pg
189192
puma
193+
rdoc
190194
rubocop-rails-omakase
191195
solid_queue!
192196
sqlite3

lib/solid_queue/processes/interruptible.rb

+10-18
Original file line numberDiff line numberDiff line change
@@ -7,31 +7,23 @@ def wake_up
77
end
88

99
private
10-
SELF_PIPE_BLOCK_SIZE = 11
1110

1211
def interrupt
13-
self_pipe[:writer].write_nonblock(".")
14-
rescue Errno::EAGAIN, Errno::EINTR
15-
# Ignore writes that would block and retry
16-
# if another signal arrived while writing
17-
retry
12+
queue << true
1813
end
1914

2015
def interruptible_sleep(time)
21-
if time > 0 && self_pipe[:reader].wait_readable(time)
22-
loop { self_pipe[:reader].read_nonblock(SELF_PIPE_BLOCK_SIZE) }
23-
end
24-
rescue Errno::EAGAIN, Errno::EINTR
16+
# Invoking from the main thread can result in a 35% slowdown (at least when running the test suite).
17+
# Using some form of Async (Futures) addresses this performance issue.
18+
Concurrent::Promises.future(time) do |timeout|
19+
if timeout > 0 && queue.pop(timeout:)
20+
queue.clear
21+
end
22+
end.value
2523
end
2624

27-
# Self-pipe for signal-handling (http://cr.yp.to/docs/selfpipe.html)
28-
def self_pipe
29-
@self_pipe ||= create_self_pipe
30-
end
31-
32-
def create_self_pipe
33-
reader, writer = IO.pipe
34-
{ reader: reader, writer: writer }
25+
def queue
26+
@queue ||= Queue.new
3527
end
3628
end
3729
end

solid_queue.gemspec

+3-1
Original file line numberDiff line numberDiff line change
@@ -37,11 +37,13 @@ Gem::Specification.new do |spec|
3737
spec.add_dependency "fugit", "~> 1.11.0"
3838
spec.add_dependency "thor", "~> 1.3.1"
3939

40-
spec.add_development_dependency "debug"
40+
spec.add_development_dependency "debug", "~> 1.9"
4141
spec.add_development_dependency "mocha"
4242
spec.add_development_dependency "puma"
4343
spec.add_development_dependency "mysql2"
4444
spec.add_development_dependency "pg"
4545
spec.add_development_dependency "sqlite3"
4646
spec.add_development_dependency "rubocop-rails-omakase"
47+
spec.add_development_dependency "rdoc"
48+
spec.add_development_dependency "logger"
4749
end
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
# frozen_string_literal: true
2+
3+
# Ideally, tests should be configured as close to production settings as
4+
# possible and YJIT is likely to be enabled. While it's highly unlikely
5+
# YJIT would cause issues, enabling it confirms this assertion.
6+
#
7+
# Configured via initializer to align with Rails 7.1 default in gemspec
8+
if defined?(RubyVM::YJIT.enable)
9+
Rails.application.config.after_initialize do
10+
RubyVM::YJIT.enable
11+
end
12+
end

test/integration/concurrency_controls_test.rb

+1-1
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase
8585
test "run several jobs over the same record sequentially, with some of them failing" do
8686
("A".."F").each_with_index do |name, i|
8787
# A, C, E will fail, for i= 0, 2, 4
88-
SequentialUpdateResultJob.perform_later(@result, name: name, pause: 0.2.seconds, exception: (RuntimeError if i.even?))
88+
SequentialUpdateResultJob.perform_later(@result, name: name, pause: 0.2.seconds, exception: (ExpectedTestError if i.even?))
8989
end
9090

9191
("G".."K").each do |name|

test/integration/instrumentation_test.rb

+4-4
Original file line numberDiff line numberDiff line change
@@ -162,7 +162,7 @@ class InstrumentationTest < ActiveSupport::TestCase
162162

163163
test "errors when deregistering processes are included in deregister_process events" do
164164
previous_thread_report_on_exception, Thread.report_on_exception = Thread.report_on_exception, false
165-
error = RuntimeError.new("everything is broken")
165+
error = ExpectedTestError.new("everything is broken")
166166
SolidQueue::Process.any_instance.expects(:destroy!).raises(error).at_least_once
167167

168168
events = subscribed("deregister_process.solid_queue") do
@@ -182,7 +182,7 @@ class InstrumentationTest < ActiveSupport::TestCase
182182
end
183183

184184
test "retrying failed job emits retry event" do
185-
RaisingJob.perform_later(RuntimeError, "A")
185+
RaisingJob.perform_later(ExpectedTestError, "A")
186186
job = SolidQueue::Job.last
187187

188188
worker = SolidQueue::Worker.new.tap(&:start)
@@ -198,7 +198,7 @@ class InstrumentationTest < ActiveSupport::TestCase
198198
end
199199

200200
test "retrying failed jobs in bulk emits retry_all" do
201-
3.times { RaisingJob.perform_later(RuntimeError, "A") }
201+
3.times { RaisingJob.perform_later(ExpectedTestError, "A") }
202202
AddToBufferJob.perform_later("A")
203203

204204
jobs = SolidQueue::Job.last(4)
@@ -392,7 +392,7 @@ class InstrumentationTest < ActiveSupport::TestCase
392392
test "thread errors emit thread_error events" do
393393
previous_thread_report_on_exception, Thread.report_on_exception = Thread.report_on_exception, false
394394

395-
error = RuntimeError.new("everything is broken")
395+
error = ExpectedTestError.new("everything is broken")
396396
SolidQueue::ClaimedExecution::Result.expects(:new).raises(error).at_least_once
397397

398398
AddToBufferJob.perform_later "hey!"

test/integration/jobs_lifecycle_test.rb

+5-3
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,13 @@
44

55
class JobsLifecycleTest < ActiveSupport::TestCase
66
setup do
7+
SolidQueue.on_thread_error = silent_on_thread_error_for([ ExpectedTestError, RaisingJob::DefaultError ])
78
@worker = SolidQueue::Worker.new(queues: "background", threads: 3)
89
@dispatcher = SolidQueue::Dispatcher.new(batch_size: 10, polling_interval: 0.2)
910
end
1011

1112
teardown do
13+
SolidQueue.on_thread_error = @on_thread_error
1214
@worker.stop
1315
@dispatcher.stop
1416

@@ -29,16 +31,16 @@ class JobsLifecycleTest < ActiveSupport::TestCase
2931
end
3032

3133
test "enqueue and run jobs that fail without retries" do
32-
RaisingJob.perform_later(RuntimeError, "A")
33-
RaisingJob.perform_later(RuntimeError, "B")
34+
RaisingJob.perform_later(ExpectedTestError, "A")
35+
RaisingJob.perform_later(ExpectedTestError, "B")
3436
jobs = SolidQueue::Job.last(2)
3537

3638
@dispatcher.start
3739
@worker.start
3840

3941
wait_for_jobs_to_finish_for(3.seconds)
4042

41-
message = "raised RuntimeError for the 1st time"
43+
message = "raised ExpectedTestError for the 1st time"
4244
assert_equal [ "A: #{message}", "B: #{message}" ], JobBuffer.values.sort
4345

4446
assert_empty SolidQueue::Job.finished

test/integration/processes_lifecycle_test.rb

+3-3
Original file line numberDiff line numberDiff line change
@@ -144,11 +144,11 @@ class ProcessesLifecycleTest < ActiveSupport::TestCase
144144
test "process some jobs that raise errors" do
145145
2.times { enqueue_store_result_job("no error", :background) }
146146
2.times { enqueue_store_result_job("no error", :default) }
147-
error1 = enqueue_store_result_job("error", :background, exception: RuntimeError)
147+
error1 = enqueue_store_result_job("error", :background, exception: ExpectedTestError)
148148
enqueue_store_result_job("no error", :background, pause: 0.03)
149-
error2 = enqueue_store_result_job("error", :background, exception: RuntimeError, pause: 0.05)
149+
error2 = enqueue_store_result_job("error", :background, exception: ExpectedTestError, pause: 0.05)
150150
2.times { enqueue_store_result_job("no error", :default, pause: 0.01) }
151-
error3 = enqueue_store_result_job("error", :default, exception: RuntimeError)
151+
error3 = enqueue_store_result_job("error", :default, exception: ExpectedTestError)
152152

153153
wait_for_jobs_to_finish_for(2.second, except: [ error1, error2, error3 ])
154154

test/models/solid_queue/failed_execution_test.rb

+9-7
Original file line numberDiff line numberDiff line change
@@ -7,23 +7,25 @@ class SolidQueue::FailedExecutionTest < ActiveSupport::TestCase
77
end
88

99
test "run job that fails" do
10-
RaisingJob.perform_later(RuntimeError, "A")
10+
RaisingJob.perform_later(ExpectedTestError, "A")
1111
@worker.start
1212

1313
assert_equal 1, SolidQueue::FailedExecution.count
1414
assert SolidQueue::Job.last.failed?
1515
end
1616

1717
test "run job that fails with a SystemStackError (stack level too deep)" do
18-
InfiniteRecursionJob.perform_later
19-
@worker.start
18+
silence_on_thread_error_for(SystemStackError) do
19+
InfiniteRecursionJob.perform_later
20+
@worker.start
2021

21-
assert_equal 1, SolidQueue::FailedExecution.count
22-
assert SolidQueue::Job.last.failed?
22+
assert_equal 1, SolidQueue::FailedExecution.count
23+
assert SolidQueue::Job.last.failed?
24+
end
2325
end
2426

2527
test "retry failed job" do
26-
RaisingJob.perform_later(RuntimeError, "A")
28+
RaisingJob.perform_later(ExpectedTestError, "A")
2729
@worker.start
2830

2931
assert_difference -> { SolidQueue::FailedExecution.count }, -1 do
@@ -34,7 +36,7 @@ class SolidQueue::FailedExecutionTest < ActiveSupport::TestCase
3436
end
3537

3638
test "retry failed jobs in bulk" do
37-
1.upto(5) { |i| RaisingJob.perform_later(RuntimeError, i) }
39+
1.upto(5) { |i| RaisingJob.perform_later(ExpectedTestError, i) }
3840
1.upto(3) { |i| AddToBufferJob.perform_later(i) }
3941

4042
@worker.start

test/test_helper.rb

+33
Original file line numberDiff line numberDiff line change
@@ -24,11 +24,20 @@ def write(...)
2424
end
2525

2626
Logger::LogDevice.prepend(BlockLogDeviceTimeoutExceptions)
27+
class ExpectedTestError < RuntimeError; end
28+
2729

2830
class ActiveSupport::TestCase
2931
include ProcessesTestHelper, JobsTestHelper
3032

33+
setup do
34+
# Could be cleaner with one several minitest gems, but didn't want to add new dependency
35+
@_on_thread_error = SolidQueue.on_thread_error
36+
SolidQueue.on_thread_error = silent_on_thread_error_for(ExpectedTestError)
37+
end
38+
3139
teardown do
40+
SolidQueue.on_thread_error = @_on_thread_error
3241
JobBuffer.clear
3342

3443
if SolidQueue.supervisor_pidfile && File.exist?(SolidQueue.supervisor_pidfile)
@@ -69,4 +78,28 @@ def wait_while_with_timeout!(timeout, &block)
6978
def skip_active_record_query_cache(&block)
7079
SolidQueue::Record.uncached(&block)
7180
end
81+
82+
# Silences specified exceptions during the execution of a block
83+
#
84+
# @param [Exception, Array<Exception>] expected an Exception or an array of Exceptions to ignore
85+
# @yield Executes the provided block with specified exception(s) silenced
86+
def silence_on_thread_error_for(expected, &block)
87+
SolidQueue.with(on_thread_error: silent_on_thread_error_for(expected)) do
88+
block.call
89+
end
90+
end
91+
92+
# Does not call on_thread_error for expected exceptions
93+
# @param [Exception, Array<Exception>] expected an Exception or an array of Exceptions to ignore
94+
def silent_on_thread_error_for(expected)
95+
current_proc = SolidQueue.on_thread_error
96+
97+
->(exception) do
98+
expected_exceptions = Array(expected)
99+
100+
unless expected_exceptions.any? { exception.instance_of?(_1) }
101+
current_proc.call(exception)
102+
end
103+
end
104+
end
72105
end

test/unit/worker_test.rb

+5-5
Original file line numberDiff line numberDiff line change
@@ -28,14 +28,14 @@ class WorkerTest < ActiveSupport::TestCase
2828
original_on_thread_error, SolidQueue.on_thread_error = SolidQueue.on_thread_error, ->(error) { errors << error.message }
2929
previous_thread_report_on_exception, Thread.report_on_exception = Thread.report_on_exception, false
3030

31-
SolidQueue::ReadyExecution.expects(:claim).raises(RuntimeError.new("everything is broken")).at_least_once
31+
SolidQueue::ReadyExecution.expects(:claim).raises(ExpectedTestError.new("everything is broken")).at_least_once
3232

3333
AddToBufferJob.perform_later "hey!"
3434

3535
worker = SolidQueue::Worker.new(queues: "background", threads: 3, polling_interval: 0.2).tap(&:start)
3636
sleep(1)
3737

38-
assert_raises RuntimeError do
38+
assert_raises ExpectedTestError do
3939
worker.stop
4040
end
4141

@@ -51,7 +51,7 @@ class WorkerTest < ActiveSupport::TestCase
5151
subscriber = ErrorBuffer.new
5252
Rails.error.subscribe(subscriber)
5353

54-
SolidQueue::ClaimedExecution::Result.expects(:new).raises(RuntimeError.new("everything is broken")).at_least_once
54+
SolidQueue::ClaimedExecution::Result.expects(:new).raises(ExpectedTestError.new("everything is broken")).at_least_once
5555

5656
AddToBufferJob.perform_later "hey!"
5757

@@ -71,15 +71,15 @@ class WorkerTest < ActiveSupport::TestCase
7171
subscriber = ErrorBuffer.new
7272
Rails.error.subscribe(subscriber)
7373

74-
RaisingJob.perform_later(RuntimeError, "B")
74+
RaisingJob.perform_later(ExpectedTestError, "B")
7575

7676
@worker.start
7777

7878
wait_for_jobs_to_finish_for(1.second)
7979
@worker.wake_up
8080

8181
assert_equal 1, subscriber.errors.count
82-
assert_equal "This is a RuntimeError exception", subscriber.messages.first
82+
assert_equal "This is a ExpectedTestError exception", subscriber.messages.first
8383
ensure
8484
Rails.error.unsubscribe(subscriber) if Rails.error.respond_to?(:unsubscribe)
8585
end

0 commit comments

Comments
 (0)