Skip to content
This repository was archived by the owner on Jul 23, 2019. It is now read-only.

Commit 698496a

Browse files
Nathan SoboMax Brunsfeld
Nathan Sobo
and
Max Brunsfeld
committed
Combine incoming and outgoing app messages into one future
Co-authored-by: Max Brunsfeld <[email protected]>
1 parent 88180c4 commit 698496a

File tree

1 file changed

+20
-23
lines changed

1 file changed

+20
-23
lines changed

xray_server/src/app.rs

Lines changed: 20 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -70,28 +70,29 @@ impl App {
7070
O: 'static + Sink<SinkItem = OutgoingMessage>,
7171
I: 'static + Stream<Item = IncomingMessage, Error = io::Error>
7272
{
73-
{
74-
let mut inner = inner.borrow_mut();
75-
let (tx, rx) = mpsc::unbounded();
76-
if inner.app_channel.is_some() {
77-
eprintln!("Redundant app client");
78-
return;
79-
}
80-
81-
inner.app_channel = Some(tx);
82-
inner.reactor.spawn(
83-
outgoing.send_all(rx.map_err(|_| unreachable!())).then(|_| Ok(()))
84-
);
73+
let mut inner_borrow = inner.borrow_mut();
74+
let (tx, rx) = mpsc::unbounded();
75+
if inner_borrow.app_channel.is_some() {
76+
eprintln!("Redundant app client");
77+
return;
8578
}
8679

87-
Self::handle_app_messages(inner, incoming);
80+
inner_borrow.app_channel = Some(tx);
81+
82+
let receive_incoming = Self::handle_app_messages(inner.clone(), incoming);
83+
let send_outgoing = outgoing.send_all(rx.map_err(|_| unreachable!())).then(|_| Ok(()));
84+
inner_borrow.reactor.spawn(
85+
receive_incoming
86+
.select(send_outgoing)
87+
.then(|_: Result<((), _), ((), _)>| Ok(()))
88+
);
8889
}
8990

9091
fn start_cli<I>(inner: Rc<RefCell<Inner>>, incoming: I)
9192
where
9293
I: 'static + Stream<Item = IncomingMessage, Error = io::Error>
9394
{
94-
Self::handle_app_messages(inner, incoming);
95+
inner.borrow_mut().reactor.spawn(Self::handle_app_messages(inner.clone(), incoming));
9596
}
9697

9798
fn start_window<O, I>(inner: Rc<RefCell<Inner>>, outgoing: O, incoming: I, window_id: WindowId)
@@ -119,18 +120,14 @@ impl App {
119120
);
120121
}
121122

122-
fn handle_app_messages<I>(inner: Rc<RefCell<Inner>>, incoming: I)
123+
fn handle_app_messages<I>(inner: Rc<RefCell<Inner>>, incoming: I) -> Box<Future<Item = (), Error = ()>>
123124
where
124125
I: 'static + Stream<Item = IncomingMessage, Error = io::Error>
125126
{
126-
let inner_clone = inner.clone();
127-
let inner = inner.borrow_mut();
128-
inner.reactor.spawn(
129-
incoming.for_each(move |message| {
130-
inner_clone.borrow_mut().handle_app_message(message);
131-
Ok(())
132-
}).then(|_| Ok(()))
133-
);
127+
Box::new(incoming.for_each(move |message| {
128+
inner.borrow_mut().handle_app_message(message);
129+
Ok(())
130+
}).then(|_| Ok(())))
134131
}
135132
}
136133

0 commit comments

Comments
 (0)