@@ -132,122 +132,88 @@ pub mod streamp {
132
132
}
133
133
}
134
134
135
- struct Chan_<T> {
136
- mut endp: Option<streamp::client::Open<T>>
137
- }
138
-
139
135
/// An endpoint that can send many messages.
140
- pub enum Chan<T> {
141
- Chan_(Chan_<T>)
142
- }
143
-
144
- struct Port_<T> {
145
- mut endp: Option<streamp::server::Open<T>>,
136
+ pub struct Chan<T> {
137
+ mut endp: Option<streamp::client::Open<T>>
146
138
}
147
139
148
140
/// An endpoint that can receive many messages.
149
- pub enum Port<T> {
150
- Port_(Port_<T>)
141
+ pub struct Port<T> {
142
+ mut endp: Option<streamp::server::Open<T>>,
151
143
}
152
144
153
- /** Creates a `(chan, port )` pair.
145
+ /** Creates a `(Port, Chan )` pair.
154
146
155
147
These allow sending or receiving an unlimited number of messages.
156
148
157
149
*/
158
150
pub fn stream<T:Owned>() -> (Port<T>, Chan<T>) {
159
151
let (c, s) = streamp::init();
160
152
161
- (Port_(Port_ { endp: Some(s) }), Chan_(Chan_{ endp: Some(c) }))
162
- }
163
-
164
- // Add an inherent method so that imports of GenericChan are not
165
- // required.
166
- pub impl<T: Owned> Chan<T> {
167
- fn send(&self, x: T) { chan_send(self, x) }
168
- fn try_send(&self, x: T) -> bool { chan_try_send(self, x) }
153
+ (Port { endp: Some(s) }, Chan { endp: Some(c) })
169
154
}
170
155
171
156
impl<T: Owned> GenericChan<T> for Chan<T> {
172
- fn send(&self, x: T) { chan_send(self, x) }
173
- }
174
-
175
- #[inline(always)]
176
- fn chan_send<T:Owned>(self: &Chan<T>, x: T) {
177
- let mut endp = None;
178
- endp <-> self.endp;
179
- self.endp = Some(
180
- streamp::client::data(endp.unwrap(), x))
157
+ #[inline(always)]
158
+ fn send(&self, x: T) {
159
+ let mut endp = None;
160
+ endp <-> self.endp;
161
+ self.endp = Some(
162
+ streamp::client::data(endp.unwrap(), x))
163
+ }
181
164
}
182
165
183
166
impl<T: Owned> GenericSmartChan<T> for Chan<T> {
167
+ #[inline(always)]
184
168
fn try_send(&self, x: T) -> bool {
185
- chan_try_send(self, x)
186
- }
187
- }
188
-
189
- #[inline(always)]
190
- fn chan_try_send<T:Owned>(self: &Chan<T>, x: T) -> bool {
191
- let mut endp = None;
192
- endp <-> self.endp;
193
- match streamp::client::try_data(endp.unwrap(), x) {
194
- Some(next) => {
195
- self.endp = Some(next);
196
- true
169
+ let mut endp = None;
170
+ endp <-> self.endp;
171
+ match streamp::client::try_data(endp.unwrap(), x) {
172
+ Some(next) => {
173
+ self.endp = Some(next);
174
+ true
175
+ }
176
+ None => false
197
177
}
198
- None => false
199
178
}
200
179
}
201
180
202
- // Use an inherent impl so that imports are not required:
203
- pub impl<T: Owned> Port<T> {
204
- fn recv(&self) -> T { port_recv(self) }
205
- fn try_recv(&self) -> Option<T> { port_try_recv(self) }
206
- fn peek(&self) -> bool { port_peek(self) }
207
- }
208
-
209
181
impl<T: Owned> GenericPort<T> for Port<T> {
210
- // These two calls will prefer the inherent versions above:
211
- fn recv(&self) -> T { port_recv(self) }
212
- fn try_recv(&self) -> Option<T> { port_try_recv(self) }
213
- }
214
-
215
- #[inline(always)]
216
- fn port_recv<T:Owned>(self: &Port<T>) -> T {
217
- let mut endp = None;
218
- endp <-> self.endp;
219
- let streamp::data(x, endp) = recv(endp.unwrap());
220
- self.endp = Some(endp);
221
- x
222
- }
223
-
224
- #[inline(always)]
225
- fn port_try_recv<T:Owned>(self: &Port<T>) -> Option<T> {
226
- let mut endp = None;
227
- endp <-> self.endp;
228
- match try_recv(endp.unwrap()) {
229
- Some(streamp::data(x, endp)) => {
230
- self.endp = Some(endp);
231
- Some(x)
182
+ #[inline(always)]
183
+ fn recv(&self) -> T {
184
+ let mut endp = None;
185
+ endp <-> self.endp;
186
+ let streamp::data(x, endp) = recv(endp.unwrap());
187
+ self.endp = Some(endp);
188
+ x
189
+ }
190
+
191
+ #[inline(always)]
192
+ fn try_recv(&self) -> Option<T> {
193
+ let mut endp = None;
194
+ endp <-> self.endp;
195
+ match try_recv(endp.unwrap()) {
196
+ Some(streamp::data(x, endp)) => {
197
+ self.endp = Some(endp);
198
+ Some(x)
199
+ }
200
+ None => None
232
201
}
233
- None => None
234
202
}
235
203
}
236
204
237
205
impl<T: Owned> Peekable<T> for Port<T> {
238
- fn peek(&self) -> bool { port_peek(self) }
239
- }
240
-
241
- #[inline(always)]
242
- fn port_peek<T:Owned>(self: &Port<T>) -> bool {
243
- let mut endp = None;
244
- endp <-> self.endp;
245
- let peek = match &endp {
246
- &Some(ref endp) => peek(endp),
247
- &None => fail!(~" peeking empty stream")
248
- };
249
- self.endp <-> endp;
250
- peek
206
+ #[inline(always)]
207
+ fn peek(&self) -> bool {
208
+ let mut endp = None;
209
+ endp <-> self.endp;
210
+ let peek = match &endp {
211
+ &Some(ref endp) => peek(endp),
212
+ &None => fail!(~" peeking empty stream")
213
+ };
214
+ self.endp <-> endp;
215
+ peek
216
+ }
251
217
}
252
218
253
219
impl<T: Owned> Selectable for Port<T> {
@@ -272,13 +238,6 @@ pub fn PortSet<T: Owned>() -> PortSet<T>{
272
238
}
273
239
}
274
240
275
- // Use an inherent impl so that imports are not required:
276
- pub impl<T:Owned> PortSet<T> {
277
- fn recv(&self) -> T { port_set_recv(self) }
278
- fn try_recv(&self) -> Option<T> { port_set_try_recv(self) }
279
- fn peek(&self) -> bool { port_set_peek(self) }
280
- }
281
-
282
241
pub impl<T: Owned> PortSet<T> {
283
242
fn add(&self, port: Port<T>) {
284
243
self.ports.push(port)
@@ -292,90 +251,69 @@ pub impl<T: Owned> PortSet<T> {
292
251
}
293
252
294
253
impl<T:Owned> GenericPort<T> for PortSet<T> {
295
- fn try_recv(&self) -> Option<T> { port_set_try_recv(self) }
296
- fn recv(&self) -> T { port_set_recv(self) }
297
- }
298
-
299
- #[inline(always)]
300
- fn port_set_recv<T:Owned>(self: &PortSet<T>) -> T {
301
- port_set_try_recv(self).expect(" port_set: endpoints closed")
302
- }
303
-
304
- #[inline(always)]
305
- fn port_set_try_recv<T:Owned>(self: &PortSet<T>) -> Option<T> {
306
- let mut result = None;
307
- // we have to swap the ports array so we aren't borrowing
308
- // aliasable mutable memory.
309
- let mut ports = ~[];
310
- ports <-> self.ports;
311
- while result.is_none() && ports.len() > 0 {
312
- let i = wait_many(ports);
313
- match ports[i].try_recv() {
314
- Some(m) => {
315
- result = Some(m);
316
- }
317
- None => {
318
- // Remove this port.
319
- let _ = ports.swap_remove(i);
254
+ fn try_recv(&self) -> Option<T> {
255
+ let mut result = None;
256
+ // we have to swap the ports array so we aren't borrowing
257
+ // aliasable mutable memory.
258
+ let mut ports = ~[];
259
+ ports <-> self.ports;
260
+ while result.is_none() && ports.len() > 0 {
261
+ let i = wait_many(ports);
262
+ match ports[i].try_recv() {
263
+ Some(m) => {
264
+ result = Some(m);
265
+ }
266
+ None => {
267
+ // Remove this port.
268
+ let _ = ports.swap_remove(i);
269
+ }
320
270
}
321
271
}
272
+ ports <-> self.ports;
273
+ result
274
+ }
275
+ fn recv(&self) -> T {
276
+ self.try_recv().expect(" port_set: endpoints closed")
322
277
}
323
- ports <-> self.ports;
324
- result
325
278
}
326
279
327
280
impl<T: Owned> Peekable<T> for PortSet<T> {
328
- fn peek(&self) -> bool { port_set_peek(self) }
329
- }
330
-
331
- #[inline(always)]
332
- fn port_set_peek<T:Owned>(self: &PortSet<T>) -> bool {
333
- // It'd be nice to use self.port.each, but that version isn't
334
- // pure.
335
- for uint::range(0, vec::uniq_len(&const self.ports)) |i| {
336
- // XXX: Botch pending demuting.
337
- unsafe {
338
- let port: &Port<T> = cast::transmute(&mut self.ports[i]);
339
- if port.peek() { return true }
281
+ fn peek(&self) -> bool {
282
+ // It'd be nice to use self.port.each, but that version isn't
283
+ // pure.
284
+ for uint::range(0, vec::uniq_len(&const self.ports)) |i| {
285
+ // XXX: Botch pending demuting.
286
+ unsafe {
287
+ let port: &Port<T> = cast::transmute(&mut self.ports[i]);
288
+ if port.peek() { return true }
289
+ }
340
290
}
291
+ false
341
292
}
342
- false
343
293
}
344
294
345
-
346
295
/// A channel that can be shared between many senders.
347
296
pub type SharedChan<T> = unstable::Exclusive<Chan<T>>;
348
297
349
- pub impl<T: Owned> SharedChan<T> {
350
- fn send(&self, x: T) { shared_chan_send(self, x) }
351
- fn try_send(&self, x: T) -> bool { shared_chan_try_send(self, x) }
352
- }
353
-
354
298
impl<T: Owned> GenericChan<T> for SharedChan<T> {
355
- fn send(&self, x: T) { shared_chan_send(self, x) }
356
- }
357
-
358
- #[inline(always)]
359
- fn shared_chan_send<T:Owned>(self: &SharedChan<T>, x: T) {
360
- let mut xx = Some(x);
361
- do self.with_imm |chan| {
362
- let mut x = None;
363
- x <-> xx;
364
- chan.send(x.unwrap())
299
+ fn send(&self, x: T) {
300
+ let mut xx = Some(x);
301
+ do self.with_imm |chan| {
302
+ let mut x = None;
303
+ x <-> xx;
304
+ chan.send(x.unwrap())
305
+ }
365
306
}
366
307
}
367
308
368
309
impl<T: Owned> GenericSmartChan<T> for SharedChan<T> {
369
- fn try_send(&self, x: T) -> bool { shared_chan_try_send(self, x) }
370
- }
371
-
372
- #[inline(always)]
373
- fn shared_chan_try_send<T:Owned>(self: &SharedChan<T>, x: T) -> bool {
374
- let mut xx = Some(x);
375
- do self.with_imm |chan| {
376
- let mut x = None;
377
- x <-> xx;
378
- chan.try_send(x.unwrap())
310
+ fn try_send(&self, x: T) -> bool {
311
+ let mut xx = Some(x);
312
+ do self.with_imm |chan| {
313
+ let mut x = None;
314
+ x <-> xx;
315
+ chan.try_send(x.unwrap())
316
+ }
379
317
}
380
318
}
381
319
0 commit comments