Skip to content

Commit 5370d50

Browse files
artembilantzolov
authored andcommitted
GH-8773: Fix MGS for removal from group
Fixes #8773 The #8732 introduced a filtering for messages in group. So, plain `removeMessage()` doesn't work any more if message is connected to some group yet. Therefore, `DelayHandler` is failing. * Introduce `getMessageFromGroup()` and `removeMessageFromGroupById()` into `MessageGroupStore` API and implement it respectively in all the stores * Remove `@LongRunningTest` from delayer integration tests and adjust its config to delay not for a long **Cherry-pick to `6.1.x`**
1 parent adb8970 commit 5370d50

File tree

14 files changed

+219
-81
lines changed

14 files changed

+219
-81
lines changed

spring-integration-core/src/main/java/org/springframework/integration/handler/DelayHandler.java

Lines changed: 15 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
import java.io.Serial;
2020
import java.io.Serializable;
2121
import java.time.Instant;
22-
import java.util.Collection;
2322
import java.util.Collections;
2423
import java.util.Date;
2524
import java.util.List;
@@ -45,7 +44,6 @@
4544
import org.springframework.integration.expression.ExpressionUtils;
4645
import org.springframework.integration.store.MessageGroup;
4746
import org.springframework.integration.store.MessageGroupStore;
48-
import org.springframework.integration.store.MessageStore;
4947
import org.springframework.integration.store.SimpleMessageStore;
5048
import org.springframework.integration.support.management.IntegrationManagedResource;
5149
import org.springframework.jmx.export.annotation.ManagedResource;
@@ -115,8 +113,6 @@ public class DelayHandler extends AbstractReplyProducingMessageHandler implement
115113

116114
private final ConcurrentMap<String, AtomicInteger> deliveries = new ConcurrentHashMap<>();
117115

118-
private final Lock removeReleasedMessageLock = new ReentrantLock();
119-
120116
private String messageGroupId;
121117

122118
private long defaultDelay;
@@ -337,9 +333,7 @@ protected void doInit() {
337333
if (this.messageStore == null) {
338334
this.messageStore = new SimpleMessageStore();
339335
}
340-
else {
341-
Assert.isInstanceOf(MessageStore.class, this.messageStore);
342-
}
336+
343337
this.evaluationContext = ExpressionUtils.createStandardEvaluationContext(getBeanFactory());
344338
this.releaseHandler = createReleaseMessageTask();
345339
}
@@ -501,7 +495,7 @@ private Runnable releaseTaskForMessage(Message<?> delayedMessage) {
501495
}
502496

503497
private Message<?> getMessageById(UUID messageId) {
504-
Message<?> theMessage = ((MessageStore) this.messageStore).getMessage(messageId);
498+
Message<?> theMessage = this.messageStore.getMessageFromGroup(this.messageGroupId, messageId);
505499

506500
if (theMessage == null) {
507501
logger.debug(() -> "No message in the Message Store for id: " + messageId +
@@ -574,11 +568,9 @@ protected void rescheduleAt(Message<?> message, Date startTime) {
574568
}
575569

576570
private void doReleaseMessage(Message<?> message) {
577-
if (removeDelayedMessageFromMessageStore(message)
571+
if (this.messageStore.removeMessageFromGroupById(this.messageGroupId, message.getHeaders().getId())
578572
|| this.deliveries.get(ObjectUtils.getIdentityHexString(message)).get() > 0) {
579-
if (!(this.messageStore instanceof SimpleMessageStore)) {
580-
this.messageStore.removeMessagesFromGroup(this.messageGroupId, message);
581-
}
573+
582574
handleMessageInternal(message);
583575
}
584576
else {
@@ -587,28 +579,6 @@ private void doReleaseMessage(Message<?> message) {
587579
}
588580
}
589581

590-
private boolean removeDelayedMessageFromMessageStore(Message<?> message) {
591-
if (this.messageStore instanceof SimpleMessageStore) {
592-
this.removeReleasedMessageLock.lock();
593-
try {
594-
Collection<Message<?>> messages = this.messageStore.getMessageGroup(this.messageGroupId).getMessages();
595-
if (messages.contains(message)) {
596-
this.messageStore.removeMessagesFromGroup(this.messageGroupId, message);
597-
return true;
598-
}
599-
else {
600-
return false;
601-
}
602-
}
603-
finally {
604-
this.removeReleasedMessageLock.unlock();
605-
}
606-
}
607-
else {
608-
return ((MessageStore) this.messageStore).removeMessage(message.getHeaders().getId()) != null;
609-
}
610-
}
611-
612582
@Override
613583
public int getDelayedMessageCount() {
614584
return this.messageStore.messageGroupSize(this.messageGroupId);
@@ -628,17 +598,17 @@ public void reschedulePersistedMessages() {
628598
try (Stream<Message<?>> messageStream = messageGroup.streamMessages()) {
629599
TaskScheduler taskScheduler = getTaskScheduler();
630600
messageStream.forEach((message) -> // NOSONAR
631-
taskScheduler.schedule(() -> {
632-
// This is fine to keep the reference to the message,
633-
// because the scheduled task is performed immediately.
634-
long delay = determineDelayForMessage(message);
635-
if (delay > 0) {
636-
releaseMessageAfterDelay(message, delay);
637-
}
638-
else {
639-
releaseMessage(message);
640-
}
641-
}, Instant.now()));
601+
taskScheduler.schedule(() -> {
602+
// This is fine to keep the reference to the message,
603+
// because the scheduled task is performed immediately.
604+
long delay = determineDelayForMessage(message);
605+
if (delay > 0) {
606+
releaseMessageAfterDelay(message, delay);
607+
}
608+
else {
609+
releaseMessage(message);
610+
}
611+
}, Instant.now()));
642612
}
643613
}
644614
finally {

spring-integration-core/src/main/java/org/springframework/integration/store/AbstractKeyValueMessageStore.java

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -268,6 +268,42 @@ public void removeMessagesFromGroup(Object groupId, Collection<Message<?>> messa
268268
}
269269
}
270270

271+
@Override
272+
@Nullable
273+
public Message<?> getMessageFromGroup(Object groupId, UUID messageId) {
274+
Assert.notNull(groupId, GROUP_ID_MUST_NOT_BE_NULL);
275+
Assert.notNull(messageId, "'messageId' must not be null");
276+
Object object = doRetrieve(this.messagePrefix + groupId + '_' + messageId);
277+
if (object != null) {
278+
return extractMessage(object);
279+
}
280+
else {
281+
return null;
282+
}
283+
}
284+
285+
@Override
286+
public boolean removeMessageFromGroupById(Object groupId, UUID messageId) {
287+
Assert.notNull(groupId, GROUP_ID_MUST_NOT_BE_NULL);
288+
Assert.notNull(messageId, "'messageId' must not be null");
289+
Object mgm = doRetrieve(this.groupPrefix + groupId);
290+
if (mgm != null) {
291+
Assert.isInstanceOf(MessageGroupMetadata.class, mgm);
292+
MessageGroupMetadata messageGroupMetadata = (MessageGroupMetadata) mgm;
293+
294+
if (messageGroupMetadata.getMessageIds().contains(messageId)) {
295+
messageGroupMetadata.remove(messageId);
296+
String groupToMessageId = this.messagePrefix + groupId + '_' + messageId;
297+
if (doRemove(groupToMessageId) != null) {
298+
messageGroupMetadata.setLastModified(System.currentTimeMillis());
299+
doStore(this.groupPrefix + groupId, messageGroupMetadata);
300+
return true;
301+
}
302+
}
303+
}
304+
return false;
305+
}
306+
271307
@Override
272308
public void completeGroup(Object groupId) {
273309
Assert.notNull(groupId, GROUP_ID_MUST_NOT_BE_NULL);

spring-integration-core/src/main/java/org/springframework/integration/store/MessageGroupStore.java

Lines changed: 28 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2021 the original author or authors.
2+
* Copyright 2002-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -18,10 +18,12 @@
1818

1919
import java.util.Collection;
2020
import java.util.Iterator;
21+
import java.util.UUID;
2122
import java.util.stream.Stream;
2223

2324
import org.springframework.jmx.export.annotation.ManagedAttribute;
2425
import org.springframework.jmx.export.annotation.ManagedOperation;
26+
import org.springframework.lang.Nullable;
2527
import org.springframework.messaging.Message;
2628

2729
/**
@@ -71,6 +73,30 @@ public interface MessageGroupStore extends BasicMessageGroupStore {
7173
*/
7274
void removeMessagesFromGroup(Object key, Message<?>... messages);
7375

76+
/**
77+
* Retrieve a {@link Message} from a group by id.
78+
* Return {@code null} if message does not belong to the requested group.
79+
* @param groupId The groupId for the group containing the message.
80+
* @param messageId The message id.
81+
* @return message by id if it belongs to requested group.
82+
* @since 6.1.5
83+
*/
84+
@Nullable
85+
default Message<?> getMessageFromGroup(Object groupId, UUID messageId) {
86+
throw new UnsupportedOperationException("Not supported for this store");
87+
}
88+
89+
/**
90+
* Deletion the message from the group.
91+
* @param groupId The groupId for the group containing the message.
92+
* @param messageId The message id to be removed.
93+
* @return true if message has been removed.
94+
* @since 6.1.5
95+
*/
96+
default boolean removeMessageFromGroupById(Object groupId, UUID messageId) {
97+
throw new UnsupportedOperationException("Not supported for this store");
98+
}
99+
74100
/**
75101
* Register a callback for when a message group is expired through {@link #expireMessageGroups(long)}.
76102
* @param callback A callback to execute when a message group is cleaned up.
@@ -114,7 +140,7 @@ public interface MessageGroupStore extends BasicMessageGroupStore {
114140

115141
/**
116142
* Completes this MessageGroup. Completion of the MessageGroup generally means
117-
* that this group should not be allowing any more mutating operation to be performed on it.
143+
* that this group should not be allowing anymore mutating operation to be performed on it.
118144
* For example any attempt to add/remove new Message form the group should not be allowed.
119145
* @param groupId The group identifier.
120146
*/

spring-integration-core/src/main/java/org/springframework/integration/store/SimpleMessageStore.java

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.springframework.integration.support.locks.LockRegistry;
2929
import org.springframework.integration.util.UpperBound;
3030
import org.springframework.jmx.export.annotation.ManagedAttribute;
31+
import org.springframework.lang.Nullable;
3132
import org.springframework.messaging.Message;
3233
import org.springframework.messaging.MessagingException;
3334
import org.springframework.util.Assert;
@@ -382,6 +383,52 @@ public void removeMessagesFromGroup(Object groupId, Collection<Message<?>> messa
382383
}
383384
}
384385

386+
@Override
387+
@Nullable
388+
public Message<?> getMessageFromGroup(Object groupId, UUID messageId) {
389+
MessageGroup group = this.groupIdToMessageGroup.get(groupId);
390+
Assert.notNull(group,
391+
() -> MESSAGE_GROUP_FOR_GROUP_ID + groupId + "' does not exists");
392+
for (Message<?> message : group.getMessages()) {
393+
if (messageId.equals(message.getHeaders().getId())) {
394+
return message;
395+
}
396+
}
397+
return null;
398+
}
399+
400+
@Override
401+
public boolean removeMessageFromGroupById(Object groupId, UUID messageId) {
402+
Lock lock = this.lockRegistry.obtain(groupId);
403+
try {
404+
lock.lockInterruptibly();
405+
try {
406+
MessageGroup group = this.groupIdToMessageGroup.get(groupId);
407+
Assert.notNull(group,
408+
() -> MESSAGE_GROUP_FOR_GROUP_ID + groupId + "' " +
409+
"can not be located while attempting to remove Message from the MessageGroup");
410+
UpperBound upperBound = this.groupToUpperBound.get(groupId);
411+
Assert.state(upperBound != null, UPPER_BOUND_MUST_NOT_BE_NULL);
412+
for (Message<?> message : group.getMessages()) {
413+
if (messageId.equals(message.getHeaders().getId())) {
414+
group.remove(message);
415+
upperBound.release();
416+
group.setLastModified(System.currentTimeMillis());
417+
return true;
418+
}
419+
}
420+
return false;
421+
}
422+
finally {
423+
lock.unlock();
424+
}
425+
}
426+
catch (InterruptedException ex) {
427+
Thread.currentThread().interrupt();
428+
throw new MessagingException(INTERRUPTED_WHILE_OBTAINING_LOCK, ex);
429+
}
430+
}
431+
385432
@Override
386433
public Iterator<MessageGroup> iterator() {
387434
return new HashSet<>(this.groupIdToMessageGroup.values()).iterator();

spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/store/JdbcMessageStore.java

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@
5454
import org.springframework.jdbc.support.lob.DefaultLobHandler;
5555
import org.springframework.jdbc.support.lob.LobHandler;
5656
import org.springframework.jmx.export.annotation.ManagedAttribute;
57+
import org.springframework.lang.Nullable;
5758
import org.springframework.messaging.Message;
5859
import org.springframework.util.Assert;
5960
import org.springframework.util.StringUtils;
@@ -163,6 +164,14 @@ SELECT min(CREATED_DATE)
163164
where MESSAGE_ID=? and REGION=?
164165
"""),
165166

167+
GET_MESSAGE_FROM_GROUP("""
168+
SELECT m.MESSAGE_ID, m.CREATED_DATE, m.MESSAGE_BYTES
169+
from %PREFIX%MESSAGE m
170+
inner join %PREFIX%GROUP_TO_MESSAGE gm
171+
on m.MESSAGE_ID = gm.MESSAGE_ID
172+
where gm.MESSAGE_ID=? and gm.GROUP_KEY = ? and gm.REGION=?
173+
"""),
174+
166175
GET_MESSAGE_COUNT("""
167176
SELECT COUNT(MESSAGE_ID)
168177
from %PREFIX%MESSAGE
@@ -601,6 +610,31 @@ public void removeMessagesFromGroup(Object groupId, Collection<Message<?>> messa
601610
updateMessageGroup(groupKey);
602611
}
603612

613+
@Override
614+
@Nullable
615+
public Message<?> getMessageFromGroup(Object groupId, UUID messageId) {
616+
List<Message<?>> list =
617+
this.jdbcTemplate.query(getQuery(Query.GET_MESSAGE_FROM_GROUP), this.mapper,
618+
getKey(messageId), getKey(groupId), this.region);
619+
if (list.isEmpty()) {
620+
return null;
621+
}
622+
return list.get(0);
623+
}
624+
625+
@Override
626+
public boolean removeMessageFromGroupById(Object groupId, UUID messageId) {
627+
String groupKey = getKey(groupId);
628+
String messageKey = getKey(messageId);
629+
int messageToGroupRemoved =
630+
this.jdbcTemplate.update(getQuery(Query.REMOVE_MESSAGE_FROM_GROUP), groupKey, messageKey, this.region);
631+
if (messageToGroupRemoved > 0) {
632+
return this.jdbcTemplate.update(getQuery(Query.DELETE_MESSAGE),
633+
messageKey, this.region, messageKey, this.region) > 0;
634+
}
635+
return false;
636+
}
637+
604638
@Override
605639
public void removeMessageGroup(Object groupId) {
606640
String groupKey = getKey(groupId);

spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/DelayerHandlerRescheduleIntegrationTests-context.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
<delayer id="#{T (org.springframework.integration.jdbc.DelayerHandlerRescheduleIntegrationTests).DELAYER_ID}"
2424
input-channel="input"
2525
output-channel="output"
26-
default-delay="10000"
26+
default-delay="1000"
2727
message-store="messageStore"/>
2828

2929
<channel id="transactionalDelayerOutput"/>

spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/DelayerHandlerRescheduleIntegrationTests.java

Lines changed: 5 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2022 the original author or authors.
2+
* Copyright 2002-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -31,7 +31,6 @@
3131
import org.springframework.integration.store.MessageGroupStore;
3232
import org.springframework.integration.store.SimpleMessageGroup;
3333
import org.springframework.integration.support.MessageBuilder;
34-
import org.springframework.integration.test.condition.LongRunningTest;
3534
import org.springframework.integration.test.util.TestUtils;
3635
import org.springframework.integration.util.UUIDConverter;
3736
import org.springframework.jdbc.datasource.embedded.EmbeddedDatabase;
@@ -47,13 +46,12 @@
4746
import org.springframework.transaction.support.TransactionSynchronizationManager;
4847

4948
import static org.assertj.core.api.Assertions.assertThat;
50-
import static org.assertj.core.api.Assertions.fail;
49+
import static org.assertj.core.api.Assertions.assertThatIllegalStateException;
5150

5251
/**
5352
* @author Artem Bilan
5453
* @author Gary Russell
5554
*/
56-
@LongRunningTest
5755
public class DelayerHandlerRescheduleIntegrationTests {
5856

5957
public static final String DELAYER_ID = "delayerWithJdbcMS";
@@ -98,15 +96,9 @@ public void testDelayerHandlerRescheduleWithJdbcMessageStore() throws Exception
9896
taskScheduler.getScheduledExecutor().awaitTermination(10, TimeUnit.SECONDS);
9997
context.close();
10098

101-
try {
102-
context.getBean("input", MessageChannel.class);
103-
fail("IllegalStateException expected");
104-
}
105-
catch (Exception e) {
106-
assertThat(e instanceof IllegalStateException).isTrue();
107-
assertThat(e.getMessage().contains("BeanFactory not initialized or already closed - call 'refresh'"))
108-
.isTrue();
109-
}
99+
assertThatIllegalStateException()
100+
.isThrownBy(() -> context.getBean("input", MessageChannel.class))
101+
.withMessageContaining("BeanFactory not initialized or already closed - call 'refresh'");
110102

111103
String delayerMessageGroupId = UUIDConverter.getUUID(DELAYER_ID + ".messageGroupId").toString();
112104

0 commit comments

Comments
 (0)