Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

จาก Server แบบ Single-Threaded ไป Multithreaded

ตอนนี้ server จะ process แต่ละ request ตามลำดับ หมายความว่ามันจะไม่ process connection ที่สองจนกว่า connection แรก finish process ถ้า server รับ request มากขึ้น ๆ การ execute serial นี้จะ optimal น้อยลง และน้อยลง ถ้า server รับ request ที่ใช้เวลานานในการ process, request ตามมาจะต้องรอจนกระทั่ง request ยาว finish แม้ request ใหม่ถูก process ได้เร็ว เราจะต้อง fix นี่ แต่ก่อนอื่นเราจะดูที่ปัญหาในการ action

จำลอง Request ช้า

เราจะดูว่า request ที่ process ช้าจะส่งผลกับ request อื่นที่ทำกับ implementation server ปัจจุบันของเรายังไง Listing 21-10 implement การ จัดการ request ไปยัง /sleep ด้วย response ช้าจำลองที่จะทำให้ server sleep ห้าวินาทีก่อนตอบ

Filename: src/main.rs
use std::{
    fs,
    io::{BufReader, prelude::*},
    net::{TcpListener, TcpStream},
    thread,
    time::Duration,
};
// --snip--

fn main() {
    let listener = TcpListener::bind("127.0.0.1:7878").unwrap();

    for stream in listener.incoming() {
        let stream = stream.unwrap();

        handle_connection(stream);
    }
}

fn handle_connection(mut stream: TcpStream) {
    // --snip--

    let buf_reader = BufReader::new(&stream);
    let request_line = buf_reader.lines().next().unwrap().unwrap();

    let (status_line, filename) = match &request_line[..] {
        "GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "hello.html"),
        "GET /sleep HTTP/1.1" => {
            thread::sleep(Duration::from_secs(5));
            ("HTTP/1.1 200 OK", "hello.html")
        }
        _ => ("HTTP/1.1 404 NOT FOUND", "404.html"),
    };

    // --snip--

    let contents = fs::read_to_string(filename).unwrap();
    let length = contents.len();

    let response =
        format!("{status_line}\r\nContent-Length: {length}\r\n\r\n{contents}");

    stream.write_all(response.as_bytes()).unwrap();
}
Listing 21-10: จำลอง request ช้าโดย sleep ห้าวินาที

เรา switch จาก if ไป match ตอนนี้เรามีสามกรณี เราต้องอย่างชัดเจน match บน slice ของ request_line เพื่อ pattern-match กับค่า string literal — match ไม่ทำการ referencing และ dereferencing อัตโนมัติ แบบที่เมธอด equality ทำ

arm แรกเหมือนกับ block if จาก Listing 21-9 arm ที่สอง match request ไปยัง /sleep เมื่อ request นั้นถูกรับ server จะ sleep ห้าวินาทีก่อน render หน้า HTML success arm ที่สามเหมือนกับ block else จาก Listing 21-9

คุณเห็นได้ว่า server ของเราดั้งเดิมแค่ไหน — library จริงจะจัดการการ จำแนกหลาย request ในวิธี verbose น้อยกว่ามาก!

เริ่ม server โดยใช้ cargo run แล้ว เปิดสอง browser window — หนึ่ง สำหรับ http://127.0.0.1:7878 และอีกอันสำหรับ http://127.0.0.1:7878/sleep ถ้าคุณใส่ URI / ไม่กี่ครั้ง เหมือนก่อน คุณจะเห็นมันตอบเร็ว แต่ถ้าคุณใส่ /sleep แล้วโหลด / คุณจะเห็นว่า / รอจนกว่า sleep ได้ sleep เต็มห้าวินาทีของมันก่อนโหลด

มีหลายเทคนิคที่เราใช้เพื่อหลีกเลี่ยง request backing up หลัง request ช้าได้ รวมการใช้ async อย่างที่เราทำในบทที่ 17 — อันที่เราจะ implement คือ thread pool

ปรับปรุง Throughput ด้วย Thread Pool

thread pool คือกลุ่มของ spawn เธรดที่พร้อมและรอจัดการงาน เมื่อ โปรแกรมรับงานใหม่ มัน assign หนึ่งในเธรดใน pool ให้งาน และเธรดนั้นจะ process งาน เธรดที่เหลือใน pool ใช้ได้เพื่อจัดการงานอื่นใดที่เข้ามาขณะ เธรดแรก process อยู่ เมื่อเธรดแรกเสร็จ process งานของมัน มันถูก return ไปยัง pool ของเธรดที่ว่าง พร้อมจัดการงานใหม่ Thread pool อนุญาตให้คุณ process connection พร้อมกัน เพิ่ม throughput ของ server ของคุณ

เราจะ limit จำนวนของเธรดใน pool ให้จำนวนเล็กเพื่อปกป้องเราจากการโจมตี DoS — ถ้าเราให้โปรแกรมของเราสร้างเธรดใหม่สำหรับแต่ละ request เมื่อมัน เข้ามา ใครสักคนทำ 10 ล้าน request ให้ server ของเราอาจสร้างความ เสียหายโดยใช้ resource ทั้งหมดของ server ของเราหมดและทำให้การ process request หยุดได้

แทนการ spawn เธรดไม่จำกัด เราจะมีจำนวนคงที่ของเธรดที่รอใน pool Request ที่เข้ามาถูกส่งให้ pool สำหรับ process Pool จะ maintain queue ของ request ที่เข้ามา แต่ละเธรดใน pool จะ pop request จาก queue นี้ จัดการ request และแล้วถามให้ queue สำหรับ request อื่น ด้วยการออกแบบนี้ เรา process ได้ถึง N request พร้อมกัน ที่ N คือจำนวนของเธรด ถ้าแต่ละเธรดกำลังตอบกับ request ที่รันนาน, request ตามมายังสามารถ back up ใน queue ได้ แต่เราเพิ่มจำนวนของ request ที่รันนานที่เราจัดการ ได้ก่อนถึงจุดนั้น

เทคนิคนี้เป็นเพียงหนึ่งในหลายวิธีในการปรับปรุง throughput ของ web server ตัวเลือกอื่นที่คุณอาจสำรวจคือ model fork/join, model async I/O แบบ single-threaded และ model async I/O แบบ multithreaded ถ้าคุณสนใจ ในหัวข้อนี้ คุณอ่านเพิ่มเกี่ยวกับวิธีแก้อื่นและพยายาม implement พวก มันได้ — กับภาษาระดับต่ำเช่น Rust ตัวเลือกเหล่านี้ทั้งหมดเป็นไปได้

ก่อนเราเริ่ม implement thread pool มาพูดเกี่ยวกับสิ่งที่ใช้ pool ควร ดูเหมือน เมื่อคุณกำลังพยายามออกแบบโค้ด เขียน client interface ก่อน ช่วยนำทางการออกแบบของคุณได้ เขียน API ของโค้ดให้มันถูกจัดโครงสร้างใน วิธีที่คุณต้องการเรียกมัน — แล้ว implement functionality ภายใน โครงสร้างนั้นแทนการ implement functionality และแล้วออกแบบ public API

คล้ายกับวิธีที่เราใช้ test-driven development ใน project ในบทที่ 12 เราจะใช้ compiler-driven development ที่นี่ เราจะเขียนโค้ดที่เรียก ฟังก์ชันที่เราต้องการ และแล้วเราจะดูที่ error จาก compiler เพื่อ ตัดสินว่าเราควรเปลี่ยนอะไรถัดไปเพื่อให้โค้ดทำงาน อย่างไรก็ตาม ก่อน เราทำสิ่งนั้น เราจะสำรวจเทคนิคที่เราจะไม่ใช้เป็นจุดเริ่มต้น

Spawn เธรดสำหรับแต่ละ Request

ก่อนอื่น มาสำรวจว่าโค้ดของเราอาจดูยังไงถ้ามันสร้างเธรดใหม่สำหรับทุก connection ตามที่กล่าวก่อนหน้า นี่ไม่ใช่แผนสุดท้ายของเราเพราะปัญหา กับการ spawn จำนวนเธรดไม่จำกัด แต่มันเป็นจุดเริ่มต้นเพื่อได้ server แบบ multithreaded ทำงานก่อน แล้ว เราจะเพิ่ม thread pool เป็นการ ปรับปรุง และเปรียบเทียบสองวิธีแก้จะง่ายขึ้น

Listing 21-11 แสดงการเปลี่ยนแปลงที่จะทำกับ main เพื่อ spawn เธรดใหม่ ในการจัดการแต่ละ stream ภายใน loop for

Filename: src/main.rs
use std::{
    fs,
    io::{BufReader, prelude::*},
    net::{TcpListener, TcpStream},
    thread,
    time::Duration,
};

fn main() {
    let listener = TcpListener::bind("127.0.0.1:7878").unwrap();

    for stream in listener.incoming() {
        let stream = stream.unwrap();

        thread::spawn(|| {
            handle_connection(stream);
        });
    }
}

fn handle_connection(mut stream: TcpStream) {
    let buf_reader = BufReader::new(&stream);
    let request_line = buf_reader.lines().next().unwrap().unwrap();

    let (status_line, filename) = match &request_line[..] {
        "GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "hello.html"),
        "GET /sleep HTTP/1.1" => {
            thread::sleep(Duration::from_secs(5));
            ("HTTP/1.1 200 OK", "hello.html")
        }
        _ => ("HTTP/1.1 404 NOT FOUND", "404.html"),
    };

    let contents = fs::read_to_string(filename).unwrap();
    let length = contents.len();

    let response =
        format!("{status_line}\r\nContent-Length: {length}\r\n\r\n{contents}");

    stream.write_all(response.as_bytes()).unwrap();
}
Listing 21-11: Spawn เธรดใหม่สำหรับแต่ละ stream

ตามที่คุณเรียนในบทที่ 16 thread::spawn จะสร้างเธรดใหม่และแล้วรันโค้ด ใน closure ในเธรดใหม่ ถ้าคุณรันโค้ดนี้และโหลด /sleep ใน browser ของ คุณ แล้ว / ในสอง browser tab อื่น คุณจะเห็นจริง ๆ ว่า request ไปยัง / ไม่ต้องรอให้ /sleep finish อย่างไรก็ตาม ตามที่เรากล่าว นี่จะ ในที่สุด overwhelm ระบบเพราะคุณจะทำเธรดใหม่โดยไม่มี limit

คุณอาจจำจากบทที่ 17 ว่านี่คือกรณีที่ async และ await ส่องสว่างจริง ๆ! จำสิ่งนั้นในใจขณะเราสร้าง thread pool และคิดเกี่ยวกับวิธีที่สิ่งต่าง ๆ จะดูต่างหรือเหมือนกับ async

สร้างจำนวนเธรดที่จำกัด

เราต้องการให้ thread pool ของเราทำงานในวิธีคล้ายกัน คุ้นเคยเพื่อให้ การ switch จากเธรดไปยัง thread pool ไม่ต้องการการเปลี่ยนแปลงใหญ่ให้ โค้ดที่ใช้ API ของเรา Listing 21-12 แสดง interface สมมติสำหรับ struct ThreadPool ที่เราต้องการใช้แทน thread::spawn

Filename: src/main.rs
use std::{
    fs,
    io::{BufReader, prelude::*},
    net::{TcpListener, TcpStream},
    thread,
    time::Duration,
};

fn main() {
    let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
    let pool = ThreadPool::new(4);

    for stream in listener.incoming() {
        let stream = stream.unwrap();

        pool.execute(|| {
            handle_connection(stream);
        });
    }
}

fn handle_connection(mut stream: TcpStream) {
    let buf_reader = BufReader::new(&stream);
    let request_line = buf_reader.lines().next().unwrap().unwrap();

    let (status_line, filename) = match &request_line[..] {
        "GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "hello.html"),
        "GET /sleep HTTP/1.1" => {
            thread::sleep(Duration::from_secs(5));
            ("HTTP/1.1 200 OK", "hello.html")
        }
        _ => ("HTTP/1.1 404 NOT FOUND", "404.html"),
    };

    let contents = fs::read_to_string(filename).unwrap();
    let length = contents.len();

    let response =
        format!("{status_line}\r\nContent-Length: {length}\r\n\r\n{contents}");

    stream.write_all(response.as_bytes()).unwrap();
}
Listing 21-12: interface ThreadPool ในอุดมคติของเรา

เราใช้ ThreadPool::new เพื่อสร้าง thread pool ใหม่กับจำนวนเธรดที่ configurable ในกรณีนี้สี่ แล้ว ใน loop for, pool.execute มี interface คล้ายกับ thread::spawn ในที่มันรับ closure ที่ pool ควร รันสำหรับแต่ละ stream เราต้อง implement pool.execute ให้มันรับ closure และให้มันกับเธรดใน pool เพื่อรัน โค้ดนี้จะยังไม่ compile แต่ เราจะลองเพื่อให้ compiler นำทางเราในวิธี fix มัน

Build ThreadPool ใช้ Compiler-Driven Development

ทำการเปลี่ยนแปลงใน Listing 21-12 ให้ src/main.rs และแล้วมาใช้ error compiler จาก cargo check เพื่อนำพัฒนาของเรา นี่คือ error แรก ที่เราได้:

$ cargo check
    Checking hello v0.1.0 (file:///projects/hello)
error[E0433]: failed to resolve: use of undeclared type `ThreadPool`
  --> src/main.rs:11:16
   |
11 |     let pool = ThreadPool::new(4);
   |                ^^^^^^^^^^ use of undeclared type `ThreadPool`

For more information about this error, try `rustc --explain E0433`.
error: could not compile `hello` (bin "hello") due to 1 previous error

ยอดเยี่ยม! error นี้บอกเราเราต้องการ type หรือโมดูล ThreadPool ดังนั้นเราจะ build อันหนึ่งตอนนี้ Implementation ThreadPool ของเรา จะอิสระจากชนิดของงานที่ web server ของเรากำลังทำ ดังนั้น มา switch crate hello จาก binary crate เป็น library crate เพื่อบรรจุ implementation ThreadPool ของเรา หลังเราเปลี่ยนไปยัง library crate เรายังใช้ library thread pool แยกสำหรับงานใดที่เราต้องการทำโดยใช้ thread pool ได้ ไม่เพียงสำหรับการ serve web request

สร้างไฟล์ src/lib.rs ที่บรรจุต่อไปนี้ ซึ่งคือนิยามง่ายที่สุดของ struct ThreadPool ที่เรามีตอนนี้ได้:

Filename: src/lib.rs
pub struct ThreadPool;

แล้ว แก้ไฟล์ main.rs เพื่อนำ ThreadPool เข้า scope จาก library crate โดยเพิ่มโค้ดต่อไปนี้ที่ด้านบนของ src/main.rs:

Filename: src/main.rs
use hello::ThreadPool;
use std::{
    fs,
    io::{BufReader, prelude::*},
    net::{TcpListener, TcpStream},
    thread,
    time::Duration,
};

fn main() {
    let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
    let pool = ThreadPool::new(4);

    for stream in listener.incoming() {
        let stream = stream.unwrap();

        pool.execute(|| {
            handle_connection(stream);
        });
    }
}

fn handle_connection(mut stream: TcpStream) {
    let buf_reader = BufReader::new(&stream);
    let request_line = buf_reader.lines().next().unwrap().unwrap();

    let (status_line, filename) = match &request_line[..] {
        "GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "hello.html"),
        "GET /sleep HTTP/1.1" => {
            thread::sleep(Duration::from_secs(5));
            ("HTTP/1.1 200 OK", "hello.html")
        }
        _ => ("HTTP/1.1 404 NOT FOUND", "404.html"),
    };

    let contents = fs::read_to_string(filename).unwrap();
    let length = contents.len();

    let response =
        format!("{status_line}\r\nContent-Length: {length}\r\n\r\n{contents}");

    stream.write_all(response.as_bytes()).unwrap();
}

โค้ดนี้ยังไม่ทำงาน แต่มา check มันอีกเพื่อได้ error ถัดไปที่เราต้อง address:

$ cargo check
    Checking hello v0.1.0 (file:///projects/hello)
error[E0599]: no function or associated item named `new` found for struct `ThreadPool` in the current scope
  --> src/main.rs:12:28
   |
12 |     let pool = ThreadPool::new(4);
   |                            ^^^ function or associated item not found in `ThreadPool`

For more information about this error, try `rustc --explain E0599`.
error: could not compile `hello` (bin "hello") due to 1 previous error

error นี้บ่งบอกว่าถัดไปเราต้องสร้าง associated function ชื่อ new สำหรับ ThreadPool เรารู้ด้วยว่า new ต้องมีหนึ่ง parameter ที่รับ 4 เป็น argument ได้และควร return instance ThreadPool มา implement ฟังก์ชัน new ง่ายที่สุดที่จะมีลักษณะเหล่านั้น:

Filename: src/lib.rs
pub struct ThreadPool;

impl ThreadPool {
    pub fn new(size: usize) -> ThreadPool {
        ThreadPool
    }
}

เราเลือก usize เป็น type ของ parameter size เพราะเรารู้ว่าจำนวน เธรดที่ติดลบไม่สมเหตุสมผล เรารู้ด้วยว่าเราจะใช้ 4 นี้เป็นจำนวนของ element ใน collection ของเธรด ซึ่งคือสิ่งที่ type usize มีไว้ ตาม ที่พูดถึงในส่วน “Type Integer” ในบท ที่ 3

มา check โค้ดอีก:

$ cargo check
    Checking hello v0.1.0 (file:///projects/hello)
error[E0599]: no method named `execute` found for struct `ThreadPool` in the current scope
  --> src/main.rs:17:14
   |
17 |         pool.execute(|| {
   |         -----^^^^^^^ method not found in `ThreadPool`

For more information about this error, try `rustc --explain E0599`.
error: could not compile `hello` (bin "hello") due to 1 previous error

ตอนนี้ error เกิดเพราะเราไม่มีเมธอด execute บน ThreadPool จำจาก ส่วน “สร้างจำนวนเธรดที่จำกัด” ว่าเราตัดสินใจว่า thread pool ของเราควรมี interface คล้ายกับ thread::spawn นอกจากนี้ เราจะ implement ฟังก์ชัน execute ให้มันรับ closure ที่มันได้และให้มันกับเธรดที่ว่างใน pool เพื่อรัน

เราจะนิยามเมธอด execute บน ThreadPool เพื่อรับ closure เป็น parameter จำจาก “ย้ายค่าที่ Capture ออกจาก Closure” ในบทที่ 13 ว่าเรารับ closure เป็น parameter กับสาม trait ต่างกันได้ — Fn, FnMut และ FnOnce เราต้องตัดสินว่าชนิดของ closure ใดที่ใช้ที่นี่ เรารู้เราจะ ลงเอยทำสิ่งคล้ายกับ implementation thread::spawn ของ standard library ดังนั้นเราดูที่ bound ใดที่ signature ของ thread::spawn มีบน parameter ของมัน Documentation แสดงเราต่อไปนี้:

pub fn spawn<F, T>(f: F) -> JoinHandle<T>
    where
        F: FnOnce() -> T,
        F: Send + 'static,
        T: Send + 'static,

type parameter F คือสิ่งที่เรากังวลที่นี่ — type parameter T เกี่ยวกับ return value และเราไม่กังวลกับนั้น เราเห็นว่า spawn ใช้ FnOnce เป็น trait bound บน F นี่น่าจะเป็นสิ่งที่เราต้องการด้วย เพราะเราจะในที่สุดส่ง argument ที่เราได้ใน execute ให้ spawn เรา มั่นใจเพิ่มได้ว่า FnOnce คือ trait ที่เราต้องการใช้เพราะเธรดสำหรับ รัน request จะ execute closure ของ request นั้นเพียงครั้งเดียว ซึ่ง match Once ใน FnOnce

type parameter F ก็มี trait bound Send และ lifetime bound 'static ซึ่งมีประโยชน์ในสถานการณ์ของเรา — เราต้องการ Send เพื่อ transfer closure จากเธรดหนึ่งไปยังอีกอันและ 'static เพราะเราไม่รู้ ว่าเธรดจะใช้เวลานานเท่าไหร่ในการ execute มาสร้างเมธอด execute บน ThreadPool ที่จะรับ generic parameter ของ type F กับ bound เหล่านี้:

Filename: src/lib.rs
pub struct ThreadPool;

impl ThreadPool {
    // --snip--
    pub fn new(size: usize) -> ThreadPool {
        ThreadPool
    }

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
    }
}

เรายังใช้ () หลัง FnOnce เพราะ FnOnce นี้ represent closure ที่ รับไม่มี parameter และ return unit type () เหมือนนิยามฟังก์ชัน return type ถูกละจาก signature ได้ แต่แม้เราไม่มี parameter เรายัง ต้องการวงเล็บ

อีกครั้ง นี่คือ implementation ง่ายที่สุดของเมธอด execute — มันไม่ ทำอะไร แต่เรากำลังพยายามทำให้โค้ดของเรา compile เพียง มา check มันอีก:

$ cargo check
    Checking hello v0.1.0 (file:///projects/hello)
    Finished `dev` profile [unoptimized + debuginfo] target(s) in 0.24s

มัน compile! แต่สังเกตว่าถ้าคุณลอง cargo run และทำ request ใน browser คุณจะเห็น error ใน browser ที่เราเห็นที่ตอนเริ่มของบท Library ของเรายังไม่ได้เรียก closure ที่ส่งให้ execute จริง ๆ!

Note: คำพูดที่คุณอาจได้ยินเกี่ยวกับภาษากับ compiler เข้มงวด เช่น Haskell และ Rust คือ “ถ้าโค้ด compile มันทำงาน” แต่คำพูดนี้ไม่จริง universal Project ของเรา compile แต่มันทำอะไรไม่เลย! ถ้าเรากำลัง build project จริง สมบูรณ์ นี่จะเป็นเวลาดีที่จะเริ่มเขียน unit test เพื่อ check ว่าโค้ด compile และ มีพฤติกรรมที่เราต้องการ

พิจารณา — อะไรจะต่างที่นี่ถ้าเราจะ execute future แทน closure?

Validate จำนวนเธรดใน new

เราไม่ได้ทำอะไรกับ parameter ของ new และ execute มา implement body ของฟังก์ชันเหล่านี้ด้วยพฤติกรรมที่เราต้องการ เพื่อเริ่ม มาคิด เกี่ยวกับ new ก่อนหน้าเราเลือก type ที่ไม่มี sign สำหรับ parameter size เพราะ pool กับจำนวนเธรดที่ติดลบไม่สมเหตุสมผล อย่างไรก็ตาม pool กับศูนย์เธรดก็ไม่สมเหตุสมผล แต่ศูนย์เป็น usize valid อย่างสมบูรณ์ เราจะเพิ่มโค้ดเพื่อ check ว่า size มากกว่าศูนย์ก่อนเรา return instance ThreadPool และเราจะให้โปรแกรม panic ถ้ามันรับศูนย์โดยใช้ macro assert! ดังที่แสดงใน Listing 21-13

Filename: src/lib.rs
pub struct ThreadPool;

impl ThreadPool {
    /// Create a new ThreadPool.
    ///
    /// The size is the number of threads in the pool.
    ///
    /// # Panics
    ///
    /// The `new` function will panic if the size is zero.
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        ThreadPool
    }

    // --snip--
    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
    }
}
Listing 21-13: Implement ThreadPool::new ให้ panic ถ้า size คือศูนย์

เรายังเพิ่ม documentation บ้างสำหรับ ThreadPool ของเราด้วย doc comment สังเกตว่าเราตาม practice documentation ดีโดยเพิ่มส่วนที่ระบุ สถานการณ์ที่ฟังก์ชันของเรา panic ได้ ตามที่พูดถึงในบทที่ 14 ลองรัน cargo doc --open และคลิก struct ThreadPool เพื่อดูว่า docs ที่ generate สำหรับ new ดูเหมือนอะไร!

แทนการเพิ่ม macro assert! ตามที่เราทำที่นี่ เราเปลี่ยน new เป็น build และ return Result แบบที่เราทำกับ Config::build ใน project I/O ใน Listing 12-9 ได้ แต่เราตัดสินในกรณีนี้ว่าพยายามสร้าง thread pool โดยไม่มีเธรดควรเป็น error ที่ไม่กู้คืนได้ ถ้าคุณรู้สึก ambitious ลองเขียนฟังก์ชันชื่อ build กับ signature ต่อไปนี้เพื่อเปรียบเทียบ กับฟังก์ชัน new:

pub fn build(size: usize) -> Result<ThreadPool, PoolCreationError> {

สร้างพื้นที่เพื่อเก็บเธรด

ตอนนี้เรามีวิธีรู้ว่าเรามีจำนวนเธรด valid ที่จะเก็บใน pool เราสามารถ สร้างเธรดเหล่านั้นและเก็บพวกมันใน struct ThreadPool ก่อน return struct แต่เรา “เก็บ” เธรดยังไง? มาดูที่ signature thread::spawn อีก:

pub fn spawn<F, T>(f: F) -> JoinHandle<T>
    where
        F: FnOnce() -> T,
        F: Send + 'static,
        T: Send + 'static,

ฟังก์ชัน spawn return JoinHandle<T> ที่ T คือ type ที่ closure return มาลองใช้ JoinHandle ด้วยและดูสิ่งที่เกิดขึ้น ในกรณีของเรา closure ที่เราส่งให้ thread pool จะจัดการ connection และไม่ return อะไร ดังนั้น T จะเป็น unit type ()

โค้ดใน Listing 21-14 จะ compile แต่มันยังไม่สร้างเธรดใด เราเปลี่ยน นิยามของ ThreadPool เพื่อบรรจุ vector ของ instance thread::JoinHandle<()> initialize vector กับ capacity ของ size ตั้ง loop for ที่จะรันโค้ดบางอย่างเพื่อสร้างเธรด และ return instance ThreadPool ที่บรรจุพวกมัน

Filename: src/lib.rs
use std::thread;

pub struct ThreadPool {
    threads: Vec<thread::JoinHandle<()>>,
}

impl ThreadPool {
    // --snip--
    /// Create a new ThreadPool.
    ///
    /// The size is the number of threads in the pool.
    ///
    /// # Panics
    ///
    /// The `new` function will panic if the size is zero.
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let mut threads = Vec::with_capacity(size);

        for _ in 0..size {
            // create some threads and store them in the vector
        }

        ThreadPool { threads }
    }
    // --snip--

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
    }
}
Listing 21-14: สร้าง vector สำหรับ ThreadPool เพื่อบรรจุเธรด

เรานำ std::thread เข้า scope ใน library crate เพราะเรากำลังใช้ thread::JoinHandle เป็น type ของ item ใน vector ใน ThreadPool

เมื่อ size valid ถูกรับ ThreadPool ของเราสร้าง vector ใหม่ที่บรรจุ size item ได้ ฟังก์ชัน with_capacity ทำงานเดียวกับ Vec::new แต่ ด้วยความแตกต่างสำคัญ — มัน pre-allocate พื้นที่ใน vector เพราะเรารู้ ว่าเราต้องเก็บ size element ใน vector การทำ allocation นี้ล่วงหน้า มีประสิทธิภาพมากกว่าใช้ Vec::new เล็กน้อย ซึ่ง resize ตัวเองเมื่อ element ถูกใส่

เมื่อคุณรัน cargo check อีก มันควรสำเร็จ

ส่งโค้ดจาก ThreadPool ไปเธรด

เราทิ้ง comment ใน loop for ใน Listing 21-14 เกี่ยวกับการสร้างเธรด ที่นี่ เราจะดูว่าเราสร้างเธรดยังไงจริง ๆ Standard library ให้ thread::spawn เป็นวิธีสร้างเธรด และ thread::spawn คาดได้โค้ดบาง อย่างที่เธรดควรรันทันทีที่เธรดถูกสร้าง อย่างไรก็ตาม ในกรณีของเรา เรา ต้องการสร้างเธรดและให้พวกมัน รอ โค้ดที่เราจะส่งภายหลัง Implementation ของเธรดของ standard library ไม่รวมวิธีทำสิ่งนั้นใด — เราต้อง implement มันโดยมือ

เราจะ implement พฤติกรรมนี้โดยแนะนำโครงสร้างข้อมูลใหม่ระหว่าง ThreadPool และเธรดที่จะจัดการพฤติกรรมใหม่นี้ เราจะเรียกโครงสร้าง ข้อมูลนี้ Worker ซึ่งคือ term ปกติใน implementation pooling Worker หยิบโค้ดที่ต้องรันและรันโค้ดในเธรดของมัน

คิดถึงคนทำงานในครัวที่ร้านอาหาร — worker รอจนกระทั่ง order มาจากลูกค้า และแล้วพวกเขารับผิดชอบรับ order เหล่านั้นและเติม

แทนการเก็บ vector ของ instance JoinHandle<()> ใน thread pool เรา จะเก็บ instance ของ struct Worker แต่ละ Worker จะเก็บ instance JoinHandle<()> เดียว แล้ว เราจะ implement เมธอดบน Worker ที่จะรับ closure ของโค้ดเพื่อรันและส่งมันให้เธรดที่กำลังรันอยู่แล้วเพื่อ execute เราจะให้แต่ละ Worker id ด้วยเพื่อให้เราแยกระหว่างแต่ละ instance ต่างกันของ Worker ใน pool เมื่อ logging หรือ debugging

นี่คือกระบวนการใหม่ที่จะเกิดเมื่อเราสร้าง ThreadPool เราจะ implement โค้ดที่ส่ง closure ให้เธรดหลังเรามี Worker ตั้งในวิธีนี้:

  1. นิยาม struct Worker ที่บรรจุ id และ JoinHandle<()>
  2. เปลี่ยน ThreadPool ให้บรรจุ vector ของ instance Worker
  3. นิยามฟังก์ชัน Worker::new ที่รับเลข id และ return instance Worker ที่บรรจุ id และเธรดที่ spawn กับ closure ว่าง
  4. ใน ThreadPool::new ใช้ counter ของ loop for เพื่อ generate id สร้าง Worker ใหม่กับ id นั้น และเก็บ Worker ใน vector

ถ้าคุณพร้อมสำหรับความท้าทาย ลอง implement การเปลี่ยนแปลงเหล่านี้ด้วย ตัวเองก่อนดูที่โค้ดใน Listing 21-15

พร้อม? นี่คือ Listing 21-15 กับวิธีหนึ่งในการทำการแก้ไขที่กล่าวก่อนหน้า

Filename: src/lib.rs
use std::thread;

pub struct ThreadPool {
    workers: Vec<Worker>,
}

impl ThreadPool {
    // --snip--
    /// Create a new ThreadPool.
    ///
    /// The size is the number of threads in the pool.
    ///
    /// # Panics
    ///
    /// The `new` function will panic if the size is zero.
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            workers.push(Worker::new(id));
        }

        ThreadPool { workers }
    }
    // --snip--

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
    }
}

struct Worker {
    id: usize,
    thread: thread::JoinHandle<()>,
}

impl Worker {
    fn new(id: usize) -> Worker {
        let thread = thread::spawn(|| {});

        Worker { id, thread }
    }
}
Listing 21-15: แก้ ThreadPool ให้บรรจุ instance Worker แทนการบรรจุเธรดโดยตรง

เราเปลี่ยนชื่อของ field บน ThreadPool จาก threads เป็น workers เพราะมันตอนนี้บรรจุ instance Worker แทน instance JoinHandle<()> เราใช้ counter ใน loop for เป็น argument ให้ Worker::new และเรา เก็บแต่ละ Worker ใหม่ใน vector ชื่อ workers

โค้ดภายนอก (เช่น server ของเราใน src/main.rs) ไม่ต้องรู้รายละเอียด implementation เกี่ยวกับการใช้ struct Worker ภายใน ThreadPool ดังนั้นเราทำ struct Worker และฟังก์ชัน new ของมัน private ฟังก์ชัน Worker::new ใช้ id ที่เราให้มันและเก็บ instance JoinHandle<()> ที่สร้างโดย spawn เธรดใหม่โดยใช้ closure ว่าง

Note: ถ้า operating system ไม่สามารถสร้างเธรดเพราะไม่มี resource ระบบพอ thread::spawn จะ panic นั่นจะทำให้ server ของเราทั้งหมด panic แม้การสร้างเธรดบางตัวอาจสำเร็จ เพื่อความเรียบง่าย พฤติกรรมนี้ okay แต่ใน implementation thread pool production คุณน่าจะต้องการ ใช้ std::thread::Builder และเมธอด spawn ของมันที่ return Result แทน

โค้ดนี้จะ compile และจะเก็บจำนวนของ instance Worker ที่เราระบุเป็น argument ให้ ThreadPool::new แต่เรา ยัง ไม่ process closure ที่ เราได้ใน execute มาดูวิธีทำนั้นถัดไป

ส่ง Request ไปเธรดผ่าน Channel

ปัญหาถัดไปที่เราจะแก้คือ closure ที่ให้กับ thread::spawn ไม่ทำอะไร เลย ปัจจุบัน เราได้ closure ที่เราต้องการ execute ในเมธอด execute แต่เราต้องให้ thread::spawn closure เพื่อรันเมื่อเราสร้างแต่ละ Worker ระหว่างการสร้าง ThreadPool

เราต้องการให้ struct Worker ที่เราเพิ่งสร้างดึงโค้ดเพื่อรันจาก queue ที่บรรจุใน ThreadPool และส่งโค้ดนั้นให้เธรดของมันเพื่อรัน

Channel ที่เราเรียนเกี่ยวกับในบทที่ 16 — วิธีง่ายในการสื่อสารระหว่าง สองเธรด — จะเหมาะสำหรับ use case นี้ เราจะใช้ channel ในการทำหน้าที่ เป็น queue ของ job และ execute จะส่ง job จาก ThreadPool ไปยัง instance Worker ซึ่งจะส่ง job ให้เธรดของมัน นี่คือแผน:

  1. ThreadPool จะสร้าง channel และถือ sender
  2. แต่ละ Worker จะถือ receiver
  3. เราจะสร้าง struct Job ใหม่ที่จะบรรจุ closure ที่เราต้องการส่งลง channel
  4. เมธอด execute จะส่ง job ที่มันต้องการ execute ผ่าน sender
  5. ในเธรดของมัน Worker จะ loop ผ่าน receiver ของมันและ execute closure ของ job ใดที่มันรับ

มาเริ่มโดยสร้าง channel ใน ThreadPool::new และถือ sender ใน instance ThreadPool ดังที่แสดงใน Listing 21-16 struct Job ไม่ บรรจุอะไรตอนนี้แต่จะเป็น type ของ item ที่เรากำลังส่งลง channel

Filename: src/lib.rs
use std::{sync::mpsc, thread};

pub struct ThreadPool {
    workers: Vec<Worker>,
    sender: mpsc::Sender<Job>,
}

struct Job;

impl ThreadPool {
    // --snip--
    /// Create a new ThreadPool.
    ///
    /// The size is the number of threads in the pool.
    ///
    /// # Panics
    ///
    /// The `new` function will panic if the size is zero.
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let (sender, receiver) = mpsc::channel();

        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            workers.push(Worker::new(id));
        }

        ThreadPool { workers, sender }
    }
    // --snip--

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
    }
}

struct Worker {
    id: usize,
    thread: thread::JoinHandle<()>,
}

impl Worker {
    fn new(id: usize) -> Worker {
        let thread = thread::spawn(|| {});

        Worker { id, thread }
    }
}
Listing 21-16: แก้ ThreadPool เพื่อเก็บ sender ของ channel ที่ transmit instance Job

ใน ThreadPool::new เราสร้าง channel ใหม่ของเราและให้ pool ถือ sender นี่จะ compile สำเร็จ

มาลองส่ง receiver ของ channel เข้าแต่ละ Worker เมื่อ thread pool สร้าง channel เรารู้เราต้องการใช้ receiver ในเธรดที่ instance Worker spawn ดังนั้นเราจะอ้างถึง parameter receiver ใน closure โค้ดใน Listing 21-17 จะยังไม่ค่อย compile

Filename: src/lib.rs
use std::{sync::mpsc, thread};

pub struct ThreadPool {
    workers: Vec<Worker>,
    sender: mpsc::Sender<Job>,
}

struct Job;

impl ThreadPool {
    // --snip--
    /// Create a new ThreadPool.
    ///
    /// The size is the number of threads in the pool.
    ///
    /// # Panics
    ///
    /// The `new` function will panic if the size is zero.
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let (sender, receiver) = mpsc::channel();

        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            workers.push(Worker::new(id, receiver));
        }

        ThreadPool { workers, sender }
    }
    // --snip--

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
    }
}

// --snip--


struct Worker {
    id: usize,
    thread: thread::JoinHandle<()>,
}

impl Worker {
    fn new(id: usize, receiver: mpsc::Receiver<Job>) -> Worker {
        let thread = thread::spawn(|| {
            receiver;
        });

        Worker { id, thread }
    }
}
Listing 21-17: ส่ง receiver ให้แต่ละ Worker

เราทำการเปลี่ยนแปลงเล็กและตรงไปตรงมาบ้าง — เราส่ง receiver เข้า Worker::new และแล้วเราใช้มันภายใน closure

เมื่อเราพยายาม check โค้ดนี้ เราได้ error นี้:

$ cargo check
    Checking hello v0.1.0 (file:///projects/hello)
error[E0382]: use of moved value: `receiver`
  --> src/lib.rs:26:42
   |
21 |         let (sender, receiver) = mpsc::channel();
   |                      -------- move occurs because `receiver` has type `std::sync::mpsc::Receiver<Job>`, which does not implement the `Copy` trait
...
25 |         for id in 0..size {
   |         ----------------- inside of this loop
26 |             workers.push(Worker::new(id, receiver));
   |                                          ^^^^^^^^ value moved here, in previous iteration of loop
   |
note: consider changing this parameter type in method `new` to borrow instead if owning the value isn't necessary
  --> src/lib.rs:47:33
   |
47 |     fn new(id: usize, receiver: mpsc::Receiver<Job>) -> Worker {
   |        --- in this method       ^^^^^^^^^^^^^^^^^^^ this parameter takes ownership of the value
help: consider moving the expression out of the loop so it is only moved once
   |
25 ~         let mut value = Worker::new(id, receiver);
26 ~         for id in 0..size {
27 ~             workers.push(value);
   |

For more information about this error, try `rustc --explain E0382`.
error: could not compile `hello` (lib) due to 1 previous error

โค้ดกำลังพยายามส่ง receiver ให้หลาย instance Worker นี่จะไม่ทำงาน ตามที่คุณจำจากบทที่ 16 — implementation channel ที่ Rust ให้คือ multiple producer, single consumer นี่หมายความว่าเราไม่สามารถเพียง clone end ที่ consume ของ channel เพื่อ fix โค้ดนี้ เรายังไม่ต้องการ ส่งข้อความหลายครั้งให้หลาย consumer — เราต้องการหนึ่ง list ของข้อความ กับหลาย instance Worker เพื่อให้แต่ละข้อความถูก process ครั้งเดียว

นอกจากนี้ การรับ job ออกจาก queue channel เกี่ยวข้องกับการ mutate receiver ดังนั้นเธรดต้องการวิธี safe ในการแชร์และแก้ receiver — มิฉะนั้น เราอาจได้ race condition (ตามที่ครอบคลุมในบทที่ 16)

จำ smart pointer thread-safe ที่พูดถึงในบทที่ 16 — เพื่อแชร์ ownership ทั่วหลายเธรดและอนุญาตให้เธรด mutate ค่า เราต้องใช้ Arc<Mutex<T>> type Arc จะให้หลาย instance Worker เป็นเจ้าของ receiver และ Mutex จะรับประกันว่าเพียงหนึ่ง Worker ได้ job จาก receiver ในเวลา เดียว Listing 21-18 แสดงการเปลี่ยนแปลงที่เราต้องทำ

Filename: src/lib.rs
use std::{
    sync::{Arc, Mutex, mpsc},
    thread,
};
// --snip--

pub struct ThreadPool {
    workers: Vec<Worker>,
    sender: mpsc::Sender<Job>,
}

struct Job;

impl ThreadPool {
    // --snip--
    /// Create a new ThreadPool.
    ///
    /// The size is the number of threads in the pool.
    ///
    /// # Panics
    ///
    /// The `new` function will panic if the size is zero.
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let (sender, receiver) = mpsc::channel();

        let receiver = Arc::new(Mutex::new(receiver));

        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            workers.push(Worker::new(id, Arc::clone(&receiver)));
        }

        ThreadPool { workers, sender }
    }

    // --snip--

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
    }
}

// --snip--

struct Worker {
    id: usize,
    thread: thread::JoinHandle<()>,
}

impl Worker {
    fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
        // --snip--
        let thread = thread::spawn(|| {
            receiver;
        });

        Worker { id, thread }
    }
}
Listing 21-18: แชร์ receiver ระหว่าง instance Worker โดยใช้ Arc และ Mutex

ใน ThreadPool::new เราใส่ receiver ใน Arc และ Mutex สำหรับแต่ละ Worker ใหม่ เรา clone Arc เพื่อ bump reference count เพื่อให้ instance Worker แชร์ ownership ของ receiver ได้

ด้วยการเปลี่ยนแปลงเหล่านี้ โค้ด compile! เรากำลังไปถึง!

Implement เมธอด execute

มาในที่สุด implement เมธอด execute บน ThreadPool เราจะเปลี่ยน Job จาก struct เป็น type alias สำหรับ trait object ที่บรรจุ type ของ closure ที่ execute รับด้วย ตามที่พูดถึงในส่วน “Type Synonym และ Type Alias” ในบทที่ 20 type alias อนุญาตให้เราทำ type ยาวสั้นกว่าเพื่อความง่ายในการใช้ ดูที่ Listing 21-19

Filename: src/lib.rs
use std::{
    sync::{Arc, Mutex, mpsc},
    thread,
};

pub struct ThreadPool {
    workers: Vec<Worker>,
    sender: mpsc::Sender<Job>,
}

// --snip--

type Job = Box<dyn FnOnce() + Send + 'static>;

impl ThreadPool {
    // --snip--
    /// Create a new ThreadPool.
    ///
    /// The size is the number of threads in the pool.
    ///
    /// # Panics
    ///
    /// The `new` function will panic if the size is zero.
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let (sender, receiver) = mpsc::channel();

        let receiver = Arc::new(Mutex::new(receiver));

        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            workers.push(Worker::new(id, Arc::clone(&receiver)));
        }

        ThreadPool { workers, sender }
    }

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
        let job = Box::new(f);

        self.sender.send(job).unwrap();
    }
}

// --snip--

struct Worker {
    id: usize,
    thread: thread::JoinHandle<()>,
}

impl Worker {
    fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
        let thread = thread::spawn(|| {
            receiver;
        });

        Worker { id, thread }
    }
}
Listing 21-19: สร้าง type alias Job สำหรับ Box ที่บรรจุแต่ละ closure และแล้วส่ง job ลง channel

หลังจากสร้าง instance Job ใหม่โดยใช้ closure ที่เราได้ใน execute เราส่ง job นั้นลง end การส่งของ channel เรากำลังเรียก unwrap บน send สำหรับกรณีที่การส่ง fail นี่อาจเกิดถ้า ตัวอย่างเช่น เราหยุด เธรดทั้งหมดของเราจากการ execute หมายความว่า end ที่รับหยุดรับข้อความ ใหม่ ที่ขณะนี้ เราไม่สามารถหยุดเธรดของเราจากการ execute — เธรดของเรา ดำเนิน execute ตราบใดที่ pool มีอยู่ เหตุผลที่เราใช้ unwrap คือเรา รู้ว่ากรณี failure จะไม่เกิด แต่ compiler ไม่รู้

แต่เรายังไม่ค่อยเสร็จ! ใน Worker ของเรา closure ของเราที่ถูกส่งให้ thread::spawn ยังเพียง อ้างอิง end การรับของ channel แทน เรา ต้องการให้ closure loop ตลอดไป ขอ end การรับของ channel สำหรับ job และรัน job เมื่อมันได้อันหนึ่ง มาทำการเปลี่ยนแปลงที่แสดงใน Listing 21-20 ให้ Worker::new

Filename: src/lib.rs
use std::{
    sync::{Arc, Mutex, mpsc},
    thread,
};

pub struct ThreadPool {
    workers: Vec<Worker>,
    sender: mpsc::Sender<Job>,
}

type Job = Box<dyn FnOnce() + Send + 'static>;

impl ThreadPool {
    /// Create a new ThreadPool.
    ///
    /// The size is the number of threads in the pool.
    ///
    /// # Panics
    ///
    /// The `new` function will panic if the size is zero.
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let (sender, receiver) = mpsc::channel();

        let receiver = Arc::new(Mutex::new(receiver));

        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            workers.push(Worker::new(id, Arc::clone(&receiver)));
        }

        ThreadPool { workers, sender }
    }

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
        let job = Box::new(f);

        self.sender.send(job).unwrap();
    }
}

struct Worker {
    id: usize,
    thread: thread::JoinHandle<()>,
}

// --snip--

impl Worker {
    fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
        let thread = thread::spawn(move || {
            loop {
                let job = receiver.lock().unwrap().recv().unwrap();

                println!("Worker {id} got a job; executing.");

                job();
            }
        });

        Worker { id, thread }
    }
}
Listing 21-20: รับและ execute job ในเธรดของ instance Worker

ที่นี่ เราเรียก lock บน receiver ก่อนเพื่อรับ mutex และแล้วเราเรียก unwrap เพื่อ panic บน error ใด การรับ lock อาจ fail ถ้า mutex อยู่ ใน state poisoned ซึ่งเกิดได้ถ้าเธรดอื่นบางตัว panic ขณะถือ lock แทน การปล่อย lock ในสถานการณ์นี้ การเรียก unwrap ให้เธรดนี้ panic คือ action ถูกต้องที่จะทำ รู้สึกอิสระที่จะเปลี่ยน unwrap นี้เป็น expect กับข้อความ error ที่มีความหมายให้คุณ

ถ้าเราได้ lock บน mutex เราเรียก recv เพื่อรับ Job จาก channel unwrap สุดท้ายข้ามผ่าน error ใดที่นี่ด้วย ซึ่งอาจเกิดถ้าเธรดที่ถือ sender shut down คล้ายกับวิธีที่เมธอด send return Err ถ้า receiver shut down

การเรียก recv block ดังนั้นถ้าไม่มี job ยัง เธรดปัจจุบันจะรอจนกว่า job available Mutex<T> รับประกันว่าเพียงหนึ่งเธรด Worker ในเวลาเดียว กำลังพยายามขอ job

Thread pool ของเราตอนนี้อยู่ใน state ทำงาน! ให้มัน cargo run และทำ request บ้าง:

$ cargo run
   Compiling hello v0.1.0 (file:///projects/hello)
warning: field `workers` is never read
 --> src/lib.rs:7:5
  |
6 | pub struct ThreadPool {
  |            ---------- field in this struct
7 |     workers: Vec<Worker>,
  |     ^^^^^^^
  |
  = note: `#[warn(dead_code)]` on by default

warning: fields `id` and `thread` are never read
  --> src/lib.rs:48:5
   |
47 | struct Worker {
   |        ------ fields in this struct
48 |     id: usize,
   |     ^^
49 |     thread: thread::JoinHandle<()>,
   |     ^^^^^^

warning: `hello` (lib) generated 2 warnings
    Finished `dev` profile [unoptimized + debuginfo] target(s) in 4.91s
     Running `target/debug/hello`
Worker 0 got a job; executing.
Worker 2 got a job; executing.
Worker 1 got a job; executing.
Worker 3 got a job; executing.
Worker 0 got a job; executing.
Worker 2 got a job; executing.
Worker 1 got a job; executing.
Worker 3 got a job; executing.
Worker 0 got a job; executing.
Worker 2 got a job; executing.

สำเร็จ! เราตอนนี้มี thread pool ที่ execute connection แบบ asynchronous ไม่เคยมีมากกว่าสี่เธรดถูกสร้าง ดังนั้นระบบของเราจะไม่ overload ถ้า server รับ request เยอะ ถ้าเราทำ request ไปยัง /sleep server จะสามารถ serve request อื่นโดยมีเธรดอื่นรันพวกมัน

Note: ถ้าคุณเปิด /sleep ในหลาย browser window พร้อมกัน พวกมันอาจ โหลดทีละอันในช่วงห้าวินาที web browser บางตัว execute หลาย instance ของ request เดียวกันตามลำดับสำหรับเหตุผล caching ข้อจำกัดนี้ไม่ได้ ถูกสาเหตุโดย web server ของเรา

นี่คือเวลาดีที่จะหยุดและพิจารณาว่าโค้ดใน Listing 21-18, 21-19 และ 21-20 จะต่างยังไงถ้าเรากำลังใช้ future แทน closure สำหรับงานที่จะทำ type ใดจะเปลี่ยน? signature เมธอดจะต่างยังไง ถ้ามี? ส่วนใดของโค้ดจะ ยังเหมือนเดิม?

หลังเรียนเกี่ยวกับ loop while let ในบทที่ 17 และบทที่ 19 คุณอาจ สงสัยทำไมเราไม่เขียนโค้ดเธรด Worker ดังที่แสดงใน Listing 21-21

Filename: src/lib.rs
use std::{
    sync::{Arc, Mutex, mpsc},
    thread,
};

pub struct ThreadPool {
    workers: Vec<Worker>,
    sender: mpsc::Sender<Job>,
}

type Job = Box<dyn FnOnce() + Send + 'static>;

impl ThreadPool {
    /// Create a new ThreadPool.
    ///
    /// The size is the number of threads in the pool.
    ///
    /// # Panics
    ///
    /// The `new` function will panic if the size is zero.
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let (sender, receiver) = mpsc::channel();

        let receiver = Arc::new(Mutex::new(receiver));

        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            workers.push(Worker::new(id, Arc::clone(&receiver)));
        }

        ThreadPool { workers, sender }
    }

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
        let job = Box::new(f);

        self.sender.send(job).unwrap();
    }
}

struct Worker {
    id: usize,
    thread: thread::JoinHandle<()>,
}
// --snip--

impl Worker {
    fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
        let thread = thread::spawn(move || {
            while let Ok(job) = receiver.lock().unwrap().recv() {
                println!("Worker {id} got a job; executing.");

                job();
            }
        });

        Worker { id, thread }
    }
}
Listing 21-21: implementation ทางเลือกของ Worker::new ใช้ while let

โค้ดนี้ compile และรันแต่ไม่ผลให้พฤติกรรม threading ที่ต้องการ — request ช้าจะยังทำให้ request อื่นรอเพื่อ process เหตุผลคือบ้าง subtle — struct Mutex ไม่มีเมธอด public unlock เพราะ ownership ของ lock ตามกับ lifetime ของ MutexGuard<T> ภายใน LockResult<MutexGuard<T>> ที่เมธอด lock return ที่ compile time, borrow checker บังคับกฎที่ resource ที่ guard โดย Mutex ไม่สามารถเข้าถึงได้เว้นแต่เราถือ lock ได้แล้ว อย่างไรก็ตาม implementation นี้ก็ผลให้ lock ถูกถือยาวกว่าที่ ตั้งใจได้ถ้าเราไม่ระวัง lifetime ของ MutexGuard<T>

โค้ดใน Listing 21-20 ที่ใช้ let job = receiver.lock().unwrap().recv().unwrap(); ทำงานเพราะกับ let ค่า ชั่วคราวใดที่ใช้ใน expression ทางขวาของเครื่องหมายเท่ากับถูก drop ทันที เมื่อ statement let จบ อย่างไรก็ตาม while let (และ if let และ match) ไม่ drop ค่าชั่วคราวจนจบของ block ที่ associate ใน Listing 21-21 lock ยังถูกถือสำหรับระยะเวลาของการเรียก job() หมายความว่า instance Worker อื่นไม่สามารถรับ job