Skip to content

GH-3844: Rework messaging annotation with @Bean #3877

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Aug 22, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,9 @@
import java.util.concurrent.TimeUnit;

import org.aopalliance.intercept.MethodInterceptor;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.Queue;
Expand All @@ -36,7 +33,7 @@
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.junit.BrokerRunning;
import org.springframework.amqp.rabbit.junit.RabbitAvailable;
import org.springframework.aop.framework.ProxyFactory;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.annotation.Autowired;
Expand All @@ -59,39 +56,40 @@
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.junit4.SpringRunner;
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;

/**
* @author Gary Russell
*
* @since 5.0.1
*
*/
@RunWith(SpringRunner.class)
@SpringJUnitConfig
@DirtiesContext
@RabbitAvailable({
AmqpMessageSourceIntegrationTests.DSL_QUEUE,
AmqpMessageSourceIntegrationTests.INTERCEPT_QUEUE,
AmqpMessageSourceIntegrationTests.DLQ,
AmqpMessageSourceIntegrationTests.NOAUTOACK_QUEUE })
public class AmqpMessageSourceIntegrationTests {

private static final String DSL_QUEUE = "AmqpMessageSourceIntegrationTests";
static final String DSL_QUEUE = "AmqpMessageSourceIntegrationTests";

private static final String QUEUE_WITH_DLQ = "AmqpMessageSourceIntegrationTests.withDLQ";
static final String QUEUE_WITH_DLQ = "AmqpMessageSourceIntegrationTests.withDLQ";

private static final String DLQ = QUEUE_WITH_DLQ + ".dlq";
static final String DLQ = QUEUE_WITH_DLQ + ".dlq";

private static final String INTERCEPT_QUEUE = "AmqpMessageSourceIntegrationTests.channel";
static final String INTERCEPT_QUEUE = "AmqpMessageSourceIntegrationTests.channel";

private static final String NOAUTOACK_QUEUE = "AmqpMessageSourceIntegrationTests.noAutoAck";

@ClassRule
public static BrokerRunning brokerRunning = BrokerRunning.isRunningWithEmptyQueues(DSL_QUEUE, INTERCEPT_QUEUE, DLQ,
NOAUTOACK_QUEUE);
static final String NOAUTOACK_QUEUE = "AmqpMessageSourceIntegrationTests.noAutoAck";

@Autowired
private Config config;

@Autowired
private ConfigurableApplicationContext context;

@Before
@BeforeEach
public void before() {
RabbitAdmin admin = new RabbitAdmin(this.config.connectionFactory());
Queue queue = QueueBuilder.nonDurable(QUEUE_WITH_DLQ)
Expand All @@ -103,16 +101,11 @@ public void before() {
this.context.start();
}

@After
@AfterEach
public void after() {
this.context.stop();
}

@AfterClass
public static void afterClass() {
brokerRunning.removeTestQueues(QUEUE_WITH_DLQ);
}

@Test
public void testImplicitNackThenAck() throws Exception {
RabbitTemplate template = new RabbitTemplate(this.config.connectionFactory());
Expand Down Expand Up @@ -256,7 +249,7 @@ public Object postProcessAfterInitialization(Object bean, String beanName) throw

@Bean
public ConnectionFactory connectionFactory() {
return new CachingConnectionFactory(brokerRunning.getConnectionFactory());
return new CachingConnectionFactory("localhost");
}

}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2014-2021 the original author or authors.
* Copyright 2014-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -95,8 +95,7 @@ private void annotated(ConfigurableListableBeanFactory beanFactory,
List<Map<String, String>> idempotentEndpointsMapping, String beanName, BeanDefinition beanDefinition)
throws LinkageError {

if (beanDefinition.getSource() instanceof MethodMetadata) {
MethodMetadata beanMethod = (MethodMetadata) beanDefinition.getSource();
if (beanDefinition.getSource() instanceof MethodMetadata beanMethod) {
String annotationType = IdempotentReceiver.class.getName();
if (beanMethod.isAnnotated(annotationType)) { // NOSONAR never null
Object value = beanMethod.getAnnotationAttributes(annotationType).get("value"); // NOSONAR
Expand All @@ -120,13 +119,7 @@ private void annotated(ConfigurableListableBeanFactory beanFactory,

String endpoint = beanName;
if (!MessageHandler.class.isAssignableFrom(returnType)) {
/*
MessageHandler beans, populated from @Bean methods, have a complex id,
including @Configuration bean name, method name and the Messaging annotation name.
The following pattern matches the bean name, regardless of the annotation name.
*/
endpoint = beanDefinition.getFactoryBeanName() + "." + beanName +
".*" + IntegrationConfigUtils.HANDLER_ALIAS_SUFFIX;
endpoint = beanName + ".*" + IntegrationConfigUtils.HANDLER_ALIAS_SUFFIX;
}

String[] interceptors = (String[]) value;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2019 the original author or authors.
* Copyright 2002-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -19,6 +19,7 @@
import java.util.Map;

import org.springframework.expression.Expression;
import org.springframework.integration.JavaUtils;
import org.springframework.integration.context.IntegrationObjectSupport;
import org.springframework.integration.handler.AbstractMessageProducingHandler;
import org.springframework.integration.router.AbstractMappingMessageRouter;
Expand Down Expand Up @@ -49,6 +50,10 @@ public class RouterFactoryBean extends AbstractStandardMessageHandlerFactoryBean

private String defaultOutputChannelName;

private String prefix;

private String suffix;

private Boolean resolutionRequired;

private Boolean applySequence;
Expand All @@ -63,6 +68,14 @@ public void setDefaultOutputChannelName(String defaultOutputChannelName) {
this.defaultOutputChannelName = defaultOutputChannelName;
}

public void setPrefix(String prefix) {
this.prefix = prefix;
}

public void setSuffix(String suffix) {
this.suffix = suffix;
}

public void setResolutionRequired(Boolean resolutionRequired) {
this.resolutionRequired = resolutionRequired;
}
Expand Down Expand Up @@ -106,7 +119,7 @@ protected MessageHandler createMethodInvokingHandler(Object targetObject, String

@Override
protected MessageHandler createExpressionEvaluatingHandler(Expression expression) {
return this.configureRouter(new ExpressionEvaluatingRouter(expression));
return configureRouter(new ExpressionEvaluatingRouter(expression));
}

protected AbstractMappingMessageRouter createMethodInvokingRouter(Object targetObject, String targetMethodName) {
Expand All @@ -116,34 +129,25 @@ protected AbstractMappingMessageRouter createMethodInvokingRouter(Object targetO
}

protected AbstractMessageRouter configureRouter(AbstractMessageRouter router) {
if (this.defaultOutputChannel != null) {
router.setDefaultOutputChannel(this.defaultOutputChannel);
}
if (this.defaultOutputChannelName != null) {
router.setDefaultOutputChannelName(this.defaultOutputChannelName);
}
if (getSendTimeout() != null) {
router.setSendTimeout(getSendTimeout());
}
if (this.applySequence != null) {
router.setApplySequence(this.applySequence);
}
if (this.ignoreSendFailures != null) {
router.setIgnoreSendFailures(this.ignoreSendFailures);
}
JavaUtils.INSTANCE
.acceptIfNotNull(this.defaultOutputChannel, router::setDefaultOutputChannel)
.acceptIfNotNull(this.defaultOutputChannelName, router::setDefaultOutputChannelName)
.acceptIfNotNull(getSendTimeout(), router::setSendTimeout)
.acceptIfNotNull(this.applySequence, router::setApplySequence)
.acceptIfNotNull(this.ignoreSendFailures, router::setIgnoreSendFailures);

if (router instanceof AbstractMappingMessageRouter) {
this.configureMappingRouter((AbstractMappingMessageRouter) router);
configureMappingRouter((AbstractMappingMessageRouter) router);
}
return router;
}

protected void configureMappingRouter(AbstractMappingMessageRouter router) {
if (this.channelMappings != null) {
router.setChannelMappings(this.channelMappings);
}
if (this.resolutionRequired != null) {
router.setResolutionRequired(this.resolutionRequired);
}
JavaUtils.INSTANCE
.acceptIfNotNull(this.channelMappings, router::setChannelMappings)
.acceptIfNotNull(this.resolutionRequired, router::setResolutionRequired)
.acceptIfHasText(this.prefix, router::setPrefix)
.acceptIfHasText(this.suffix, router::setSuffix);
}

@Override
Expand Down
Loading