Skip to content

Go back to using the original, self-pipe based implementation of Interruptible #552

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Apr 20, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 0 additions & 8 deletions lib/solid_queue/engine.rb
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,5 @@ class Engine < ::Rails::Engine
include ActiveJob::ConcurrencyControls
end
end

initializer "solid_queue.include_interruptible_concern" do
if Gem::Version.new(RUBY_VERSION) >= Gem::Version.new("3.2")
SolidQueue::Processes::Base.include SolidQueue::Processes::Interruptible
else
SolidQueue::Processes::Base.include SolidQueue::Processes::OgInterruptible
end
end
end
end
2 changes: 1 addition & 1 deletion lib/solid_queue/processes/base.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ module SolidQueue
module Processes
class Base
include Callbacks # Defines callbacks needed by other concerns
include AppExecutor, Registrable, Procline
include AppExecutor, Registrable, Interruptible, Procline

attr_reader :name

Expand Down
34 changes: 17 additions & 17 deletions lib/solid_queue/processes/interruptible.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,36 +2,36 @@

module SolidQueue::Processes
module Interruptible
include SolidQueue::AppExecutor

def wake_up
interrupt
end

private
SELF_PIPE_BLOCK_SIZE = 11

def interrupt
queue << true
self_pipe[:writer].write_nonblock(".")
rescue Errno::EAGAIN, Errno::EINTR
# Ignore writes that would block and retry
# if another signal arrived while writing
retry
end

# Sleeps for 'time'. Can be interrupted asynchronously and return early via wake_up.
# @param time [Numeric, Duration] the time to sleep. 0 returns immediately.
def interruptible_sleep(time)
# Invoking this from the main thread may result in significant slowdown.
# Utilizing asynchronous execution (Futures) addresses this performance issue.
Concurrent::Promises.future(time) do |timeout|
queue.clear unless queue.pop(timeout:).nil?
end.on_rejection! do |e|
wrapped_exception = RuntimeError.new("Interruptible#interruptible_sleep - #{e.class}: #{e.message}")
wrapped_exception.set_backtrace(e.backtrace)
handle_thread_error(wrapped_exception)
end.value
if time > 0 && self_pipe[:reader].wait_readable(time)
loop { self_pipe[:reader].read_nonblock(SELF_PIPE_BLOCK_SIZE) }
end
rescue Errno::EAGAIN, Errno::EINTR
end

nil
# Self-pipe for signal-handling (http://cr.yp.to/docs/selfpipe.html)
def self_pipe
@self_pipe ||= create_self_pipe
end

def queue
@queue ||= Queue.new
def create_self_pipe
reader, writer = IO.pipe
{ reader: reader, writer: writer }
end
end
end
39 changes: 0 additions & 39 deletions lib/solid_queue/processes/og_interruptible.rb

This file was deleted.