Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Graceful Shutdown และ Cleanup

โค้ดใน Listing 21-20 กำลังตอบกับ request แบบ asynchronous ผ่านการใช้ thread pool ตามที่เราตั้งใจ เราได้ warning บ้างเกี่ยวกับ field workers, id และ thread ที่เราไม่ได้ใช้ในวิธีโดยตรงที่เตือนเรา ว่าเราไม่ได้ cleanup อะไร เมื่อเราใช้เมธอด ctrl-C ที่ไม่ elegant เพื่อหยุดเธรดหลัก เธรดอื่นทั้งหมดถูกหยุดทันทีด้วย แม้ พวกมันอยู่กลาง serve request

ถัดไป เราจะ implement trait Drop เพื่อเรียก join บนแต่ละเธรดใน pool เพื่อให้พวกมัน finish request ที่พวกมันกำลังทำก่อนปิด แล้ว เราจะ implement วิธีบอกเธรดให้พวกมันควรหยุดยอมรับ request ใหม่และ shut down เพื่อเห็นโค้ดนี้ในการ action เราจะแก้ server ของเราให้ยอมรับเพียง สอง request ก่อน graceful shut down thread pool ของมัน

สิ่งหนึ่งที่ต้องสังเกตขณะเราไป — ไม่มีของนี้กระทบส่วนของโค้ดที่จัดการ execute closure ดังนั้นทุกอย่างที่นี่จะเหมือนกันถ้าเรากำลังใช้ thread pool สำหรับ async runtime

Implement Trait Drop บน ThreadPool

มาเริ่มด้วยการ implement Drop บน thread pool ของเรา เมื่อ pool ถูก drop เธรดของเราควร join ทั้งหมดเพื่อให้แน่ใจว่าพวกมัน finish งานของ พวกมัน Listing 21-22 แสดงความพยายามแรกที่ implementation Drop — โค้ดนี้ยังไม่ค่อยทำงาน

Filename: src/lib.rs
use std::{
    sync::{Arc, Mutex, mpsc},
    thread,
};

pub struct ThreadPool {
    workers: Vec<Worker>,
    sender: mpsc::Sender<Job>,
}

type Job = Box<dyn FnOnce() + Send + 'static>;

impl ThreadPool {
    /// Create a new ThreadPool.
    ///
    /// The size is the number of threads in the pool.
    ///
    /// # Panics
    ///
    /// The `new` function will panic if the size is zero.
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let (sender, receiver) = mpsc::channel();

        let receiver = Arc::new(Mutex::new(receiver));

        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            workers.push(Worker::new(id, Arc::clone(&receiver)));
        }

        ThreadPool { workers, sender }
    }

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
        let job = Box::new(f);

        self.sender.send(job).unwrap();
    }
}

impl Drop for ThreadPool {
    fn drop(&mut self) {
        for worker in &mut self.workers {
            println!("Shutting down worker {}", worker.id);

            worker.thread.join().unwrap();
        }
    }
}

struct Worker {
    id: usize,
    thread: thread::JoinHandle<()>,
}

impl Worker {
    fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
        let thread = thread::spawn(move || {
            loop {
                let job = receiver.lock().unwrap().recv().unwrap();

                println!("Worker {id} got a job; executing.");

                job();
            }
        });

        Worker { id, thread }
    }
}
Listing 21-22: Join แต่ละเธรดเมื่อ thread pool ออกจาก scope

ก่อนอื่น เรา loop ผ่านแต่ละ workers ของ thread pool เราใช้ &mut สำหรับสิ่งนี้เพราะ self คือ mutable reference และเรายังต้องสามารถ mutate worker สำหรับแต่ละ worker เรา print ข้อความบอกว่า instance Worker เฉพาะนี้กำลัง shutting down และแล้วเราเรียก join บนเธรด ของ instance Worker นั้น ถ้าการเรียก join fail เราใช้ unwrap เพื่อทำให้ Rust panic และเข้าไปใน shutdown ที่ไม่ graceful

นี่คือ error ที่เราได้เมื่อเรา compile โค้ดนี้:

$ cargo check
    Checking hello v0.1.0 (file:///projects/hello)
error[E0507]: cannot move out of `worker.thread` which is behind a mutable reference
  --> src/lib.rs:52:13
   |
52 |             worker.thread.join().unwrap();
   |             ^^^^^^^^^^^^^ ------ `worker.thread` moved due to this method call
   |             |
   |             move occurs because `worker.thread` has type `JoinHandle<()>`, which does not implement the `Copy` trait
   |
note: `JoinHandle::<T>::join` takes ownership of the receiver `self`, which moves `worker.thread`
  --> /rustc/1159e78c4747b02ef996e55082b704c09b970588/library/std/src/thread/mod.rs:1921:17

For more information about this error, try `rustc --explain E0507`.
error: could not compile `hello` (lib) due to 1 previous error

error บอกเราว่าเราไม่สามารถเรียก join เพราะเราเพียงมี mutable borrow ของแต่ละ worker และ join รับ ownership ของ argument ของมัน เพื่อแก้ปัญหานี้ เราต้องย้ายเธรดออกจาก instance Worker ที่เป็นเจ้าของ thread เพื่อให้ join consume เธรดได้ วิธีหนึ่งในการทำสิ่งนี้คือ รับแนวทางเดียวกับที่เรารับใน Listing 18-15 ถ้า Worker ถือ Option<thread::JoinHandle<()>> เราเรียกเมธอด take บน Option เพื่อย้ายค่าออกจาก variant Some และทิ้ง variant None ในที่ของมัน ได้ ในคำพูดอื่น Worker ที่กำลังรันจะมี variant Some ใน thread และเมื่อเราต้องการ cleanup Worker เราจะแทน Some ด้วย None เพื่อ ให้ Worker ไม่มีเธรดที่จะรัน

อย่างไรก็ตาม เวลา เดียว ที่นี่จะมาคือเมื่อ drop Worker แลกกับนั้น เราจะต้องจัดการกับ Option<thread::JoinHandle<()>> ที่ใดที่เราเข้า ถึง worker.thread Idiomatic Rust ใช้ Option ค่อนข้างเยอะ แต่เมื่อ คุณพบตัวเองห่อบางอย่างที่คุณรู้จะมีอยู่เสมอใน Option เป็น workaround แบบนี้ มันคือไอเดียดีที่จะมองหาแนวทางทางเลือกเพื่อทำให้โค้ดของคุณ สะอาดกว่าและเสี่ยง error น้อยกว่า

ในกรณีนี้ ทางเลือกที่ดีกว่ามีอยู่ — เมธอด Vec::drain มันรับ parameter range เพื่อระบุ item ใดที่จะลบจาก vector และ return iterator ของ item เหล่านั้น ส่ง syntax range .. จะลบ ทุก ค่าจาก vector

ดังนั้น เราต้องอัพเดท implementation drop ของ ThreadPool แบบนี้:

Filename: src/lib.rs
#![allow(unused)]
fn main() {
use std::{
    sync::{Arc, Mutex, mpsc},
    thread,
};

pub struct ThreadPool {
    workers: Vec<Worker>,
    sender: mpsc::Sender<Job>,
}

type Job = Box<dyn FnOnce() + Send + 'static>;

impl ThreadPool {
    /// Create a new ThreadPool.
    ///
    /// The size is the number of threads in the pool.
    ///
    /// # Panics
    ///
    /// The `new` function will panic if the size is zero.
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let (sender, receiver) = mpsc::channel();

        let receiver = Arc::new(Mutex::new(receiver));

        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            workers.push(Worker::new(id, Arc::clone(&receiver)));
        }

        ThreadPool { workers, sender }
    }

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
        let job = Box::new(f);

        self.sender.send(job).unwrap();
    }
}

impl Drop for ThreadPool {
    fn drop(&mut self) {
        for worker in self.workers.drain(..) {
            println!("Shutting down worker {}", worker.id);

            worker.thread.join().unwrap();
        }
    }
}

struct Worker {
    id: usize,
    thread: thread::JoinHandle<()>,
}

impl Worker {
    fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
        let thread = thread::spawn(move || {
            loop {
                let job = receiver.lock().unwrap().recv().unwrap();

                println!("Worker {id} got a job; executing.");

                job();
            }
        });

        Worker { id, thread }
    }
}
}

นี่แก้ error compiler และไม่ต้องการการเปลี่ยนแปลงอื่นใดให้โค้ดของเรา สังเกตว่า เพราะ drop สามารถถูกเรียกเมื่อ panic, unwrap ก็ panic ได้ และสาเหตุ double panic ซึ่งทันที crash โปรแกรมและจบ cleanup ใดที่ กำลังดำเนิน นี่ okay สำหรับโปรแกรมตัวอย่าง แต่มันไม่แนะนำสำหรับโค้ด production

Signal ให้เธรดหยุดฟังสำหรับ Job

ด้วยการเปลี่ยนแปลงทั้งหมดที่เราทำ โค้ดของเรา compile โดยไม่มี warning ใด อย่างไรก็ตาม ข่าวร้ายคือโค้ดนี้ยังไม่ทำงานในวิธีที่เราต้องการ key คือ logic ใน closure ที่รันโดยเธรดของ instance Worker — ที่ขณะนี้ เราเรียก join แต่นั่นจะไม่ shut down เธรด เพราะพวกมัน loop ตลอด ไปมองหา job ถ้าเราพยายาม drop ThreadPool ของเรากับ implementation ปัจจุบันของเราของ drop เธรดหลักจะ block ตลอดไป รอเธรดแรก finish

เพื่อ fix ปัญหานี้ เราจะต้องการการเปลี่ยนแปลงใน implementation drop ของ ThreadPool และแล้วการเปลี่ยนแปลงใน loop ของ Worker

ก่อนอื่น เราจะเปลี่ยน implementation drop ของ ThreadPool ให้ drop sender อย่างชัดเจนก่อนรอเธรด finish Listing 21-23 แสดงการ เปลี่ยนแปลงให้ ThreadPool เพื่อ drop sender อย่างชัดเจน ต่างจาก กรณีของเธรด ที่นี่เรา ต้อง ใช้ Option เพื่อให้สามารถย้าย sender ออกจาก ThreadPool กับ Option::take

Filename: src/lib.rs
use std::{
    sync::{Arc, Mutex, mpsc},
    thread,
};

pub struct ThreadPool {
    workers: Vec<Worker>,
    sender: Option<mpsc::Sender<Job>>,
}
// --snip--

type Job = Box<dyn FnOnce() + Send + 'static>;

impl ThreadPool {
    /// Create a new ThreadPool.
    ///
    /// The size is the number of threads in the pool.
    ///
    /// # Panics
    ///
    /// The `new` function will panic if the size is zero.
    pub fn new(size: usize) -> ThreadPool {
        // --snip--

        assert!(size > 0);

        let (sender, receiver) = mpsc::channel();

        let receiver = Arc::new(Mutex::new(receiver));

        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            workers.push(Worker::new(id, Arc::clone(&receiver)));
        }

        ThreadPool {
            workers,
            sender: Some(sender),
        }
    }

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
        let job = Box::new(f);

        self.sender.as_ref().unwrap().send(job).unwrap();
    }
}

impl Drop for ThreadPool {
    fn drop(&mut self) {
        drop(self.sender.take());

        for worker in self.workers.drain(..) {
            println!("Shutting down worker {}", worker.id);

            worker.thread.join().unwrap();
        }
    }
}

struct Worker {
    id: usize,
    thread: thread::JoinHandle<()>,
}

impl Worker {
    fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
        let thread = thread::spawn(move || {
            loop {
                let job = receiver.lock().unwrap().recv().unwrap();

                println!("Worker {id} got a job; executing.");

                job();
            }
        });

        Worker { id, thread }
    }
}
Listing 21-23: Drop sender อย่างชัดเจนก่อน join เธรด Worker

Drop sender ปิด channel ซึ่งบ่งบอกว่าจะไม่มีข้อความถูกส่งอีก เมื่อ นั้นเกิด การเรียก recv ทั้งหมดที่ instance Worker ทำใน loop infinite จะ return error ใน Listing 21-24 เราเปลี่ยน loop Worker ให้ exit loop graceful ในกรณีนั้น ซึ่งหมายความว่าเธรดจะ finish เมื่อ implementation drop ของ ThreadPool เรียก join บนพวกมัน

Filename: src/lib.rs
use std::{
    sync::{Arc, Mutex, mpsc},
    thread,
};

pub struct ThreadPool {
    workers: Vec<Worker>,
    sender: Option<mpsc::Sender<Job>>,
}

type Job = Box<dyn FnOnce() + Send + 'static>;

impl ThreadPool {
    /// Create a new ThreadPool.
    ///
    /// The size is the number of threads in the pool.
    ///
    /// # Panics
    ///
    /// The `new` function will panic if the size is zero.
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let (sender, receiver) = mpsc::channel();

        let receiver = Arc::new(Mutex::new(receiver));

        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            workers.push(Worker::new(id, Arc::clone(&receiver)));
        }

        ThreadPool {
            workers,
            sender: Some(sender),
        }
    }

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
        let job = Box::new(f);

        self.sender.as_ref().unwrap().send(job).unwrap();
    }
}

impl Drop for ThreadPool {
    fn drop(&mut self) {
        drop(self.sender.take());

        for worker in self.workers.drain(..) {
            println!("Shutting down worker {}", worker.id);

            worker.thread.join().unwrap();
        }
    }
}

struct Worker {
    id: usize,
    thread: thread::JoinHandle<()>,
}

impl Worker {
    fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
        let thread = thread::spawn(move || {
            loop {
                let message = receiver.lock().unwrap().recv();

                match message {
                    Ok(job) => {
                        println!("Worker {id} got a job; executing.");

                        job();
                    }
                    Err(_) => {
                        println!("Worker {id} disconnected; shutting down.");
                        break;
                    }
                }
            }
        });

        Worker { id, thread }
    }
}
Listing 21-24: Break ออกจาก loop อย่างชัดเจนเมื่อ recv return error

เพื่อเห็นโค้ดนี้ในการ action มาแก้ main เพื่อยอมรับเพียงสอง request ก่อน graceful shut down server ดังที่แสดงใน Listing 21-25

Filename: src/main.rs
use hello::ThreadPool;
use std::{
    fs,
    io::{BufReader, prelude::*},
    net::{TcpListener, TcpStream},
    thread,
    time::Duration,
};

fn main() {
    let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
    let pool = ThreadPool::new(4);

    for stream in listener.incoming().take(2) {
        let stream = stream.unwrap();

        pool.execute(|| {
            handle_connection(stream);
        });
    }

    println!("Shutting down.");
}

fn handle_connection(mut stream: TcpStream) {
    let buf_reader = BufReader::new(&stream);
    let request_line = buf_reader.lines().next().unwrap().unwrap();

    let (status_line, filename) = match &request_line[..] {
        "GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "hello.html"),
        "GET /sleep HTTP/1.1" => {
            thread::sleep(Duration::from_secs(5));
            ("HTTP/1.1 200 OK", "hello.html")
        }
        _ => ("HTTP/1.1 404 NOT FOUND", "404.html"),
    };

    let contents = fs::read_to_string(filename).unwrap();
    let length = contents.len();

    let response =
        format!("{status_line}\r\nContent-Length: {length}\r\n\r\n{contents}");

    stream.write_all(response.as_bytes()).unwrap();
}
Listing 21-25: Shut down server หลัง serve สอง request โดย exit loop

คุณจะไม่ต้องการให้ web server โลกจริง shut down หลัง serve เพียงสอง request โค้ดนี้เพียงสาธิตว่า graceful shutdown และ cleanup อยู่ใน state ทำงาน

เมธอด take ถูกนิยามใน trait Iterator และ limit iteration ให้สอง item แรกที่สุด ThreadPool จะออกจาก scope ที่ตอนจบของ main และ implementation drop จะรัน

เริ่ม server กับ cargo run และทำสาม request request ที่สามควร error และใน terminal ของคุณ คุณควรเห็น output คล้ายกับนี้:

$ cargo run
   Compiling hello v0.1.0 (file:///projects/hello)
    Finished `dev` profile [unoptimized + debuginfo] target(s) in 0.41s
     Running `target/debug/hello`
Worker 0 got a job; executing.
Shutting down.
Shutting down worker 0
Worker 3 got a job; executing.
Worker 1 disconnected; shutting down.
Worker 2 disconnected; shutting down.
Worker 3 disconnected; shutting down.
Worker 0 disconnected; shutting down.
Shutting down worker 1
Shutting down worker 2
Shutting down worker 3

คุณอาจเห็น ordering ต่างของ Worker ID และข้อความที่ print เราเห็นได้ ว่าโค้ดนี้ทำงานยังไงจากข้อความ — instance Worker 0 และ 3 ได้สอง request แรก Server หยุดยอมรับ connection หลัง connection ที่สอง และ implementation Drop บน ThreadPool เริ่ม execute ก่อนที่ Worker 3 จะเริ่ม job ของมันด้วยซ้ำ Drop sender disconnect instance Worker ทั้งหมดและบอกพวกมันให้ shut down instance Worker แต่ละ print ข้อความเมื่อพวกมัน disconnect และแล้ว thread pool เรียก join เพื่อ รอแต่ละเธรด Worker finish

สังเกตหนึ่งแง่มุมที่น่าสนใจของการ execute เฉพาะนี้ — ThreadPool drop sender และก่อน Worker ใดรับ error เราพยายาม join Worker 0 Worker 0 ยังไม่ได้ error จาก recv ดังนั้นเธรดหลัก block รอ Worker 0 finish ในระหว่าง Worker 3 รับ job และแล้วเธรดทั้งหมดรับ error เมื่อ Worker 0 finish เธรดหลักรอ instance Worker ที่เหลือ finish ที่จุดนั้น พวกมัน exit loop ของพวกมันและหยุดทั้งหมด

ยินดีด้วย! เรา complete project ของเราแล้ว — เรามี web server พื้นฐานที่ ใช้ thread pool เพื่อตอบแบบ asynchronous เราสามารถทำ graceful shutdown ของ server ซึ่ง cleanup เธรดทั้งหมดใน pool

นี่คือโค้ดเต็มสำหรับ reference:

Filename: src/main.rs
use hello::ThreadPool;
use std::{
    fs,
    io::{BufReader, prelude::*},
    net::{TcpListener, TcpStream},
    thread,
    time::Duration,
};

fn main() {
    let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
    let pool = ThreadPool::new(4);

    for stream in listener.incoming().take(2) {
        let stream = stream.unwrap();

        pool.execute(|| {
            handle_connection(stream);
        });
    }

    println!("Shutting down.");
}

fn handle_connection(mut stream: TcpStream) {
    let buf_reader = BufReader::new(&stream);
    let request_line = buf_reader.lines().next().unwrap().unwrap();

    let (status_line, filename) = match &request_line[..] {
        "GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "hello.html"),
        "GET /sleep HTTP/1.1" => {
            thread::sleep(Duration::from_secs(5));
            ("HTTP/1.1 200 OK", "hello.html")
        }
        _ => ("HTTP/1.1 404 NOT FOUND", "404.html"),
    };

    let contents = fs::read_to_string(filename).unwrap();
    let length = contents.len();

    let response =
        format!("{status_line}\r\nContent-Length: {length}\r\n\r\n{contents}");

    stream.write_all(response.as_bytes()).unwrap();
}
Filename: src/lib.rs
use std::{
    sync::{Arc, Mutex, mpsc},
    thread,
};

pub struct ThreadPool {
    workers: Vec<Worker>,
    sender: Option<mpsc::Sender<Job>>,
}

type Job = Box<dyn FnOnce() + Send + 'static>;

impl ThreadPool {
    /// Create a new ThreadPool.
    ///
    /// The size is the number of threads in the pool.
    ///
    /// # Panics
    ///
    /// The `new` function will panic if the size is zero.
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let (sender, receiver) = mpsc::channel();

        let receiver = Arc::new(Mutex::new(receiver));

        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            workers.push(Worker::new(id, Arc::clone(&receiver)));
        }

        ThreadPool {
            workers,
            sender: Some(sender),
        }
    }

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
        let job = Box::new(f);

        self.sender.as_ref().unwrap().send(job).unwrap();
    }
}

impl Drop for ThreadPool {
    fn drop(&mut self) {
        drop(self.sender.take());

        for worker in &mut self.workers {
            println!("Shutting down worker {}", worker.id);

            if let Some(thread) = worker.thread.take() {
                thread.join().unwrap();
            }
        }
    }
}

struct Worker {
    id: usize,
    thread: Option<thread::JoinHandle<()>>,
}

impl Worker {
    fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
        let thread = thread::spawn(move || {
            loop {
                let message = receiver.lock().unwrap().recv();

                match message {
                    Ok(job) => {
                        println!("Worker {id} got a job; executing.");

                        job();
                    }
                    Err(_) => {
                        println!("Worker {id} disconnected; shutting down.");
                        break;
                    }
                }
            }
        });

        Worker {
            id,
            thread: Some(thread),
        }
    }
}

เราทำเพิ่มที่นี่ได้! ถ้าคุณต้องการดำเนินปรับปรุง project นี้ นี่คือ ไอเดียบางอย่าง:

  • เพิ่ม documentation เพิ่มให้ ThreadPool และเมธอด public ของมัน
  • เพิ่ม test ของ functionality ของ library
  • เปลี่ยนการเรียก unwrap เป็นการจัดการ error ที่ robust กว่า
  • ใช้ ThreadPool เพื่อทำงานอื่นนอกจากการ serve web request
  • หา crate thread pool บน crates.io และ implement web server คล้ายกันโดยใช้ crate แทน แล้ว เปรียบเทียบ API และ robustness ของมันกับ thread pool ที่เรา implement

สรุป

ทำได้ดี! คุณได้มาถึงจุดจบของหนังสือ! เราต้องการขอบคุณคุณสำหรับการเข้า ร่วมเราในการเดินทางของ Rust นี้ คุณตอนนี้พร้อมที่จะ implement project Rust ของตัวเองและช่วย project ของคนอื่น จำในใจว่ามี community ที่ ต้อนรับของ Rustacean อื่นที่ยินดีช่วยคุณกับความท้าทายใดที่คุณเจอใน การเดินทาง Rust ของคุณ