|
| 1 | +//! Adapts the HTTP/1.1 implementation into the `HttpMessage` API. |
| 2 | +use std::io::{self, Write, BufWriter, Read}; |
| 3 | +use std::net::Shutdown; |
| 4 | + |
| 5 | +use method::{Method}; |
| 6 | +use header::{ContentLength, TransferEncoding}; |
| 7 | +use header::Encoding::Chunked; |
| 8 | +use http::{HttpWriter, LINE_ENDING}; |
| 9 | +use http::HttpReader::{SizedReader, ChunkedReader, EofReader}; |
| 10 | +use http::HttpWriter::{ChunkedWriter, SizedWriter, EmptyWriter}; |
| 11 | +use buffer::BufReader; |
| 12 | +use http::{self, HttpReader}; |
| 13 | + |
| 14 | +use message::{ |
| 15 | + HttpMessage, |
| 16 | + RequestHead, |
| 17 | + ResponseHead, |
| 18 | +}; |
| 19 | +use net::NetworkStream; |
| 20 | +use header; |
| 21 | +use version; |
| 22 | + |
| 23 | +/// An implementation of the `HttpMessage` trait for HTTP/1.1. |
| 24 | +#[derive(Debug)] |
| 25 | +pub struct Http11Message { |
| 26 | + stream: Option<Box<NetworkStream + Send>>, |
| 27 | + writer: Option<HttpWriter<BufWriter<Box<NetworkStream + Send>>>>, |
| 28 | + reader: Option<HttpReader<BufReader<Box<NetworkStream + Send>>>>, |
| 29 | +} |
| 30 | + |
| 31 | +impl Write for Http11Message { |
| 32 | + #[inline] |
| 33 | + fn write(&mut self, buf: &[u8]) -> io::Result<usize> { |
| 34 | + match self.writer { |
| 35 | + None => Err(io::Error::new(io::ErrorKind::Other, |
| 36 | + "Not in a writable state")), |
| 37 | + Some(ref mut writer) => writer.write(buf), |
| 38 | + } |
| 39 | + } |
| 40 | + #[inline] |
| 41 | + fn flush(&mut self) -> io::Result<()> { |
| 42 | + match self.writer { |
| 43 | + None => Err(io::Error::new(io::ErrorKind::Other, |
| 44 | + "Not in a writable state")), |
| 45 | + Some(ref mut writer) => writer.flush(), |
| 46 | + } |
| 47 | + } |
| 48 | +} |
| 49 | + |
| 50 | +impl Read for Http11Message { |
| 51 | + #[inline] |
| 52 | + fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { |
| 53 | + match self.reader { |
| 54 | + None => Err(io::Error::new(io::ErrorKind::Other, |
| 55 | + "Not in a readable state")), |
| 56 | + Some(ref mut reader) => reader.read(buf), |
| 57 | + } |
| 58 | + } |
| 59 | +} |
| 60 | + |
| 61 | +impl HttpMessage for Http11Message { |
| 62 | + fn set_outgoing(&mut self, mut head: RequestHead) -> ::Result<RequestHead> { |
| 63 | + if self.stream.is_none() { |
| 64 | + return Err(From::from(io::Error::new( |
| 65 | + io::ErrorKind::Other, |
| 66 | + "Message not idle, cannot start new outgoing"))); |
| 67 | + } |
| 68 | + let mut stream = BufWriter::new(self.stream.take().unwrap()); |
| 69 | + |
| 70 | + let mut uri = head.url.serialize_path().unwrap(); |
| 71 | + if let Some(ref q) = head.url.query { |
| 72 | + uri.push('?'); |
| 73 | + uri.push_str(&q[..]); |
| 74 | + } |
| 75 | + |
| 76 | + let version = version::HttpVersion::Http11; |
| 77 | + debug!("request line: {:?} {:?} {:?}", head.method, uri, version); |
| 78 | + try!(write!(&mut stream, "{} {} {}{}", |
| 79 | + head.method, uri, version, LINE_ENDING)); |
| 80 | + |
| 81 | + let stream = match head.method { |
| 82 | + Method::Get | Method::Head => { |
| 83 | + debug!("headers={:?}", head.headers); |
| 84 | + try!(write!(&mut stream, "{}{}", head.headers, LINE_ENDING)); |
| 85 | + EmptyWriter(stream) |
| 86 | + }, |
| 87 | + _ => { |
| 88 | + let mut chunked = true; |
| 89 | + let mut len = 0; |
| 90 | + |
| 91 | + match head.headers.get::<header::ContentLength>() { |
| 92 | + Some(cl) => { |
| 93 | + chunked = false; |
| 94 | + len = **cl; |
| 95 | + }, |
| 96 | + None => () |
| 97 | + }; |
| 98 | + |
| 99 | + // can't do in match above, thanks borrowck |
| 100 | + if chunked { |
| 101 | + let encodings = match head.headers.get_mut::<header::TransferEncoding>() { |
| 102 | + Some(&mut header::TransferEncoding(ref mut encodings)) => { |
| 103 | + //TODO: check if chunked is already in encodings. use HashSet? |
| 104 | + encodings.push(header::Encoding::Chunked); |
| 105 | + false |
| 106 | + }, |
| 107 | + None => true |
| 108 | + }; |
| 109 | + |
| 110 | + if encodings { |
| 111 | + head.headers.set::<header::TransferEncoding>( |
| 112 | + header::TransferEncoding(vec![header::Encoding::Chunked])) |
| 113 | + } |
| 114 | + } |
| 115 | + |
| 116 | + debug!("headers={:?}", head.headers); |
| 117 | + try!(write!(&mut stream, "{}{}", head.headers, LINE_ENDING)); |
| 118 | + |
| 119 | + if chunked { |
| 120 | + ChunkedWriter(stream) |
| 121 | + } else { |
| 122 | + SizedWriter(stream, len) |
| 123 | + } |
| 124 | + } |
| 125 | + }; |
| 126 | + |
| 127 | + self.writer = Some(stream); |
| 128 | + |
| 129 | + Ok(head) |
| 130 | + } |
| 131 | + |
| 132 | + fn get_incoming(&mut self) -> ::Result<ResponseHead> { |
| 133 | + try!(self.flush_outgoing()); |
| 134 | + if self.stream.is_none() { |
| 135 | + // The message was already in the reading state... |
| 136 | + // TODO Decide what happens in case we try to get a new incoming at that point |
| 137 | + return Err(From::from( |
| 138 | + io::Error::new(io::ErrorKind::Other, |
| 139 | + "Read already in progress"))); |
| 140 | + } |
| 141 | + |
| 142 | + let stream = self.stream.take().unwrap(); |
| 143 | + let mut stream = BufReader::new(stream); |
| 144 | + |
| 145 | + let head = try!(http::parse_response(&mut stream)); |
| 146 | + let raw_status = head.subject; |
| 147 | + let headers = head.headers; |
| 148 | + |
| 149 | + let body = if headers.has::<TransferEncoding>() { |
| 150 | + match headers.get::<TransferEncoding>() { |
| 151 | + Some(&TransferEncoding(ref codings)) => { |
| 152 | + if codings.len() > 1 { |
| 153 | + trace!("TODO: #2 handle other codings: {:?}", codings); |
| 154 | + }; |
| 155 | + |
| 156 | + if codings.contains(&Chunked) { |
| 157 | + ChunkedReader(stream, None) |
| 158 | + } else { |
| 159 | + trace!("not chuncked. read till eof"); |
| 160 | + EofReader(stream) |
| 161 | + } |
| 162 | + } |
| 163 | + None => unreachable!() |
| 164 | + } |
| 165 | + } else if headers.has::<ContentLength>() { |
| 166 | + match headers.get::<ContentLength>() { |
| 167 | + Some(&ContentLength(len)) => SizedReader(stream, len), |
| 168 | + None => unreachable!() |
| 169 | + } |
| 170 | + } else { |
| 171 | + trace!("neither Transfer-Encoding nor Content-Length"); |
| 172 | + EofReader(stream) |
| 173 | + }; |
| 174 | + |
| 175 | + self.reader = Some(body); |
| 176 | + |
| 177 | + Ok(ResponseHead { |
| 178 | + headers: headers, |
| 179 | + raw_status: raw_status, |
| 180 | + version: head.version, |
| 181 | + }) |
| 182 | + } |
| 183 | + |
| 184 | + fn close_connection(&mut self) -> ::Result<()> { |
| 185 | + try!(self.get_mut().close(Shutdown::Both)); |
| 186 | + Ok(()) |
| 187 | + } |
| 188 | +} |
| 189 | + |
| 190 | +impl Http11Message { |
| 191 | + /// Consumes the `Http11Message` and returns the underlying `NetworkStream`. |
| 192 | + pub fn into_inner(mut self) -> Box<NetworkStream + Send> { |
| 193 | + if self.stream.is_some() { |
| 194 | + self.stream.take().unwrap() |
| 195 | + } else if self.writer.is_some() { |
| 196 | + self.writer.take().unwrap().into_inner().into_inner().unwrap() |
| 197 | + } else if self.reader.is_some() { |
| 198 | + self.reader.take().unwrap().into_inner().into_inner() |
| 199 | + } else { |
| 200 | + panic!("Http11Message lost its underlying stream somehow"); |
| 201 | + } |
| 202 | + } |
| 203 | + |
| 204 | + /// Gets a mutable reference to the underlying `NetworkStream`, regardless of the state of the |
| 205 | + /// `Http11Message`. |
| 206 | + pub fn get_mut(&mut self) -> &mut Box<NetworkStream + Send> { |
| 207 | + if self.stream.is_some() { |
| 208 | + self.stream.as_mut().unwrap() |
| 209 | + } else if self.writer.is_some() { |
| 210 | + self.writer.as_mut().unwrap().get_mut().get_mut() |
| 211 | + } else if self.reader.is_some() { |
| 212 | + self.reader.as_mut().unwrap().get_mut().get_mut() |
| 213 | + } else { |
| 214 | + panic!("Http11Message lost its underlying stream somehow"); |
| 215 | + } |
| 216 | + } |
| 217 | + |
| 218 | + /// Creates a new `Http11Message` that will use the given `NetworkStream` for communicating to |
| 219 | + /// the peer. |
| 220 | + pub fn with_stream(stream: Box<NetworkStream + Send>) -> Http11Message { |
| 221 | + Http11Message { |
| 222 | + stream: Some(stream), |
| 223 | + writer: None, |
| 224 | + reader: None, |
| 225 | + } |
| 226 | + } |
| 227 | + |
| 228 | + /// Flushes the current outgoing content and moves the stream into the `stream` property. |
| 229 | + /// |
| 230 | + /// TODO It might be sensible to lift this up to the `HttpMessage` trait itself... |
| 231 | + pub fn flush_outgoing(&mut self) -> ::Result<()> { |
| 232 | + match self.writer { |
| 233 | + None => return Ok(()), |
| 234 | + Some(_) => {}, |
| 235 | + }; |
| 236 | + |
| 237 | + let writer = self.writer.take().unwrap(); |
| 238 | + let raw = try!(writer.end()).into_inner().unwrap(); // end() already flushes |
| 239 | + self.stream = Some(raw); |
| 240 | + |
| 241 | + Ok(()) |
| 242 | + } |
| 243 | +} |
0 commit comments