Skip to content

Commit 0ef659b

Browse files
artembilangaryrussell
authored andcommitted
INT-4351: Add DiscardAwareFileListFilter
JIRA: https://jira.spring.io/browse/INT-4351 The `WatchService` reacts to the events in the file system and keep track ove the events until we poll them, e.g. via `listEligibleFiles()` in the `WatchServiceDirectoryScanner`. On the other hand the `SourcePollingChannelAdapter` calls the mentioned `listEligibleFiles()` according its polling period. At this moment the `FileListFilter` is applied to the polled files. It may happen that `LastModifiedFileListFilter` can't accept too young files yet and they are lost for the future consideration. * To allow, for example, to retain young files by the `LastModifiedFileListFilter` judgment for the future cycles add `DiscardAwareFileListFilter` with the `DiscardCallback` support. The `WatchServiceDirectoryScanner` now registers such a callback into the filter and stores discarded files into the `filesToPoll` queue for the future poll cycle. **Cherry-pick to 5.0** Fix compilation warnings * Replace `DiscardCallback` with the plain `Consumer` * Ensure uniqueness in the `WatchServiceDirectoryScanner` internal queue via a `Set` implementation, since discard callback may be called several times for the same file from the `CompositeFileListFilter` according to its nature * Add JavaDocs and Docs Change `@since` to `5.0.5`
1 parent acd78a7 commit 0ef659b

File tree

6 files changed

+135
-28
lines changed

6 files changed

+135
-28
lines changed

spring-integration-file/src/main/java/org/springframework/integration/file/FileReadingMessageSource.java

Lines changed: 24 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,8 @@
2929
import java.nio.file.WatchService;
3030
import java.nio.file.attribute.BasicFileAttributes;
3131
import java.util.Arrays;
32-
import java.util.Collection;
3332
import java.util.Comparator;
33+
import java.util.Iterator;
3434
import java.util.LinkedHashSet;
3535
import java.util.List;
3636
import java.util.Queue;
@@ -49,6 +49,7 @@
4949
import org.springframework.integration.context.IntegrationObjectSupport;
5050
import org.springframework.integration.core.MessageSource;
5151
import org.springframework.integration.file.filters.AcceptOnceFileListFilter;
52+
import org.springframework.integration.file.filters.DiscardAwareFileListFilter;
5253
import org.springframework.integration.file.filters.FileListFilter;
5354
import org.springframework.integration.file.filters.ResettableFileListFilter;
5455
import org.springframework.integration.support.AbstractIntegrationMessageBuilder;
@@ -435,12 +436,20 @@ private class WatchServiceDirectoryScanner extends DefaultDirectoryScanner imple
435436

436437
private final ConcurrentMap<Path, WatchKey> pathKeys = new ConcurrentHashMap<>();
437438

438-
private WatchService watcher;
439+
private final Set<File> filesToPoll = ConcurrentHashMap.newKeySet();
439440

440-
private Collection<File> initialFiles;
441+
private WatchService watcher;
441442

442443
private WatchEvent.Kind<?>[] kinds;
443444

445+
@Override
446+
public void setFilter(FileListFilter<File> filter) {
447+
if (filter instanceof DiscardAwareFileListFilter) {
448+
((DiscardAwareFileListFilter<File>) filter).addDiscardCallback(this.filesToPoll::add);
449+
}
450+
super.setFilter(filter);
451+
}
452+
444453
@Override
445454
public void start() {
446455
try {
@@ -456,9 +465,9 @@ public void start() {
456465
this.kinds[i] = FileReadingMessageSource.this.watchEvents[i].kind;
457466
}
458467

459-
final Set<File> initialFiles = walkDirectory(FileReadingMessageSource.this.directory.toPath(), null);
468+
Set<File> initialFiles = walkDirectory(FileReadingMessageSource.this.directory.toPath(), null);
460469
initialFiles.addAll(filesFromEvents());
461-
this.initialFiles = initialFiles;
470+
this.filesToPoll.addAll(initialFiles);
462471
}
463472

464473
@Override
@@ -481,18 +490,22 @@ public boolean isRunning() {
481490
@Override
482491
protected File[] listEligibleFiles(File directory) {
483492
Assert.state(this.watcher != null, "The WatchService has'nt been started");
484-
if (this.initialFiles != null) {
485-
File[] initial = this.initialFiles.toArray(new File[this.initialFiles.size()]);
486-
this.initialFiles = null;
487-
return initial;
493+
494+
Set<File> files = new LinkedHashSet<>();
495+
496+
for (Iterator<File> iterator = this.filesToPoll.iterator(); iterator.hasNext(); ) {
497+
files.add(iterator.next());
498+
iterator.remove();
488499
}
489-
Collection<File> files = filesFromEvents();
500+
501+
files.addAll(filesFromEvents());
502+
490503
return files.toArray(new File[files.size()]);
491504
}
492505

493506
private Set<File> filesFromEvents() {
494507
WatchKey key = this.watcher.poll();
495-
Set<File> files = new LinkedHashSet<File>();
508+
Set<File> files = new LinkedHashSet<>();
496509
while (key != null) {
497510
File parentDir = ((Path) key.watchable()).toAbsolutePath().toFile();
498511
for (WatchEvent<?> event : key.pollEvents()) {

spring-integration-file/src/main/java/org/springframework/integration/file/filters/CompositeFileListFilter.java

Lines changed: 31 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2017 the original author or authors.
2+
* Copyright 2002-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -25,33 +25,41 @@
2525
import java.util.LinkedHashSet;
2626
import java.util.List;
2727
import java.util.Set;
28+
import java.util.function.Consumer;
2829

2930
import org.springframework.beans.factory.InitializingBean;
3031
import org.springframework.util.Assert;
3132

3233
/**
3334
* Simple {@link FileListFilter} that predicates its matches against <b>all</b> of the
3435
* configured {@link FileListFilter}.
36+
* <p>
37+
* Note: when {@link #discardCallback} is provided, it is populated to all the
38+
* {@link DiscardAwareFileListFilter} delegates. In this case, since this filter
39+
* matches the files against all delegates, the {@link #discardCallback} may be
40+
* called several times for the same file.
41+
*
3542
* @param <F> The type that will be filtered.
3643
*
3744
* @author Iwein Fuld
3845
* @author Josh Long
3946
* @author Gary Russell
4047
* @author Artem Bilan
41-
*
42-
*
4348
*/
44-
public class CompositeFileListFilter<F> implements ReversibleFileListFilter<F>, ResettableFileListFilter<F>, Closeable {
49+
public class CompositeFileListFilter<F>
50+
implements ReversibleFileListFilter<F>, ResettableFileListFilter<F>, DiscardAwareFileListFilter<F>, Closeable {
4551

4652
protected final Set<FileListFilter<F>> fileFilters; // NOSONAR
4753

54+
private Consumer<F> discardCallback;
55+
4856

4957
public CompositeFileListFilter() {
50-
this.fileFilters = new LinkedHashSet<FileListFilter<F>>();
58+
this.fileFilters = new LinkedHashSet<>();
5159
}
5260

5361
public CompositeFileListFilter(Collection<? extends FileListFilter<F>> fileFilters) {
54-
this.fileFilters = new LinkedHashSet<FileListFilter<F>>(fileFilters);
62+
this.fileFilters = new LinkedHashSet<>(fileFilters);
5563
}
5664

5765

@@ -65,15 +73,15 @@ public void close() throws IOException {
6573
}
6674

6775
public CompositeFileListFilter<F> addFilter(FileListFilter<F> filter) {
68-
return this.addFilters(Collections.singletonList(filter));
76+
return addFilters(Collections.singletonList(filter));
6977
}
7078

7179
/**
7280
* @param filters one or more new filters to add
7381
* @return this CompositeFileFilter instance with the added filters
7482
* @see #addFilters(Collection)
7583
*/
76-
@SuppressWarnings("unchecked") //For JDK7
84+
@SuppressWarnings("unchecked")
7785
public CompositeFileListFilter<F> addFilters(FileListFilter<F>... filters) {
7886
return addFilters(Arrays.asList(filters));
7987
}
@@ -87,7 +95,10 @@ public CompositeFileListFilter<F> addFilters(FileListFilter<F>... filters) {
8795
* @return this CompositeFileListFilter instance with the added filters
8896
*/
8997
public CompositeFileListFilter<F> addFilters(Collection<? extends FileListFilter<F>> filtersToAdd) {
90-
for (FileListFilter<? extends F> elf : filtersToAdd) {
98+
for (FileListFilter<F> elf : filtersToAdd) {
99+
if (elf instanceof DiscardAwareFileListFilter) {
100+
((DiscardAwareFileListFilter<F>) elf).addDiscardCallback(this.discardCallback);
101+
}
91102
if (elf instanceof InitializingBean) {
92103
try {
93104
((InitializingBean) elf).afterPropertiesSet();
@@ -101,6 +112,17 @@ public CompositeFileListFilter<F> addFilters(Collection<? extends FileListFilter
101112
return this;
102113
}
103114

115+
@Override
116+
public void addDiscardCallback(Consumer<F> discardCallback) {
117+
this.discardCallback = discardCallback;
118+
if (this.discardCallback != null) {
119+
this.fileFilters
120+
.stream()
121+
.filter(DiscardAwareFileListFilter.class::isInstance)
122+
.map(f -> (DiscardAwareFileListFilter<F>) f)
123+
.forEach(f -> f.addDiscardCallback(discardCallback));
124+
}
125+
}
104126

105127
@Override
106128
public List<F> filterFiles(F[] files) {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
/*
2+
* Copyright 2018 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.integration.file.filters;
18+
19+
import java.util.function.Consumer;
20+
21+
/**
22+
* The {@link FileListFilter} modification which can accept a {@link Consumer}
23+
* which can be called when filter discards the file.
24+
*
25+
* @author Artem Bilan
26+
*
27+
* @since 5.0.5
28+
*/
29+
public interface DiscardAwareFileListFilter<F> extends FileListFilter<F> {
30+
31+
void addDiscardCallback(Consumer<F> discardCallback);
32+
33+
}

spring-integration-file/src/main/java/org/springframework/integration/file/filters/LastModifiedFileListFilter.java

Lines changed: 21 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2015-2016 the original author or authors.
2+
* Copyright 2015-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -20,28 +20,31 @@
2020
import java.util.ArrayList;
2121
import java.util.List;
2222
import java.util.concurrent.TimeUnit;
23+
import java.util.function.Consumer;
2324

2425
/**
2526
* The {@link FileListFilter} implementation to filter those files which
2627
* {@link File#lastModified()} is less than the {@link #age} in comparison
2728
* with the current time.
2829
* <p>
2930
* The resolution is done in seconds.
31+
* <p>
32+
* When {@link #discardCallback} is provided, it called for all the
33+
* rejected files.
3034
*
3135
* @author Gary Russell
3236
* @author Artem Bilan
37+
*
3338
* @since 4.2
3439
*
3540
*/
36-
public class LastModifiedFileListFilter implements FileListFilter<File> {
41+
public class LastModifiedFileListFilter implements DiscardAwareFileListFilter<File> {
3742

3843
private static final long DEFAULT_AGE = 60;
3944

4045
private volatile long age = DEFAULT_AGE;
4146

42-
public long getAge() {
43-
return this.age;
44-
}
47+
private Consumer<File> discardCallback;
4548

4649
public LastModifiedFileListFilter() {
4750
}
@@ -67,6 +70,10 @@ public void setAge(long age) {
6770
setAge(age, TimeUnit.SECONDS);
6871
}
6972

73+
public long getAge() {
74+
return this.age;
75+
}
76+
7077
/**
7178
* Set the age that files have to be before being passed by this filter.
7279
* If {@link File#lastModified()} plus age is greater than the current time, the file
@@ -79,14 +86,22 @@ public void setAge(long age, TimeUnit unit) {
7986
this.age = unit.toSeconds(age);
8087
}
8188

89+
@Override
90+
public void addDiscardCallback(Consumer<File> discardCallback) {
91+
this.discardCallback = discardCallback;
92+
}
93+
8294
@Override
8395
public List<File> filterFiles(File[] files) {
84-
List<File> list = new ArrayList<File>();
96+
List<File> list = new ArrayList<>();
8597
long now = System.currentTimeMillis() / 1000;
8698
for (File file : files) {
8799
if (file.lastModified() / 1000 + this.age <= now) {
88100
list.add(file);
89101
}
102+
else if (this.discardCallback != null) {
103+
this.discardCallback.accept(file);
104+
}
90105
}
91106
return list;
92107
}

spring-integration-file/src/test/java/org/springframework/integration/file/WatchServiceDirectoryScannerTests.java

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,9 @@
3939
import org.junit.rules.TemporaryFolder;
4040

4141
import org.springframework.beans.factory.BeanFactory;
42+
import org.springframework.integration.file.filters.ChainFileListFilter;
4243
import org.springframework.integration.file.filters.FileSystemPersistentAcceptOnceFileListFilter;
44+
import org.springframework.integration.file.filters.LastModifiedFileListFilter;
4345
import org.springframework.integration.metadata.SimpleMetadataStore;
4446
import org.springframework.integration.test.util.TestUtils;
4547
import org.springframework.messaging.Message;
@@ -75,6 +77,7 @@ public void setUp() throws IOException {
7577
}
7678

7779
@Test
80+
@SuppressWarnings("unchecked")
7881
public void testWatchServiceDirectoryScanner() throws Exception {
7982
FileReadingMessageSource fileReadingMessageSource = new FileReadingMessageSource();
8083
fileReadingMessageSource.setDirectory(folder.getRoot());
@@ -86,7 +89,7 @@ public void testWatchServiceDirectoryScanner() throws Exception {
8689

8790
final CountDownLatch removeFileLatch = new CountDownLatch(1);
8891

89-
FileSystemPersistentAcceptOnceFileListFilter filter =
92+
FileSystemPersistentAcceptOnceFileListFilter fileSystemPersistentAcceptOnceFileListFilter =
9093
new FileSystemPersistentAcceptOnceFileListFilter(new SimpleMetadataStore(), "test") {
9194

9295
@Override
@@ -97,14 +100,24 @@ public boolean remove(File fileToRemove) {
97100

98101
};
99102

100-
fileReadingMessageSource.setFilter(filter);
103+
LastModifiedFileListFilter fileLastModifiedFileListFilter = new LastModifiedFileListFilter();
104+
105+
ChainFileListFilter<File> fileChainFileListFilter = new ChainFileListFilter<>();
106+
fileChainFileListFilter.addFilters(fileLastModifiedFileListFilter, fileSystemPersistentAcceptOnceFileListFilter);
107+
108+
fileReadingMessageSource.setFilter(fileChainFileListFilter);
101109
fileReadingMessageSource.afterPropertiesSet();
102110
fileReadingMessageSource.start();
103111
DirectoryScanner scanner = fileReadingMessageSource.getScanner();
104112
assertThat(scanner.getClass().getName(),
105113
containsString("FileReadingMessageSource$WatchServiceDirectoryScanner"));
106114

115+
// Files are skipped by the LastModifiedFileListFilter
107116
List<File> files = scanner.listFiles(folder.getRoot());
117+
assertEquals(0, files.size());
118+
// Consider all the files as one day old
119+
fileLastModifiedFileListFilter.setAge(-60 * 60 * 24);
120+
files = scanner.listFiles(folder.getRoot());
108121
assertEquals(3, files.size());
109122
assertTrue(files.contains(top1));
110123
assertTrue(files.contains(foo1));

src/reference/asciidoc/file.adoc

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,17 @@ For this purpose all the XML components for file handling (local and remote), al
130130
auto-startup="false"/>
131131
----
132132

133+
Starting with _version 5.0.5_, a `DiscardAwareFileListFilter` is provided for implementations when there is an interest in the event of the rejected files.
134+
For this purpose such a filter implementation should be supplied with a callback via `addDiscardCallback(Consumer<File>)`.
135+
In the Framework this functionality is used from the `FileReadingMessageSource.WatchServiceDirectoryScanner` in combination with `LastModifiedFileListFilter`.
136+
Unlike the regular `DirectoryScanner`, the `WatchService` provides files for processing according the events on the target file system.
137+
At the moment of polling an internal queue with those files, the `LastModifiedFileListFilter` may discard them because they are too young in regards to its configured `age`.
138+
Therefore we lose the file for the future possible considerations.
139+
The discard callback hook allows us to retain the file in the internal queue, so it is available to be checked against the `age` in the subsequent polls.
140+
The `CompositeFileListFilter` also implements a `DiscardAwareFileListFilter` and populates provided discard callback to all its `DiscardAwareFileListFilter` delegates.
141+
142+
NOTE: Since `CompositeFileListFilter` matches the files against all delegates, the `discardCallback` may be called several times for the same file.
143+
133144
*Message Headers*
134145

135146
Starting with _version 5.0_ the `FileReadingMessageSource`, in addition to the `payload` as a polled `File`, populates these headers to the outbound `Message`:

0 commit comments

Comments
 (0)