Skip to content

Commit 11d39b7

Browse files
garyrussellartembilan
authored andcommitted
INT-4496: (S)FTP: Sort file list earlier
JIRA: https://jira.spring.io/browse/INT-4496 Sort the file list before applying `maxFetchSize` and filters. * Polishing - PR Comments
1 parent 6ecd948 commit 11d39b7

File tree

15 files changed

+64
-32
lines changed

15 files changed

+64
-32
lines changed

spring-integration-file/src/main/java/org/springframework/integration/file/remote/AbstractRemoteFileStreamingMessageSource.java

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
import java.util.ArrayList;
2222
import java.util.Arrays;
2323
import java.util.Collection;
24-
import java.util.Collections;
2524
import java.util.Comparator;
2625
import java.util.List;
2726
import java.util.concurrent.BlockingQueue;
@@ -59,7 +58,7 @@ public abstract class AbstractRemoteFileStreamingMessageSource<F>
5958

6059
private final BlockingQueue<AbstractFileInfo<F>> toBeReceived = new LinkedBlockingQueue<AbstractFileInfo<F>>();
6160

62-
private final Comparator<AbstractFileInfo<F>> comparator;
61+
private final Comparator<F> comparator;
6362

6463
private boolean fileInfoJson = true;
6564

@@ -76,7 +75,7 @@ public abstract class AbstractRemoteFileStreamingMessageSource<F>
7675
private volatile FileListFilter<F> filter;
7776

7877
protected AbstractRemoteFileStreamingMessageSource(RemoteFileTemplate<F> template,
79-
Comparator<AbstractFileInfo<F>> comparator) {
78+
Comparator<F> comparator) {
8079
this.remoteFileTemplate = template;
8180
this.comparator = comparator;
8281
}
@@ -194,7 +193,7 @@ private void listFiles() {
194193
String remoteDirectory = this.remoteDirectoryExpression.getValue(getEvaluationContext(), String.class);
195194
F[] files = this.remoteFileTemplate.list(remoteDirectory);
196195
if (!ObjectUtils.isEmpty(files)) {
197-
files = FileUtils.purgeUnwantedElements(files, f -> f == null || isDirectory(f));
196+
files = FileUtils.purgeUnwantedElements(files, f -> f == null || isDirectory(f), this.comparator);
198197
}
199198
if (!ObjectUtils.isEmpty(files)) {
200199
int maxFetchSize = getMaxFetchSize();
@@ -209,9 +208,6 @@ private void listFiles() {
209208
}
210209
List<AbstractFileInfo<F>> fileInfoList = asFileInfoList(filteredFiles);
211210
fileInfoList.forEach(fi -> fi.setRemoteDirectory(remoteDirectory));
212-
if (this.comparator != null) {
213-
Collections.sort(fileInfoList, this.comparator);
214-
}
215211
this.toBeReceived.addAll(fileInfoList);
216212
}
217213
}

spring-integration-file/src/main/java/org/springframework/integration/file/remote/synchronizer/AbstractInboundFileSynchronizer.java

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import java.io.OutputStream;
2525
import java.util.ArrayList;
2626
import java.util.Arrays;
27+
import java.util.Comparator;
2728
import java.util.List;
2829
import java.util.regex.Matcher;
2930

@@ -59,6 +60,8 @@
5960
* {@link org.springframework.integration.file.filters.FileListFilter}s to
6061
* ensure the file entry is acceptable.
6162
*
63+
* @param <F> the Type that represents a remote file.
64+
*
6265
* @author Josh Long
6366
* @author Mark Fisher
6467
* @author Oleg Zhurakousky
@@ -116,6 +119,8 @@ public abstract class AbstractInboundFileSynchronizer<F>
116119

117120
private BeanFactory beanFactory;
118121

122+
private Comparator<F> comparator;
123+
119124
/**
120125
* Create a synchronizer with the {@link SessionFactory} used to acquire {@link Session} instances.
121126
*
@@ -127,6 +132,20 @@ public AbstractInboundFileSynchronizer(SessionFactory<F> sessionFactory) {
127132
}
128133

129134

135+
protected Comparator<F> getComparator() {
136+
return this.comparator;
137+
}
138+
139+
/**
140+
* Set a comparator to sort the retrieved list of {@code F} (the Type that represents
141+
* the remote file) prior to applying filters and max fetch size.
142+
* @param comparator the comparator.
143+
* @since 5.1
144+
*/
145+
public void setComparator(Comparator<F> comparator) {
146+
this.comparator = comparator;
147+
}
148+
130149
/**
131150
* @param remoteFileSeparator the remote file separator.
132151
* @see RemoteFileTemplate#setRemoteFileSeparator(String)
@@ -286,7 +305,7 @@ public void synchronizeToLocalDirectory(final File localDirectory, final int max
286305
int transferred = this.remoteFileTemplate.execute(session -> {
287306
F[] files = session.list(this.evaluatedRemoteDirectory);
288307
if (!ObjectUtils.isEmpty(files)) {
289-
files = FileUtils.purgeUnwantedElements(files, e -> !isFile(e));
308+
files = FileUtils.purgeUnwantedElements(files, e -> !isFile(e), this.comparator);
290309
}
291310
if (!ObjectUtils.isEmpty(files)) {
292311
List<F> filteredFiles = filterFiles(files);

spring-integration-file/src/main/java/org/springframework/integration/file/support/FileUtils.java

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,10 @@
1919
import java.lang.reflect.Array;
2020
import java.nio.file.FileSystems;
2121
import java.util.Arrays;
22+
import java.util.Comparator;
2223
import java.util.function.Predicate;
2324

25+
import org.springframework.lang.Nullable;
2426
import org.springframework.util.ObjectUtils;
2527

2628
/**
@@ -38,19 +40,29 @@ public final class FileUtils {
3840
* Remove entries from the array if the predicate returns true for an element.
3941
* @param fileArray the array.
4042
* @param predicate the predicate.
43+
* @param comparator an optional comparator to sort the results.
4144
* @param <F> the file type.
4245
* @return the array of remaining elements.
4346
* @since 5.0.7
4447
*/
4548
@SuppressWarnings("unchecked")
46-
public static <F> F[] purgeUnwantedElements(F[] fileArray, Predicate<F> predicate) {
49+
public static <F> F[] purgeUnwantedElements(F[] fileArray, Predicate<F> predicate,
50+
@Nullable Comparator<F> comparator) {
4751
if (ObjectUtils.isEmpty(fileArray)) {
4852
return fileArray;
4953
}
5054
else {
51-
return Arrays.stream(fileArray)
52-
.filter(predicate.negate())
53-
.toArray(size -> (F[]) Array.newInstance(fileArray[0].getClass(), size));
55+
if (comparator == null) {
56+
return Arrays.stream(fileArray)
57+
.filter(predicate.negate())
58+
.toArray(size -> (F[]) Array.newInstance(fileArray[0].getClass(), size));
59+
}
60+
else {
61+
return Arrays.stream(fileArray)
62+
.filter(predicate.negate())
63+
.sorted(comparator)
64+
.toArray(size -> (F[]) Array.newInstance(fileArray[0].getClass(), size));
65+
}
5466
}
5567
}
5668

spring-integration-file/src/test/java/org/springframework/integration/file/remote/StreamingInboundTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -192,7 +192,7 @@ public void testLineByLine() throws Exception {
192192

193193
public static class Streamer extends AbstractRemoteFileStreamingMessageSource<String> {
194194

195-
protected Streamer(RemoteFileTemplate<String> template, Comparator<AbstractFileInfo<String>> comparator) {
195+
protected Streamer(RemoteFileTemplate<String> template, Comparator<String> comparator) {
196196
super(template, comparator);
197197
}
198198

spring-integration-ftp/src/main/java/org/springframework/integration/ftp/dsl/Ftp.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121

2222
import org.apache.commons.net.ftp.FTPFile;
2323

24-
import org.springframework.integration.file.remote.AbstractFileInfo;
2524
import org.springframework.integration.file.remote.MessageSessionCallback;
2625
import org.springframework.integration.file.remote.RemoteFileTemplate;
2726
import org.springframework.integration.file.remote.gateway.AbstractRemoteFileOutboundGateway;
@@ -79,7 +78,7 @@ public static FtpStreamingInboundChannelAdapterSpec inboundStreamingAdapter(
7978
*/
8079
public static FtpStreamingInboundChannelAdapterSpec inboundStreamingAdapter(
8180
RemoteFileTemplate<FTPFile> remoteFileTemplate,
82-
Comparator<AbstractFileInfo<FTPFile>> receptionOrderComparator) {
81+
Comparator<FTPFile> receptionOrderComparator) {
8382
return new FtpStreamingInboundChannelAdapterSpec(remoteFileTemplate, receptionOrderComparator);
8483
}
8584

spring-integration-ftp/src/main/java/org/springframework/integration/ftp/dsl/FtpStreamingInboundChannelAdapterSpec.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2014-2017 the original author or authors.
2+
* Copyright 2014-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -23,7 +23,6 @@
2323
import org.springframework.integration.file.dsl.RemoteFileStreamingInboundChannelAdapterSpec;
2424
import org.springframework.integration.file.filters.CompositeFileListFilter;
2525
import org.springframework.integration.file.filters.FileListFilter;
26-
import org.springframework.integration.file.remote.AbstractFileInfo;
2726
import org.springframework.integration.file.remote.RemoteFileTemplate;
2827
import org.springframework.integration.ftp.filters.FtpPersistentAcceptOnceFileListFilter;
2928
import org.springframework.integration.ftp.filters.FtpRegexPatternFileListFilter;
@@ -43,7 +42,7 @@ public class FtpStreamingInboundChannelAdapterSpec
4342
FtpStreamingMessageSource> {
4443

4544
FtpStreamingInboundChannelAdapterSpec(RemoteFileTemplate<FTPFile> remoteFileTemplate,
46-
Comparator<AbstractFileInfo<FTPFile>> comparator) {
45+
Comparator<FTPFile> comparator) {
4746
this.target = new FtpStreamingMessageSource(remoteFileTemplate, comparator);
4847
}
4948

spring-integration-ftp/src/main/java/org/springframework/integration/ftp/inbound/FtpStreamingMessageSource.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,8 +56,7 @@ public FtpStreamingMessageSource(RemoteFileTemplate<FTPFile> template) {
5656
* @param template the template.
5757
* @param comparator the comparator.
5858
*/
59-
public FtpStreamingMessageSource(RemoteFileTemplate<FTPFile> template,
60-
Comparator<AbstractFileInfo<FTPFile>> comparator) {
59+
public FtpStreamingMessageSource(RemoteFileTemplate<FTPFile> template, Comparator<FTPFile> comparator) {
6160
super(template, comparator);
6261
doSetFilter(new FtpPersistentAcceptOnceFileListFilter(new SimpleMetadataStore(), "ftpStreamingMessageSource"));
6362
}

spring-integration-ftp/src/test/java/org/springframework/integration/ftp/inbound/FtpStreamingMessageSourceTests.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,6 @@
4646
import org.springframework.integration.endpoint.SourcePollingChannelAdapter;
4747
import org.springframework.integration.file.FileHeaders;
4848
import org.springframework.integration.file.filters.AcceptAllFileListFilter;
49-
import org.springframework.integration.file.remote.FileInfo;
5049
import org.springframework.integration.file.remote.session.SessionFactory;
5150
import org.springframework.integration.ftp.FtpTestSupport;
5251
import org.springframework.integration.ftp.filters.FtpPersistentAcceptOnceFileListFilter;
@@ -160,7 +159,7 @@ public void testMaxFetchNoFilter() throws IOException {
160159

161160
private FtpStreamingMessageSource buildSource() {
162161
FtpStreamingMessageSource messageSource = new FtpStreamingMessageSource(this.config.template(),
163-
Comparator.comparing(FileInfo::getFilename));
162+
Comparator.comparing(FTPFile::getName));
164163
messageSource.setRemoteDirectory("ftpSource/");
165164
messageSource.setMaxFetchSize(1);
166165
messageSource.setBeanFactory(this.context);
@@ -193,11 +192,10 @@ public ConcurrentMap<String, String> metadataMap() {
193192
@InboundChannelAdapter(channel = "stream", autoStartup = "false")
194193
public MessageSource<InputStream> ftpMessageSource() {
195194
FtpStreamingMessageSource messageSource = new FtpStreamingMessageSource(template(),
196-
Comparator.comparing(FileInfo::getFilename));
195+
Comparator.comparing(FTPFile::getName));
197196
messageSource.setFilter(
198197
new FtpPersistentAcceptOnceFileListFilter(
199198
new SimpleMetadataStore(metadataMap()), "testStreaming"));
200-
201199
messageSource.setRemoteDirectory("ftpSource/");
202200
return messageSource;
203201
}

spring-integration-sftp/src/main/java/org/springframework/integration/sftp/dsl/Sftp.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
import java.io.File;
2020
import java.util.Comparator;
2121

22-
import org.springframework.integration.file.remote.AbstractFileInfo;
2322
import org.springframework.integration.file.remote.MessageSessionCallback;
2423
import org.springframework.integration.file.remote.RemoteFileTemplate;
2524
import org.springframework.integration.file.remote.gateway.AbstractRemoteFileOutboundGateway;
@@ -80,7 +79,7 @@ public static SftpStreamingInboundChannelAdapterSpec inboundStreamingAdapter(
8079
*/
8180
public static SftpStreamingInboundChannelAdapterSpec inboundStreamingAdapter(
8281
RemoteFileTemplate<LsEntry> remoteFileTemplate,
83-
Comparator<AbstractFileInfo<LsEntry>> receptionOrderComparator) {
82+
Comparator<LsEntry> receptionOrderComparator) {
8483
return new SftpStreamingInboundChannelAdapterSpec(remoteFileTemplate, receptionOrderComparator);
8584
}
8685

spring-integration-sftp/src/main/java/org/springframework/integration/sftp/dsl/SftpStreamingInboundChannelAdapterSpec.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
import org.springframework.integration.file.dsl.RemoteFileStreamingInboundChannelAdapterSpec;
2222
import org.springframework.integration.file.filters.CompositeFileListFilter;
2323
import org.springframework.integration.file.filters.FileListFilter;
24-
import org.springframework.integration.file.remote.AbstractFileInfo;
2524
import org.springframework.integration.file.remote.RemoteFileTemplate;
2625
import org.springframework.integration.metadata.SimpleMetadataStore;
2726
import org.springframework.integration.sftp.filters.SftpPersistentAcceptOnceFileListFilter;
@@ -41,7 +40,7 @@ public class SftpStreamingInboundChannelAdapterSpec
4140
SftpStreamingMessageSource> {
4241

4342
SftpStreamingInboundChannelAdapterSpec(RemoteFileTemplate<LsEntry> remoteFileTemplate,
44-
Comparator<AbstractFileInfo<LsEntry>> comparator) {
43+
Comparator<LsEntry> comparator) {
4544
this.target = new SftpStreamingMessageSource(remoteFileTemplate, comparator);
4645
}
4746

spring-integration-sftp/src/main/java/org/springframework/integration/sftp/inbound/SftpStreamingMessageSource.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ public SftpStreamingMessageSource(RemoteFileTemplate<LsEntry> template) {
5757
* @param comparator the comparator.
5858
*/
5959
public SftpStreamingMessageSource(RemoteFileTemplate<LsEntry> template,
60-
Comparator<AbstractFileInfo<LsEntry>> comparator) {
60+
Comparator<LsEntry> comparator) {
6161
super(template, comparator);
6262
doSetFilter(new SftpPersistentAcceptOnceFileListFilter(new SimpleMetadataStore(), "sftpStreamingMessageSource"));
6363
}

spring-integration-sftp/src/test/java/org/springframework/integration/sftp/inbound/SftpStreamingMessageSourceTests.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,6 @@
4343
import org.springframework.integration.endpoint.SourcePollingChannelAdapter;
4444
import org.springframework.integration.file.FileHeaders;
4545
import org.springframework.integration.file.filters.AcceptAllFileListFilter;
46-
import org.springframework.integration.file.remote.FileInfo;
4746
import org.springframework.integration.file.remote.session.SessionFactory;
4847
import org.springframework.integration.scheduling.PollerMetadata;
4948
import org.springframework.integration.sftp.SftpTestSupport;
@@ -161,7 +160,7 @@ public void testMaxFetchLambdaFilter() throws IOException {
161160

162161
private SftpStreamingMessageSource buildSource() {
163162
SftpStreamingMessageSource messageSource = new SftpStreamingMessageSource(this.config.template(),
164-
Comparator.comparing(FileInfo::getFilename));
163+
Comparator.comparing(LsEntry::getFilename));
165164
messageSource.setRemoteDirectory("sftpSource/");
166165
messageSource.setMaxFetchSize(1);
167166
messageSource.setBeanFactory(this.context);
@@ -189,7 +188,7 @@ public PollerMetadata defaultPoller() {
189188
@InboundChannelAdapter(channel = "stream", autoStartup = "false")
190189
public MessageSource<InputStream> sftpMessageSource() {
191190
SftpStreamingMessageSource messageSource = new SftpStreamingMessageSource(template(),
192-
Comparator.comparing(FileInfo::getFilename));
191+
Comparator.comparing(LsEntry::getFilename));
193192
messageSource.setFilter(new AcceptAllFileListFilter<>());
194193
messageSource.setRemoteDirectory("sftpSource/");
195194
return messageSource;

src/reference/asciidoc/ftp.adoc

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -567,6 +567,8 @@ If you set the `fileInfoJson` property on the `FtpStreamingMessageSource` to `fa
567567
The `FTPFile` object provided by the underlying Apache Net library can be accessed using the `FtpFileInfo.getFileInfo()` method.
568568
The `fileInfoJson` property is not available when using XML configuration but you can set it by injecting the `FtpStreamingMessageSource` into one of your configuration classes.
569569

570+
Starting with version 5.1, the `comparator` 's generic type is now `FTPFile`; previously it was `AbstractFileInfo<FTPFile>`; this is because the sort is now performed earlier in the processing, before filtering and applying `maxFetch`.
571+
570572
==== Configuring with Java Configuration
571573

572574
The following Spring Boot application provides an example of configuring the inbound adapter using Java configuration:
@@ -727,6 +729,8 @@ Another use for `max-fetch-size` is if you want to stop fetching remote files, b
727729
Setting the `maxFetchSize` property on the `MessageSource` (programmatically, via JMX, or via a <<control-bus, control bus>>) effectively stops the adapter from fetching more files, but allows the poller to continue to emit messages for files that have previously been fetched.
728730
If the poller is active when the property is changed, the change will take effect on the next poll.
729731

732+
Starting with version 5.1, the synchronizer can be provided with a `Comparator<FTPFile>`.
733+
This is useful when restricting the number of files fetched with `maxFetchSize`.
730734

731735
[[ftp-outbound]]
732736
=== FTP Outbound Channel Adapter

src/reference/asciidoc/sftp.adoc

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -612,6 +612,8 @@ If you set the `fileInfoJson` property on the `SftpStreamingMessageSource` to `f
612612
The `LsEntry` object provided by the underlying Jsch library can be accessed using the `SftpFileInfo.getFileInfo()` method.
613613
The `fileInfoJson` property is not available when using XML configuration but you can set it by injecting the `SftpStreamingMessageSource` into one of your configuration classes.
614614

615+
Starting with version 5.1, the `comparator` 's generic type is now `LsEntry`; previously it was `AbstractFileInfo<LsEntry>`; this is because the sort is now performed earlier in the processing, before filtering and applying `maxFetch`.
616+
615617
==== Configuring with Java Configuration
616618

617619
The following Spring Boot application provides an example of configuring the inbound adapter using Java configuration:
@@ -772,6 +774,8 @@ Another use for `max-fetch-size` is if you want to stop fetching remote files, b
772774
Setting the `maxFetchSize` property on the `MessageSource` (programmatically, via JMX, or via a <<control-bus, control bus>>) effectively stops the adapter from fetching more files, but allows the poller to continue to emit messages for files that have previously been fetched.
773775
If the poller is active when the property is changed, the change will take effect on the next poll.
774776

777+
Starting with version 5.1, the synchronizer can be provided with a `Comparator<LsEntry>`.
778+
This is useful when restricting the number of files fetched with `maxFetchSize`.
775779

776780
[[sftp-outbound]]
777781
=== SFTP Outbound Channel Adapter

src/reference/asciidoc/whats-new.adoc

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,3 +85,8 @@ A `RotatingServerAdvice` is now available to poll multiple servers and/or direct
8585
See <<ftp-rotating-server-advice>> and <<sftp-rotating-server-advice>> for more information.
8686

8787
Also inbound adapter `localFilenameExpression` s can contain the variable `#remoteDirectory` which contains the remote directory being polled.
88+
89+
The generic type of the comparators, used to sort the fetched file list for the streaming adapters, has changed from `Comparator<AbstractFileInfo<F>>` to simply `Comparator<F>`.
90+
See <<ftp-streaming>> and <<sftp-streaming>> for more information.
91+
92+
In addition, the synchronizers for inbound channel adapters can now be provided with a `Comparator`; this is useful when using `maxFetchSize` to limit the files retrieved.

0 commit comments

Comments
 (0)