Skip to content

Commit 544de6b

Browse files
garyrussellartembilan
authored andcommitted
Add BoundRabbitChannelAdvice
Polishing and docs Polishing - DEBUG log for confirms; add integration test Polishing - PR Comments Renamed Advice Verify acks logged. Polishing - more PR comments Renamed to BoundRabbitChannelAdvice. * Extract `ConfirmCallback`s instances for optimization * Remove unused constant
1 parent 8afdcb4 commit 544de6b

File tree

7 files changed

+438
-1
lines changed

7 files changed

+438
-1
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
/*
2+
* Copyright 2018 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.integration.amqp.support;
18+
19+
import java.lang.reflect.UndeclaredThrowableException;
20+
import java.time.Duration;
21+
22+
import org.aopalliance.intercept.MethodInvocation;
23+
import org.apache.commons.logging.Log;
24+
import org.apache.commons.logging.LogFactory;
25+
26+
import org.springframework.amqp.rabbit.core.RabbitOperations;
27+
import org.springframework.integration.handler.advice.HandleMessageAdvice;
28+
import org.springframework.lang.Nullable;
29+
import org.springframework.util.Assert;
30+
import org.springframework.util.ReflectionUtils;
31+
32+
import com.rabbitmq.client.ConfirmCallback;
33+
34+
/**
35+
* An advice that causes all downstream {@link RabbitOperations} operations to be executed
36+
* on the same channel, as long as there are no thread handoffs, since the channel is
37+
* bound to the thread. The same RabbitOperations must be used in this and all downstream
38+
* components. Typically used with a splitter or some other mechanism that would cause
39+
* multiple messages to be sent. Optionally waits for publisher confirms if the channel is
40+
* so configured.
41+
*
42+
* @author Gary Russell
43+
* @author Artem Bilan
44+
*
45+
* @since 5.1
46+
*
47+
*/
48+
public class BoundRabbitChannelAdvice implements HandleMessageAdvice {
49+
50+
private final Log logger = LogFactory.getLog(getClass());
51+
52+
private final RabbitOperations operations;
53+
54+
private final Duration waitForConfirmsTimeout;
55+
56+
private final ConfirmCallback ackCallback = this::handleAcks;
57+
58+
private final ConfirmCallback nackCallback = this::handleNacks;
59+
60+
/**
61+
* Construct an instance that doesn't wait for confirms.
62+
* @param operations the operations.
63+
*/
64+
public BoundRabbitChannelAdvice(RabbitOperations operations) {
65+
this(operations, null);
66+
}
67+
68+
/**
69+
* Construct an instance that waits for publisher confirms (if
70+
* configured and waitForConfirmsTimeout is not null).
71+
* @param operations the operations.
72+
* @param waitForConfirmsTimeout the timeout.
73+
*/
74+
public BoundRabbitChannelAdvice(RabbitOperations operations, @Nullable Duration waitForConfirmsTimeout) {
75+
Assert.notNull(operations, "'operations' cannot be null");
76+
this.operations = operations;
77+
this.waitForConfirmsTimeout = waitForConfirmsTimeout;
78+
}
79+
80+
@Override
81+
public Object invoke(MethodInvocation invocation) throws Throwable {
82+
try {
83+
return this.operations.invoke(operations -> {
84+
try {
85+
Object result = invocation.proceed();
86+
if (this.waitForConfirmsTimeout != null) {
87+
this.operations.waitForConfirmsOrDie(this.waitForConfirmsTimeout.toMillis());
88+
}
89+
return result;
90+
}
91+
catch (Throwable t) { // NOSONAR - rethrown below
92+
ReflectionUtils.rethrowRuntimeException(t);
93+
return null; // not reachable - satisfy compiler
94+
}
95+
}, this.ackCallback, this.nackCallback);
96+
}
97+
catch (UndeclaredThrowableException ute) {
98+
throw ute.getCause();
99+
}
100+
}
101+
102+
private void handleAcks(long deliveryTag, boolean multiple) {
103+
doHandleAcks(deliveryTag, multiple, true);
104+
}
105+
106+
private void handleNacks(long deliveryTag, boolean multiple) {
107+
doHandleAcks(deliveryTag, multiple, false);
108+
}
109+
110+
private void doHandleAcks(long deliveryTag, boolean multiple, boolean ack) {
111+
if (this.logger.isDebugEnabled()) {
112+
this.logger.debug("Publisher confirm " + (!ack ? "n" : "") + "ack: " + deliveryTag + ", " +
113+
"multiple: " + multiple);
114+
}
115+
}
116+
117+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,136 @@
1+
/*
2+
* Copyright 2018 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.integration.amqp.support;
18+
19+
import static org.assertj.core.api.Assertions.assertThat;
20+
import static org.junit.Assert.assertTrue;
21+
import static org.mockito.ArgumentMatchers.anyString;
22+
import static org.mockito.BDDMockito.willAnswer;
23+
import static org.mockito.BDDMockito.willReturn;
24+
import static org.mockito.Mockito.spy;
25+
26+
import java.time.Duration;
27+
import java.util.ArrayList;
28+
import java.util.List;
29+
import java.util.concurrent.CountDownLatch;
30+
import java.util.concurrent.TimeUnit;
31+
32+
import org.apache.commons.logging.Log;
33+
import org.junit.jupiter.api.Test;
34+
35+
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
36+
import org.springframework.amqp.rabbit.core.RabbitTemplate;
37+
import org.springframework.amqp.rabbit.junit.RabbitAvailable;
38+
import org.springframework.beans.DirectFieldAccessor;
39+
import org.springframework.beans.factory.annotation.Autowired;
40+
import org.springframework.context.annotation.Bean;
41+
import org.springframework.context.annotation.Configuration;
42+
import org.springframework.integration.amqp.dsl.Amqp;
43+
import org.springframework.integration.config.EnableIntegration;
44+
import org.springframework.integration.dsl.IntegrationFlow;
45+
import org.springframework.integration.dsl.IntegrationFlows;
46+
import org.springframework.integration.test.util.TestUtils;
47+
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
48+
49+
/**
50+
* @author Gary Russell
51+
* @since 5.1
52+
*
53+
*/
54+
@SpringJUnitConfig
55+
@RabbitAvailable(queues = BoundRabbitChannelAdviceIntegrationTests.QUEUE)
56+
public class BoundRabbitChannelAdviceIntegrationTests {
57+
58+
public static final String QUEUE = "dedicated.advice";
59+
60+
@Autowired
61+
private Config.Gate gate;
62+
63+
@Autowired
64+
private Config config;
65+
66+
@Test
67+
public void testAdvice() throws Exception {
68+
BoundRabbitChannelAdvice advice = this.config.advice(this.config.template());
69+
Log logger = spy(TestUtils.getPropertyValue(advice, "logger", Log.class));
70+
new DirectFieldAccessor(advice).setPropertyValue("logger", logger);
71+
willReturn(true).given(logger).isDebugEnabled();
72+
final CountDownLatch latch = new CountDownLatch(1);
73+
willAnswer(i -> {
74+
latch.countDown();
75+
return i.callRealMethod();
76+
}).given(logger).debug(anyString());
77+
this.gate.send("a,b,c");
78+
assertTrue(this.config.latch.await(10, TimeUnit.SECONDS));
79+
assertTrue(latch.await(10, TimeUnit.SECONDS));
80+
assertThat(this.config.received).containsExactly("A", "B", "C");
81+
}
82+
83+
@Configuration
84+
@EnableIntegration
85+
public static class Config {
86+
87+
private final CountDownLatch latch = new CountDownLatch(3);
88+
89+
private final List<String> received = new ArrayList<>();
90+
91+
@Bean
92+
public CachingConnectionFactory cf() throws Exception {
93+
CachingConnectionFactory ccf = new CachingConnectionFactory("localhost");
94+
ccf.setSimplePublisherConfirms(true);
95+
return ccf;
96+
}
97+
98+
@Bean
99+
public RabbitTemplate template() throws Exception {
100+
return new RabbitTemplate(cf());
101+
}
102+
103+
@Bean
104+
public BoundRabbitChannelAdvice advice(RabbitTemplate template) {
105+
return new BoundRabbitChannelAdvice(template, Duration.ofSeconds(10));
106+
}
107+
108+
@Bean
109+
public IntegrationFlow flow(RabbitTemplate template, BoundRabbitChannelAdvice advice) {
110+
return IntegrationFlows.from(Gate.class)
111+
.split(s -> s.delimiters(",")
112+
.advice(advice))
113+
.<String, String>transform(String::toUpperCase)
114+
.handle(Amqp.outboundAdapter(template).routingKey(QUEUE))
115+
.get();
116+
}
117+
118+
@Bean
119+
public IntegrationFlow listener(CachingConnectionFactory ccf) {
120+
return IntegrationFlows.from(Amqp.inboundAdapter(ccf, QUEUE))
121+
.handle(m -> {
122+
received.add((String) m.getPayload());
123+
this.latch.countDown();
124+
})
125+
.get();
126+
}
127+
128+
public interface Gate {
129+
130+
void send(String out);
131+
132+
}
133+
134+
}
135+
136+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,127 @@
1+
/*
2+
* Copyright 2018 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.integration.amqp.support;
18+
19+
import static org.mockito.ArgumentMatchers.any;
20+
import static org.mockito.ArgumentMatchers.anyBoolean;
21+
import static org.mockito.ArgumentMatchers.anyString;
22+
import static org.mockito.ArgumentMatchers.eq;
23+
import static org.mockito.ArgumentMatchers.isNull;
24+
import static org.mockito.BDDMockito.given;
25+
import static org.mockito.BDDMockito.willAnswer;
26+
import static org.mockito.Mockito.mock;
27+
import static org.mockito.Mockito.spy;
28+
import static org.mockito.Mockito.times;
29+
import static org.mockito.Mockito.verify;
30+
31+
import java.time.Duration;
32+
import java.util.concurrent.ExecutorService;
33+
34+
import org.junit.jupiter.api.Test;
35+
36+
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
37+
import org.springframework.amqp.rabbit.core.RabbitTemplate;
38+
import org.springframework.beans.factory.annotation.Autowired;
39+
import org.springframework.context.annotation.Bean;
40+
import org.springframework.context.annotation.Configuration;
41+
import org.springframework.integration.amqp.dsl.Amqp;
42+
import org.springframework.integration.config.EnableIntegration;
43+
import org.springframework.integration.dsl.IntegrationFlow;
44+
import org.springframework.integration.dsl.IntegrationFlows;
45+
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
46+
47+
import com.rabbitmq.client.Channel;
48+
import com.rabbitmq.client.Connection;
49+
import com.rabbitmq.client.ConnectionFactory;
50+
51+
/**
52+
* @author Gary Russell
53+
* @since 5.1
54+
*
55+
*/
56+
@SpringJUnitConfig
57+
public class BoundRabbitChannelAdviceTests {
58+
59+
@Autowired
60+
private Config.Gate gate;
61+
62+
@Autowired
63+
private Config config;
64+
65+
@Test
66+
public void testAdvice() throws Exception {
67+
this.gate.send("a,b,c");
68+
verify(this.config.connection, times(1)).createChannel();
69+
verify(this.config.channel).confirmSelect();
70+
verify(this.config.channel).basicPublish(eq(""), eq("rk"), anyBoolean(), any(), eq("A".getBytes()));
71+
verify(this.config.channel).basicPublish(eq(""), eq("rk"), anyBoolean(), any(), eq("B".getBytes()));
72+
verify(this.config.channel).basicPublish(eq(""), eq("rk"), anyBoolean(), any(), eq("C".getBytes()));
73+
verify(this.config.channel).waitForConfirmsOrDie(10_000L);
74+
}
75+
76+
@Configuration
77+
@EnableIntegration
78+
public static class Config {
79+
80+
private Connection connection;
81+
82+
private Channel channel;
83+
84+
@Bean
85+
public CachingConnectionFactory cf() throws Exception {
86+
ConnectionFactory cf = mock(ConnectionFactory.class);
87+
cf.setHost("localhost");
88+
cf = spy(cf);
89+
willAnswer(i -> {
90+
this.connection = mock(Connection.class);
91+
willAnswer(ii -> {
92+
this.channel = mock(Channel.class);
93+
given(this.channel.isOpen()).willReturn(true);
94+
return this.channel;
95+
}).given(this.connection).createChannel();
96+
return this.connection;
97+
}).given(cf).newConnection((ExecutorService) isNull(), anyString());
98+
cf.setAutomaticRecoveryEnabled(false);
99+
CachingConnectionFactory ccf = new CachingConnectionFactory(cf);
100+
ccf.setSimplePublisherConfirms(true);
101+
return ccf;
102+
}
103+
104+
@Bean
105+
public RabbitTemplate template() throws Exception {
106+
return new RabbitTemplate(cf());
107+
}
108+
109+
@Bean
110+
public IntegrationFlow flow(RabbitTemplate template) {
111+
return IntegrationFlows.from(Gate.class)
112+
.split(s -> s.delimiters(",")
113+
.advice(new BoundRabbitChannelAdvice(template, Duration.ofSeconds(10))))
114+
.<String, String>transform(String::toUpperCase)
115+
.handle(Amqp.outboundAdapter(template).routingKey("rk"))
116+
.get();
117+
}
118+
119+
public interface Gate {
120+
121+
void send(String out);
122+
123+
}
124+
125+
}
126+
127+
}

spring-integration-core/src/main/java/org/springframework/integration/dsl/IntegrationFlowDefinition.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1298,7 +1298,7 @@ public B split() {
12981298
* Typically used with a Java 8 Lambda expression:
12991299
* <pre class="code">
13001300
* {@code
1301-
* .split(s -> s.applySequence(false).get().getT2().setDelimiters(","))
1301+
* .split(s -> s.applySequence(false).delimiters(","))
13021302
* }
13031303
* </pre>
13041304
* @param endpointConfigurer the {@link Consumer} to provide integration endpoint options

0 commit comments

Comments
 (0)