@@ -114,3 +114,121 @@ where
114
114
}
115
115
}
116
116
117
+ #[ cfg( test) ]
118
+ mod tests {
119
+ use futures:: { future, Async , Future , Poll } ;
120
+ use super :: * ;
121
+
122
+ struct TestMe {
123
+ draining : bool ,
124
+ finished : bool ,
125
+ poll_cnt : usize ,
126
+ }
127
+
128
+ impl Future for TestMe {
129
+ type Item = ( ) ;
130
+ type Error = ( ) ;
131
+
132
+ fn poll ( & mut self ) -> Poll < Self :: Item , Self :: Error > {
133
+ self . poll_cnt += 1 ;
134
+ if self . finished {
135
+ Ok ( Async :: Ready ( ( ) ) )
136
+ } else {
137
+ Ok ( Async :: NotReady )
138
+ }
139
+ }
140
+ }
141
+
142
+ #[ test]
143
+ fn watch ( ) {
144
+ future:: lazy ( || {
145
+ let ( tx, rx) = channel ( ) ;
146
+ let fut = TestMe {
147
+ draining : false ,
148
+ finished : false ,
149
+ poll_cnt : 0 ,
150
+ } ;
151
+
152
+ let mut watch = rx. watch ( fut, |fut| {
153
+ fut. draining = true ;
154
+ } ) ;
155
+
156
+ assert_eq ! ( watch. future. poll_cnt, 0 ) ;
157
+
158
+ // First poll should poll the inner future
159
+ assert ! ( watch. poll( ) . unwrap( ) . is_not_ready( ) ) ;
160
+ assert_eq ! ( watch. future. poll_cnt, 1 ) ;
161
+
162
+ // Second poll should poll the inner future again
163
+ assert ! ( watch. poll( ) . unwrap( ) . is_not_ready( ) ) ;
164
+ assert_eq ! ( watch. future. poll_cnt, 2 ) ;
165
+
166
+ let mut draining = tx. drain ( ) ;
167
+ // Drain signaled, but needs another poll to be noticed.
168
+ assert ! ( !watch. future. draining) ;
169
+ assert_eq ! ( watch. future. poll_cnt, 2 ) ;
170
+
171
+ // Now, poll after drain has been signaled.
172
+ assert ! ( watch. poll( ) . unwrap( ) . is_not_ready( ) ) ;
173
+ assert_eq ! ( watch. future. poll_cnt, 3 ) ;
174
+ assert ! ( watch. future. draining) ;
175
+
176
+ // Draining is not ready until watcher completes
177
+ assert ! ( draining. poll( ) . unwrap( ) . is_not_ready( ) ) ;
178
+
179
+ // Finishing up the watch future
180
+ watch. future . finished = true ;
181
+ assert ! ( watch. poll( ) . unwrap( ) . is_ready( ) ) ;
182
+ assert_eq ! ( watch. future. poll_cnt, 4 ) ;
183
+ drop ( watch) ;
184
+
185
+ assert ! ( draining. poll( ) . unwrap( ) . is_ready( ) ) ;
186
+
187
+ Ok :: < _ , ( ) > ( ( ) )
188
+ } ) . wait ( ) . unwrap ( ) ;
189
+ }
190
+
191
+ #[ test]
192
+ fn watch_clones ( ) {
193
+ future:: lazy ( || {
194
+ let ( tx, rx) = channel ( ) ;
195
+
196
+ let fut1 = TestMe {
197
+ draining : false ,
198
+ finished : false ,
199
+ poll_cnt : 0 ,
200
+ } ;
201
+ let fut2 = TestMe {
202
+ draining : false ,
203
+ finished : false ,
204
+ poll_cnt : 0 ,
205
+ } ;
206
+
207
+ let watch1 = rx. clone ( ) . watch ( fut1, |fut| {
208
+ fut. draining = true ;
209
+ } ) ;
210
+ let watch2 = rx. watch ( fut2, |fut| {
211
+ fut. draining = true ;
212
+ } ) ;
213
+
214
+ let mut draining = tx. drain ( ) ;
215
+
216
+ // Still 2 outstanding watchers
217
+ assert ! ( draining. poll( ) . unwrap( ) . is_not_ready( ) ) ;
218
+
219
+ // drop 1 for whatever reason
220
+ drop ( watch1) ;
221
+
222
+ // Still not ready, 1 other watcher still pending
223
+ assert ! ( draining. poll( ) . unwrap( ) . is_not_ready( ) ) ;
224
+
225
+ drop ( watch2) ;
226
+
227
+ // Now all watchers are gone, draining is complete
228
+ assert ! ( draining. poll( ) . unwrap( ) . is_ready( ) ) ;
229
+
230
+ Ok :: < _ , ( ) > ( ( ) )
231
+ } ) . wait ( ) . unwrap ( ) ;
232
+ }
233
+ }
234
+
0 commit comments