Skip to content

Commit c9f8d17

Browse files
MessageListenerContainer must not be initialized twice.
Error when afterPropertiesSet is called more than once. Update documentation and error messages. Original Pull Request: #2256
1 parent a87540e commit c9f8d17

File tree

2 files changed

+87
-76
lines changed

2 files changed

+87
-76
lines changed

src/main/java/org/springframework/data/redis/listener/RedisMessageListenerContainer.java

Lines changed: 82 additions & 76 deletions
Original file line numberDiff line numberDiff line change
@@ -163,64 +163,11 @@ public class RedisMessageListenerContainer implements InitializingBean, Disposab
163163

164164
private volatile CompletableFuture<Void> unsubscribeFuture = new CompletableFuture<>();
165165

166-
/**
167-
* Container listening state.
168-
*/
169-
static class State {
170-
171-
private final boolean prepareListening;
172-
private final boolean listening;
173-
174-
private State(boolean prepareListening, boolean listening) {
175-
this.prepareListening = prepareListening;
176-
this.listening = listening;
177-
}
178-
179-
/**
180-
* Initial state. Next state is {@link #prepareListening()}.
181-
*/
182-
public static State notListening() {
183-
return new State(false, false);
184-
}
185-
186-
/**
187-
* Prepare listening after {@link #notListening()}. Next states are either {@link #notListening()} upon failure or
188-
* {@link #listening()}.
189-
*/
190-
public static State prepareListening() {
191-
return new State(true, false);
192-
}
193-
194-
/**
195-
* Active listening state after {@link #prepareListening()}. Next is {@link #prepareUnsubscribe()}.
196-
*/
197-
public static State listening() {
198-
return new State(true, true);
199-
}
200-
201-
/**
202-
* Prepare unsubscribe after {@link #listening()}. Next state is {@link #notListening()}.
203-
*/
204-
public static State prepareUnsubscribe() {
205-
return new State(false, true);
206-
}
207-
208-
private boolean isListenerActivated() {
209-
return isListening() || isPrepareListening();
210-
}
211-
212-
public boolean isListening() {
213-
return listening;
214-
}
215-
216-
public boolean isPrepareListening() {
217-
return prepareListening;
218-
}
219-
}
220-
221166
@Override
222167
public void afterPropertiesSet() {
223168

169+
Assert.state(!afterPropertiesSet, "Container already initialized.");
170+
224171
if (this.connectionFactory == null) {
225172
throw new IllegalArgumentException("RedisConnectionFactory is not set");
226173
}
@@ -234,6 +181,7 @@ public void afterPropertiesSet() {
234181
subscriptionExecutor = taskExecutor;
235182
}
236183

184+
237185
this.subscriber = createSubscriber(connectionFactory, this.subscriptionExecutor);
238186

239187
afterPropertiesSet = true;
@@ -268,7 +216,7 @@ public void destroy() throws Exception {
268216
((DisposableBean) taskExecutor).destroy();
269217

270218
if (logger.isDebugEnabled()) {
271-
logger.debug("Stopped internally-managed task executor");
219+
logger.debug("Stopped internally-managed task executor.");
272220
}
273221
}
274222
}
@@ -323,7 +271,7 @@ private void lazyListen() {
323271
} catch (ExecutionException e) {
324272
throw new CompletionException(e.getCause());
325273
} catch (TimeoutException e) {
326-
throw new IllegalStateException("Subscription registration timeout exceeded", e);
274+
throw new IllegalStateException("Subscription registration timeout exceeded.", e);
327275
}
328276
}
329277

@@ -333,7 +281,7 @@ private void lazyListen() {
333281
private CompletableFuture<Void> lazyListen(BackOffExecution backOffExecution) {
334282

335283
if (!hasTopics()) {
336-
logger.debug("Postpone listening for Redis messages until actual listeners are added");
284+
logger.debug("Postpone listening for Redis messages until actual listeners are added.");
337285
return CompletableFuture.completedFuture(null);
338286
}
339287

@@ -367,10 +315,10 @@ private boolean doSubscribe(BackOffExecution backOffExecution) {
367315
listenFuture.whenComplete((unused, throwable) -> {
368316

369317
if (throwable == null) {
370-
logger.debug("RedisMessageListenerContainer listeners registered successfully");
318+
logger.debug("RedisMessageListenerContainer listeners registered successfully.");
371319
this.state.set(State.listening());
372320
} else {
373-
logger.debug("Failed to start RedisMessageListenerContainer listeners", throwable);
321+
logger.debug("Failed to start RedisMessageListenerContainer listeners.", throwable);
374322
this.state.set(State.notListening());
375323
}
376324

@@ -382,7 +330,7 @@ private boolean doSubscribe(BackOffExecution backOffExecution) {
382330
}
383331
});
384332

385-
logger.debug("Subscribing to topics for RedisMessageListenerContainer");
333+
logger.debug("Subscribing to topics for RedisMessageListenerContainer.");
386334

387335
return true;
388336
}
@@ -422,7 +370,7 @@ public void stop(Runnable callback) {
422370
stopListening();
423371

424372
if (logger.isDebugEnabled()) {
425-
logger.debug("Stopped RedisMessageListenerContainer");
373+
logger.debug("Stopped RedisMessageListenerContainer.");
426374
}
427375

428376
callback.run();
@@ -459,7 +407,7 @@ private boolean doUnsubscribe() {
459407
this.unsubscribeFuture = new CompletableFuture<>();
460408

461409
if (logger.isDebugEnabled()) {
462-
logger.debug("Stopped RedisMessageListenerContainer");
410+
logger.debug("Stopped listening.");
463411
}
464412

465413
return true;
@@ -831,7 +779,7 @@ public void setRecoveryInterval(long recoveryInterval) {
831779
*
832780
* @see #handleSubscriptionException
833781
* @see #setRecoveryInterval(long)
834-
* @since 3.0
782+
* @since 2.7
835783
*/
836784
public void setRecoveryBackoff(BackOff recoveryInterval) {
837785

@@ -928,8 +876,7 @@ protected void handleSubscriptionException(CompletableFuture<Void> future, BackO
928876
long recoveryInterval = backOffExecution.nextBackOff();
929877

930878
if (recoveryInterval != BackOffExecution.STOP) {
931-
logger.error(
932-
"Connection failure occurred:" + ex + ". Restarting subscription task after " + recoveryInterval + " ms");
879+
logger.error(String.format("Connection failure occurred: %s. Restarting subscription task after %s ms.", ex, recoveryInterval));
933880
}
934881

935882
return recoveryInterval;
@@ -1011,14 +958,6 @@ private void dispatchSubscriptionNotification(Collection<MessageListener> listen
1011958
}
1012959
}
1013960

1014-
/**
1015-
* Represents an operation that accepts three input arguments {@link SubscriptionListener},
1016-
* {@code channel or pattern}, and {@code count} and returns no result.
1017-
*/
1018-
interface SubscriptionConsumer {
1019-
void accept(SubscriptionListener listener, byte[] channelOrPattern, long count);
1020-
}
1021-
1022961
private void dispatchMessage(Collection<MessageListener> listeners, Message message, @Nullable byte[] pattern) {
1023962

1024963
byte[] source = (pattern != null ? pattern.clone() : message.getChannel());
@@ -1057,6 +996,73 @@ private byte[] serialize(Topic topic) {
1057996
return serializer.serialize(topic.getTopic());
1058997
}
1059998

999+
/**
1000+
* Represents an operation that accepts three input arguments {@link SubscriptionListener},
1001+
* {@code channel or pattern}, and {@code count} and returns no result.
1002+
*/
1003+
interface SubscriptionConsumer {
1004+
void accept(SubscriptionListener listener, byte[] channelOrPattern, long count);
1005+
}
1006+
1007+
/**
1008+
* Container listening state.
1009+
*
1010+
* @author Mark Paluch
1011+
* @since 2.7
1012+
*/
1013+
static class State {
1014+
1015+
private final boolean prepareListening;
1016+
private final boolean listening;
1017+
1018+
private State(boolean prepareListening, boolean listening) {
1019+
1020+
this.prepareListening = prepareListening;
1021+
this.listening = listening;
1022+
}
1023+
1024+
/**
1025+
* Initial state. Next state is {@link #prepareListening()}.
1026+
*/
1027+
static State notListening() {
1028+
return new State(false, false);
1029+
}
1030+
1031+
/**
1032+
* Prepare listening after {@link #notListening()}. Next states are either {@link #notListening()} upon failure or
1033+
* {@link #listening()}.
1034+
*/
1035+
static State prepareListening() {
1036+
return new State(true, false);
1037+
}
1038+
1039+
/**
1040+
* Active listening state after {@link #prepareListening()}. Next is {@link #prepareUnsubscribe()}.
1041+
*/
1042+
static State listening() {
1043+
return new State(true, true);
1044+
}
1045+
1046+
/**
1047+
* Prepare unsubscribe after {@link #listening()}. Next state is {@link #notListening()}.
1048+
*/
1049+
static State prepareUnsubscribe() {
1050+
return new State(false, true);
1051+
}
1052+
1053+
private boolean isListenerActivated() {
1054+
return isListening() || isPrepareListening();
1055+
}
1056+
1057+
public boolean isListening() {
1058+
return listening;
1059+
}
1060+
1061+
public boolean isPrepareListening() {
1062+
return prepareListening;
1063+
}
1064+
}
1065+
10601066
/**
10611067
* Actual message dispatcher/multiplexer.
10621068
*
@@ -1119,7 +1125,7 @@ public void onPatternUnsubscribed(byte[] pattern, long count) {
11191125
* actual listeners without blocking the event loop.
11201126
*
11211127
* @author Mark Paluch
1122-
* @since 3.0
1128+
* @since 2.7
11231129
*/
11241130
class Subscriber {
11251131

@@ -1324,7 +1330,7 @@ private void doWithSubscription(byte[][] data, BiConsumer<Subscription, byte[][]
13241330
* Blocking variant of a subscriber for connectors that block within the (p)subscribe method.
13251331
*
13261332
* @author Mark Paluch
1327-
* @since 3.0
1333+
* @since 2.7
13281334
*/
13291335
class BlockingSubscriber extends Subscriber {
13301336

src/test/java/org/springframework/data/redis/listener/RedisMessageListenerContainerUnitTests.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,4 +105,9 @@ void containerShouldStopGracefullyOnUnsubscribeErrors() {
105105
assertThat(container.isRunning()).isFalse();
106106
verify(subscriptionMock).close();
107107
}
108+
109+
@Test // GH-964
110+
void failsOnDuplicateInit() {
111+
assertThatExceptionOfType(IllegalStateException.class).isThrownBy(() -> container.afterPropertiesSet());
112+
}
108113
}

0 commit comments

Comments
 (0)