Skip to content

Commit a956572

Browse files
committed
Remove only stream subscriptions affected by down stream member
The clean-up of a stream connection state when a stream member goes down can remove subscriptions not affected by the member. The subscription state is removed from the connection, but the subscription is not removed from the SAC state (if the subscription is a SAC), because the subscription member PID does not match the down member PID. When the actual member of the subscription goes down, the subscription is no longer part of the state, so the clean-up does not find the subscription and does not remove it from the SAC state. This lets a ghost consumer in the corresponding SAC group. This commit makes sure only the affected subscriptions are removed from the state when a stream member goes down. Fixes #13961
1 parent 3ad06e5 commit a956572

File tree

2 files changed

+140
-79
lines changed

2 files changed

+140
-79
lines changed

deps/rabbitmq_stream/src/rabbit_stream_reader.erl

Lines changed: 102 additions & 79 deletions
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,8 @@
106106
close_sent/3]).
107107
-ifdef(TEST).
108108
-export([ensure_token_expiry_timer/2,
109-
evaluate_state_after_secret_update/4]).
109+
evaluate_state_after_secret_update/4,
110+
clean_subscriptions/4]).
110111
-endif.
111112

112113
callback_mode() ->
@@ -3280,91 +3281,19 @@ clean_state_after_super_stream_deletion(Partitions, Connection, State, Transport
32803281

32813282
clean_state_after_stream_deletion_or_failure(MemberPid, Stream,
32823283
#stream_connection{
3283-
user = #user{username = Username},
3284-
virtual_host = VirtualHost,
3285-
stream_subscriptions = StreamSubscriptions,
3286-
publishers = Publishers,
3287-
publisher_to_ids = PublisherToIds,
3288-
stream_leaders = Leaders,
3289-
outstanding_requests = Requests0} = C0,
3290-
#stream_connection_state{consumers = Consumers} = S0) ->
3284+
stream_leaders = Leaders} = C0,
3285+
S0) ->
32913286
{SubscriptionsCleaned, C1, S1} =
32923287
case stream_has_subscriptions(Stream, C0) of
32933288
true ->
3294-
#{Stream := SubscriptionIds} = StreamSubscriptions,
3295-
Requests1 = lists:foldl(
3296-
fun(SubId, Rqsts0) ->
3297-
#{SubId := Consumer} = Consumers,
3298-
case {MemberPid, Consumer} of
3299-
{undefined, _C} ->
3300-
rabbit_stream_metrics:consumer_cancelled(self(),
3301-
stream_r(Stream,
3302-
C0),
3303-
SubId,
3304-
Username,
3305-
false),
3306-
maybe_unregister_consumer(
3307-
VirtualHost, Consumer,
3308-
single_active_consumer(Consumer),
3309-
Rqsts0);
3310-
{MemberPid, #consumer{configuration =
3311-
#consumer_configuration{member_pid = MemberPid}}} ->
3312-
rabbit_stream_metrics:consumer_cancelled(self(),
3313-
stream_r(Stream,
3314-
C0),
3315-
SubId,
3316-
Username,
3317-
false),
3318-
maybe_unregister_consumer(
3319-
VirtualHost, Consumer,
3320-
single_active_consumer(Consumer),
3321-
Rqsts0);
3322-
_ ->
3323-
Rqsts0
3324-
end
3325-
end, Requests0, SubscriptionIds),
3326-
{true,
3327-
C0#stream_connection{stream_subscriptions =
3328-
maps:remove(Stream,
3329-
StreamSubscriptions),
3330-
outstanding_requests = Requests1},
3331-
S0#stream_connection_state{consumers =
3332-
maps:without(SubscriptionIds,
3333-
Consumers)}};
3289+
clean_subscriptions(MemberPid, Stream, C0, S0);
33343290
false ->
33353291
{false, C0, S0}
33363292
end,
33373293
{PublishersCleaned, C2, S2} =
33383294
case stream_has_publishers(Stream, C1) of
33393295
true ->
3340-
{PurgedPubs, PurgedPubToIds} =
3341-
maps:fold(fun(PubId,
3342-
#publisher{stream = S, reference = Ref},
3343-
{Pubs, PubToIds}) when S =:= Stream andalso MemberPid =:= undefined ->
3344-
rabbit_stream_metrics:publisher_deleted(self(),
3345-
stream_r(Stream,
3346-
C1),
3347-
PubId),
3348-
{maps:remove(PubId, Pubs),
3349-
maps:remove({Stream, Ref}, PubToIds)};
3350-
(PubId,
3351-
#publisher{stream = S, reference = Ref, leader = MPid},
3352-
{Pubs, PubToIds}) when S =:= Stream andalso MPid =:= MemberPid ->
3353-
rabbit_stream_metrics:publisher_deleted(self(),
3354-
stream_r(Stream,
3355-
C1),
3356-
PubId),
3357-
{maps:remove(PubId, Pubs),
3358-
maps:remove({Stream, Ref}, PubToIds)};
3359-
3360-
(_PubId, _Publisher, {Pubs, PubToIds}) ->
3361-
{Pubs, PubToIds}
3362-
end,
3363-
{Publishers, PublisherToIds}, Publishers),
3364-
{true,
3365-
C1#stream_connection{publishers = PurgedPubs,
3366-
publisher_to_ids = PurgedPubToIds},
3367-
S1};
3296+
clean_publishers(MemberPid, Stream, C1, S1);
33683297
false ->
33693298
{false, C1, S1}
33703299
end,
@@ -3386,6 +3315,100 @@ clean_state_after_stream_deletion_or_failure(MemberPid, Stream,
33863315
{not_cleaned, C2#stream_connection{stream_leaders = Leaders1}, S2}
33873316
end.
33883317

3318+
clean_subscriptions(MemberPid, Stream,
3319+
#stream_connection{user = #user{username = Username},
3320+
virtual_host = VirtualHost,
3321+
stream_subscriptions = StreamSubs,
3322+
outstanding_requests = Requests0} = C0,
3323+
#stream_connection_state{consumers = Consumers} = S0) ->
3324+
#{Stream := SubIds} = StreamSubs,
3325+
{DelSubs1, Requests1} =
3326+
lists:foldl(
3327+
fun(SubId, {DelSubIds, Rqsts0}) ->
3328+
#{SubId := Consumer} = Consumers,
3329+
case {MemberPid, Consumer} of
3330+
{undefined, _C} ->
3331+
rabbit_stream_metrics:consumer_cancelled(self(),
3332+
stream_r(Stream,
3333+
C0),
3334+
SubId,
3335+
Username,
3336+
false),
3337+
Rqsts1 = maybe_unregister_consumer(
3338+
VirtualHost, Consumer,
3339+
single_active_consumer(Consumer),
3340+
Rqsts0),
3341+
{[SubId | DelSubIds], Rqsts1};
3342+
{MemberPid,
3343+
#consumer{configuration =
3344+
#consumer_configuration{member_pid = MemberPid}}} ->
3345+
rabbit_stream_metrics:consumer_cancelled(self(),
3346+
stream_r(Stream,
3347+
C0),
3348+
SubId,
3349+
Username,
3350+
false),
3351+
Rqsts1 = maybe_unregister_consumer(
3352+
VirtualHost, Consumer,
3353+
single_active_consumer(Consumer),
3354+
Rqsts0),
3355+
{[SubId | DelSubIds], Rqsts1};
3356+
_ ->
3357+
{DelSubIds, Rqsts0}
3358+
end
3359+
end, {[], Requests0}, SubIds),
3360+
case DelSubs1 of
3361+
[] ->
3362+
{false, C0, S0};
3363+
_ ->
3364+
StreamSubs1 = case SubIds -- DelSubs1 of
3365+
[] ->
3366+
maps:remove(Stream, StreamSubs);
3367+
RemSubIds ->
3368+
StreamSubs#{Stream => RemSubIds}
3369+
end,
3370+
Consumers1 = maps:without(DelSubs1, Consumers),
3371+
{true,
3372+
C0#stream_connection{stream_subscriptions = StreamSubs1,
3373+
outstanding_requests = Requests1},
3374+
S0#stream_connection_state{consumers = Consumers1}}
3375+
end.
3376+
3377+
clean_publishers(MemberPid, Stream,
3378+
#stream_connection{
3379+
publishers = Publishers,
3380+
publisher_to_ids = PublisherToIds} = C0, S0) ->
3381+
{Updated, PurgedPubs, PurgedPubToIds} =
3382+
maps:fold(fun(PubId, #publisher{stream = S, reference = Ref},
3383+
{_, Pubs, PubToIds})
3384+
when S =:= Stream andalso MemberPid =:= undefined ->
3385+
rabbit_stream_metrics:publisher_deleted(self(),
3386+
stream_r(Stream,
3387+
C0),
3388+
PubId),
3389+
{true,
3390+
maps:remove(PubId, Pubs),
3391+
maps:remove({Stream, Ref}, PubToIds)};
3392+
(PubId, #publisher{stream = S, reference = Ref, leader = MPid},
3393+
{_, Pubs, PubToIds})
3394+
when S =:= Stream andalso MPid =:= MemberPid ->
3395+
rabbit_stream_metrics:publisher_deleted(self(),
3396+
stream_r(Stream,
3397+
C0),
3398+
PubId),
3399+
{true,
3400+
maps:remove(PubId, Pubs),
3401+
maps:remove({Stream, Ref}, PubToIds)};
3402+
3403+
(_PubId, _Publisher, {Updated, Pubs, PubToIds}) ->
3404+
{Updated, Pubs, PubToIds}
3405+
end,
3406+
{false, Publishers, PublisherToIds}, Publishers),
3407+
{Updated,
3408+
C0#stream_connection{publishers = PurgedPubs,
3409+
publisher_to_ids = PurgedPubToIds},
3410+
S0}.
3411+
33893412
store_offset(Reference, _, _, C) when ?IS_INVALID_REF(Reference) ->
33903413
rabbit_log:warning("Reference is too long to store offset: ~p", [byte_size(Reference)]),
33913414
C;
@@ -3403,8 +3426,7 @@ store_offset(Reference, Stream, Offset, Connection0) ->
34033426

34043427
lookup_leader(Stream,
34053428
#stream_connection{stream_leaders = StreamLeaders,
3406-
virtual_host = VirtualHost} =
3407-
Connection) ->
3429+
virtual_host = VirtualHost} = Connection) ->
34083430
case maps:get(Stream, StreamLeaders, undefined) of
34093431
undefined ->
34103432
case lookup_leader_from_manager(VirtualHost, Stream) of
@@ -3413,6 +3435,7 @@ lookup_leader(Stream,
34133435
{ok, LeaderPid} ->
34143436
Connection1 =
34153437
maybe_monitor_stream(LeaderPid, Stream, Connection),
3438+
34163439
{LeaderPid,
34173440
Connection1#stream_connection{stream_leaders =
34183441
StreamLeaders#{Stream =>

deps/rabbitmq_stream/test/rabbit_stream_reader_SUITE.erl

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -184,6 +184,44 @@ evaluate_state_after_secret_update_test(_) ->
184184
?assert(is_integer(Cancel2)),
185185
ok.
186186

187+
clean_subscriptions_should_remove_only_affected_subscriptions_test(_) ->
188+
Mod = rabbit_stream_reader,
189+
meck:new(Mod, [passthrough]),
190+
meck:new(rabbit_stream_metrics, [stub_all]),
191+
meck:new(rabbit_stream_sac_coordinator, [stub_all]),
192+
193+
S = <<"s1">>,
194+
Pid1 = new_process(),
195+
Pid2 = new_process(),
196+
StreamSubs = #{S => [0, 1]},
197+
Consumers = #{0 => consumer(S, Pid1),
198+
1 => consumer(S, Pid2)},
199+
200+
C0 = #stream_connection{stream_subscriptions = StreamSubs,
201+
user = #user{}},
202+
S0 = #stream_connection_state{consumers = Consumers},
203+
{Cleaned1, C1, S1} = Mod:clean_subscriptions(Pid1, S, C0, S0),
204+
?assert(Cleaned1),
205+
?assertEqual(#{S => [1]},
206+
C1#stream_connection.stream_subscriptions),
207+
?assertEqual(#{1 => consumer(S, Pid2)},
208+
S1#stream_connection_state.consumers),
209+
210+
{Cleaned2, C2, S2} = Mod:clean_subscriptions(Pid2, S, C1, S1),
211+
?assert(Cleaned2),
212+
?assertEqual(#{}, C2#stream_connection.stream_subscriptions),
213+
?assertEqual(#{}, S2#stream_connection_state.consumers),
214+
215+
ok.
216+
217+
consumer(S, Pid) ->
218+
#consumer{configuration = #consumer_configuration{stream = S,
219+
member_pid = Pid}}.
220+
187221
consumer(S) ->
188222
#consumer{configuration = #consumer_configuration{stream = S},
189223
log = osiris_log:init(#{})}.
224+
225+
new_process() ->
226+
spawn(node(), fun() -> ok end).
227+

0 commit comments

Comments
 (0)