Skip to content

std: Remove PortSet. Not supported by new scheduler. Replace uses with S... #8164

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions src/libextra/arc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -576,16 +576,12 @@ mod tests {
let (p, c) = comm::stream();

do task::spawn() || {
let p = comm::PortSet::new();
c.send(p.chan());

let arc_v : Arc<~[int]> = p.recv();

let v = (*arc_v.get()).clone();
assert_eq!(v[3], 4);
};

let c = p.recv();
c.send(arc_v.clone());

assert_eq!(arc_v.get()[2], 3);
Expand Down
83 changes: 2 additions & 81 deletions src/libstd/comm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,10 @@ Message passing

#[allow(missing_doc)];

use cast::{transmute, transmute_mut};
use container::Container;
use cast::transmute;
use either::{Either, Left, Right};
use kinds::Send;
use option::{Option, Some, None};
use uint;
use vec::OwnedVector;
use util::replace;
use option::{Option, Some};
use unstable::sync::Exclusive;
use rtcomm = rt::comm;
use rt;
Expand Down Expand Up @@ -143,81 +139,6 @@ impl<T: Send> Selectable for Port<T> {
}
}

/// Treat many ports as one.
#[unsafe_mut_field(ports)]
pub struct PortSet<T> {
ports: ~[pipesy::Port<T>],
}

impl<T: Send> PortSet<T> {
pub fn new() -> PortSet<T> {
PortSet {
ports: ~[]
}
}

pub fn add(&self, port: Port<T>) {
let Port { inner } = port;
let port = match inner {
Left(p) => p,
Right(_) => fail!("PortSet not implemented")
};
unsafe {
let self_ports = transmute_mut(&self.ports);
self_ports.push(port)
}
}

pub fn chan(&self) -> Chan<T> {
let (po, ch) = stream();
self.add(po);
ch
}
}

impl<T:Send> GenericPort<T> for PortSet<T> {
fn try_recv(&self) -> Option<T> {
unsafe {
let self_ports = transmute_mut(&self.ports);
let mut result = None;
// we have to swap the ports array so we aren't borrowing
// aliasable mutable memory.
let mut ports = replace(self_ports, ~[]);
while result.is_none() && ports.len() > 0 {
let i = wait_many(ports);
match ports[i].try_recv() {
Some(m) => {
result = Some(m);
}
None => {
// Remove this port.
let _ = ports.swap_remove(i);
}
}
}
*self_ports = ports;
result
}
}
fn recv(&self) -> T {
self.try_recv().expect("port_set: endpoints closed")
}
}

impl<T: Send> Peekable<T> for PortSet<T> {
fn peek(&self) -> bool {
// It'd be nice to use self.port.each, but that version isn't
// pure.
for uint::range(0, self.ports.len()) |i| {
let port: &pipesy::Port<T> = &self.ports[i];
if port.peek() {
return true;
}
}
false
}
}

/// A channel that can be shared between many senders.
pub struct SharedChan<T> {
inner: Either<Exclusive<pipesy::Chan<T>>, rtcomm::SharedChan<T>>
Expand Down
12 changes: 5 additions & 7 deletions src/test/bench/msgsend-pipes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

extern mod extra;

use std::comm::{PortSet, Chan, stream};
use std::comm::{SharedChan, Chan, stream};
use std::io;
use std::os;
use std::task;
Expand All @@ -30,7 +30,7 @@ enum request {
stop
}

fn server(requests: &PortSet<request>, responses: &Chan<uint>) {
fn server(requests: &Port<request>, responses: &Chan<uint>) {
let mut count: uint = 0;
let mut done = false;
while !done {
Expand All @@ -50,18 +50,16 @@ fn server(requests: &PortSet<request>, responses: &Chan<uint>) {

fn run(args: &[~str]) {
let (from_child, to_parent) = stream();
let (from_parent_, to_child) = stream();
let from_parent = PortSet::new();
from_parent.add(from_parent_);
let (from_parent, to_child) = stream();
let to_child = SharedChan::new(to_child);

let size = uint::from_str(args[1]).get();
let workers = uint::from_str(args[2]).get();
let num_bytes = 100;
let start = extra::time::precise_time_s();
let mut worker_results = ~[];
for uint::range(0, workers) |_i| {
let (from_parent_, to_child) = stream();
from_parent.add(from_parent_);
let to_child = to_child.clone();
let mut builder = task::task();
builder.future_result(|r| worker_results.push(r));
do builder.spawn {
Expand Down
12 changes: 7 additions & 5 deletions src/test/bench/shootout-pfib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,22 +33,24 @@ use std::u64;
use std::uint;

fn fib(n: int) -> int {
fn pfib(c: &Chan<int>, n: int) {
fn pfib(c: &SharedChan<int>, n: int) {
if n == 0 {
c.send(0);
} else if n <= 2 {
c.send(1);
} else {
let p = PortSet::new();
let ch = p.chan();
let (pp, cc) = stream();
let cc = SharedChan::new(cc);
let ch = cc.clone();
task::spawn(|| pfib(&ch, n - 1) );
let ch = p.chan();
let ch = cc.clone();
task::spawn(|| pfib(&ch, n - 2) );
c.send(p.recv() + p.recv());
c.send(pp.recv() + pp.recv());
}
}

let (p, ch) = stream();
let ch = SharedChan::new(ch);
let _t = task::spawn(|| pfib(&ch, n) );
p.recv()
}
Expand Down
8 changes: 4 additions & 4 deletions src/test/run-pass/task-comm-14.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,14 @@ use std::comm;
use std::task;

pub fn main() {
let po = comm::PortSet::new();
let (po, ch) = comm::stream();
let ch = comm::SharedChan::new(ch);

// Spawn 10 tasks each sending us back one int.
let mut i = 10;
while (i > 0) {
info!(i);
let (p, ch) = comm::stream();
po.add(p);
let ch = ch.clone();
task::spawn({let i = i; || child(i, &ch)});
i = i - 1;
}
Expand All @@ -39,7 +39,7 @@ pub fn main() {
info!("main thread exiting");
}

fn child(x: int, ch: &comm::Chan<int>) {
fn child(x: int, ch: &comm::SharedChan<int>) {
info!(x);
ch.send(x);
}
9 changes: 5 additions & 4 deletions src/test/run-pass/task-comm-3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,13 @@

extern mod extra;

use std::comm::Chan;
use std::comm::SharedChan;
use std::comm;
use std::task;

pub fn main() { info!("===== WITHOUT THREADS ====="); test00(); }

fn test00_start(ch: &Chan<int>, message: int, count: int) {
fn test00_start(ch: &SharedChan<int>, message: int, count: int) {
info!("Starting test00_start");
let mut i: int = 0;
while i < count {
Expand All @@ -35,14 +35,15 @@ fn test00() {

info!("Creating tasks");

let po = comm::PortSet::new();
let (po, ch) = comm::stream();
let ch = comm::SharedChan::new(ch);

let mut i: int = 0;

// Create and spawn tasks...
let mut results = ~[];
while i < number_of_tasks {
let ch = po.chan();
let ch = ch.clone();
let mut builder = task::task();
builder.future_result(|r| results.push(r));
builder.spawn({
Expand Down
13 changes: 7 additions & 6 deletions src/test/run-pass/task-comm-6.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,20 @@
// option. This file may not be copied, modified, or distributed
// except according to those terms.

use std::comm::Chan;
use std::comm::SharedChan;
use std::comm;

pub fn main() { test00(); }

fn test00() {
let mut r: int = 0;
let mut sum: int = 0;
let p = comm::PortSet::new();
let c0 = p.chan();
let c1 = p.chan();
let c2 = p.chan();
let c3 = p.chan();
let (p, ch) = comm::stream();
let ch = SharedChan::new(ch);
let c0 = ch.clone();
let c1 = ch.clone();
let c2 = ch.clone();
let c3 = ch.clone();
let number_of_messages: int = 1000;
let mut i: int = 0;
while i < number_of_messages {
Expand Down
13 changes: 7 additions & 6 deletions src/test/run-pass/task-comm-7.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,30 +17,31 @@ use std::task;

pub fn main() { test00(); }

fn test00_start(c: &comm::Chan<int>, start: int, number_of_messages: int) {
fn test00_start(c: &comm::SharedChan<int>, start: int, number_of_messages: int) {
let mut i: int = 0;
while i < number_of_messages { c.send(start + i); i += 1; }
}

fn test00() {
let mut r: int = 0;
let mut sum: int = 0;
let p = comm::PortSet::new();
let (p, ch) = comm::stream();
let ch = comm::SharedChan::new(ch);
let number_of_messages: int = 10;

let c = p.chan();
let c = ch.clone();
do task::spawn || {
test00_start(&c, number_of_messages * 0, number_of_messages);
}
let c = p.chan();
let c = ch.clone();
do task::spawn || {
test00_start(&c, number_of_messages * 1, number_of_messages);
}
let c = p.chan();
let c = ch.clone();
do task::spawn || {
test00_start(&c, number_of_messages * 2, number_of_messages);
}
let c = p.chan();
let c = ch.clone();
do task::spawn || {
test00_start(&c, number_of_messages * 3, number_of_messages);
}
Expand Down
3 changes: 1 addition & 2 deletions src/test/run-pass/task-comm-9.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,8 @@ fn test00_start(c: &comm::Chan<int>, number_of_messages: int) {
fn test00() {
let r: int = 0;
let mut sum: int = 0;
let p = comm::PortSet::new();
let (p, ch) = comm::stream();
let number_of_messages: int = 10;
let ch = p.chan();

let mut result = None;
let mut builder = task::task();
Expand Down