Skip to content

Commit 26cd645

Browse files
committed
INT-4495: (S)FTP: fix max-fetch with directories
JIRA: https://jira.spring.io/browse/INT-4495 Previously, `maxFetch` was applied before directories were removed from the fetch list. Remove the directories before filtering and applying `maxFetch`. * Polishing - PR Comments * More polishing * More polishing. * Check for empty array. * Remove test main method. INT-4495: Fix SFTP tests
1 parent c1dd618 commit 26cd645

File tree

10 files changed

+315
-36
lines changed

10 files changed

+315
-36
lines changed

spring-integration-file/src/main/java/org/springframework/integration/file/remote/AbstractRemoteFileStreamingMessageSource.java

Lines changed: 22 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2017 the original author or authors.
2+
* Copyright 2016-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -23,7 +23,6 @@
2323
import java.util.Collection;
2424
import java.util.Collections;
2525
import java.util.Comparator;
26-
import java.util.Iterator;
2726
import java.util.List;
2827
import java.util.concurrent.BlockingQueue;
2928
import java.util.concurrent.LinkedBlockingQueue;
@@ -38,8 +37,10 @@
3837
import org.springframework.integration.file.filters.FileListFilter;
3938
import org.springframework.integration.file.filters.ReversibleFileListFilter;
4039
import org.springframework.integration.file.remote.session.Session;
40+
import org.springframework.integration.file.support.FileUtils;
4141
import org.springframework.messaging.MessagingException;
4242
import org.springframework.util.Assert;
43+
import org.springframework.util.ObjectUtils;
4344

4445
/**
4546
* A message source that produces a message with an {@link InputStream} payload
@@ -192,31 +193,27 @@ protected String remotePath(AbstractFileInfo<F> file) {
192193
private void listFiles() {
193194
String remoteDirectory = this.remoteDirectoryExpression.getValue(getEvaluationContext(), String.class);
194195
F[] files = this.remoteFileTemplate.list(remoteDirectory);
195-
int maxFetchSize = getMaxFetchSize();
196-
List<F> filteredFiles = this.filter == null ? Arrays.asList(files) : this.filter.filterFiles(files);
197-
if (maxFetchSize > 0 && filteredFiles.size() > maxFetchSize) {
198-
rollbackFromFileToListEnd(filteredFiles, filteredFiles.get(maxFetchSize));
199-
List<F> newList = new ArrayList<>(maxFetchSize);
200-
for (int i = 0; i < maxFetchSize; i++) {
201-
newList.add(filteredFiles.get(i));
202-
}
203-
filteredFiles = newList;
196+
if (!ObjectUtils.isEmpty(files)) {
197+
files = FileUtils.purgeUnwantedElements(files, f -> f == null || isDirectory(f));
204198
}
205-
List<AbstractFileInfo<F>> fileInfoList = asFileInfoList(filteredFiles);
206-
Iterator<AbstractFileInfo<F>> iterator = fileInfoList.iterator();
207-
while (iterator.hasNext()) {
208-
AbstractFileInfo<F> next = iterator.next();
209-
if (next.isDirectory()) {
210-
iterator.remove();
199+
if (!ObjectUtils.isEmpty(files)) {
200+
int maxFetchSize = getMaxFetchSize();
201+
List<F> filteredFiles = this.filter == null ? Arrays.asList(files) : this.filter.filterFiles(files);
202+
if (maxFetchSize > 0 && filteredFiles.size() > maxFetchSize) {
203+
rollbackFromFileToListEnd(filteredFiles, filteredFiles.get(maxFetchSize));
204+
List<F> newList = new ArrayList<>(maxFetchSize);
205+
for (int i = 0; i < maxFetchSize; i++) {
206+
newList.add(filteredFiles.get(i));
207+
}
208+
filteredFiles = newList;
211209
}
212-
else {
213-
next.setRemoteDirectory(remoteDirectory);
210+
List<AbstractFileInfo<F>> fileInfoList = asFileInfoList(filteredFiles);
211+
fileInfoList.forEach(fi -> fi.setRemoteDirectory(remoteDirectory));
212+
if (this.comparator != null) {
213+
Collections.sort(fileInfoList, this.comparator);
214214
}
215+
this.toBeReceived.addAll(fileInfoList);
215216
}
216-
if (this.comparator != null) {
217-
Collections.sort(fileInfoList, this.comparator);
218-
}
219-
this.toBeReceived.addAll(fileInfoList);
220217
}
221218

222219
protected void rollbackFromFileToListEnd(List<F> filteredFiles, F file) {
@@ -228,4 +225,6 @@ protected void rollbackFromFileToListEnd(List<F> filteredFiles, F file) {
228225

229226
abstract protected List<AbstractFileInfo<F>> asFileInfoList(Collection<F> files);
230227

228+
abstract protected boolean isDirectory(F file);
229+
231230
}

spring-integration-file/src/main/java/org/springframework/integration/file/remote/synchronizer/AbstractInboundFileSynchronizer.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
import org.springframework.integration.file.remote.RemoteFileTemplate;
4747
import org.springframework.integration.file.remote.session.Session;
4848
import org.springframework.integration.file.remote.session.SessionFactory;
49+
import org.springframework.integration.file.support.FileUtils;
4950
import org.springframework.messaging.MessagingException;
5051
import org.springframework.util.Assert;
5152
import org.springframework.util.ObjectUtils;
@@ -284,6 +285,9 @@ public void synchronizeToLocalDirectory(final File localDirectory, final int max
284285
try {
285286
int transferred = this.remoteFileTemplate.execute(session -> {
286287
F[] files = session.list(this.evaluatedRemoteDirectory);
288+
if (!ObjectUtils.isEmpty(files)) {
289+
files = FileUtils.purgeUnwantedElements(files, e -> !isFile(e));
290+
}
287291
if (!ObjectUtils.isEmpty(files)) {
288292
List<F> filteredFiles = filterFiles(files);
289293
if (maxFetchSize >= 0 && filteredFiles.size() > maxFetchSize) {
@@ -313,7 +317,6 @@ public void synchronizeToLocalDirectory(final File localDirectory, final int max
313317
throw e1;
314318
}
315319
}
316-
317320
return copied;
318321
}
319322
else {
@@ -344,7 +347,7 @@ protected boolean copyFileToLocalDirectory(String remoteDirectoryPath, F remoteF
344347
? (remoteDirectoryPath + this.remoteFileSeparator + remoteFileName)
345348
: remoteFileName;
346349

347-
if (!this.isFile(remoteFile)) {
350+
if (!isFile(remoteFile)) {
348351
if (this.logger.isDebugEnabled()) {
349352
this.logger.debug("cannot copy, not a file: " + remoteFilePath);
350353
}

spring-integration-file/src/main/java/org/springframework/integration/file/support/FileUtils.java

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2017 the original author or authors.
2+
* Copyright 2017-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -16,7 +16,12 @@
1616

1717
package org.springframework.integration.file.support;
1818

19+
import java.lang.reflect.Array;
1920
import java.nio.file.FileSystems;
21+
import java.util.Arrays;
22+
import java.util.function.Predicate;
23+
24+
import org.springframework.util.ObjectUtils;
2025

2126
/**
2227
* Utilities for operations on Files.
@@ -29,6 +34,26 @@ public final class FileUtils {
2934

3035
public static final boolean IS_POSIX = FileSystems.getDefault().supportedFileAttributeViews().contains("posix");
3136

37+
/**
38+
* Remove entries from the array if the predicate returns true for an element.
39+
* @param fileArray the array.
40+
* @param predicate the predicate.
41+
* @param <F> the file type.
42+
* @return the array of remaining elements.
43+
* @since 5.0.7
44+
*/
45+
@SuppressWarnings("unchecked")
46+
public static <F> F[] purgeUnwantedElements(F[] fileArray, Predicate<F> predicate) {
47+
if (ObjectUtils.isEmpty(fileArray)) {
48+
return fileArray;
49+
}
50+
else {
51+
return Arrays.stream(fileArray)
52+
.filter(predicate.negate())
53+
.toArray(size -> (F[]) Array.newInstance(fileArray[0].getClass(), size));
54+
}
55+
}
56+
3257
private FileUtils() {
3358
super();
3459
}

spring-integration-file/src/test/java/org/springframework/integration/file/remote/StreamingInboundTests.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016 the original author or authors.
2+
* Copyright 2016-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -210,6 +210,11 @@ protected List<AbstractFileInfo<String>> asFileInfoList(Collection<String> files
210210
return infos;
211211
}
212212

213+
@Override
214+
protected boolean isDirectory(String file) {
215+
return false;
216+
}
217+
213218
}
214219

215220
public static class StringFileInfo extends AbstractFileInfo<String> {

spring-integration-ftp/src/main/java/org/springframework/integration/ftp/inbound/FtpStreamingMessageSource.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2017 the original author or authors.
2+
* Copyright 2016-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -76,4 +76,9 @@ protected List<AbstractFileInfo<FTPFile>> asFileInfoList(Collection<FTPFile> fil
7676
return canonicalFiles;
7777
}
7878

79+
@Override
80+
protected boolean isDirectory(FTPFile file) {
81+
return file != null && file.isDirectory();
82+
}
83+
7984
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
/*
2+
* Copyright 2018 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.integration.ftp.inbound;
18+
19+
import static org.hamcrest.Matchers.equalTo;
20+
import static org.junit.Assert.assertNotNull;
21+
import static org.junit.Assert.assertThat;
22+
23+
import org.junit.Test;
24+
import org.junit.runner.RunWith;
25+
26+
import org.springframework.beans.factory.annotation.Autowired;
27+
import org.springframework.context.ApplicationContext;
28+
import org.springframework.context.annotation.Configuration;
29+
import org.springframework.integration.file.FileHeaders;
30+
import org.springframework.integration.ftp.FtpTestSupport;
31+
import org.springframework.messaging.Message;
32+
import org.springframework.test.annotation.DirtiesContext;
33+
import org.springframework.test.context.junit4.SpringRunner;
34+
35+
/**
36+
* @author Gary Russell
37+
* @since 5.0.7
38+
*
39+
*/
40+
@RunWith(SpringRunner.class)
41+
@DirtiesContext
42+
public class FtpMessageSourceTests extends FtpTestSupport {
43+
44+
@Autowired
45+
private ApplicationContext context;
46+
47+
@Test
48+
public void testMaxFetch() throws Exception {
49+
FtpInboundFileSynchronizingMessageSource messageSource = buildSource();
50+
Message<?> received = messageSource.receive();
51+
assertNotNull(received);
52+
assertThat(received.getHeaders().get(FileHeaders.FILENAME), equalTo(" ftpSource1.txt"));
53+
}
54+
55+
private FtpInboundFileSynchronizingMessageSource buildSource() throws Exception {
56+
FtpInboundFileSynchronizer sync = new FtpInboundFileSynchronizer(sessionFactory());
57+
sync.setRemoteDirectory("ftpSource/");
58+
sync.setBeanFactory(this.context);
59+
FtpInboundFileSynchronizingMessageSource messageSource = new FtpInboundFileSynchronizingMessageSource(sync);
60+
messageSource.setLocalDirectory(getTargetLocalDirectory());
61+
messageSource.setMaxFetchSize(1);
62+
messageSource.setBeanFactory(this.context);
63+
messageSource.setBeanName("source");
64+
messageSource.afterPropertiesSet();
65+
return messageSource;
66+
}
67+
68+
@Configuration
69+
public static class Config {
70+
71+
}
72+
73+
}

spring-integration-ftp/src/test/java/org/springframework/integration/ftp/inbound/FtpStreamingMessageSourceTests.java

Lines changed: 40 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import org.junit.runner.RunWith;
3232

3333
import org.springframework.beans.factory.annotation.Autowired;
34+
import org.springframework.context.ApplicationContext;
3435
import org.springframework.context.annotation.Bean;
3536
import org.springframework.context.annotation.Configuration;
3637
import org.springframework.integration.annotation.InboundChannelAdapter;
@@ -52,7 +53,7 @@
5253
import org.springframework.messaging.Message;
5354
import org.springframework.scheduling.support.PeriodicTrigger;
5455
import org.springframework.test.annotation.DirtiesContext;
55-
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
56+
import org.springframework.test.context.junit4.SpringRunner;
5657

5758
/**
5859
* @author Gary Russell
@@ -61,7 +62,7 @@
6162
* @since 4.3
6263
*
6364
*/
64-
@RunWith(SpringJUnit4ClassRunner.class)
65+
@RunWith(SpringRunner.class)
6566
@DirtiesContext
6667
public class FtpStreamingMessageSourceTests extends FtpTestSupport {
6768

@@ -74,6 +75,12 @@ public class FtpStreamingMessageSourceTests extends FtpTestSupport {
7475
@Autowired
7576
private SourcePollingChannelAdapter adapter;
7677

78+
@Autowired
79+
private Config config;
80+
81+
@Autowired
82+
private ApplicationContext context;
83+
7784
@Rule
7885
public Log4j2LevelAdjuster adjuster =
7986
Log4j2LevelAdjuster.debug()
@@ -82,6 +89,7 @@ public class FtpStreamingMessageSourceTests extends FtpTestSupport {
8289
@SuppressWarnings("unchecked")
8390
@Test
8491
public void testAllContents() {
92+
this.adapter.start();
8593
Message<byte[]> received = (Message<byte[]>) this.data.receive(10000);
8694
assertNotNull(received);
8795
assertThat(new String(received.getPayload()), equalTo("source1"));
@@ -115,6 +123,35 @@ public void testAllContents() {
115123
this.adapter.stop();
116124
}
117125

126+
@Test
127+
public void testMaxFetch() {
128+
FtpStreamingMessageSource messageSource = buildsource();
129+
messageSource.setFilter(new AcceptAllFileListFilter<>());
130+
messageSource.afterPropertiesSet();
131+
Message<InputStream> received = messageSource.receive();
132+
assertNotNull(received);
133+
assertThat(received.getHeaders().get(FileHeaders.REMOTE_FILE), equalTo(" ftpSource1.txt"));
134+
}
135+
136+
@Test
137+
public void testMaxFetchNoFilter() {
138+
FtpStreamingMessageSource messageSource = buildsource();
139+
messageSource.setFilter(null);
140+
messageSource.afterPropertiesSet();
141+
Message<InputStream> received = messageSource.receive();
142+
assertNotNull(received);
143+
assertThat(received.getHeaders().get(FileHeaders.REMOTE_FILE), equalTo(" ftpSource1.txt"));
144+
}
145+
146+
private FtpStreamingMessageSource buildsource() {
147+
FtpStreamingMessageSource messageSource = new FtpStreamingMessageSource(this.config.template(),
148+
Comparator.comparing(FileInfo::getFilename));
149+
messageSource.setRemoteDirectory("ftpSource/");
150+
messageSource.setMaxFetchSize(1);
151+
messageSource.setBeanFactory(this.context);
152+
return messageSource;
153+
}
154+
118155
@Configuration
119156
@EnableIntegration
120157
public static class Config {
@@ -133,7 +170,7 @@ public PollerMetadata defaultPoller() {
133170
}
134171

135172
@Bean
136-
@InboundChannelAdapter(channel = "stream")
173+
@InboundChannelAdapter(channel = "stream", autoStartup = "false")
137174
public MessageSource<InputStream> ftpMessageSource() {
138175
FtpStreamingMessageSource messageSource = new FtpStreamingMessageSource(template(),
139176
Comparator.comparing(FileInfo::getFilename));

spring-integration-sftp/src/main/java/org/springframework/integration/sftp/inbound/SftpStreamingMessageSource.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2017 the original author or authors.
2+
* Copyright 2016-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -76,4 +76,9 @@ protected List<AbstractFileInfo<LsEntry>> asFileInfoList(Collection<LsEntry> fil
7676
return canonicalFiles;
7777
}
7878

79+
@Override
80+
protected boolean isDirectory(LsEntry file) {
81+
return file != null && file.getAttrs().isDir();
82+
}
83+
7984
}

0 commit comments

Comments
 (0)