Skip to content

Commit 33dc0f2

Browse files
committed
rabbitmq_peer_discovery_consul: Implement the new pre/post discovery callbacks
[Why] The Consul peer discovery backend needs to create a session before it can acquire a lock. This session is also required for nodes to discover each other. It must open the session before the `list_nodes/0` callback can return meaningful results. [How] The new `pre_discovery/0` and `post_discovery/1` callbacks are used to create and delete that session before the whole discover/lock/join process. Fixes #10760.
1 parent 5032259 commit 33dc0f2

File tree

2 files changed

+86
-26
lines changed

2 files changed

+86
-26
lines changed

deps/rabbitmq_peer_discovery_consul/src/rabbit_peer_discovery_consul.erl

Lines changed: 60 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,19 @@
1616
-include("rabbit_peer_discovery_consul.hrl").
1717

1818
-export([init/0, list_nodes/0, supports_registration/0, register/0, unregister/0,
19-
post_registration/0, lock/1, unlock/1]).
19+
post_registration/0, lock/2, unlock/2,
20+
pre_discovery/0, post_discovery/1]).
2021
-export([send_health_check_pass/0]).
2122
-export([session_ttl_update_callback/1]).
2223
%% for debugging from the REPL
2324
-export([service_id/0, service_address/0]).
2425
%% for debugging from the REPL
2526
-export([http_options/1, http_options/2]).
2627

28+
-type backend_priv() :: {Config :: #{atom() => peer_discovery_config_value()},
29+
SessionId :: string(),
30+
TRef :: timer:tref()}.
31+
2732
%% for tests
2833
-ifdef(TEST).
2934
-compile(export_all).
@@ -160,10 +165,9 @@ post_registration() ->
160165
send_health_check_pass(),
161166
ok.
162167

163-
-spec lock(Nodes :: [node()]) ->
164-
{ok, Data :: term()} | {error, Reason :: string()}.
168+
-spec pre_discovery() -> {ok, backend_priv()} | {error, string()}.
165169

166-
lock(_Nodes) ->
170+
pre_discovery() ->
167171
M = ?CONFIG_MODULE:config_map(?BACKEND_CONFIG_KEY),
168172
?LOG_DEBUG(
169173
"Effective Consul peer discovery configuration: ~tp", [M],
@@ -172,21 +176,33 @@ lock(_Nodes) ->
172176
case create_session(Node, get_config_key(consul_svc_ttl, M)) of
173177
{ok, SessionId} ->
174178
TRef = start_session_ttl_updater(SessionId),
175-
Now = erlang:system_time(seconds),
176-
EndTime = Now + get_config_key(lock_wait_time, M),
177-
lock(TRef, SessionId, Now, EndTime);
179+
{ok, {M, SessionId, TRef}};
178180
{error, Reason} ->
179181
{error, lists:flatten(io_lib:format("Error while creating a session, reason: ~ts",
180182
[Reason]))}
181183
end.
182184

183-
-spec unlock({SessionId :: string(), TRef :: timer:tref()}) -> ok.
185+
-spec post_discovery(backend_priv()) -> ok.
184186

185-
unlock({SessionId, TRef}) ->
187+
post_discovery({_M, SessionId, TRef}) ->
186188
_ = timer:cancel(TRef),
189+
delete_session(SessionId),
187190
?LOG_DEBUG(
188191
"Stopped session renewal",
189192
#{domain => ?RMQLOG_DOMAIN_PEER_DIS}),
193+
ok.
194+
195+
-spec lock(Nodes :: [node()], BackendPriv :: backend_priv()) ->
196+
{ok, ok} | {error, Reason :: string()}.
197+
198+
lock(_Nodes, {M, SessionId, _TRef}) ->
199+
Now = erlang:system_time(seconds),
200+
EndTime = Now + get_config_key(lock_wait_time, M),
201+
lock(SessionId, Now, EndTime).
202+
203+
-spec unlock(LockData :: ok, backend_priv()) -> ok.
204+
205+
unlock(ok, {_M, SessionId, _TRef}) ->
190206
case release_lock(SessionId) of
191207
{ok, true} ->
192208
ok;
@@ -613,7 +629,11 @@ create_session(Name, TTL) ->
613629
[{'Name', Name},
614630
{'TTL', rabbit_data_coercion:to_atom(service_ttl(TTL))}]) of
615631
{ok, Response} ->
616-
{ok, get_session_id(Response)};
632+
SessionId = get_session_id(Response),
633+
?LOG_DEBUG(
634+
"Consul created session ID: ~s", [SessionId],
635+
#{domain => ?RMQLOG_DOMAIN_PEER_DIS}),
636+
{ok, SessionId};
617637
{error, _} = Err ->
618638
Err
619639
end.
@@ -645,6 +665,31 @@ consul_session_create(Query, Headers, Body) ->
645665
Err
646666
end.
647667

668+
%%--------------------------------------------------------------------
669+
%% @private
670+
%% @doc
671+
%% Delete a session
672+
%% @end
673+
%%--------------------------------------------------------------------
674+
-spec delete_session(string()) -> ok.
675+
delete_session(SessionId) ->
676+
M = ?CONFIG_MODULE:config_map(?BACKEND_CONFIG_KEY),
677+
Headers = maybe_add_acl([]),
678+
HttpOpts = http_options(M),
679+
Ret = rabbit_peer_discovery_httpc:put(
680+
get_config_key(consul_scheme, M),
681+
get_config_key(consul_host, M),
682+
get_integer_config_key(consul_port, M),
683+
"v1/session/destroy/" ++ SessionId,
684+
[],
685+
Headers,
686+
HttpOpts,
687+
<<>>),
688+
?LOG_DEBUG(
689+
"Consul deleted session: ~p", [Ret],
690+
#{domain => ?RMQLOG_DOMAIN_PEER_DIS}),
691+
ok.
692+
648693
%%--------------------------------------------------------------------
649694
%% @private
650695
%% @doc
@@ -692,31 +737,27 @@ start_session_ttl_updater(SessionId) ->
692737
%% Tries to acquire lock. If the lock is held by someone else, waits until it
693738
%% is released, or too much time has passed
694739
%% @end
695-
-spec lock(timer:tref(), string(), pos_integer(), pos_integer()) -> {ok, string()} | {error, string()}.
696-
lock(TRef, _, Now, EndTime) when EndTime < Now ->
697-
_ = timer:cancel(TRef),
740+
-spec lock(string(), pos_integer(), pos_integer()) -> {ok, ok} | {error, string()}.
741+
lock(_, Now, EndTime) when EndTime < Now ->
698742
{error, "Acquiring lock taking too long, bailing out"};
699-
lock(TRef, SessionId, _, EndTime) ->
743+
lock(SessionId, _, EndTime) ->
700744
case acquire_lock(SessionId) of
701745
{ok, true} ->
702-
{ok, {SessionId, TRef}};
746+
{ok, ok};
703747
{ok, false} ->
704748
case get_lock_status() of
705749
{ok, {SessionHeld, ModifyIndex}} ->
706750
Wait = max(EndTime - erlang:system_time(seconds), 0),
707751
case wait_for_lock_release(SessionHeld, ModifyIndex, Wait) of
708752
ok ->
709-
lock(TRef, SessionId, erlang:system_time(seconds), EndTime);
753+
lock(SessionId, erlang:system_time(seconds), EndTime);
710754
{error, Reason} ->
711-
_ = timer:cancel(TRef),
712755
{error, lists:flatten(io_lib:format("Error waiting for lock release, reason: ~ts",[Reason]))}
713756
end;
714757
{error, Reason} ->
715-
_ = timer:cancel(TRef),
716758
{error, lists:flatten(io_lib:format("Error obtaining lock status, reason: ~ts", [Reason]))}
717759
end;
718760
{error, Reason} ->
719-
_ = timer:cancel(TRef),
720761
{error, lists:flatten(io_lib:format("Error while acquiring lock, reason: ~ts", [Reason]))}
721762
end.
722763

deps/rabbitmq_peer_discovery_consul/src/rabbitmq_peer_discovery_consul.erl

Lines changed: 26 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,8 @@
99
-behaviour(rabbit_peer_discovery_backend).
1010

1111
-export([init/0, list_nodes/0, supports_registration/0, register/0, unregister/0,
12-
post_registration/0, lock/1, unlock/1]).
12+
post_registration/0, lock/2, unlock/2,
13+
pre_discovery/0, post_discovery/1]).
1314
-export([send_health_check_pass/0]).
1415
-export([session_ttl_update_callback/1]).
1516

@@ -42,13 +43,31 @@ unregister() ->
4243
post_registration() ->
4344
?DELEGATE:post_registration().
4445

45-
-spec lock(Nodes :: [node()]) -> {ok, Data :: term()} | {error, Reason :: string()}.
46-
lock(Node) ->
47-
?DELEGATE:lock(Node).
46+
-spec pre_discovery() ->
47+
{ok, BackendPriv :: rabbit_peer_discovery_backend:backend_priv()} |
48+
{error, Reason :: string()}.
4849

49-
-spec unlock({SessionId :: string(), TRef :: timer:tref()}) -> ok.
50-
unlock(Data) ->
51-
?DELEGATE:unlock(Data).
50+
pre_discovery() ->
51+
?DELEGATE:pre_discovery().
52+
53+
-spec post_discovery(BackendPriv :: rabbit_peer_discovery_backend:backend_priv()) ->
54+
ok.
55+
56+
post_discovery(BackendPriv) ->
57+
?DELEGATE:post_discovery(BackendPriv).
58+
59+
-spec lock(Nodes :: [node()],
60+
BackendPriv :: rabbit_peer_discovery_backend:backend_priv()) ->
61+
{ok, ok} | {error, Reason :: string()}.
62+
63+
lock(Node, BackendPriv) ->
64+
?DELEGATE:lock(Node, BackendPriv).
65+
66+
-spec unlock(LockData :: rabbit_peer_discovery_backend:lock_data(),
67+
BackendPriv :: rabbit_peer_discovery_backend:backend_priv()) ->
68+
ok.
69+
unlock(LockData, BackendPriv) ->
70+
?DELEGATE:unlock(LockData, BackendPriv).
5271

5372
-spec send_health_check_pass() -> ok.
5473
send_health_check_pass() ->

0 commit comments

Comments
 (0)