From 53eed55e4c90481628b10dac644dbcf761b23b92 Mon Sep 17 00:00:00 2001 From: Alex Pyattaev Date: Tue, 22 Aug 2023 00:24:39 +0300 Subject: [PATCH] simplify rust native threads version, remove useless mutexes from there --- wrk/measure.sh | 21 +++++---- wrk/rust/hello/Cargo.toml | 1 + wrk/rust/hello/src/lib.rs | 87 +++++++++++++++++++++----------------- wrk/rust/hello/src/main.rs | 20 ++++----- 4 files changed, 70 insertions(+), 59 deletions(-) diff --git a/wrk/measure.sh b/wrk/measure.sh index 3e25aea..9ed7d65 100755 --- a/wrk/measure.sh +++ b/wrk/measure.sh @@ -6,6 +6,9 @@ DURATION_SECONDS=10 SUBJECT=$1 +TSK_SRV="taskset -c 0,1,2,3" +TSK_LOAD="taskset -c 4,5,6,7" + if [ "$SUBJECT" = "" ] ; then echo "usage: $0 subject # subject: zig or go" exit 1 @@ -13,54 +16,54 @@ fi if [ "$SUBJECT" = "zig" ] ; then zig build -Doptimize=ReleaseFast wrk > /dev/null - ./zig-out/bin/wrk & + $TSK_SRV ./zig-out/bin/wrk & PID=$! URL=http://127.0.0.1:3000 fi if [ "$SUBJECT" = "zigstd" ] ; then zig build -Doptimize=ReleaseFast wrk_zigstd > /dev/null - ./zig-out/bin/wrk_zigstd & + $TSK_SRV ./zig-out/bin/wrk_zigstd & PID=$! URL=http://127.0.0.1:3000 fi if [ "$SUBJECT" = "go" ] ; then cd wrk/go && go build main.go - ./main & + $TSK_SRV ./main & PID=$! URL=http://127.0.0.1:8090/hello fi if [ "$SUBJECT" = "python" ] ; then - python wrk/python/main.py & + $TSK_SRV python wrk/python/main.py & PID=$! URL=http://127.0.0.1:8080 fi if [ "$SUBJECT" = "sanic" ] ; then - python wrk/sanic/sanic-app.py & + $TSK_SRV python wrk/sanic/sanic-app.py & PID=$! URL=http://127.0.0.1:8000 fi if [ "$SUBJECT" = "rust" ] ; then cd wrk/rust/hello && cargo build --release - ./target/release/hello & + $TSK_SRV ./target/release/hello & PID=$! URL=http://127.0.0.1:7878 fi if [ "$SUBJECT" = "axum" ] ; then cd wrk/axum/hello-axum && cargo build --release - ./target/release/hello-axum & + $TSK_SRV ./target/release/hello-axum & PID=$! URL=http://127.0.0.1:3000 fi if [ "$SUBJECT" = "csharp" ] ; then cd wrk/csharp && dotnet publish csharp.csproj -o ./out - ./out/csharp --urls "http://127.0.0.1:5026" & + $TSK_SRV ./out/csharp --urls "http://127.0.0.1:5026" & PID=$! URL=http://127.0.0.1:5026 fi @@ -69,7 +72,7 @@ sleep 1 echo "========================================================================" echo " $SUBJECT" echo "========================================================================" -wrk -c $CONNECTIONS -t $THREADS -d $DURATION_SECONDS --latency $URL +$TSK_LOAD wrk -c $CONNECTIONS -t $THREADS -d $DURATION_SECONDS --latency $URL kill $PID diff --git a/wrk/rust/hello/Cargo.toml b/wrk/rust/hello/Cargo.toml index fb1ec2c..c9ba87b 100644 --- a/wrk/rust/hello/Cargo.toml +++ b/wrk/rust/hello/Cargo.toml @@ -6,3 +6,4 @@ edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] +# crossbeam = { version = "0.8.2", features = ["crossbeam-channel"] } diff --git a/wrk/rust/hello/src/lib.rs b/wrk/rust/hello/src/lib.rs index 936d014..6e5d88d 100644 --- a/wrk/rust/hello/src/lib.rs +++ b/wrk/rust/hello/src/lib.rs @@ -1,14 +1,20 @@ -use std::{ - sync::{mpsc, Arc, Mutex}, - thread, -}; +//Crossbeam should, but does not make this faster. +//use crossbeam::channel::bounded; +use std::{net::TcpStream, sync::mpsc, thread}; +type Job = (fn(TcpStream), TcpStream); + +type Sender = mpsc::Sender; +//type Sender = crossbeam::channel::Sender; + +type Receiver = mpsc::Receiver; +//type Receiver = crossbeam::channel::Receiver; pub struct ThreadPool { workers: Vec, - sender: Option>, -} + senders: Vec, -type Job = Box; + next_sender: usize, +} impl ThreadPool { /// Create a new ThreadPool. @@ -21,39 +27,40 @@ impl ThreadPool { pub fn new(size: usize) -> ThreadPool { assert!(size > 0); - let (sender, receiver) = mpsc::channel(); - - let receiver = Arc::new(Mutex::new(receiver)); - let mut workers = Vec::with_capacity(size); + let mut senders = Vec::with_capacity(size); for id in 0..size { - workers.push(Worker::new(id, Arc::clone(&receiver))); + //let (sender, receiver) = bounded(2); + let (sender, receiver) = mpsc::channel(); + senders.push(sender); + workers.push(Worker::new(id, receiver)); } ThreadPool { workers, - sender: Some(sender), + senders, + next_sender: 0, } } - - pub fn execute(&self, f: F) - where - F: FnOnce() + Send + 'static, - { - let job = Box::new(f); - - self.sender.as_ref().unwrap().send(job).unwrap(); + /// round robin over available workers to ensure we never have to buffer requests + pub fn execute(&mut self, handler: fn(TcpStream), stream: TcpStream) { + let job = (handler, stream); + self.senders[self.next_sender].send(job).unwrap(); + //self.senders[self.next_sender].try_send(job).unwrap(); + self.next_sender += 1; + if self.next_sender == self.senders.len() { + self.next_sender = 0; + } } } impl Drop for ThreadPool { fn drop(&mut self) { - drop(self.sender.take()); + self.senders.clear(); for worker in &mut self.workers { println!("Shutting down worker {}", worker.id); - if let Some(thread) = worker.thread.take() { thread.join().unwrap(); } @@ -67,26 +74,28 @@ struct Worker { } impl Worker { - fn new(id: usize, receiver: Arc>>) -> Worker { - let thread = thread::spawn(move || loop { - let message = receiver.lock().unwrap().recv(); - - match message { - Ok(job) => { - // println!("Worker got a job; executing."); - - job(); - } - Err(_) => { - // println!("Worker disconnected; shutting down."); - break; - } - } - }); + fn new(id: usize, receiver: Receiver) -> Worker { + let thread = thread::spawn(move || Self::work(receiver)); Worker { id, thread: Some(thread), } } + + fn work(receiver: Receiver) { + loop { + let message = receiver.recv(); + match message { + Ok((handler, stream)) => { + // println!("Worker got a job; executing."); + handler(stream); + } + Err(_) => { + // println!("Worker disconnected; shutting down."); + break; + } + } + } + } } diff --git a/wrk/rust/hello/src/main.rs b/wrk/rust/hello/src/main.rs index 34a7474..d6138d9 100644 --- a/wrk/rust/hello/src/main.rs +++ b/wrk/rust/hello/src/main.rs @@ -1,31 +1,30 @@ use hello::ThreadPool; -// use std::fs; use std::io::prelude::*; use std::net::TcpListener; use std::net::TcpStream; -// use std::thread; -// use std::time::Duration; fn main() { let listener = TcpListener::bind("127.0.0.1:7878").unwrap(); - let pool = ThreadPool::new(4); + //Creating a massive amount of threads so we can always have one ready to go. + let mut pool = ThreadPool::new(128); // for stream in listener.incoming().take(2) { for stream in listener.incoming() { let stream = stream.unwrap(); - - pool.execute(|| { - handle_connection(stream); - }); + //handle_connection(stream); + pool.execute(handle_connection, stream); } println!("Shutting down."); } fn handle_connection(mut stream: TcpStream) { + stream.set_nodelay(true).expect("set_nodelay call failed"); let mut buffer = [0; 1024]; - stream.read(&mut buffer).unwrap(); - + let nbytes = stream.read(&mut buffer).unwrap(); + if nbytes == 0 { + return; + } let status_line = "HTTP/1.1 200 OK"; @@ -39,5 +38,4 @@ fn handle_connection(mut stream: TcpStream) { ); stream.write_all(response.as_bytes()).unwrap(); - stream.flush().unwrap(); }