Skip to content

net_tcp read timeout #3599

Closed
Closed
@jesse99

Description

@jesse99

Not completely sure if this is a bug because I'm finding it difficult to figure out how to use net_tcp using the unit tests. But the below code times out when both the client and the server try to read.

// rustc --test tcp.rs && export RUST_LOG=tcp=3,std::net_tcp=3 && ./tcp
extern mod std;

use ip = std::net_ip;
use std::net_ip::{IpAddr};
use tcp = std::net_tcp;
use uv = std::uv;
use comm::{Chan, Port};
use Option = option::Option;

#[forbid(implicit_copies)]
#[allow(non_implicitly_copyable_typarams)]    // uv uses non-implicitly copyable Result types

// Note that net_tcp currently does not use SO_REUSEADDR so you may need
// to change this port if you get an AddressInUse error.
const server_port: uint = 8089;

// Spins up a task used to wait for new incoming connections from clients. 
priv fn run_server(addr: IpAddr, port: uint, exit_chan: Chan<()>)
{
    let hl_loop = uv::global_loop::get();

    // When we connect to a client the tasks that execute to service requests
    // from it will be distributed across two threads (unless some subtask
    // creates a new scheduler).
    do task::spawn_sched(task::ManualThreads(1))        // TODO: try using 2
    |move addr|
    {
        let backlog = 2;    // TODO: add a type alias or something for the horrible below
        let on_establish: fn~ (&&kill_ch: comm::Chan<Option<tcp::TcpErrData>>) = |_kill_ch, copy addr| {error!("listening for connections on %s", ip::format_addr(&addr))};

        // listen will block until it gets an error (from the system or from kill_ch). 
        error!("server is listening");
        let result = tcp::listen(copy addr, port, backlog, hl_loop, on_establish, on_connect);
        if result.is_err()
        {
            fail fmt!("failed listen on %s: %?", ip::format_addr(&addr), result.get_err());
        }

        // Let our caller know when we're done listening.
        error!("server is exiting");
        exit_chan.send(());
    }
}

priv fn on_connect(&&connection: tcp::TcpNewConnection, &&kill_chan: Chan<Option<tcp::TcpErrData>>)
{
    do task::spawn
    {
        error!("server is accepting a connection");
        let result = tcp::accept(connection);
        if result.is_ok()
        {
            error!("server accepted the connection");
            let sock = result::get_ref(&result);
            handle_connection(sock, kill_chan);
        }
        else
        {
            fail fmt!("accept to failed: %?", result.get_err());
        }
    }
}

priv fn handle_connection(sock: &tcp::TcpSocket, kill_chan: Chan<Option<tcp::TcpErrData>>)
{
    loop
    {
        let request = read_str("server request", sock);
        error!("server got request: %?", request);
        if request.starts_with("dupe: ")
        {
            let reply = request.slice(0, "dupe: ".len());
            write_str(sock, reply + reply);
        }
        else if request.starts_with("shutdown:")
        {
            // We need to stop the server tasks here. Sockets are closed when the last referrence
            // to them goes away. This will happen when we break out of this loop, but we
            // still need to terminate the listen loop which is why we send to kill_chan.
            let err = {err_name: ~"shutdown", err_msg: ~"client requested server  shutdown"};
            kill_chan.send(option::Some(err)); 
            break;
        }
        else
        {
            fail fmt!("bad request: '%s'", request);
        }
    }
}

priv fn run_client(addr: IpAddr, port: uint)
{
    let hl_loop = uv::global_loop::get();

    error!("client is connecting");
    let result = tcp::connect(addr, port, hl_loop);
    if result.is_ok()
    {
        error!("client connected");
        let sock = result::unwrap(move result);

        write_str(&sock, ~"dupe: hey");
        let reply = read_str("server reply", &sock);
        error!("client received: %?", reply);
        if reply != ~"heyhey"
        {
            fail fmt!("client expected 'heyhey', but found '%s'", reply);
        }

        write_str(&sock, ~"shutdown:");
    }
    else
    {
        fail fmt!("connect failed: %?", result.get_err());
    }
}

priv fn write_str(sock: &tcp::TcpSocket, message: ~str)
{
    do str::as_bytes(message)
    |buffer|
    {
error!("   writing %?", buffer);
        let result = sock.write(buffer);
        if result.is_err()
        {
            fail fmt!("write('%s') failed: %?", message, result.get_err());
        }
    }
}

// TCP is a stream oriented protocol so there are no messages as such:
// there are just streams of bytes. However framing does occur when
// the network stack packages up the bytes into a packet. Here we assume
// that our packets correspond to individual messages. In general this is
// a terrible idea: messages may at some point become too large for a
// single packet or the network stack may decide to gang up multiple
// messages in one packet.
priv fn read_str(expected: &str, sock: &tcp::TcpSocket) -> ~str
{
    // The right way to do this is to read each chunk, buffer the results,
    // and return the message part (e.g. using a delimeter like a null
    // character). Bonus points for not allowing rogue clients to grow
    // the buffer arbitrarily large.
    let timeout = 2000;                    // msecs
    match sock.read(timeout)
    {
        result::Ok(buffer) =>
        {
error!("   read %?", buffer);
            str::from_bytes(buffer)
        }
        result::Err(err) =>
        {
            fail fmt!("read %s failed: %?", expected, err);
        }
    }
}

fn resolve_addr(addr: &str) -> IpAddr
{
    let hl_loop = uv::global_loop::get();
    match ip::get_addr(addr, hl_loop)
    {
        result::Ok(addrs) if addrs.is_not_empty() =>
        {
            addrs[0]
        }
        result::Ok(*) =>
        {
            fail fmt!("ip::get_addr('%s') failed: empty result", addr);
        }
        result::Err(err) =>
        {
            fail fmt!("ip::get_addr('%s') failed: %?", addr, err);
        }
    }
}

#[test]
fn simple_client_server()
{
    let exit_port = Port();
    let exit_chan = Chan(exit_port);

    let addr = resolve_addr("127.0.0.1");
    run_server(addr, server_port, exit_chan);

    do task::spawn_sched(task::SingleThreaded)    // TODO: don't think we need to spawn a task here
    {
        run_client(addr, server_port);
        exit_chan.send(());
    }

    // Wait for the server to finish.
    exit_port.recv();
    exit_port.recv();
}

Logging shows this:

rustc --test tcp.rs && export RUST_LOG=tcp=3,std::net_tcp=3 && ./tcp
warning: no debug symbols in executable (-arch x86_64)

running 1 test
rust: ~"server is listening"
rust: ~"addr: { mut sin_family: 528, mut sin_port: 0, mut sin_addr: 16777343, mut sin_zero: (0, 0, 0, 0, 0, 0, 0, 0) }"
rust: ~"listening for connections on 127.0.0.1"
rust: ~"client is connecting"
rust: ~"tcp_connect result_ch Chan_(18)"
rust: ~"stream_handle_ptr outside interact 140379594689168"
rust: ~"in interact cb for tcp client connect.."
rust: ~"stream_handle_ptr in interact 140379594689168"
rust: ~"tcp_init successful"
rust: ~"dealing w/ ipv4 connection.."
rust: ~"addr: { mut sin_family: 528, mut sin_port: 0, mut sin_addr: 16777343, mut sin_zero: (0, 0, 0, 0, 0, 0, 0, 0) }"
rust: ~"tcp_connect successful"
rust: ~"leaving tcp_connect interact cb..."
rust: ~"tcp_connect result_ch Chan_(18)"
rust: ~"successful tcp connection!"
rust: ~"leaving tcp_connect_on_connect_cb"
rust: ~"tcp::connect - received success on result_po"
rust: ~"client connected"
rust: ~"   writing ~[ 100, 117, 112, 101, 58, 32, 104, 101, 121, 0 ]"
rust: ~"server is accepting a connection"
rust: ~"in interact cb for tcp::accept"
rust: ~"uv_tcp_init successful for client stream"
rust: ~"successfully accepted client connection"
rust: ~"in interact cb for tcp::write 140379593647056"
rust: ~"uv_write() invoked successfully"
rust: ~"successful write complete"
rust: ~"server accepted the connection"
rust: ~"starting tcp::read"
rust: ~"in tcp::read_start before interact loop"
rust: ~"starting tcp::read"
rust: ~"in tcp::read_start before interact loop"
rust: ~"in tcp::read_start interact cb 140379593647056"
rust: ~"success doing uv_read_start"
rust: ~"tcp::read before recv_timeout"
rust: ~"in tcp::read_start interact cb 140379593647056"
rust: ~"success doing uv_read_start"
rust: ~"tcp read on_alloc_cb!"
rust: ~"tcp::read before recv_timeout"
rust: ~"tcp read on_alloc_cb h: 140379593655232 char_ptr: 140379605528064 sugsize: 65536"
rust: ~"entering on_tcp_read_cb stream: 140379593655232 nread: 10"
rust: ~"tcp on_read_cb nread: 10"
rust: ~"exiting on_tcp_read_cb"
rust: ~"tcp::read after recv_timeout"
rust: ~"tcp::read: timed out.."
rust: ~"tcp::read after recv_timeout"
rust: ~"tcp::read: timed out.."
rust: ~"in interact cb for tcp::read_stop"
rust: ~"successfully called uv_read_stop"
rust: ~"in interact cb for tcp::read_stop"
rust: ~"successfully called uv_read_stop"
rust: task failed at 'read server request failed: { err_name: ~"TIMEOUT", err_msg: ~"req timed out" }', tcp.rs:156
rust: task failed at 'read server reply failed: { err_name: ~"TIMEOUT", err_msg: ~"req timed out" }', tcp.rs:156
rust: ~"interact dtor for tcp_socket stream 140379593655232 loop 140379593647056"
rust: ~"interact dtor for tcp_socket stream 140379594689168 loop 140379593647056"
rust: ~"tcp_socket_dtor_close_cb exiting.."
rust: ~"tcp_socket_dtor_close_cb exiting.."
/bin/sh: line 1: 56867 Segmentation fault: 11  ./tcp
make: *** [tcp] Error 139
exited with code 2

So it appears the the message is being read but the read function isn't returning for some reason (the first message is "dupe: hey\0" which is 10 bytes). This is on Mac with rust from Sep 22, 2012.

The seg faults are a bit disturbing too...

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions