Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

โอนข้อมูลระหว่าง Thread ด้วย Message Passing

แนวทางที่เป็นที่นิยมมากขึ้นในการรับประกัน concurrency ที่ปลอดภัยคือ message passing ที่เธรดหรือ actor สื่อสารโดยส่งข้อความกันที่บรรจุ ข้อมูล นี่คือไอเดียใน slogan จาก documentation ของภาษา Go — “อย่าสื่อสารโดยแชร์ memory แทน แชร์ memory โดยสื่อสาร”

เพื่อทำ concurrency แบบ message-sending standard library ของ Rust ให้ implementation ของ channel channel คือแนวคิด programming ทั่วไปที่ข้อมูลถูกส่งจากเธรดหนึ่งไปยังอีกเธรด

คุณจินตนาการ channel ใน programming เป็นเหมือน channel ของน้ำที่มี ทิศทาง เช่นลำธารหรือแม่น้ำ ถ้าคุณใส่อะไรเช่นเป็ดยางในแม่น้ำ มันจะ เดินทางลงไปยังท้ายสุดของทางน้ำ

channel มีสองครึ่ง — transmitter และ receiver ครึ่ง transmitter คือที่ upstream ที่คุณใส่เป็ดยางลงในแม่น้ำ และครึ่ง receiver คือที่ ที่เป็ดยางลงเอย downstream ส่วนหนึ่งของโค้ดของคุณเรียกเมธอดบน transmitter ด้วยข้อมูลที่คุณต้องการส่ง และอีกส่วนตรวจสอบปลายรับสำหรับ ข้อความที่มาถึง channel ถูกพูดว่า ปิด ถ้าครึ่ง transmitter หรือ receiver ถูก drop

ที่นี่ เราจะสร้างต่อจนถึงโปรแกรมที่มีเธรดหนึ่งสร้างค่าและส่งพวกมันลง channel และอีกเธรดที่จะรับค่าและ print พวกมันออก เราจะส่งค่าง่ายระหว่าง เธรดโดยใช้ channel เพื่อแสดงฟีเจอร์ เมื่อคุณคุ้นเคยกับเทคนิคแล้ว คุณใช้ channel สำหรับเธรดใดที่ต้องสื่อสารกันได้ เช่นระบบ chat หรือ ระบบที่หลายเธรดทำส่วนของการคำนวณและส่งส่วนไปยังเธรดหนึ่งที่ aggregate ผล

ก่อนอื่น ใน Listing 16-6 เราจะสร้าง channel แต่ไม่ทำอะไรกับมัน สังเกตว่านี่จะยังไม่คอมไพล์เพราะ Rust บอกไม่ได้ว่า type ของค่าใดที่ เราต้องการส่งผ่าน channel

Filename: src/main.rs
use std::sync::mpsc;

fn main() {
    let (tx, rx) = mpsc::channel();
}
Listing 16-6: สร้าง channel และ assign สองครึ่งให้ tx และ rx

เราสร้าง channel ใหม่โดยใช้ฟังก์ชัน mpsc::channelmpsc ย่อมา จาก multiple producer, single consumer สั้น ๆ วิธีที่ standard library ของ Rust implement channel หมายความว่า channel มีหลายปลาย ส่ง ที่สร้างค่าได้ แต่มีเพียงหนึ่งปลาย รับ ที่ consume ค่าเหล่า นั้น จินตนาการหลายลำธารไหลรวมกันเป็นแม่น้ำใหญ่ — ทุกสิ่งที่ส่งลง ลำธารใดจะลงเอยในแม่น้ำเดียวที่ปลายสุด เราจะเริ่มด้วย producer เดียว ตอนนี้ แต่เราจะเพิ่ม producer หลายตัวเมื่อเราทำให้ตัวอย่างนี้ทำงาน

ฟังก์ชัน mpsc::channel return tuple ที่ element แรกคือปลายส่ง — transmitter — และ element ที่สองคือปลายรับ — receiver คำย่อ tx และ rx ถูกใช้ตามธรรมเนียมในหลายสาขาสำหรับ transmitter และ receiver ตามลำดับ ดังนั้นเราตั้งชื่อตัวแปรของเราเช่นนั้นเพื่อระบุ แต่ละปลาย เรากำลังใช้ statement let กับ pattern ที่ destructure tuple — เราจะพูดถึงการใช้ pattern ใน statement let และ destructuring ในบทที่ 19 ตอนนี้ รู้ว่าการใช้ statement let ใน วิธีนี้เป็นแนวทางสะดวกในการดึงชิ้นส่วนของ tuple ที่ return โดย mpsc::channel

มาย้ายปลายส่งเข้าเธรดที่ spawn และให้มันส่งหนึ่ง string เพื่อให้ เธรดที่ spawn สื่อสารกับเธรดหลัก ดังที่แสดงใน Listing 16-7 นี่ เหมือนใส่เป็ดยางในแม่น้ำ upstream หรือส่งข้อความ chat จากเธรดหนึ่ง ไปยังอีกเธรด

Filename: src/main.rs
use std::sync::mpsc;
use std::thread;

fn main() {
    let (tx, rx) = mpsc::channel();

    thread::spawn(move || {
        let val = String::from("hi");
        tx.send(val).unwrap();
    });
}
Listing 16-7: ย้าย tx ไปยังเธรดที่ spawn และส่ง "hi"

อีกครั้ง เรากำลังใช้ thread::spawn เพื่อสร้างเธรดใหม่และจากนั้น ใช้ move เพื่อย้าย tx เข้า closure เพื่อให้เธรดที่ spawn own tx เธรดที่ spawn ต้อง own transmitter เพื่อสามารถส่งข้อความผ่าน channel

transmitter มีเมธอด send ที่รับค่าที่เราต้องการส่ง เมธอด send return type Result<T, E> ดังนั้นถ้า receiver ถูก drop แล้วและไม่ มีที่จะส่งค่า การ operation send จะ return error ในตัวอย่างนี้ เรา กำลังเรียก unwrap เพื่อ panic ในกรณี error แต่ใน application จริง เราจะจัดการมันอย่างเหมาะสม — กลับไปบทที่ 9 เพื่อทบทวนกลยุทธ์สำหรับ การจัดการ error ที่เหมาะสม

ใน Listing 16-8 เราจะรับค่าจาก receiver ในเธรดหลัก นี่เหมือนดึงเป็ด ยางจากน้ำที่ปลายแม่น้ำหรือรับข้อความ chat

Filename: src/main.rs
use std::sync::mpsc;
use std::thread;

fn main() {
    let (tx, rx) = mpsc::channel();

    thread::spawn(move || {
        let val = String::from("hi");
        tx.send(val).unwrap();
    });

    let received = rx.recv().unwrap();
    println!("Got: {received}");
}
Listing 16-8: รับค่า "hi" ในเธรดหลักและ print มัน

receiver มีสองเมธอดที่มีประโยชน์ — recv และ try_recv เรากำลัง ใช้ recv ย่อสำหรับ receive ซึ่งจะ block execution ของเธรดหลัก และรอจนกว่าค่าถูกส่งลง channel เมื่อค่าถูกส่ง recv จะ return มัน ใน Result<T, E> เมื่อ transmitter ปิด recv จะ return error เพื่อ ส่งสัญญาณว่าไม่มีค่ามากกว่าที่จะมา

เมธอด try_recv ไม่ block แต่จะ return Result<T, E> ทันที — ค่า Ok ที่เก็บข้อความถ้ามี และค่า Err ถ้าไม่มีข้อความใดคราวนี้ การ ใช้ try_recv มีประโยชน์ถ้าเธรดนี้มีงานอื่นที่จะทำในขณะที่รอข้อความ — เราเขียน loop ที่เรียก try_recv เป็นบางครั้ง จัดการข้อความถ้ามี และมิฉะนั้นทำงานอื่นสักพักจนถึงตรวจสอบอีกครั้งได้

เราใช้ recv ในตัวอย่างนี้เพื่อความง่าย — เราไม่มีงานอื่นให้เธรด หลักทำนอกจากรอข้อความ ดังนั้นการ block เธรดหลักเหมาะสม

เมื่อเรารันโค้ดใน Listing 16-8 เราจะเห็นค่าที่ print จากเธรดหลัก:

Got: hi

สมบูรณ์แบบ!

โอน Ownership ผ่าน Channel

กฎ ownership มีบทบาทสำคัญในการส่งข้อความเพราะพวกมันช่วยให้คุณเขียน โค้ด concurrent ที่ปลอดภัย การป้องกัน error ใน concurrent programming คือข้อดีของการคิดเกี่ยวกับ ownership ตลอดโปรแกรม Rust ของคุณ มาทำ การทดลองเพื่อแสดงว่า channel และ ownership ทำงานร่วมกันเพื่อป้องกัน ปัญหายังไง — เราจะพยายามใช้ค่า val ในเธรดที่ spawn หลัง ที่เรา ได้ส่งมันลง channel ลองคอมไพล์โค้ดใน Listing 16-9 เพื่อดูทำไมโค้ด นี้ไม่ได้รับอนุญาต

Filename: src/main.rs
use std::sync::mpsc;
use std::thread;

fn main() {
    let (tx, rx) = mpsc::channel();

    thread::spawn(move || {
        let val = String::from("hi");
        tx.send(val).unwrap();
        println!("val is {val}");
    });

    let received = rx.recv().unwrap();
    println!("Got: {received}");
}
Listing 16-9: พยายามใช้ val หลังที่เราได้ส่งมันลง channel

ที่นี่ เราพยายาม print val หลังที่เราได้ส่งมันลง channel ผ่าน tx.send การอนุญาตสิ่งนี้จะเป็นความคิดที่แย่ — เมื่อค่าถูกส่งไปยัง อีกเธรด เธรดนั้นแก้หรือ drop มันก่อนเราพยายามใช้ค่าอีกได้ อาจ การ แก้ของเธรดอื่นก่อให้เกิด error หรือผลที่ไม่คาดหวังเนื่องจากข้อมูล ไม่สอดคล้องหรือไม่มี อย่างไรก็ตาม Rust ให้ error แก่เราถ้าเราพยายาม คอมไพล์โค้ดใน Listing 16-9:

$ cargo run
   Compiling message-passing v0.1.0 (file:///projects/message-passing)
error[E0382]: borrow of moved value: `val`
  --> src/main.rs:10:27
   |
 8 |         let val = String::from("hi");
   |             --- move occurs because `val` has type `String`, which does not implement the `Copy` trait
 9 |         tx.send(val).unwrap();
   |                 --- value moved here
10 |         println!("val is {val}");
   |                           ^^^ value borrowed here after move
   |
   = note: this error originates in the macro `$crate::format_args_nl` which comes from the expansion of the macro `println` (in Nightly builds, run with -Z macro-backtrace for more info)

For more information about this error, try `rustc --explain E0382`.
error: could not compile `message-passing` (bin "message-passing") due to 1 previous error

ความผิดพลาด concurrency ของเราก่อให้เกิด error ที่ compile-time ฟังก์ชัน send รับ ownership ของ parameter ของมัน และเมื่อค่าถูก ย้าย receiver รับ ownership ของมัน นี่หยุดเราจากการใช้ค่าโดยบังเอิญ อีกหลังส่งมัน — ระบบ ownership ตรวจสอบว่าทุกอย่างโอเค

ส่งค่าหลายค่า

โค้ดใน Listing 16-8 คอมไพล์และรัน แต่ไม่ได้แสดงเราชัดเจนว่าสองเธรด แยกกำลังคุยกันผ่าน channel

ใน Listing 16-10 เราได้ทำการแก้บางอย่างที่จะพิสูจน์ว่าโค้ดใน Listing 16-8 กำลังรันแบบ concurrent — เธรดที่ spawn ตอนนี้จะส่งหลายข้อความ และ pause หนึ่งวินาทีระหว่างแต่ละข้อความ

Filename: src/main.rs
use std::sync::mpsc;
use std::thread;
use std::time::Duration;

fn main() {
    let (tx, rx) = mpsc::channel();

    thread::spawn(move || {
        let vals = vec![
            String::from("hi"),
            String::from("from"),
            String::from("the"),
            String::from("thread"),
        ];

        for val in vals {
            tx.send(val).unwrap();
            thread::sleep(Duration::from_secs(1));
        }
    });

    for received in rx {
        println!("Got: {received}");
    }
}
Listing 16-10: ส่งหลายข้อความและ pause ระหว่างแต่ละข้อความ

คราวนี้ เธรดที่ spawn มี vector ของ string ที่เราต้องการส่งไปยัง เธรดหลัก เรา iterate ผ่านพวกมัน ส่งแต่ละตัวแยก และ pause ระหว่าง แต่ละโดยเรียกฟังก์ชัน thread::sleep กับค่า Duration ของหนึ่งวินาที

ในเธรดหลัก เราไม่ได้เรียกฟังก์ชัน recv ชัดเจนอีก — แทน เรากำลัง ปฏิบัติ rx เป็น iterator สำหรับแต่ละค่าที่ได้รับ เรากำลัง print มัน เมื่อ channel ถูกปิด iteration จะจบ

เมื่อรันโค้ดใน Listing 16-10 คุณควรเห็น output ต่อไปนี้กับ pause หนึ่งวินาทีระหว่างแต่ละบรรทัด:

Got: hi
Got: from
Got: the
Got: thread

เพราะเราไม่มีโค้ดที่ pause หรือ delay ใน loop for ในเธรดหลัก เรา บอกได้ว่าเธรดหลักกำลังรอที่จะรับค่าจากเธรดที่ spawn

สร้าง Producer หลายตัว

ก่อนหน้านี้เรากล่าวว่า mpsc เป็นคำย่อสำหรับ multiple producer, single consumer มาใช้ mpsc และขยายโค้ดใน Listing 16-10 เพื่อสร้าง หลายเธรดที่ทุกเธรดส่งค่าไปยัง receiver เดียวกัน เราทำเช่นนั้นได้ โดย clone transmitter ดังที่แสดงใน Listing 16-11

Filename: src/main.rs
use std::sync::mpsc;
use std::thread;
use std::time::Duration;

fn main() {
    // --snip--

    let (tx, rx) = mpsc::channel();

    let tx1 = tx.clone();
    thread::spawn(move || {
        let vals = vec![
            String::from("hi"),
            String::from("from"),
            String::from("the"),
            String::from("thread"),
        ];

        for val in vals {
            tx1.send(val).unwrap();
            thread::sleep(Duration::from_secs(1));
        }
    });

    thread::spawn(move || {
        let vals = vec![
            String::from("more"),
            String::from("messages"),
            String::from("for"),
            String::from("you"),
        ];

        for val in vals {
            tx.send(val).unwrap();
            thread::sleep(Duration::from_secs(1));
        }
    });

    for received in rx {
        println!("Got: {received}");
    }

    // --snip--
}
Listing 16-11: ส่งหลายข้อความจาก producer หลายตัว

คราวนี้ ก่อนที่เราสร้างเธรดที่ spawn แรก เราเรียก clone บน transmitter นี่จะให้เรา transmitter ใหม่ที่เราส่งให้เธรดที่ spawn แรกได้ เราส่ง transmitter เดิมให้เธรดที่ spawn ที่สอง นี่ให้เรา สองเธรด แต่ละเธรดส่งข้อความต่างกันไปยัง receiver เดียว

เมื่อคุณรันโค้ด output ของคุณควรดูประมาณนี้:

Got: hi
Got: more
Got: from
Got: messages
Got: for
Got: the
Got: thread
Got: you

คุณอาจเห็นค่าในลำดับอื่น ขึ้นกับระบบของคุณ นี่คือสิ่งที่ทำให้ concurrency น่าสนใจเช่นเดียวกับยาก ถ้าคุณทดลองกับ thread::sleep ให้ค่าต่าง ๆ ในเธรดต่าง ๆ แต่ละการรันจะมี nondeterministic มากขึ้น และสร้าง output ต่างกันแต่ละครั้ง

ตอนนี้เราดูว่า channel ทำงานยังไงแล้ว มาดูวิธี concurrency อีกแบบ