Skip to content

INT-4389: Default to SimpleSeqSizeReleaseStrategy #2444

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
May 15, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ public AbstractCorrelatingMessageHandler(MessageGroupProcessor processor, Messag
? new HeaderAttributeCorrelationStrategy(IntegrationMessageHeaderAccessor.CORRELATION_ID)
: correlationStrategy);
this.releaseStrategy = releaseStrategy == null ? new SimpleSequenceSizeReleaseStrategy() : releaseStrategy;
this.releaseStrategySet = releaseStrategy != null;
this.sequenceAware = this.releaseStrategy instanceof SequenceSizeReleaseStrategy;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2015-2016 the original author or authors.
* Copyright 2015-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -81,9 +81,7 @@ public BarrierMessageHandler(long timeout) {
* @param outputProcessor the output {@link MessageGroupProcessor}.
*/
public BarrierMessageHandler(long timeout, MessageGroupProcessor outputProcessor) {
this(timeout, outputProcessor,
new HeaderAttributeCorrelationStrategy(IntegrationMessageHeaderAccessor.CORRELATION_ID)
);
this(timeout, outputProcessor, null);
}

/**
Expand All @@ -105,10 +103,12 @@ public BarrierMessageHandler(long timeout, CorrelationStrategy correlationStrate
*/
public BarrierMessageHandler(long timeout, MessageGroupProcessor outputProcessor,
CorrelationStrategy correlationStrategy) {

Assert.notNull(outputProcessor, "'messageGroupProcessor' cannot be null");
Assert.notNull(correlationStrategy, "'correlationStrategy' cannot be null");
this.messageGroupProcessor = outputProcessor;
this.correlationStrategy = correlationStrategy;
this.correlationStrategy = (correlationStrategy == null
? new HeaderAttributeCorrelationStrategy(IntegrationMessageHeaderAccessor.CORRELATION_ID)
: correlationStrategy);
this.timeout = timeout;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2016 the original author or authors.
* Copyright 2002-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -25,7 +25,7 @@
import org.springframework.beans.factory.InitializingBean;
import org.springframework.integration.aggregator.MethodInvokingReleaseStrategy;
import org.springframework.integration.aggregator.ReleaseStrategy;
import org.springframework.integration.aggregator.SequenceSizeReleaseStrategy;
import org.springframework.integration.aggregator.SimpleSequenceSizeReleaseStrategy;
import org.springframework.integration.util.MessagingAnnotationUtils;
import org.springframework.util.StringUtils;

Expand All @@ -46,10 +46,7 @@ public class ReleaseStrategyFactoryBean implements FactoryBean<ReleaseStrategy>,

private String methodName;

private ReleaseStrategy strategy = new SequenceSizeReleaseStrategy();

public ReleaseStrategyFactoryBean() {
}
private ReleaseStrategy strategy = new SimpleSequenceSizeReleaseStrategy();

public void setTarget(Object target) {
this.target = target;
Expand Down Expand Up @@ -79,16 +76,14 @@ public void afterPropertiesSet() throws Exception {
if (logger.isWarnEnabled()) {
logger.warn("No ReleaseStrategy annotated method found on "
+ this.target.getClass().getSimpleName()
+ "; falling back to SequenceSizeReleaseStrategy, target:"
+ this.target + ", methodName:" + this.methodName);
+ "; falling back to SimpleSequenceSizeReleaseStrategy, target: "
+ this.target + ", methodName: " + this.methodName);
}
}
}
}
else {
if (logger.isWarnEnabled()) {
logger.warn("No target supplied; falling back to SequenceSizeReleaseStrategy");
}
logger.warn("No target supplied; falling back to SimpleSequenceSizeReleaseStrategy");
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2017 the original author or authors.
* Copyright 2002-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -97,6 +97,7 @@ public abstract class IntegrationNamespaceUtils {
*/
public static void setValueIfAttributeDefined(BeanDefinitionBuilder builder, Element element, String attributeName,
String propertyName) {

setValueIfAttributeDefined(builder, element, attributeName, propertyName, false);
}

Expand All @@ -114,7 +115,9 @@ public static void setValueIfAttributeDefined(BeanDefinitionBuilder builder, Ele
* @param element - the XML element where the attribute should be defined
* @param attributeName - the name of the attribute whose value will be set on the property
*/
public static void setValueIfAttributeDefined(BeanDefinitionBuilder builder, Element element, String attributeName) {
public static void setValueIfAttributeDefined(BeanDefinitionBuilder builder, Element element,
String attributeName) {

setValueIfAttributeDefined(builder, element, attributeName, false);
}

Expand All @@ -131,6 +134,7 @@ public static void setValueIfAttributeDefined(BeanDefinitionBuilder builder, Ele
*/
public static void setValueIfAttributeDefined(BeanDefinitionBuilder builder, Element element, String attributeName,
String propertyName, boolean emptyStringAllowed) {

String attributeValue = element.getAttribute(attributeName);
if (StringUtils.hasText(attributeValue) || (emptyStringAllowed && element.hasAttribute(attributeName))) {
builder.addPropertyValue(propertyName, new TypedStringValue(attributeValue));
Expand All @@ -156,6 +160,7 @@ public static void setValueIfAttributeDefined(BeanDefinitionBuilder builder, Ele
*/
public static void setValueIfAttributeDefined(BeanDefinitionBuilder builder, Element element, String attributeName,
boolean emptyStringAllowed) {

setValueIfAttributeDefined(builder, element, attributeName,
Conventions.attributeNameToPropertyName(attributeName), emptyStringAllowed);
}
Expand All @@ -173,11 +178,13 @@ public static void setValueIfAttributeDefined(BeanDefinitionBuilder builder, Ele
*/
public static void setReferenceIfAttributeDefined(BeanDefinitionBuilder builder, Element element,
String attributeName, String propertyName) {

setReferenceIfAttributeDefined(builder, element, attributeName, propertyName, false);
}

public static void setReferenceIfAttributeDefined(BeanDefinitionBuilder builder, Element element,
String attributeName, String propertyName, boolean emptyStringAllowed) {

if (element.hasAttribute(attributeName)) {
String attributeValue = element.getAttribute(attributeName);
if (StringUtils.hasText(attributeValue)) {
Expand Down Expand Up @@ -209,11 +216,13 @@ else if (emptyStringAllowed) {
*/
public static void setReferenceIfAttributeDefined(BeanDefinitionBuilder builder, Element element,
String attributeName) {

setReferenceIfAttributeDefined(builder, element, attributeName, false);
}

public static void setReferenceIfAttributeDefined(BeanDefinitionBuilder builder, Element element,
String attributeName, boolean emptyStringAllowed) {

setReferenceIfAttributeDefined(builder, element, attributeName,
Conventions.attributeNameToPropertyName(attributeName), emptyStringAllowed);
}
Expand Down Expand Up @@ -245,6 +254,7 @@ public static String createElementDescription(Element element) {
*/
public static void configurePollerMetadata(Element pollerElement, BeanDefinitionBuilder targetBuilder,
ParserContext parserContext) {

if (pollerElement.hasAttribute("ref")) {
int numberOfAttributes = pollerElement.getAttributes().getLength();
if (numberOfAttributes != 1) {
Expand Down Expand Up @@ -286,6 +296,7 @@ public static void configurePollerMetadata(Element pollerElement, BeanDefinition
*/
public static String getTextFromAttributeOrNestedElement(Element element, String name,
ParserContext parserContext) {

String attr = element.getAttribute(name);
Element childElement = DomUtils.getChildElementByTagName(element, name);
if (StringUtils.hasText(attr) && childElement != null) {
Expand Down Expand Up @@ -334,6 +345,7 @@ public static BeanComponentDefinition parseInnerHandlerDefinition(Element elemen
*/
public static void configureHeaderMapper(Element element, BeanDefinitionBuilder rootBuilder,
ParserContext parserContext, Class<?> headerMapperClass, String replyHeaderValue) {

configureHeaderMapper(element, rootBuilder, parserContext,
BeanDefinitionBuilder.genericBeanDefinition(headerMapperClass), replyHeaderValue);
}
Expand All @@ -349,6 +361,7 @@ public static void configureHeaderMapper(Element element, BeanDefinitionBuilder
*/
public static void configureHeaderMapper(Element element, BeanDefinitionBuilder rootBuilder,
ParserContext parserContext, BeanDefinitionBuilder headerMapperBuilder, String replyHeaderValue) {

String defaultMappedReplyHeadersAttributeName = "mapped-reply-headers";
if (!StringUtils.hasText(replyHeaderValue)) {
replyHeaderValue = defaultMappedReplyHeadersAttributeName;
Expand All @@ -368,7 +381,8 @@ public static void configureHeaderMapper(Element element, BeanDefinitionBuilder
if (hasMappedRequestHeaders || hasMappedReplyHeaders) {

if (hasMappedRequestHeaders) {
headerMapperBuilder.addPropertyValue("requestHeaderNames", element.getAttribute("mapped-request-headers"));
headerMapperBuilder.addPropertyValue("requestHeaderNames",
element.getAttribute("mapped-request-headers"));
}
if (hasMappedReplyHeaders) {
headerMapperBuilder.addPropertyValue("replyHeaderNames", element.getAttribute(replyHeaderValue));
Expand Down Expand Up @@ -423,8 +437,10 @@ public static BeanDefinition configureTransactionAttributes(Element txElement, b
* @return The bean definition.
*/
public static BeanDefinition configureTransactionDefinition(Element txElement) {
BeanDefinitionBuilder txDefinitionBuilder = BeanDefinitionBuilder.genericBeanDefinition(DefaultTransactionAttribute.class);
txDefinitionBuilder.addPropertyValue("propagationBehaviorName", "PROPAGATION_" + txElement.getAttribute("propagation"));
BeanDefinitionBuilder txDefinitionBuilder =
BeanDefinitionBuilder.genericBeanDefinition(DefaultTransactionAttribute.class);
txDefinitionBuilder.addPropertyValue("propagationBehaviorName", "PROPAGATION_"
+ txElement.getAttribute("propagation"));
txDefinitionBuilder.addPropertyValue("isolationLevelName", "ISOLATION_" + txElement.getAttribute("isolation"));
txDefinitionBuilder.addPropertyValue("timeout", txElement.getAttribute("timeout"));
txDefinitionBuilder.addPropertyValue("readOnly", txElement.getAttribute("read-only"));
Expand All @@ -442,18 +458,21 @@ public static String[] generateAlias(Element element) {

public static void configureAndSetAdviceChainIfPresent(Element adviceChainElement, Element txElement,
BeanDefinition parentBeanDefinition, ParserContext parserContext) {

configureAndSetAdviceChainIfPresent(adviceChainElement, txElement, false, parentBeanDefinition, parserContext);
}

public static void configureAndSetAdviceChainIfPresent(Element adviceChainElement,
Element txElement, boolean handleMessageAdvice, BeanDefinition parentBeanDefinition,
ParserContext parserContext) {

configureAndSetAdviceChainIfPresent(adviceChainElement, txElement, handleMessageAdvice,
parentBeanDefinition, parserContext, "adviceChain");
}

public static void configureAndSetAdviceChainIfPresent(Element adviceChainElement, Element txElement,
BeanDefinition parentBeanDefinition, ParserContext parserContext, String propertyName) {

configureAndSetAdviceChainIfPresent(adviceChainElement, txElement, false, parentBeanDefinition,
parserContext, propertyName);
}
Expand All @@ -462,6 +481,7 @@ public static void configureAndSetAdviceChainIfPresent(Element adviceChainElemen
public static void configureAndSetAdviceChainIfPresent(Element adviceChainElement, Element txElement,
boolean handleMessageAdvice, BeanDefinition parentBeanDefinition, ParserContext parserContext,
String propertyName) {

ManagedList adviceChain = configureAdviceChain(adviceChainElement, txElement, handleMessageAdvice,
parentBeanDefinition, parserContext);
if (!CollectionUtils.isEmpty(adviceChain)) {
Expand All @@ -472,12 +492,14 @@ public static void configureAndSetAdviceChainIfPresent(Element adviceChainElemen
@SuppressWarnings("rawtypes")
public static ManagedList configureAdviceChain(Element adviceChainElement, Element txElement,
BeanDefinition parentBeanDefinition, ParserContext parserContext) {

return configureAdviceChain(adviceChainElement, txElement, false, parentBeanDefinition, parserContext);
}

@SuppressWarnings({ "rawtypes", "unchecked" })
public static ManagedList configureAdviceChain(Element adviceChainElement, Element txElement,
boolean handleMessageAdvice, BeanDefinition parentBeanDefinition, ParserContext parserContext) {

ManagedList adviceChain = new ManagedList();
if (txElement != null) {
adviceChain.add(configureTransactionAttributes(txElement, handleMessageAdvice));
Expand Down Expand Up @@ -553,7 +575,8 @@ public static BeanDefinition createExpressionDefIfAttributeDefined(String expres
String expressionElementValue = element.getAttribute(expressionElementName);

if (StringUtils.hasText(expressionElementValue)) {
BeanDefinitionBuilder expressionDefBuilder = BeanDefinitionBuilder.genericBeanDefinition(ExpressionFactoryBean.class);
BeanDefinitionBuilder expressionDefBuilder =
BeanDefinitionBuilder.genericBeanDefinition(ExpressionFactoryBean.class);
expressionDefBuilder.addConstructorArgValue(expressionElementValue);
return expressionDefBuilder.getRawBeanDefinition();
}
Expand All @@ -575,6 +598,7 @@ public static String createDirectChannel(Element element, ParserContext parserCo
@SuppressWarnings("unchecked")
public static void checkAndConfigureFixedSubscriberChannel(Element element, ParserContext parserContext,
String channelName, String handlerBeanName) {

BeanDefinitionRegistry registry = parserContext.getRegistry();
if (registry.containsBeanDefinition(channelName)) {
BeanDefinition inputChannelDefinition = registry.getBeanDefinition(channelName);
Expand All @@ -592,17 +616,21 @@ public static void checkAndConfigureFixedSubscriberChannel(Element element, Pars
}
else {
BeanDefinition bfppd;
if (!registry.containsBeanDefinition(IntegrationContextUtils.INTEGRATION_FIXED_SUBSCRIBER_CHANNEL_BPP_BEAN_NAME)) {
if (!registry.containsBeanDefinition(
IntegrationContextUtils.INTEGRATION_FIXED_SUBSCRIBER_CHANNEL_BPP_BEAN_NAME)) {

bfppd = new RootBeanDefinition(FixedSubscriberChannelBeanFactoryPostProcessor.class);
registry.registerBeanDefinition(IntegrationContextUtils.INTEGRATION_FIXED_SUBSCRIBER_CHANNEL_BPP_BEAN_NAME, bfppd);
registry.registerBeanDefinition(
IntegrationContextUtils.INTEGRATION_FIXED_SUBSCRIBER_CHANNEL_BPP_BEAN_NAME, bfppd);
}
else {
bfppd = registry.getBeanDefinition(IntegrationContextUtils.INTEGRATION_FIXED_SUBSCRIBER_CHANNEL_BPP_BEAN_NAME);
bfppd = registry.getBeanDefinition(
IntegrationContextUtils.INTEGRATION_FIXED_SUBSCRIBER_CHANNEL_BPP_BEAN_NAME);
}
ManagedMap<String, String> candidates;
ValueHolder argumentValue = bfppd.getConstructorArgumentValues().getArgumentValue(0, Map.class);
if (argumentValue == null) {
candidates = new ManagedMap<String, String>();
candidates = new ManagedMap<>();
bfppd.getConstructorArgumentValues().addIndexedArgumentValue(0, candidates);
}
else {
Expand All @@ -615,14 +643,19 @@ public static void checkAndConfigureFixedSubscriberChannel(Element element, Pars
public static void injectPropertyWithAdapter(String beanRefAttribute, String methodRefAttribute,
String expressionAttribute, String beanProperty, String adapterClass, Element element,
BeanDefinitionBuilder builder, BeanMetadataElement processor, ParserContext parserContext) {

BeanMetadataElement adapter = constructAdapter(beanRefAttribute, methodRefAttribute, expressionAttribute,
adapterClass, element, processor, parserContext);
builder.addPropertyValue(beanProperty, adapter);

if (adapter != null) {
builder.addPropertyValue(beanProperty, adapter);
}
}

public static void injectConstructorWithAdapter(String beanRefAttribute, String methodRefAttribute,
String expressionAttribute, String adapterClass, Element element,
BeanDefinitionBuilder builder, BeanMetadataElement processor, ParserContext parserContext) {

BeanMetadataElement adapter = constructAdapter(beanRefAttribute, methodRefAttribute, expressionAttribute,
adapterClass, element, processor, parserContext);
builder.addConstructorArgValue(adapter);
Expand All @@ -631,6 +664,7 @@ public static void injectConstructorWithAdapter(String beanRefAttribute, String
private static BeanMetadataElement constructAdapter(String beanRefAttribute, String methodRefAttribute,
String expressionAttribute, String adapterClass, Element element, BeanMetadataElement processor,
ParserContext parserContext) {

final String beanRef = element.getAttribute(beanRefAttribute);
final String beanMethod = element.getAttribute(methodRefAttribute);
final String expression = element.getAttribute(expressionAttribute);
Expand All @@ -657,14 +691,13 @@ else if (hasExpression) {
else if (processor != null) {
adapter = createAdapter(processor, beanMethod, adapterClass);
}
else {
adapter = createAdapter(null, beanMethod, adapterClass);
}

return adapter;
}

private static BeanMetadataElement createAdapter(BeanMetadataElement ref, String method,
String unqualifiedClassName) {

BeanDefinitionBuilder builder = BeanDefinitionBuilder
.genericBeanDefinition(IntegrationConfigUtils.BASE_PACKAGE + ".config." + unqualifiedClassName
+ "FactoryBean");
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2016 the original author or authors.
* Copyright 2002-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -30,6 +30,7 @@
* @author Dave Syer
* @author Iwein Fuld
* @author Oleg Zhurakousky
* @author Artem Bilan
*/
public class ResequencerParser extends AbstractCorrelatingMessageHandlerParser {

Expand All @@ -43,10 +44,11 @@ protected BeanDefinitionBuilder parseHandler(Element element, ParserContext pars

builder.addConstructorArgValue(processorBuilder.getBeanDefinition());

this.doParse(builder, element, processorBuilder.getBeanDefinition(), parserContext);
this.doParse(builder, element, null, parserContext);

IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, RELEASE_PARTIAL_SEQUENCES_ATTRIBUTE);

return builder;
}

}
Loading