Skip to content

Commit 9f937f5

Browse files
garyrussellartembilan
authored andcommitted
INT-4491: (S)FTP inbound rotate dirs/servers
JIRA: https://jira.spring.io/browse/INT-4491 Add Rotating Server/Directory Polling Advice. **cherry-pick to 5.0.x** * Polishing - PR Comments. * Polishing * Polishing; revert `KeyDirectory`; WARN about `TaskExecutor` and `MessageSoureMutator`(s). * More polishing - PR comments * Apply stashed changes. * Fix WARN log - the `SyncTaskExecutor` is wrapped. # Conflicts: # spring-integration-core/src/main/java/org/springframework/integration/endpoint/SourcePollingChannelAdapter.java # src/reference/asciidoc/whats-new.adoc
1 parent 170cc37 commit 9f937f5

File tree

20 files changed

+930
-68
lines changed

20 files changed

+930
-68
lines changed

spring-integration-core/src/main/java/org/springframework/integration/aop/AbstractMessageSourceAdvice.java

Lines changed: 3 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2015 the original author or authors.
2+
* Copyright 2015-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -27,9 +27,10 @@
2727
* should be ignored and/or take action after the receive.
2828
*
2929
* @author Gary Russell
30+
*
3031
* @since 4.2
3132
*/
32-
public abstract class AbstractMessageSourceAdvice implements MethodInterceptor {
33+
public abstract class AbstractMessageSourceAdvice implements MethodInterceptor, MessageSourceMutator {
3334

3435
@Override
3536
public final Object invoke(MethodInvocation invocation) throws Throwable {
@@ -45,20 +46,4 @@ public final Object invoke(MethodInvocation invocation) throws Throwable {
4546
return afterReceive(result, (MessageSource<?>) target);
4647
}
4748

48-
/**
49-
* Subclasses can decide whether to proceed with this poll.
50-
* @param source the message source.
51-
* @return true to proceed.
52-
*/
53-
public abstract boolean beforeReceive(MessageSource<?> source);
54-
55-
/**
56-
* Subclasses can take actions based on the result of the poll; e.g.
57-
* adjust the {@code trigger}. The message can also be replaced with a new one.
58-
* @param result the received message.
59-
* @param source the message source.
60-
* @return a message to continue to process the result, null to discard whatever the poll returned.
61-
*/
62-
public abstract Message<?> afterReceive(Message<?> result, MessageSource<?> source);
63-
6449
}

spring-integration-core/src/main/java/org/springframework/integration/aop/CompoundTriggerAdvice.java

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2015-2016 the original author or authors.
2+
* Copyright 2015-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -47,11 +47,6 @@ public CompoundTriggerAdvice(CompoundTrigger compoundTrigger, Trigger overrideTr
4747
this.override = overrideTrigger;
4848
}
4949

50-
@Override
51-
public boolean beforeReceive(MessageSource<?> source) {
52-
return true;
53-
}
54-
5550
@Override
5651
public Message<?> afterReceive(Message<?> result, MessageSource<?> source) {
5752
if (result == null) {
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
/*
2+
* Copyright 2018 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.integration.aop;
18+
19+
import org.springframework.integration.core.MessageSource;
20+
import org.springframework.messaging.Message;
21+
22+
/**
23+
* An object that can mutate a {@link MessageSource} before and/or after
24+
* {@link MessageSource#receive()} is called.
25+
*
26+
* @author Gary Russell
27+
*
28+
* @since 5.0.7.
29+
*
30+
*/
31+
@FunctionalInterface
32+
public interface MessageSourceMutator {
33+
34+
/**
35+
* Subclasses can decide whether to proceed with this poll.
36+
* @param source the message source.
37+
* @return true to proceed (default).
38+
*/
39+
default boolean beforeReceive(MessageSource<?> source) {
40+
return true;
41+
}
42+
43+
/**
44+
* Subclasses can take actions based on the result of the poll; e.g.
45+
* adjust the {@code trigger}. The message can also be replaced with a new one.
46+
* @param result the received message.
47+
* @param source the message source.
48+
* @return a message to continue to process the result, null to discard whatever the poll returned.
49+
*/
50+
Message<?> afterReceive(Message<?> result, MessageSource<?> source);
51+
52+
}

spring-integration-core/src/main/java/org/springframework/integration/aop/SimpleActiveIdleMessageSourceAdvice.java

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2015 the original author or authors.
2+
* Copyright 2015-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -61,11 +61,6 @@ public void setActivePollPeriod(long activePollPeriod) {
6161
this.activePollPeriod = activePollPeriod;
6262
}
6363

64-
@Override
65-
public boolean beforeReceive(MessageSource<?> source) {
66-
return true;
67-
}
68-
6964
@Override
7065
public Message<?> afterReceive(Message<?> result, MessageSource<?> source) {
7166
if (result == null) {

spring-integration-core/src/main/java/org/springframework/integration/dsl/PollerSpec.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ public PollerSpec transactionSynchronizationFactory(
7575
* to the {@link org.springframework.integration.util.ErrorHandlingTaskExecutor}.
7676
* @param errorHandler the {@link ErrorHandler} to use.
7777
* @return the spec.
78-
* @see #taskExecutor
78+
* @see #taskExecutor(Executor)
7979
*/
8080
public PollerSpec errorHandler(ErrorHandler errorHandler) {
8181
this.target.setErrorHandler(errorHandler);

spring-integration-core/src/main/java/org/springframework/integration/endpoint/AbstractPollingEndpoint.java

Lines changed: 25 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -60,36 +60,49 @@
6060
*/
6161
public abstract class AbstractPollingEndpoint extends AbstractEndpoint implements BeanClassLoaderAware {
6262

63-
private volatile Executor taskExecutor = new SyncTaskExecutor();
63+
private final Object initializationMonitor = new Object();
6464

65-
private volatile ErrorHandler errorHandler;
65+
private Executor taskExecutor = new SyncTaskExecutor();
6666

67-
private volatile boolean errorHandlerIsDefault;
67+
private boolean syncExecutor = true;
6868

69-
private volatile Trigger trigger = new PeriodicTrigger(10);
69+
private ErrorHandler errorHandler;
7070

71-
private volatile List<Advice> adviceChain;
71+
private boolean errorHandlerIsDefault;
7272

73-
private volatile ClassLoader beanClassLoader = ClassUtils.getDefaultClassLoader();
73+
private Trigger trigger = new PeriodicTrigger(10);
7474

75-
private volatile ScheduledFuture<?> runningTask;
75+
private List<Advice> adviceChain;
7676

77-
private volatile Runnable poller;
77+
private ClassLoader beanClassLoader = ClassUtils.getDefaultClassLoader();
7878

79-
private volatile boolean initialized;
79+
private long maxMessagesPerPoll = -1;
8080

81-
private volatile long maxMessagesPerPoll = -1;
81+
private TransactionSynchronizationFactory transactionSynchronizationFactory;
8282

83-
private final Object initializationMonitor = new Object();
83+
private volatile ScheduledFuture<?> runningTask;
8484

85-
private volatile TransactionSynchronizationFactory transactionSynchronizationFactory;
85+
private volatile Runnable poller;
86+
87+
private volatile boolean initialized;
8688

8789
public AbstractPollingEndpoint() {
8890
this.setPhase(Integer.MAX_VALUE / 2);
8991
}
9092

9193
public void setTaskExecutor(Executor taskExecutor) {
9294
this.taskExecutor = (taskExecutor != null ? taskExecutor : new SyncTaskExecutor());
95+
this.syncExecutor = this.taskExecutor instanceof SyncTaskExecutor
96+
|| (this.taskExecutor instanceof ErrorHandlingTaskExecutor
97+
&& ((ErrorHandlingTaskExecutor) this.taskExecutor).isSyncExecutor());
98+
}
99+
100+
protected Executor getTaskExecutor() {
101+
return this.taskExecutor;
102+
}
103+
104+
protected boolean isSyncExecutor() {
105+
return this.syncExecutor;
93106
}
94107

95108
public void setTrigger(Trigger trigger) {

spring-integration-core/src/main/java/org/springframework/integration/endpoint/SourcePollingChannelAdapter.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727
import org.springframework.aop.support.NameMatchMethodPointcutAdvisor;
2828
import org.springframework.beans.factory.BeanCreationException;
2929
import org.springframework.context.Lifecycle;
30-
import org.springframework.integration.aop.AbstractMessageSourceAdvice;
30+
import org.springframework.integration.aop.MessageSourceMutator;
3131
import org.springframework.integration.context.ExpressionCapable;
3232
import org.springframework.integration.core.MessageSource;
3333
import org.springframework.integration.core.MessagingTemplate;
@@ -137,7 +137,7 @@ public String getComponentType() {
137137

138138
@Override
139139
protected boolean isReceiveOnlyAdvice(Advice advice) {
140-
return advice instanceof AbstractMessageSourceAdvice;
140+
return advice instanceof MessageSourceMutator;
141141
}
142142

143143
@Override
@@ -159,6 +159,13 @@ protected void applyReceiveOnlyAdviceChain(Collection<Advice> chain) {
159159
}
160160
this.appliedAdvices.clear();
161161
this.appliedAdvices.addAll(chain);
162+
if (!(isSyncExecutor()) && logger.isWarnEnabled()) {
163+
logger.warn(getComponentName() + ": A task executor is supplied and " + chain.size()
164+
+ "MessageSourceMutator(s) is/are provided. If an advice mutates the source, such "
165+
+ "mutations are not thread safe and could cause unexpected results, especially with "
166+
+ "high frequency pollers. Consider using a downstream ExecutorChannel instead of "
167+
+ "adding an executor to the poller");
168+
}
162169
}
163170
}
164171

spring-integration-core/src/main/java/org/springframework/integration/util/ErrorHandlingTaskExecutor.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2016 the original author or authors.
2+
* Copyright 2002-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -18,6 +18,7 @@
1818

1919
import java.util.concurrent.Executor;
2020

21+
import org.springframework.core.task.SyncTaskExecutor;
2122
import org.springframework.core.task.TaskExecutor;
2223
import org.springframework.util.Assert;
2324
import org.springframework.util.ErrorHandler;
@@ -45,6 +46,9 @@ public ErrorHandlingTaskExecutor(Executor executor, ErrorHandler errorHandler) {
4546
this.errorHandler = errorHandler;
4647
}
4748

49+
public boolean isSyncExecutor() {
50+
return this.executor instanceof SyncTaskExecutor;
51+
}
4852

4953
@Override
5054
public void execute(final Runnable task) {

0 commit comments

Comments
 (0)