From bc71fc346fa393db30493a68ec83853e36324ee2 Mon Sep 17 00:00:00 2001 From: Alex Pyattaev Date: Tue, 22 Aug 2023 21:50:12 +0300 Subject: [PATCH] now two actually different impls for rust --- wrk/rust/bythebook/hello.html | 1 - wrk/rust/bythebook/src/main.rs | 1 - wrk/rust/clean/Cargo.toml | 3 +- wrk/rust/clean/hello.html | 1 - wrk/rust/clean/src/lib.rs | 101 --------------------------------- wrk/rust/clean/src/main.rs | 35 +++++------- 6 files changed, 14 insertions(+), 128 deletions(-) delete mode 100644 wrk/rust/bythebook/hello.html delete mode 100644 wrk/rust/clean/hello.html delete mode 100644 wrk/rust/clean/src/lib.rs diff --git a/wrk/rust/bythebook/hello.html b/wrk/rust/bythebook/hello.html deleted file mode 100644 index a2b3773..0000000 --- a/wrk/rust/bythebook/hello.html +++ /dev/null @@ -1 +0,0 @@ -Hello from RUST! diff --git a/wrk/rust/bythebook/src/main.rs b/wrk/rust/bythebook/src/main.rs index ee75336..8b22471 100644 --- a/wrk/rust/bythebook/src/main.rs +++ b/wrk/rust/bythebook/src/main.rs @@ -8,7 +8,6 @@ fn main() { //Creating a massive amount of threads so we can always have one ready to go. let mut pool = ThreadPool::new(128); - // for stream in listener.incoming().take(2) { for stream in listener.incoming() { let stream = stream.unwrap(); //handle_connection(stream); diff --git a/wrk/rust/clean/Cargo.toml b/wrk/rust/clean/Cargo.toml index c9ba87b..cf97cf6 100644 --- a/wrk/rust/clean/Cargo.toml +++ b/wrk/rust/clean/Cargo.toml @@ -3,7 +3,6 @@ name = "hello" version = "0.1.0" edition = "2021" -# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -# crossbeam = { version = "0.8.2", features = ["crossbeam-channel"] } + diff --git a/wrk/rust/clean/hello.html b/wrk/rust/clean/hello.html deleted file mode 100644 index a2b3773..0000000 --- a/wrk/rust/clean/hello.html +++ /dev/null @@ -1 +0,0 @@ -Hello from RUST! diff --git a/wrk/rust/clean/src/lib.rs b/wrk/rust/clean/src/lib.rs deleted file mode 100644 index 6e5d88d..0000000 --- a/wrk/rust/clean/src/lib.rs +++ /dev/null @@ -1,101 +0,0 @@ -//Crossbeam should, but does not make this faster. -//use crossbeam::channel::bounded; -use std::{net::TcpStream, sync::mpsc, thread}; -type Job = (fn(TcpStream), TcpStream); - -type Sender = mpsc::Sender; -//type Sender = crossbeam::channel::Sender; - -type Receiver = mpsc::Receiver; -//type Receiver = crossbeam::channel::Receiver; - -pub struct ThreadPool { - workers: Vec, - senders: Vec, - - next_sender: usize, -} - -impl ThreadPool { - /// Create a new ThreadPool. - /// - /// The size is the number of threads in the pool. - /// - /// # Panics - /// - /// The `new` function will panic if the size is zero. - pub fn new(size: usize) -> ThreadPool { - assert!(size > 0); - - let mut workers = Vec::with_capacity(size); - let mut senders = Vec::with_capacity(size); - - for id in 0..size { - //let (sender, receiver) = bounded(2); - let (sender, receiver) = mpsc::channel(); - senders.push(sender); - workers.push(Worker::new(id, receiver)); - } - - ThreadPool { - workers, - senders, - next_sender: 0, - } - } - /// round robin over available workers to ensure we never have to buffer requests - pub fn execute(&mut self, handler: fn(TcpStream), stream: TcpStream) { - let job = (handler, stream); - self.senders[self.next_sender].send(job).unwrap(); - //self.senders[self.next_sender].try_send(job).unwrap(); - self.next_sender += 1; - if self.next_sender == self.senders.len() { - self.next_sender = 0; - } - } -} - -impl Drop for ThreadPool { - fn drop(&mut self) { - self.senders.clear(); - - for worker in &mut self.workers { - println!("Shutting down worker {}", worker.id); - if let Some(thread) = worker.thread.take() { - thread.join().unwrap(); - } - } - } -} - -struct Worker { - id: usize, - thread: Option>, -} - -impl Worker { - fn new(id: usize, receiver: Receiver) -> Worker { - let thread = thread::spawn(move || Self::work(receiver)); - - Worker { - id, - thread: Some(thread), - } - } - - fn work(receiver: Receiver) { - loop { - let message = receiver.recv(); - match message { - Ok((handler, stream)) => { - // println!("Worker got a job; executing."); - handler(stream); - } - Err(_) => { - // println!("Worker disconnected; shutting down."); - break; - } - } - } - } -} diff --git a/wrk/rust/clean/src/main.rs b/wrk/rust/clean/src/main.rs index d6138d9..cda767c 100644 --- a/wrk/rust/clean/src/main.rs +++ b/wrk/rust/clean/src/main.rs @@ -1,18 +1,14 @@ -use hello::ThreadPool; use std::io::prelude::*; use std::net::TcpListener; use std::net::TcpStream; fn main() { - let listener = TcpListener::bind("127.0.0.1:7878").unwrap(); - //Creating a massive amount of threads so we can always have one ready to go. - let mut pool = ThreadPool::new(128); + let listener = TcpListener::bind("127.0.0.1:7878").unwrap(); - // for stream in listener.incoming().take(2) { for stream in listener.incoming() { let stream = stream.unwrap(); //handle_connection(stream); - pool.execute(handle_connection, stream); + std::thread::spawn(||{handle_connection(stream)}); } println!("Shutting down."); @@ -20,22 +16,17 @@ fn main() { fn handle_connection(mut stream: TcpStream) { stream.set_nodelay(true).expect("set_nodelay call failed"); - let mut buffer = [0; 1024]; - let nbytes = stream.read(&mut buffer).unwrap(); - if nbytes == 0 { - return; + loop{ + let mut buffer = [0; 1024]; + match stream.read(&mut buffer){ + Err(_)=>return, + Ok(0)=>return, + Ok(_v)=>{}, + } + + let response_bytes = b"HTTP/1.1 200 OK\r\nContent-Length: 16\r\nConnection: keep-alive\r\n\r\nHELLO from RUST!"; + + stream.write_all(response_bytes).unwrap(); } - let status_line = "HTTP/1.1 200 OK"; - - let contents = "HELLO from RUST!"; - - let response = format!( - "{}\r\nContent-Length: {}\r\n\r\n{}", - status_line, - contents.len(), - contents - ); - - stream.write_all(response.as_bytes()).unwrap(); }