โอนข้อมูลระหว่าง Thread ด้วย Message Passing
แนวทางที่เป็นที่นิยมมากขึ้นในการรับประกัน concurrency ที่ปลอดภัยคือ message passing ที่เธรดหรือ actor สื่อสารโดยส่งข้อความกันที่บรรจุ ข้อมูล นี่คือไอเดียใน slogan จาก documentation ของภาษา Go — “อย่าสื่อสารโดยแชร์ memory แทน แชร์ memory โดยสื่อสาร”
เพื่อทำ concurrency แบบ message-sending standard library ของ Rust ให้ implementation ของ channel channel คือแนวคิด programming ทั่วไปที่ข้อมูลถูกส่งจากเธรดหนึ่งไปยังอีกเธรด
คุณจินตนาการ channel ใน programming เป็นเหมือน channel ของน้ำที่มี ทิศทาง เช่นลำธารหรือแม่น้ำ ถ้าคุณใส่อะไรเช่นเป็ดยางในแม่น้ำ มันจะ เดินทางลงไปยังท้ายสุดของทางน้ำ
channel มีสองครึ่ง — transmitter และ receiver ครึ่ง transmitter คือที่ upstream ที่คุณใส่เป็ดยางลงในแม่น้ำ และครึ่ง receiver คือที่ ที่เป็ดยางลงเอย downstream ส่วนหนึ่งของโค้ดของคุณเรียกเมธอดบน transmitter ด้วยข้อมูลที่คุณต้องการส่ง และอีกส่วนตรวจสอบปลายรับสำหรับ ข้อความที่มาถึง channel ถูกพูดว่า ปิด ถ้าครึ่ง transmitter หรือ receiver ถูก drop
ที่นี่ เราจะสร้างต่อจนถึงโปรแกรมที่มีเธรดหนึ่งสร้างค่าและส่งพวกมันลง channel และอีกเธรดที่จะรับค่าและ print พวกมันออก เราจะส่งค่าง่ายระหว่าง เธรดโดยใช้ channel เพื่อแสดงฟีเจอร์ เมื่อคุณคุ้นเคยกับเทคนิคแล้ว คุณใช้ channel สำหรับเธรดใดที่ต้องสื่อสารกันได้ เช่นระบบ chat หรือ ระบบที่หลายเธรดทำส่วนของการคำนวณและส่งส่วนไปยังเธรดหนึ่งที่ aggregate ผล
ก่อนอื่น ใน Listing 16-6 เราจะสร้าง channel แต่ไม่ทำอะไรกับมัน สังเกตว่านี่จะยังไม่คอมไพล์เพราะ Rust บอกไม่ได้ว่า type ของค่าใดที่ เราต้องการส่งผ่าน channel
use std::sync::mpsc;
fn main() {
let (tx, rx) = mpsc::channel();
}
tx และ rxเราสร้าง channel ใหม่โดยใช้ฟังก์ชัน mpsc::channel — mpsc ย่อมา
จาก multiple producer, single consumer สั้น ๆ วิธีที่ standard
library ของ Rust implement channel หมายความว่า channel มีหลายปลาย
ส่ง ที่สร้างค่าได้ แต่มีเพียงหนึ่งปลาย รับ ที่ consume ค่าเหล่า
นั้น จินตนาการหลายลำธารไหลรวมกันเป็นแม่น้ำใหญ่ — ทุกสิ่งที่ส่งลง
ลำธารใดจะลงเอยในแม่น้ำเดียวที่ปลายสุด เราจะเริ่มด้วย producer เดียว
ตอนนี้ แต่เราจะเพิ่ม producer หลายตัวเมื่อเราทำให้ตัวอย่างนี้ทำงาน
ฟังก์ชัน mpsc::channel return tuple ที่ element แรกคือปลายส่ง —
transmitter — และ element ที่สองคือปลายรับ — receiver คำย่อ tx
และ rx ถูกใช้ตามธรรมเนียมในหลายสาขาสำหรับ transmitter และ
receiver ตามลำดับ ดังนั้นเราตั้งชื่อตัวแปรของเราเช่นนั้นเพื่อระบุ
แต่ละปลาย เรากำลังใช้ statement let กับ pattern ที่ destructure
tuple — เราจะพูดถึงการใช้ pattern ใน statement let และ
destructuring ในบทที่ 19 ตอนนี้ รู้ว่าการใช้ statement let ใน
วิธีนี้เป็นแนวทางสะดวกในการดึงชิ้นส่วนของ tuple ที่ return โดย
mpsc::channel
มาย้ายปลายส่งเข้าเธรดที่ spawn และให้มันส่งหนึ่ง string เพื่อให้ เธรดที่ spawn สื่อสารกับเธรดหลัก ดังที่แสดงใน Listing 16-7 นี่ เหมือนใส่เป็ดยางในแม่น้ำ upstream หรือส่งข้อความ chat จากเธรดหนึ่ง ไปยังอีกเธรด
use std::sync::mpsc;
use std::thread;
fn main() {
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
let val = String::from("hi");
tx.send(val).unwrap();
});
}
tx ไปยังเธรดที่ spawn และส่ง "hi"อีกครั้ง เรากำลังใช้ thread::spawn เพื่อสร้างเธรดใหม่และจากนั้น
ใช้ move เพื่อย้าย tx เข้า closure เพื่อให้เธรดที่ spawn own
tx เธรดที่ spawn ต้อง own transmitter เพื่อสามารถส่งข้อความผ่าน
channel
transmitter มีเมธอด send ที่รับค่าที่เราต้องการส่ง เมธอด send
return type Result<T, E> ดังนั้นถ้า receiver ถูก drop แล้วและไม่
มีที่จะส่งค่า การ operation send จะ return error ในตัวอย่างนี้ เรา
กำลังเรียก unwrap เพื่อ panic ในกรณี error แต่ใน application จริง
เราจะจัดการมันอย่างเหมาะสม — กลับไปบทที่ 9 เพื่อทบทวนกลยุทธ์สำหรับ
การจัดการ error ที่เหมาะสม
ใน Listing 16-8 เราจะรับค่าจาก receiver ในเธรดหลัก นี่เหมือนดึงเป็ด ยางจากน้ำที่ปลายแม่น้ำหรือรับข้อความ chat
use std::sync::mpsc;
use std::thread;
fn main() {
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
let val = String::from("hi");
tx.send(val).unwrap();
});
let received = rx.recv().unwrap();
println!("Got: {received}");
}
"hi" ในเธรดหลักและ print มันreceiver มีสองเมธอดที่มีประโยชน์ — recv และ try_recv เรากำลัง
ใช้ recv ย่อสำหรับ receive ซึ่งจะ block execution ของเธรดหลัก
และรอจนกว่าค่าถูกส่งลง channel เมื่อค่าถูกส่ง recv จะ return มัน
ใน Result<T, E> เมื่อ transmitter ปิด recv จะ return error เพื่อ
ส่งสัญญาณว่าไม่มีค่ามากกว่าที่จะมา
เมธอด try_recv ไม่ block แต่จะ return Result<T, E> ทันที — ค่า
Ok ที่เก็บข้อความถ้ามี และค่า Err ถ้าไม่มีข้อความใดคราวนี้ การ
ใช้ try_recv มีประโยชน์ถ้าเธรดนี้มีงานอื่นที่จะทำในขณะที่รอข้อความ
— เราเขียน loop ที่เรียก try_recv เป็นบางครั้ง จัดการข้อความถ้ามี
และมิฉะนั้นทำงานอื่นสักพักจนถึงตรวจสอบอีกครั้งได้
เราใช้ recv ในตัวอย่างนี้เพื่อความง่าย — เราไม่มีงานอื่นให้เธรด
หลักทำนอกจากรอข้อความ ดังนั้นการ block เธรดหลักเหมาะสม
เมื่อเรารันโค้ดใน Listing 16-8 เราจะเห็นค่าที่ print จากเธรดหลัก:
Got: hi
สมบูรณ์แบบ!
โอน Ownership ผ่าน Channel
กฎ ownership มีบทบาทสำคัญในการส่งข้อความเพราะพวกมันช่วยให้คุณเขียน
โค้ด concurrent ที่ปลอดภัย การป้องกัน error ใน concurrent programming
คือข้อดีของการคิดเกี่ยวกับ ownership ตลอดโปรแกรม Rust ของคุณ มาทำ
การทดลองเพื่อแสดงว่า channel และ ownership ทำงานร่วมกันเพื่อป้องกัน
ปัญหายังไง — เราจะพยายามใช้ค่า val ในเธรดที่ spawn หลัง ที่เรา
ได้ส่งมันลง channel ลองคอมไพล์โค้ดใน Listing 16-9 เพื่อดูทำไมโค้ด
นี้ไม่ได้รับอนุญาต
use std::sync::mpsc;
use std::thread;
fn main() {
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
let val = String::from("hi");
tx.send(val).unwrap();
println!("val is {val}");
});
let received = rx.recv().unwrap();
println!("Got: {received}");
}
val หลังที่เราได้ส่งมันลง channelที่นี่ เราพยายาม print val หลังที่เราได้ส่งมันลง channel ผ่าน
tx.send การอนุญาตสิ่งนี้จะเป็นความคิดที่แย่ — เมื่อค่าถูกส่งไปยัง
อีกเธรด เธรดนั้นแก้หรือ drop มันก่อนเราพยายามใช้ค่าอีกได้ อาจ การ
แก้ของเธรดอื่นก่อให้เกิด error หรือผลที่ไม่คาดหวังเนื่องจากข้อมูล
ไม่สอดคล้องหรือไม่มี อย่างไรก็ตาม Rust ให้ error แก่เราถ้าเราพยายาม
คอมไพล์โค้ดใน Listing 16-9:
$ cargo run
Compiling message-passing v0.1.0 (file:///projects/message-passing)
error[E0382]: borrow of moved value: `val`
--> src/main.rs:10:27
|
8 | let val = String::from("hi");
| --- move occurs because `val` has type `String`, which does not implement the `Copy` trait
9 | tx.send(val).unwrap();
| --- value moved here
10 | println!("val is {val}");
| ^^^ value borrowed here after move
|
= note: this error originates in the macro `$crate::format_args_nl` which comes from the expansion of the macro `println` (in Nightly builds, run with -Z macro-backtrace for more info)
For more information about this error, try `rustc --explain E0382`.
error: could not compile `message-passing` (bin "message-passing") due to 1 previous error
ความผิดพลาด concurrency ของเราก่อให้เกิด error ที่ compile-time
ฟังก์ชัน send รับ ownership ของ parameter ของมัน และเมื่อค่าถูก
ย้าย receiver รับ ownership ของมัน นี่หยุดเราจากการใช้ค่าโดยบังเอิญ
อีกหลังส่งมัน — ระบบ ownership ตรวจสอบว่าทุกอย่างโอเค
ส่งค่าหลายค่า
โค้ดใน Listing 16-8 คอมไพล์และรัน แต่ไม่ได้แสดงเราชัดเจนว่าสองเธรด แยกกำลังคุยกันผ่าน channel
ใน Listing 16-10 เราได้ทำการแก้บางอย่างที่จะพิสูจน์ว่าโค้ดใน Listing 16-8 กำลังรันแบบ concurrent — เธรดที่ spawn ตอนนี้จะส่งหลายข้อความ และ pause หนึ่งวินาทีระหว่างแต่ละข้อความ
use std::sync::mpsc;
use std::thread;
use std::time::Duration;
fn main() {
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
let vals = vec![
String::from("hi"),
String::from("from"),
String::from("the"),
String::from("thread"),
];
for val in vals {
tx.send(val).unwrap();
thread::sleep(Duration::from_secs(1));
}
});
for received in rx {
println!("Got: {received}");
}
}
คราวนี้ เธรดที่ spawn มี vector ของ string ที่เราต้องการส่งไปยัง
เธรดหลัก เรา iterate ผ่านพวกมัน ส่งแต่ละตัวแยก และ pause ระหว่าง
แต่ละโดยเรียกฟังก์ชัน thread::sleep กับค่า Duration ของหนึ่งวินาที
ในเธรดหลัก เราไม่ได้เรียกฟังก์ชัน recv ชัดเจนอีก — แทน เรากำลัง
ปฏิบัติ rx เป็น iterator สำหรับแต่ละค่าที่ได้รับ เรากำลัง print
มัน เมื่อ channel ถูกปิด iteration จะจบ
เมื่อรันโค้ดใน Listing 16-10 คุณควรเห็น output ต่อไปนี้กับ pause หนึ่งวินาทีระหว่างแต่ละบรรทัด:
Got: hi
Got: from
Got: the
Got: thread
เพราะเราไม่มีโค้ดที่ pause หรือ delay ใน loop for ในเธรดหลัก เรา
บอกได้ว่าเธรดหลักกำลังรอที่จะรับค่าจากเธรดที่ spawn
สร้าง Producer หลายตัว
ก่อนหน้านี้เรากล่าวว่า mpsc เป็นคำย่อสำหรับ multiple producer,
single consumer มาใช้ mpsc และขยายโค้ดใน Listing 16-10 เพื่อสร้าง
หลายเธรดที่ทุกเธรดส่งค่าไปยัง receiver เดียวกัน เราทำเช่นนั้นได้
โดย clone transmitter ดังที่แสดงใน Listing 16-11
use std::sync::mpsc;
use std::thread;
use std::time::Duration;
fn main() {
// --snip--
let (tx, rx) = mpsc::channel();
let tx1 = tx.clone();
thread::spawn(move || {
let vals = vec![
String::from("hi"),
String::from("from"),
String::from("the"),
String::from("thread"),
];
for val in vals {
tx1.send(val).unwrap();
thread::sleep(Duration::from_secs(1));
}
});
thread::spawn(move || {
let vals = vec![
String::from("more"),
String::from("messages"),
String::from("for"),
String::from("you"),
];
for val in vals {
tx.send(val).unwrap();
thread::sleep(Duration::from_secs(1));
}
});
for received in rx {
println!("Got: {received}");
}
// --snip--
}
คราวนี้ ก่อนที่เราสร้างเธรดที่ spawn แรก เราเรียก clone บน
transmitter นี่จะให้เรา transmitter ใหม่ที่เราส่งให้เธรดที่ spawn
แรกได้ เราส่ง transmitter เดิมให้เธรดที่ spawn ที่สอง นี่ให้เรา
สองเธรด แต่ละเธรดส่งข้อความต่างกันไปยัง receiver เดียว
เมื่อคุณรันโค้ด output ของคุณควรดูประมาณนี้:
Got: hi
Got: more
Got: from
Got: messages
Got: for
Got: the
Got: thread
Got: you
คุณอาจเห็นค่าในลำดับอื่น ขึ้นกับระบบของคุณ นี่คือสิ่งที่ทำให้
concurrency น่าสนใจเช่นเดียวกับยาก ถ้าคุณทดลองกับ thread::sleep
ให้ค่าต่าง ๆ ในเธรดต่าง ๆ แต่ละการรันจะมี nondeterministic มากขึ้น
และสร้าง output ต่างกันแต่ละครั้ง
ตอนนี้เราดูว่า channel ทำงานยังไงแล้ว มาดูวิธี concurrency อีกแบบ