Graceful Shutdown และ Cleanup
โค้ดใน Listing 21-20 กำลังตอบกับ request แบบ asynchronous ผ่านการใช้
thread pool ตามที่เราตั้งใจ เราได้ warning บ้างเกี่ยวกับ field
workers, id และ thread ที่เราไม่ได้ใช้ในวิธีโดยตรงที่เตือนเรา
ว่าเราไม่ได้ cleanup อะไร เมื่อเราใช้เมธอด ctrl-C
ที่ไม่ elegant เพื่อหยุดเธรดหลัก เธรดอื่นทั้งหมดถูกหยุดทันทีด้วย แม้
พวกมันอยู่กลาง serve request
ถัดไป เราจะ implement trait Drop เพื่อเรียก join บนแต่ละเธรดใน
pool เพื่อให้พวกมัน finish request ที่พวกมันกำลังทำก่อนปิด แล้ว เราจะ
implement วิธีบอกเธรดให้พวกมันควรหยุดยอมรับ request ใหม่และ shut
down เพื่อเห็นโค้ดนี้ในการ action เราจะแก้ server ของเราให้ยอมรับเพียง
สอง request ก่อน graceful shut down thread pool ของมัน
สิ่งหนึ่งที่ต้องสังเกตขณะเราไป — ไม่มีของนี้กระทบส่วนของโค้ดที่จัดการ execute closure ดังนั้นทุกอย่างที่นี่จะเหมือนกันถ้าเรากำลังใช้ thread pool สำหรับ async runtime
Implement Trait Drop บน ThreadPool
มาเริ่มด้วยการ implement Drop บน thread pool ของเรา เมื่อ pool ถูก
drop เธรดของเราควร join ทั้งหมดเพื่อให้แน่ใจว่าพวกมัน finish งานของ
พวกมัน Listing 21-22 แสดงความพยายามแรกที่ implementation Drop —
โค้ดนี้ยังไม่ค่อยทำงาน
use std::{
sync::{Arc, Mutex, mpsc},
thread,
};
pub struct ThreadPool {
workers: Vec<Worker>,
sender: mpsc::Sender<Job>,
}
type Job = Box<dyn FnOnce() + Send + 'static>;
impl ThreadPool {
/// Create a new ThreadPool.
///
/// The size is the number of threads in the pool.
///
/// # Panics
///
/// The `new` function will panic if the size is zero.
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let (sender, receiver) = mpsc::channel();
let receiver = Arc::new(Mutex::new(receiver));
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id, Arc::clone(&receiver)));
}
ThreadPool { workers, sender }
}
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
let job = Box::new(f);
self.sender.send(job).unwrap();
}
}
impl Drop for ThreadPool {
fn drop(&mut self) {
for worker in &mut self.workers {
println!("Shutting down worker {}", worker.id);
worker.thread.join().unwrap();
}
}
}
struct Worker {
id: usize,
thread: thread::JoinHandle<()>,
}
impl Worker {
fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
let thread = thread::spawn(move || {
loop {
let job = receiver.lock().unwrap().recv().unwrap();
println!("Worker {id} got a job; executing.");
job();
}
});
Worker { id, thread }
}
}
ก่อนอื่น เรา loop ผ่านแต่ละ workers ของ thread pool เราใช้ &mut
สำหรับสิ่งนี้เพราะ self คือ mutable reference และเรายังต้องสามารถ
mutate worker สำหรับแต่ละ worker เรา print ข้อความบอกว่า instance
Worker เฉพาะนี้กำลัง shutting down และแล้วเราเรียก join บนเธรด
ของ instance Worker นั้น ถ้าการเรียก join fail เราใช้ unwrap
เพื่อทำให้ Rust panic และเข้าไปใน shutdown ที่ไม่ graceful
นี่คือ error ที่เราได้เมื่อเรา compile โค้ดนี้:
$ cargo check
Checking hello v0.1.0 (file:///projects/hello)
error[E0507]: cannot move out of `worker.thread` which is behind a mutable reference
--> src/lib.rs:52:13
|
52 | worker.thread.join().unwrap();
| ^^^^^^^^^^^^^ ------ `worker.thread` moved due to this method call
| |
| move occurs because `worker.thread` has type `JoinHandle<()>`, which does not implement the `Copy` trait
|
note: `JoinHandle::<T>::join` takes ownership of the receiver `self`, which moves `worker.thread`
--> /rustc/1159e78c4747b02ef996e55082b704c09b970588/library/std/src/thread/mod.rs:1921:17
For more information about this error, try `rustc --explain E0507`.
error: could not compile `hello` (lib) due to 1 previous error
error บอกเราว่าเราไม่สามารถเรียก join เพราะเราเพียงมี mutable
borrow ของแต่ละ worker และ join รับ ownership ของ argument ของมัน
เพื่อแก้ปัญหานี้ เราต้องย้ายเธรดออกจาก instance Worker ที่เป็นเจ้าของ
thread เพื่อให้ join consume เธรดได้ วิธีหนึ่งในการทำสิ่งนี้คือ
รับแนวทางเดียวกับที่เรารับใน Listing 18-15 ถ้า Worker ถือ
Option<thread::JoinHandle<()>> เราเรียกเมธอด take บน Option
เพื่อย้ายค่าออกจาก variant Some และทิ้ง variant None ในที่ของมัน
ได้ ในคำพูดอื่น Worker ที่กำลังรันจะมี variant Some ใน thread
และเมื่อเราต้องการ cleanup Worker เราจะแทน Some ด้วย None เพื่อ
ให้ Worker ไม่มีเธรดที่จะรัน
อย่างไรก็ตาม เวลา เดียว ที่นี่จะมาคือเมื่อ drop Worker แลกกับนั้น
เราจะต้องจัดการกับ Option<thread::JoinHandle<()>> ที่ใดที่เราเข้า
ถึง worker.thread Idiomatic Rust ใช้ Option ค่อนข้างเยอะ แต่เมื่อ
คุณพบตัวเองห่อบางอย่างที่คุณรู้จะมีอยู่เสมอใน Option เป็น workaround
แบบนี้ มันคือไอเดียดีที่จะมองหาแนวทางทางเลือกเพื่อทำให้โค้ดของคุณ
สะอาดกว่าและเสี่ยง error น้อยกว่า
ในกรณีนี้ ทางเลือกที่ดีกว่ามีอยู่ — เมธอด Vec::drain มันรับ parameter
range เพื่อระบุ item ใดที่จะลบจาก vector และ return iterator ของ item
เหล่านั้น ส่ง syntax range .. จะลบ ทุก ค่าจาก vector
ดังนั้น เราต้องอัพเดท implementation drop ของ ThreadPool แบบนี้:
#![allow(unused)]
fn main() {
use std::{
sync::{Arc, Mutex, mpsc},
thread,
};
pub struct ThreadPool {
workers: Vec<Worker>,
sender: mpsc::Sender<Job>,
}
type Job = Box<dyn FnOnce() + Send + 'static>;
impl ThreadPool {
/// Create a new ThreadPool.
///
/// The size is the number of threads in the pool.
///
/// # Panics
///
/// The `new` function will panic if the size is zero.
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let (sender, receiver) = mpsc::channel();
let receiver = Arc::new(Mutex::new(receiver));
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id, Arc::clone(&receiver)));
}
ThreadPool { workers, sender }
}
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
let job = Box::new(f);
self.sender.send(job).unwrap();
}
}
impl Drop for ThreadPool {
fn drop(&mut self) {
for worker in self.workers.drain(..) {
println!("Shutting down worker {}", worker.id);
worker.thread.join().unwrap();
}
}
}
struct Worker {
id: usize,
thread: thread::JoinHandle<()>,
}
impl Worker {
fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
let thread = thread::spawn(move || {
loop {
let job = receiver.lock().unwrap().recv().unwrap();
println!("Worker {id} got a job; executing.");
job();
}
});
Worker { id, thread }
}
}
}
นี่แก้ error compiler และไม่ต้องการการเปลี่ยนแปลงอื่นใดให้โค้ดของเรา สังเกตว่า เพราะ drop สามารถถูกเรียกเมื่อ panic, unwrap ก็ panic ได้ และสาเหตุ double panic ซึ่งทันที crash โปรแกรมและจบ cleanup ใดที่ กำลังดำเนิน นี่ okay สำหรับโปรแกรมตัวอย่าง แต่มันไม่แนะนำสำหรับโค้ด production
Signal ให้เธรดหยุดฟังสำหรับ Job
ด้วยการเปลี่ยนแปลงทั้งหมดที่เราทำ โค้ดของเรา compile โดยไม่มี warning
ใด อย่างไรก็ตาม ข่าวร้ายคือโค้ดนี้ยังไม่ทำงานในวิธีที่เราต้องการ key
คือ logic ใน closure ที่รันโดยเธรดของ instance Worker — ที่ขณะนี้
เราเรียก join แต่นั่นจะไม่ shut down เธรด เพราะพวกมัน loop ตลอด
ไปมองหา job ถ้าเราพยายาม drop ThreadPool ของเรากับ implementation
ปัจจุบันของเราของ drop เธรดหลักจะ block ตลอดไป รอเธรดแรก finish
เพื่อ fix ปัญหานี้ เราจะต้องการการเปลี่ยนแปลงใน implementation drop
ของ ThreadPool และแล้วการเปลี่ยนแปลงใน loop ของ Worker
ก่อนอื่น เราจะเปลี่ยน implementation drop ของ ThreadPool ให้
drop sender อย่างชัดเจนก่อนรอเธรด finish Listing 21-23 แสดงการ
เปลี่ยนแปลงให้ ThreadPool เพื่อ drop sender อย่างชัดเจน ต่างจาก
กรณีของเธรด ที่นี่เรา ต้อง ใช้ Option เพื่อให้สามารถย้าย sender
ออกจาก ThreadPool กับ Option::take
use std::{
sync::{Arc, Mutex, mpsc},
thread,
};
pub struct ThreadPool {
workers: Vec<Worker>,
sender: Option<mpsc::Sender<Job>>,
}
// --snip--
type Job = Box<dyn FnOnce() + Send + 'static>;
impl ThreadPool {
/// Create a new ThreadPool.
///
/// The size is the number of threads in the pool.
///
/// # Panics
///
/// The `new` function will panic if the size is zero.
pub fn new(size: usize) -> ThreadPool {
// --snip--
assert!(size > 0);
let (sender, receiver) = mpsc::channel();
let receiver = Arc::new(Mutex::new(receiver));
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id, Arc::clone(&receiver)));
}
ThreadPool {
workers,
sender: Some(sender),
}
}
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
let job = Box::new(f);
self.sender.as_ref().unwrap().send(job).unwrap();
}
}
impl Drop for ThreadPool {
fn drop(&mut self) {
drop(self.sender.take());
for worker in self.workers.drain(..) {
println!("Shutting down worker {}", worker.id);
worker.thread.join().unwrap();
}
}
}
struct Worker {
id: usize,
thread: thread::JoinHandle<()>,
}
impl Worker {
fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
let thread = thread::spawn(move || {
loop {
let job = receiver.lock().unwrap().recv().unwrap();
println!("Worker {id} got a job; executing.");
job();
}
});
Worker { id, thread }
}
}
sender อย่างชัดเจนก่อน join เธรด WorkerDrop sender ปิด channel ซึ่งบ่งบอกว่าจะไม่มีข้อความถูกส่งอีก เมื่อ
นั้นเกิด การเรียก recv ทั้งหมดที่ instance Worker ทำใน loop
infinite จะ return error ใน Listing 21-24 เราเปลี่ยน loop Worker
ให้ exit loop graceful ในกรณีนั้น ซึ่งหมายความว่าเธรดจะ finish เมื่อ
implementation drop ของ ThreadPool เรียก join บนพวกมัน
use std::{
sync::{Arc, Mutex, mpsc},
thread,
};
pub struct ThreadPool {
workers: Vec<Worker>,
sender: Option<mpsc::Sender<Job>>,
}
type Job = Box<dyn FnOnce() + Send + 'static>;
impl ThreadPool {
/// Create a new ThreadPool.
///
/// The size is the number of threads in the pool.
///
/// # Panics
///
/// The `new` function will panic if the size is zero.
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let (sender, receiver) = mpsc::channel();
let receiver = Arc::new(Mutex::new(receiver));
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id, Arc::clone(&receiver)));
}
ThreadPool {
workers,
sender: Some(sender),
}
}
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
let job = Box::new(f);
self.sender.as_ref().unwrap().send(job).unwrap();
}
}
impl Drop for ThreadPool {
fn drop(&mut self) {
drop(self.sender.take());
for worker in self.workers.drain(..) {
println!("Shutting down worker {}", worker.id);
worker.thread.join().unwrap();
}
}
}
struct Worker {
id: usize,
thread: thread::JoinHandle<()>,
}
impl Worker {
fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
let thread = thread::spawn(move || {
loop {
let message = receiver.lock().unwrap().recv();
match message {
Ok(job) => {
println!("Worker {id} got a job; executing.");
job();
}
Err(_) => {
println!("Worker {id} disconnected; shutting down.");
break;
}
}
}
});
Worker { id, thread }
}
}
recv return errorเพื่อเห็นโค้ดนี้ในการ action มาแก้ main เพื่อยอมรับเพียงสอง request
ก่อน graceful shut down server ดังที่แสดงใน Listing 21-25
use hello::ThreadPool;
use std::{
fs,
io::{BufReader, prelude::*},
net::{TcpListener, TcpStream},
thread,
time::Duration,
};
fn main() {
let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
let pool = ThreadPool::new(4);
for stream in listener.incoming().take(2) {
let stream = stream.unwrap();
pool.execute(|| {
handle_connection(stream);
});
}
println!("Shutting down.");
}
fn handle_connection(mut stream: TcpStream) {
let buf_reader = BufReader::new(&stream);
let request_line = buf_reader.lines().next().unwrap().unwrap();
let (status_line, filename) = match &request_line[..] {
"GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "hello.html"),
"GET /sleep HTTP/1.1" => {
thread::sleep(Duration::from_secs(5));
("HTTP/1.1 200 OK", "hello.html")
}
_ => ("HTTP/1.1 404 NOT FOUND", "404.html"),
};
let contents = fs::read_to_string(filename).unwrap();
let length = contents.len();
let response =
format!("{status_line}\r\nContent-Length: {length}\r\n\r\n{contents}");
stream.write_all(response.as_bytes()).unwrap();
}
คุณจะไม่ต้องการให้ web server โลกจริง shut down หลัง serve เพียงสอง request โค้ดนี้เพียงสาธิตว่า graceful shutdown และ cleanup อยู่ใน state ทำงาน
เมธอด take ถูกนิยามใน trait Iterator และ limit iteration ให้สอง
item แรกที่สุด ThreadPool จะออกจาก scope ที่ตอนจบของ main และ
implementation drop จะรัน
เริ่ม server กับ cargo run และทำสาม request request ที่สามควร error
และใน terminal ของคุณ คุณควรเห็น output คล้ายกับนี้:
$ cargo run
Compiling hello v0.1.0 (file:///projects/hello)
Finished `dev` profile [unoptimized + debuginfo] target(s) in 0.41s
Running `target/debug/hello`
Worker 0 got a job; executing.
Shutting down.
Shutting down worker 0
Worker 3 got a job; executing.
Worker 1 disconnected; shutting down.
Worker 2 disconnected; shutting down.
Worker 3 disconnected; shutting down.
Worker 0 disconnected; shutting down.
Shutting down worker 1
Shutting down worker 2
Shutting down worker 3
คุณอาจเห็น ordering ต่างของ Worker ID และข้อความที่ print เราเห็นได้
ว่าโค้ดนี้ทำงานยังไงจากข้อความ — instance Worker 0 และ 3 ได้สอง
request แรก Server หยุดยอมรับ connection หลัง connection ที่สอง และ
implementation Drop บน ThreadPool เริ่ม execute ก่อนที่ Worker 3
จะเริ่ม job ของมันด้วยซ้ำ Drop sender disconnect instance Worker
ทั้งหมดและบอกพวกมันให้ shut down instance Worker แต่ละ print
ข้อความเมื่อพวกมัน disconnect และแล้ว thread pool เรียก join เพื่อ
รอแต่ละเธรด Worker finish
สังเกตหนึ่งแง่มุมที่น่าสนใจของการ execute เฉพาะนี้ — ThreadPool
drop sender และก่อน Worker ใดรับ error เราพยายาม join Worker 0
Worker 0 ยังไม่ได้ error จาก recv ดังนั้นเธรดหลัก block รอ
Worker 0 finish ในระหว่าง Worker 3 รับ job และแล้วเธรดทั้งหมดรับ
error เมื่อ Worker 0 finish เธรดหลักรอ instance Worker ที่เหลือ
finish ที่จุดนั้น พวกมัน exit loop ของพวกมันและหยุดทั้งหมด
ยินดีด้วย! เรา complete project ของเราแล้ว — เรามี web server พื้นฐานที่ ใช้ thread pool เพื่อตอบแบบ asynchronous เราสามารถทำ graceful shutdown ของ server ซึ่ง cleanup เธรดทั้งหมดใน pool
นี่คือโค้ดเต็มสำหรับ reference:
use hello::ThreadPool;
use std::{
fs,
io::{BufReader, prelude::*},
net::{TcpListener, TcpStream},
thread,
time::Duration,
};
fn main() {
let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
let pool = ThreadPool::new(4);
for stream in listener.incoming().take(2) {
let stream = stream.unwrap();
pool.execute(|| {
handle_connection(stream);
});
}
println!("Shutting down.");
}
fn handle_connection(mut stream: TcpStream) {
let buf_reader = BufReader::new(&stream);
let request_line = buf_reader.lines().next().unwrap().unwrap();
let (status_line, filename) = match &request_line[..] {
"GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "hello.html"),
"GET /sleep HTTP/1.1" => {
thread::sleep(Duration::from_secs(5));
("HTTP/1.1 200 OK", "hello.html")
}
_ => ("HTTP/1.1 404 NOT FOUND", "404.html"),
};
let contents = fs::read_to_string(filename).unwrap();
let length = contents.len();
let response =
format!("{status_line}\r\nContent-Length: {length}\r\n\r\n{contents}");
stream.write_all(response.as_bytes()).unwrap();
}
use std::{
sync::{Arc, Mutex, mpsc},
thread,
};
pub struct ThreadPool {
workers: Vec<Worker>,
sender: Option<mpsc::Sender<Job>>,
}
type Job = Box<dyn FnOnce() + Send + 'static>;
impl ThreadPool {
/// Create a new ThreadPool.
///
/// The size is the number of threads in the pool.
///
/// # Panics
///
/// The `new` function will panic if the size is zero.
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let (sender, receiver) = mpsc::channel();
let receiver = Arc::new(Mutex::new(receiver));
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id, Arc::clone(&receiver)));
}
ThreadPool {
workers,
sender: Some(sender),
}
}
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
let job = Box::new(f);
self.sender.as_ref().unwrap().send(job).unwrap();
}
}
impl Drop for ThreadPool {
fn drop(&mut self) {
drop(self.sender.take());
for worker in &mut self.workers {
println!("Shutting down worker {}", worker.id);
if let Some(thread) = worker.thread.take() {
thread.join().unwrap();
}
}
}
}
struct Worker {
id: usize,
thread: Option<thread::JoinHandle<()>>,
}
impl Worker {
fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
let thread = thread::spawn(move || {
loop {
let message = receiver.lock().unwrap().recv();
match message {
Ok(job) => {
println!("Worker {id} got a job; executing.");
job();
}
Err(_) => {
println!("Worker {id} disconnected; shutting down.");
break;
}
}
}
});
Worker {
id,
thread: Some(thread),
}
}
}
เราทำเพิ่มที่นี่ได้! ถ้าคุณต้องการดำเนินปรับปรุง project นี้ นี่คือ ไอเดียบางอย่าง:
- เพิ่ม documentation เพิ่มให้
ThreadPoolและเมธอด public ของมัน - เพิ่ม test ของ functionality ของ library
- เปลี่ยนการเรียก
unwrapเป็นการจัดการ error ที่ robust กว่า - ใช้
ThreadPoolเพื่อทำงานอื่นนอกจากการ serve web request - หา crate thread pool บน crates.io และ implement web server คล้ายกันโดยใช้ crate แทน แล้ว เปรียบเทียบ API และ robustness ของมันกับ thread pool ที่เรา implement
สรุป
ทำได้ดี! คุณได้มาถึงจุดจบของหนังสือ! เราต้องการขอบคุณคุณสำหรับการเข้า ร่วมเราในการเดินทางของ Rust นี้ คุณตอนนี้พร้อมที่จะ implement project Rust ของตัวเองและช่วย project ของคนอื่น จำในใจว่ามี community ที่ ต้อนรับของ Rustacean อื่นที่ยินดีช่วยคุณกับความท้าทายใดที่คุณเจอใน การเดินทาง Rust ของคุณ