Skip to content

Commit 7ea4192

Browse files
committed
Close file resource after (S)FTP streaming tests
Windows is too scrupulous for non closed file resources. Our `RemoteFileTestSupport` recreates a test directory with files for each test. When we don't close some file handler, we are not able to delete the directory and we fail with subsequent tests. * Close `InputStream` for each polled file in new tests in the `SftpStreamingMessageSourceTests` * Close `CLOSEABLE_RESOURCE` in new tests in the `FtpStreamingMessageSourceTests` * Rework `FtpStreamingMessageSourceTests.testAllContents()` do not poll all the files on each polling cycle - this causes a race condition when we don't close `CLOSEABLE_RESOURCE` yet in the `StreamTransformer`, but try to proceed with recreation a test directory structure **Cherry-pick to 5.0.x** (cherry picked from commit 6ecd948)
1 parent 26cd645 commit 7ea4192

File tree

2 files changed

+48
-17
lines changed

2 files changed

+48
-17
lines changed

spring-integration-ftp/src/test/java/org/springframework/integration/ftp/inbound/FtpStreamingMessageSourceTests.java

Lines changed: 37 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -17,23 +17,27 @@
1717
package org.springframework.integration.ftp.inbound;
1818

1919
import static org.hamcrest.CoreMatchers.containsString;
20-
import static org.hamcrest.CoreMatchers.instanceOf;
2120
import static org.hamcrest.Matchers.equalTo;
21+
import static org.hamcrest.Matchers.instanceOf;
2222
import static org.junit.Assert.assertNotNull;
2323
import static org.junit.Assert.assertThat;
2424

25+
import java.io.Closeable;
26+
import java.io.IOException;
2527
import java.io.InputStream;
2628
import java.util.Comparator;
29+
import java.util.concurrent.ConcurrentHashMap;
30+
import java.util.concurrent.ConcurrentMap;
2731

2832
import org.apache.commons.net.ftp.FTPFile;
29-
import org.junit.Rule;
3033
import org.junit.Test;
3134
import org.junit.runner.RunWith;
3235

3336
import org.springframework.beans.factory.annotation.Autowired;
3437
import org.springframework.context.ApplicationContext;
3538
import org.springframework.context.annotation.Bean;
3639
import org.springframework.context.annotation.Configuration;
40+
import org.springframework.integration.StaticMessageHeaderAccessor;
3741
import org.springframework.integration.annotation.InboundChannelAdapter;
3842
import org.springframework.integration.annotation.Transformer;
3943
import org.springframework.integration.channel.QueueChannel;
@@ -45,10 +49,11 @@
4549
import org.springframework.integration.file.remote.FileInfo;
4650
import org.springframework.integration.file.remote.session.SessionFactory;
4751
import org.springframework.integration.ftp.FtpTestSupport;
52+
import org.springframework.integration.ftp.filters.FtpPersistentAcceptOnceFileListFilter;
4853
import org.springframework.integration.ftp.session.FtpFileInfo;
4954
import org.springframework.integration.ftp.session.FtpRemoteFileTemplate;
55+
import org.springframework.integration.metadata.SimpleMetadataStore;
5056
import org.springframework.integration.scheduling.PollerMetadata;
51-
import org.springframework.integration.test.rule.Log4j2LevelAdjuster;
5257
import org.springframework.integration.transformer.StreamTransformer;
5358
import org.springframework.messaging.Message;
5459
import org.springframework.scheduling.support.PeriodicTrigger;
@@ -81,10 +86,8 @@ public class FtpStreamingMessageSourceTests extends FtpTestSupport {
8186
@Autowired
8287
private ApplicationContext context;
8388

84-
@Rule
85-
public Log4j2LevelAdjuster adjuster =
86-
Log4j2LevelAdjuster.debug()
87-
.categories(true, "org.apache.commons");
89+
@Autowired
90+
private ConcurrentMap<String, String> metadataMap;
8891

8992
@SuppressWarnings("unchecked")
9093
@Test
@@ -116,34 +119,46 @@ public void testAllContents() {
116119
this.adapter.stop();
117120
this.source.setFileInfoJson(false);
118121
this.data.purge(null);
122+
this.metadataMap.clear();
119123
this.adapter.start();
120124
received = (Message<byte[]>) this.data.receive(10000);
121125
assertNotNull(received);
122-
assertThat(received.getHeaders().get(FileHeaders.REMOTE_FILE_INFO), instanceOf(FtpFileInfo.class));
123126
this.adapter.stop();
127+
128+
assertThat(received.getHeaders().get(FileHeaders.REMOTE_FILE_INFO), instanceOf(FtpFileInfo.class));
124129
}
125130

126131
@Test
127-
public void testMaxFetch() {
128-
FtpStreamingMessageSource messageSource = buildsource();
132+
public void testMaxFetch() throws IOException {
133+
FtpStreamingMessageSource messageSource = buildSource();
129134
messageSource.setFilter(new AcceptAllFileListFilter<>());
130135
messageSource.afterPropertiesSet();
131136
Message<InputStream> received = messageSource.receive();
132137
assertNotNull(received);
133138
assertThat(received.getHeaders().get(FileHeaders.REMOTE_FILE), equalTo(" ftpSource1.txt"));
139+
140+
Closeable closeableResource = StaticMessageHeaderAccessor.getCloseableResource(received);
141+
if (closeableResource != null) {
142+
closeableResource.close();
143+
}
134144
}
135145

136146
@Test
137-
public void testMaxFetchNoFilter() {
138-
FtpStreamingMessageSource messageSource = buildsource();
147+
public void testMaxFetchNoFilter() throws IOException {
148+
FtpStreamingMessageSource messageSource = buildSource();
139149
messageSource.setFilter(null);
140150
messageSource.afterPropertiesSet();
141151
Message<InputStream> received = messageSource.receive();
142152
assertNotNull(received);
143153
assertThat(received.getHeaders().get(FileHeaders.REMOTE_FILE), equalTo(" ftpSource1.txt"));
154+
155+
Closeable closeableResource = StaticMessageHeaderAccessor.getCloseableResource(received);
156+
if (closeableResource != null) {
157+
closeableResource.close();
158+
}
144159
}
145160

146-
private FtpStreamingMessageSource buildsource() {
161+
private FtpStreamingMessageSource buildSource() {
147162
FtpStreamingMessageSource messageSource = new FtpStreamingMessageSource(this.config.template(),
148163
Comparator.comparing(FileInfo::getFilename));
149164
messageSource.setRemoteDirectory("ftpSource/");
@@ -169,12 +184,20 @@ public PollerMetadata defaultPoller() {
169184
return pollerMetadata;
170185
}
171186

187+
@Bean
188+
public ConcurrentMap<String, String> metadataMap() {
189+
return new ConcurrentHashMap<>();
190+
}
191+
172192
@Bean
173193
@InboundChannelAdapter(channel = "stream", autoStartup = "false")
174194
public MessageSource<InputStream> ftpMessageSource() {
175195
FtpStreamingMessageSource messageSource = new FtpStreamingMessageSource(template(),
176196
Comparator.comparing(FileInfo::getFilename));
177-
messageSource.setFilter(new AcceptAllFileListFilter<>());
197+
messageSource.setFilter(
198+
new FtpPersistentAcceptOnceFileListFilter(
199+
new SimpleMetadataStore(metadataMap()), "testStreaming"));
200+
178201
messageSource.setRemoteDirectory("ftpSource/");
179202
return messageSource;
180203
}

spring-integration-sftp/src/test/java/org/springframework/integration/sftp/inbound/SftpStreamingMessageSourceTests.java

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import static org.junit.Assert.assertNotNull;
2424
import static org.junit.Assert.assertThat;
2525

26+
import java.io.IOException;
2627
import java.io.InputStream;
2728
import java.util.Arrays;
2829
import java.util.Comparator;
@@ -59,6 +60,7 @@
5960
/**
6061
* @author Gary Russell
6162
* @author Artem Bilan
63+
*
6264
* @since 4.3
6365
*
6466
*/
@@ -119,36 +121,42 @@ public void testAllContents() {
119121
}
120122

121123
@Test
122-
public void testMaxFetch() {
124+
public void testMaxFetch() throws IOException {
123125
SftpStreamingMessageSource messageSource = buildSource();
124126
messageSource.setFilter(new AcceptAllFileListFilter<>());
125127
messageSource.afterPropertiesSet();
126128
Message<InputStream> received = messageSource.receive();
127129
assertNotNull(received);
128130
assertThat(received.getHeaders().get(FileHeaders.REMOTE_FILE),
129131
anyOf(equalTo(" sftpSource1.txt"), equalTo("sftpSource2.txt")));
132+
133+
received.getPayload().close();
130134
}
131135

132136
@Test
133-
public void testMaxFetchNoFilter() {
137+
public void testMaxFetchNoFilter() throws IOException {
134138
SftpStreamingMessageSource messageSource = buildSource();
135139
messageSource.setFilter(null);
136140
messageSource.afterPropertiesSet();
137141
Message<InputStream> received = messageSource.receive();
138142
assertNotNull(received);
139143
assertThat(received.getHeaders().get(FileHeaders.REMOTE_FILE),
140144
anyOf(equalTo(" sftpSource1.txt"), equalTo("sftpSource2.txt")));
145+
146+
received.getPayload().close();
141147
}
142148

143149
@Test
144-
public void testMaxFetchLambdaFilter() {
150+
public void testMaxFetchLambdaFilter() throws IOException {
145151
SftpStreamingMessageSource messageSource = buildSource();
146152
messageSource.setFilter(f -> Arrays.asList(f));
147153
messageSource.afterPropertiesSet();
148154
Message<InputStream> received = messageSource.receive();
149155
assertNotNull(received);
150156
assertThat(received.getHeaders().get(FileHeaders.REMOTE_FILE),
151157
anyOf(equalTo(" sftpSource1.txt"), equalTo("sftpSource2.txt")));
158+
159+
received.getPayload().close();
152160
}
153161

154162
private SftpStreamingMessageSource buildSource() {

0 commit comments

Comments
 (0)