Skip to content

Commit 9fa95c5

Browse files
committed
Validate DB connection pool before starting supervisor
Also, refactor a bit configuration validation to use ActiveModel's validations.
1 parent 019d7c7 commit 9fa95c5

File tree

5 files changed

+67
-35
lines changed

5 files changed

+67
-35
lines changed

lib/solid_queue/configuration.rb

+35-9
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,12 @@
22

33
module SolidQueue
44
class Configuration
5+
include ActiveModel::Model
6+
7+
validate :ensure_configured_processes
8+
validate :ensure_valid_recurring_tasks
9+
validate :ensure_correctly_sized_thread_pool
10+
511
class Process < Struct.new(:kind, :attributes)
612
def instantiate
713
"SolidQueue::#{kind.to_s.titleize}".safe_constantize.new(**attributes)
@@ -36,10 +42,6 @@ def configured_processes
3642
end
3743
end
3844

39-
def valid?
40-
configured_processes.any? && (skip_recurring_tasks? || invalid_tasks.none?)
41-
end
42-
4345
def error_messages
4446
if configured_processes.none?
4547
"No workers or processed configured. Exiting..."
@@ -54,14 +56,32 @@ def error_messages
5456
end
5557
end
5658

57-
def max_number_of_threads
58-
# At most "threads" in each worker + 1 thread for the worker + 1 thread for the heartbeat task
59-
workers_options.map { |options| options[:threads] }.max + 2
60-
end
61-
6259
private
6360
attr_reader :options
6461

62+
def ensure_configured_processes
63+
unless configured_processes.any?
64+
errors.add(:base, "No processes configured")
65+
end
66+
end
67+
68+
def ensure_valid_recurring_tasks
69+
unless skip_recurring_tasks? || invalid_tasks.none?
70+
error_messages = invalid_tasks.map do |task|
71+
"- #{task.key}: #{task.errors.full_messages.join(", ")}"
72+
end
73+
74+
errors.add(:base, "Invalid recurring tasks:\n#{error_messages.join("\n")}")
75+
end
76+
end
77+
78+
def ensure_correctly_sized_thread_pool
79+
if (db_pool_size = SolidQueue::Record.connection_pool&.size) && db_pool_size < estimated_number_of_threads
80+
errors.add(:base, "Solid Queue is configured to use #{estimated_number_of_threads} threads but the " +
81+
"database connection pool is #{db_pool_size}. Increase it in `config/database.yml`")
82+
end
83+
end
84+
6585
def default_options
6686
{
6787
config_file: Rails.root.join(ENV["SOLID_QUEUE_CONFIG"] || DEFAULT_CONFIG_FILE_PATH),
@@ -169,5 +189,11 @@ def load_config_from_file(file)
169189
{}
170190
end
171191
end
192+
193+
def estimated_number_of_threads
194+
# At most "threads" in each worker + 1 thread for the worker + 1 thread for the heartbeat task
195+
thread_count = workers_options.map { |options| options.fetch(:threads, WORKER_DEFAULTS[:threads]) }.max
196+
(thread_count || 1) + 2
197+
end
172198
end
173199
end

lib/solid_queue/supervisor.rb

+1-1
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ def start(**options)
1313
if configuration.valid?
1414
new(configuration).tap(&:start)
1515
else
16-
abort configuration.error_messages
16+
abort configuration.errors.full_messages.join("\n") + "\nExiting..."
1717
end
1818
end
1919
end
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
1-
periodic_store_result:
2-
class: StoreResultJorrrrrrb
3-
queue: default
4-
args: [42, { status: "custom_status" }]
5-
schedule: every second
1+
periodic_invalid_class:
2+
class: StoreResultJorrrrrrb
3+
queue: default
4+
args: [42, { status: "custom_status" }]
5+
schedule: every second
6+
periodic_incorrect_schedule:
7+
class: StoreResultJob
8+
schedule: every 1.minute

test/unit/configuration_test.rb

+18-15
Original file line numberDiff line numberDiff line change
@@ -54,11 +54,6 @@ class ConfigurationTest < ActiveSupport::TestCase
5454
assert_processes configuration, :worker, 2
5555
end
5656

57-
test "max number of threads" do
58-
configuration = SolidQueue::Configuration.new
59-
assert 7, configuration.max_number_of_threads
60-
end
61-
6257
test "mulitple workers with the same configuration" do
6358
background_worker = { queues: "background", polling_interval: 10, processes: 3 }
6459
configuration = SolidQueue::Configuration.new(workers: [ background_worker ])
@@ -90,22 +85,30 @@ class ConfigurationTest < ActiveSupport::TestCase
9085
assert_processes configuration, :dispatcher, 1, polling_interval: 0.1, recurring_tasks: nil
9186
end
9287

93-
test "detects when there are invalid recurring tasks" do
88+
test "validate configuration" do
89+
# Valid and invalid recurring tasks
9490
configuration = SolidQueue::Configuration.new(recurring_schedule_file: config_file_path(:recurring_with_invalid))
95-
9691
assert_not configuration.valid?
97-
end
92+
assert configuration.errors.full_messages.one?
93+
error = configuration.errors.full_messages.first
9894

99-
test "is valid when there are no recurring tasks" do
100-
configuration = SolidQueue::Configuration.new(recurring_schedule_file: config_file_path(:empty_recurring))
95+
assert error.include?("Invalid recurring tasks")
96+
assert error.include?("periodic_invalid_class: Class name doesn't correspond to an existing class")
97+
assert error.include?("periodic_incorrect_schedule: Schedule is not a supported recurring schedule")
10198

102-
assert configuration.valid?
103-
end
99+
assert SolidQueue::Configuration.new(recurring_schedule_file: config_file_path(:empty_recurring)).valid?
100+
assert SolidQueue::Configuration.new(skip_recurring: true).valid?
104101

105-
test "is valid when recurring tasks are skipped" do
106-
configuration = SolidQueue::Configuration.new(skip_recurring: true)
102+
# No processes
103+
configuration = SolidQueue::Configuration.new(skip_recurring: true, dispatchers: [], workers: [])
104+
assert_not configuration.valid?
105+
assert_equal [ "No processes configured" ], configuration.errors.full_messages
107106

108-
assert configuration.valid?
107+
# Not enough DB connections
108+
configuration = SolidQueue::Configuration.new(workers: [ { queues: "background", threads: 50, polling_interval: 10 } ])
109+
assert_not configuration.valid?
110+
assert_match /Solid Queue is configured to use \d+ threads but the database connection pool is \d+. Increase it in `config\/database.yml`/,
111+
configuration.errors.full_messages.first
109112
end
110113

111114
private

test/unit/supervisor_test.rb

+5-5
Original file line numberDiff line numberDiff line change
@@ -41,22 +41,22 @@ class SupervisorTest < ActiveSupport::TestCase
4141
end
4242

4343
test "start with empty configuration" do
44-
pid, _out, err = run_supervisor_as_fork_with_captured_io(workers: [], dispatchers: [])
44+
pid, _out, error = run_supervisor_as_fork_with_captured_io(workers: [], dispatchers: [])
4545
sleep(0.5)
4646
assert_no_registered_processes
4747

4848
assert_not process_exists?(pid)
49-
assert_match %r{No workers or processed configured. Exiting...}, err
49+
assert_match %r{No processes configured}, error
5050
end
5151

52-
test "start with invalid configuration" do
53-
pid, _out, err = run_supervisor_as_fork_with_captured_io(recurring_schedule_file: config_file_path(:recurring_with_invalid), skip_recurring: false)
52+
test "start with invalid recurring tasks" do
53+
pid, _out, error = run_supervisor_as_fork_with_captured_io(recurring_schedule_file: config_file_path(:recurring_with_invalid), skip_recurring: false)
5454

5555
sleep(0.5)
5656
assert_no_registered_processes
5757

5858
assert_not process_exists?(pid)
59-
assert_match %r{Invalid processes configured}, err
59+
assert_match %r{Invalid recurring tasks}, error
6060
end
6161

6262
test "create and delete pidfile" do

0 commit comments

Comments
 (0)