1
1
/*
2
- * Copyright 2002-2017 the original author or authors.
2
+ * Copyright 2002-2018 the original author or authors.
3
3
*
4
4
* Licensed under the Apache License, Version 2.0 (the "License");
5
5
* you may not use this file except in compliance with the License.
19
19
import static org .junit .Assert .assertEquals ;
20
20
import static org .mockito .Mockito .mock ;
21
21
22
- import java .util .Date ;
23
- import java .util .concurrent .CountDownLatch ;
24
- import java .util .concurrent .TimeUnit ;
25
- import java .util .concurrent .atomic .AtomicBoolean ;
26
22
import java .util .concurrent .atomic .AtomicInteger ;
27
23
28
24
import org .junit .After ;
33
29
import org .springframework .beans .factory .BeanFactory ;
34
30
import org .springframework .integration .MessageRejectedException ;
35
31
import org .springframework .integration .support .MessagingExceptionWrapper ;
32
+ import org .springframework .integration .test .util .OnlyOnceTrigger ;
36
33
import org .springframework .messaging .Message ;
37
34
import org .springframework .messaging .MessageHandler ;
38
35
import org .springframework .messaging .PollableChannel ;
39
36
import org .springframework .messaging .support .GenericMessage ;
40
- import org .springframework .scheduling .Trigger ;
41
- import org .springframework .scheduling .TriggerContext ;
42
37
import org .springframework .scheduling .concurrent .ThreadPoolTaskScheduler ;
43
38
import org .springframework .util .ErrorHandler ;
44
39
45
40
/**
46
41
* @author Iwein Fuld
47
42
* @author Mark Fisher
48
43
* @author Kiel Boatman
44
+ * @author Artem Bilan
49
45
*/
50
- @ SuppressWarnings (" unchecked" )
46
+ @ SuppressWarnings ({ "rawtypes" , " unchecked" } )
51
47
public class PollingConsumerEndpointTests {
52
48
53
- private PollingConsumer endpoint ;
54
-
55
- private final TestTrigger trigger = new TestTrigger ();
49
+ private final OnlyOnceTrigger trigger = new OnlyOnceTrigger ();
56
50
57
51
private final TestConsumer consumer = new TestConsumer ();
58
52
59
- @ SuppressWarnings ("rawtypes" )
60
- private final Message message = new GenericMessage <String >("test" );
53
+ private final Message message = new GenericMessage <>("test" );
61
54
62
- @ SuppressWarnings ("rawtypes" )
63
- private final Message badMessage = new GenericMessage <String >("bad" );
55
+ private final Message badMessage = new GenericMessage <>("bad" );
64
56
65
57
private final TestErrorHandler errorHandler = new TestErrorHandler ();
66
58
67
- private PollableChannel channelMock ;
68
-
69
59
private final ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler ();
70
60
61
+ private PollingConsumer endpoint ;
62
+
63
+ private PollableChannel channelMock ;
64
+
71
65
72
66
@ Before
73
- public void init () throws Exception {
74
- channelMock = Mockito .mock (PollableChannel .class );
75
- consumer .counter .set (0 );
76
- trigger .reset ();
77
- endpoint = new PollingConsumer (channelMock , consumer );
78
- taskScheduler .setPoolSize (5 );
79
- endpoint .setErrorHandler (errorHandler );
80
- endpoint .setTaskScheduler (taskScheduler );
81
- endpoint .setTrigger (trigger );
82
- endpoint .setBeanFactory (mock (BeanFactory .class ));
83
- endpoint .setReceiveTimeout (-1 );
84
- endpoint .afterPropertiesSet ();
85
- taskScheduler .afterPropertiesSet ();
67
+ public void init () {
68
+ this .channelMock = mock (PollableChannel .class );
69
+ this .endpoint = new PollingConsumer (this .channelMock , this .consumer );
70
+ this .taskScheduler .setPoolSize (5 );
71
+ this .endpoint .setErrorHandler (this .errorHandler );
72
+ this .endpoint .setTaskScheduler (this .taskScheduler );
73
+ this .endpoint .setTrigger (this .trigger );
74
+ this .endpoint .setBeanFactory (mock (BeanFactory .class ));
75
+ this .endpoint .setReceiveTimeout (-1 );
76
+ this .endpoint .afterPropertiesSet ();
77
+ this .taskScheduler .afterPropertiesSet ();
86
78
}
87
79
88
80
@ After
89
- public void stop () throws Exception {
81
+ public void stop () {
90
82
taskScheduler .destroy ();
91
83
}
92
84
93
85
94
86
@ Test
95
87
public void singleMessage () {
96
- Mockito .when (channelMock .receive ()).thenReturn (message );
97
- endpoint .setMaxMessagesPerPoll (1 );
98
- endpoint .start ();
99
- trigger .await ();
100
- endpoint .stop ();
101
- assertEquals (1 , consumer .counter .get ());
88
+ Mockito .when (this . channelMock .receive ()).thenReturn (this . message );
89
+ this . endpoint .setMaxMessagesPerPoll (1 );
90
+ this . endpoint .start ();
91
+ this . trigger .await ();
92
+ this . endpoint .stop ();
93
+ assertEquals (1 , this . consumer .counter .get ());
102
94
}
103
95
104
96
@ Test
105
97
public void multipleMessages () {
106
- Mockito .when (channelMock .receive ()).thenReturn (message , message , message , message , message );
107
- endpoint .setMaxMessagesPerPoll (5 );
108
- endpoint .start ();
109
- trigger .await ();
110
- endpoint .stop ();
111
- assertEquals (5 , consumer .counter .get ());
98
+ Mockito .when (this .channelMock .receive ())
99
+ .thenReturn (this .message , this .message , this .message , this .message , this .message );
100
+ this .endpoint .setMaxMessagesPerPoll (5 );
101
+ this .endpoint .start ();
102
+ this .trigger .await ();
103
+ this .endpoint .stop ();
104
+ assertEquals (5 , this .consumer .counter .get ());
112
105
}
113
106
114
107
@ Test
115
- public void multipleMessages_underrun () {
116
- Mockito .when (channelMock .receive ()).thenReturn (message , message , message , message , message , null );
117
- endpoint .setMaxMessagesPerPoll (6 );
118
- endpoint .start ();
119
- trigger .await ();
120
- endpoint .stop ();
121
- assertEquals (5 , consumer .counter .get ());
108
+ public void multipleMessages_under_run () {
109
+ Mockito .when (this .channelMock .receive ())
110
+ .thenReturn (this .message , this .message , this .message , this .message , this .message , null );
111
+ this .endpoint .setMaxMessagesPerPoll (6 );
112
+ this .endpoint .start ();
113
+ this .trigger .await ();
114
+ this .endpoint .stop ();
115
+ assertEquals (5 , this .consumer .counter .get ());
122
116
}
123
117
124
118
@ Test
125
- public void heavierLoadTest () throws Exception {
119
+ public void heavierLoadTest () {
126
120
for (int i = 0 ; i < 1000 ; i ++) {
127
- this .init ();
128
- this .multipleMessages ();
129
- this .stop ();
121
+ init ();
122
+ this .trigger .reset ();
123
+ this .consumer .counter .set (0 );
124
+ multipleMessages ();
125
+ stop ();
130
126
}
131
127
}
132
128
133
129
@ Test (expected = MessageRejectedException .class )
134
130
public void rejectedMessage () throws Throwable {
135
- Mockito .when (channelMock .receive ()).thenReturn (badMessage );
136
- endpoint .start ();
137
- trigger .await ();
138
- endpoint .stop ();
139
- assertEquals (1 , consumer .counter .get ());
140
- errorHandler .throwLastErrorIfAvailable ();
131
+ Mockito .when (this . channelMock .receive ()).thenReturn (this . badMessage );
132
+ this . endpoint .start ();
133
+ this . trigger .await ();
134
+ this . endpoint .stop ();
135
+ assertEquals (1 , this . consumer .counter .get ());
136
+ this . errorHandler .throwLastErrorIfAvailable ();
141
137
}
142
138
143
139
@ Test (expected = MessageRejectedException .class )
144
140
public void droppedMessage_onePerPoll () throws Throwable {
145
- Mockito .when (channelMock .receive ()).thenReturn (badMessage );
146
- endpoint .setMaxMessagesPerPoll (10 );
147
- endpoint .start ();
148
- trigger .await ();
149
- endpoint .stop ();
150
- assertEquals (1 , consumer .counter .get ());
151
- errorHandler .throwLastErrorIfAvailable ();
141
+ Mockito .when (this . channelMock .receive ()).thenReturn (this . badMessage );
142
+ this . endpoint .setMaxMessagesPerPoll (10 );
143
+ this . endpoint .start ();
144
+ this . trigger .await ();
145
+ this . endpoint .stop ();
146
+ assertEquals (1 , this . consumer .counter .get ());
147
+ this . errorHandler .throwLastErrorIfAvailable ();
152
148
}
153
149
154
150
@ Test
155
151
public void blockingSourceTimedOut () {
156
152
// we don't need to await the timeout, returning null suffices
157
- Mockito .when (channelMock .receive ()).thenReturn (null );
158
- endpoint .setReceiveTimeout (1 );
159
- endpoint .start ();
160
- trigger .await ();
161
- endpoint .stop ();
162
- assertEquals (0 , consumer .counter .get ());
153
+ Mockito .when (this . channelMock .receive ()).thenReturn (null );
154
+ this . endpoint .setReceiveTimeout (1 );
155
+ this . endpoint .start ();
156
+ this . trigger .await ();
157
+ this . endpoint .stop ();
158
+ assertEquals (0 , this . consumer .counter .get ());
163
159
}
164
160
165
161
@ Test
166
162
public void blockingSourceNotTimedOut () {
167
- Mockito .when (channelMock .receive (Mockito .eq (1L ))).thenReturn (message );
168
- endpoint .setReceiveTimeout (1 );
169
- endpoint .setMaxMessagesPerPoll (1 );
170
- endpoint .start ();
171
- trigger .await ();
172
- endpoint .stop ();
173
- assertEquals (1 , consumer .counter .get ());
163
+ Mockito .when (this . channelMock .receive (Mockito .eq (1L ))).thenReturn (this . message );
164
+ this . endpoint .setReceiveTimeout (1 );
165
+ this . endpoint .setMaxMessagesPerPoll (1 );
166
+ this . endpoint .start ();
167
+ this . trigger .await ();
168
+ this . endpoint .stop ();
169
+ assertEquals (1 , this . consumer .counter .get ());
174
170
}
175
171
176
172
@@ -192,45 +188,6 @@ public void handleMessage(Message<?> message) {
192
188
}
193
189
194
190
195
- private static class TestTrigger implements Trigger {
196
-
197
- private final AtomicBoolean hasRun = new AtomicBoolean ();
198
-
199
- private volatile CountDownLatch latch = new CountDownLatch (1 );
200
-
201
-
202
- TestTrigger () {
203
- super ();
204
- }
205
-
206
- @ Override
207
- public Date nextExecutionTime (TriggerContext triggerContext ) {
208
- if (!this .hasRun .getAndSet (true )) {
209
- return new Date ();
210
- }
211
- this .latch .countDown ();
212
- return null ;
213
- }
214
-
215
- public void reset () {
216
- this .latch = new CountDownLatch (1 );
217
- this .hasRun .set (false );
218
- }
219
-
220
- public void await () {
221
- try {
222
- this .latch .await (5000 , TimeUnit .MILLISECONDS );
223
- if (latch .getCount () != 0 ) {
224
- throw new RuntimeException ("test latch.await() did not count down" );
225
- }
226
- }
227
- catch (InterruptedException e ) {
228
- throw new RuntimeException ("test latch.await() interrupted" );
229
- }
230
- }
231
- }
232
-
233
-
234
191
private static class TestErrorHandler implements ErrorHandler {
235
192
236
193
private volatile Throwable lastError ;
@@ -252,6 +209,7 @@ public void throwLastErrorIfAvailable() throws Throwable {
252
209
this .lastError = null ;
253
210
throw t ;
254
211
}
212
+
255
213
}
256
214
257
215
}
0 commit comments