Shared-State Concurrency
Message passing เป็นวิธีดีในการจัดการ concurrency แต่ไม่ใช่วิธีเดียว อีกวิธีจะเป็นการให้หลายเธรดเข้าถึงข้อมูลที่แชร์เดียวกัน พิจารณาส่วน นี้ของ slogan จาก documentation ภาษา Go อีก — “อย่าสื่อสารโดยแชร์ memory”
การสื่อสารโดยแชร์ memory จะดูเป็นยังไง? นอกจากนี้ ทำไมผู้ที่กระตือ รือร้นกับ message-passing เตือนไม่ให้ใช้การแชร์ memory?
ในแง่หนึ่ง channel ในภาษาโปรแกรมใดคล้ายกับ single ownership เพราะ เมื่อคุณโอนค่าลง channel คุณไม่ควรใช้ค่านั้นอีก Shared-memory concurrency เหมือน multiple ownership — หลายเธรดเข้าถึง memory location เดียวกันพร้อมกันได้ ดังที่คุณเห็นในบทที่ 15 ที่ smart pointer ทำให้ multiple ownership เป็นไปได้ multiple ownership เพิ่ม ความซับซ้อนได้เพราะ owner ต่างเหล่านี้ต้องจัดการ ระบบ type และกฎ ownership ของ Rust ช่วยอย่างมากในการจัดการสิ่งนี้ให้ถูกต้อง เป็น ตัวอย่าง มาดู mutex หนึ่งใน concurrency primitive ที่ทั่วไปกว่า สำหรับการแชร์ memory
ควบคุมการเข้าถึงด้วย Mutex
Mutex เป็นคำย่อสำหรับ mutual exclusion — mutex อนุญาตให้เพียง หนึ่งเธรดเข้าถึงข้อมูลในเวลาใดเวลาหนึ่ง ในการเข้าถึงข้อมูลใน mutex เธรดต้องส่งสัญญาณก่อนว่ามันต้องการการเข้าถึงโดยขอ acquire lock ของ mutex lock คือโครงสร้างข้อมูลที่เป็นส่วนของ mutex ที่ตามใครมี สิทธิ์เข้าถึงข้อมูลแบบ exclusive ปัจจุบัน ดังนั้น mutex ถูกอธิบาย ว่า ปกป้อง ข้อมูลที่มันเก็บผ่านระบบ locking
Mutex มีชื่อเสียงว่าใช้ยากเพราะคุณต้องจำกฎสองข้อ:
- คุณต้องพยายาม acquire lock ก่อนใช้ข้อมูล
- เมื่อคุณเสร็จกับข้อมูลที่ mutex ปกป้อง คุณต้อง unlock ข้อมูลเพื่อ ให้เธรดอื่น acquire lock ได้
สำหรับการเปรียบในโลกจริงสำหรับ mutex จินตนาการ panel discussion ที่ conference ที่มีเพียงหนึ่ง microphone ก่อนที่ panelist จะพูด พวกเขา ต้องขอหรือส่งสัญญาณว่าพวกเขาต้องการใช้ microphone เมื่อพวกเขาได้ microphone พวกเขาคุยได้นานเท่าที่ต้องการแล้วส่ง microphone ให้ panelist ถัดไปที่ขอพูด ถ้า panelist ลืมส่ง microphone เมื่อพวกเขา เสร็จกับมัน ไม่มีใครอื่นที่สามารถพูดได้ ถ้าการจัดการ microphone ที่แชร์ผิดพลาด panel จะไม่ทำงานตามที่วางแผน!
การจัดการ mutex ยากมากในการทำให้ถูก ซึ่งเป็นเหตุผลที่คนหลายคน กระตือรือร้นกับ channel อย่างไรก็ตาม ขอบคุณระบบ type และกฎ ownership ของ Rust คุณทำ locking และ unlocking ผิดไม่ได้
API ของ Mutex<T>
เป็นตัวอย่างของวิธีใช้ mutex มาเริ่มโดยใช้ mutex ใน context เธรด เดียว ดังที่แสดงใน Listing 16-12
use std::sync::Mutex;
fn main() {
let m = Mutex::new(5);
{
let mut num = m.lock().unwrap();
*num = 6;
}
println!("m = {m:?}");
}
Mutex<T> ใน context เธรดเดียวเพื่อความง่ายเช่นเดียวกับหลาย type เราสร้าง Mutex<T> โดยใช้ associated function
new ในการเข้าถึงข้อมูลภายใน mutex เราใช้เมธอด lock เพื่อ
acquire lock การเรียกนี้จะ block เธรดปัจจุบันเพื่อให้มันทำงานใดไม่
ได้จนกว่าจะถึงเทิร์นของเราที่จะมี lock
การเรียก lock จะ fail ถ้าเธรดอื่นที่ถือ lock panic ในกรณีนั้น
ไม่มีใครที่สามารถได้ lock ดังนั้นเราเลือก unwrap และให้เธรดนี้
panic ถ้าเราอยู่ในสถานการณ์นั้น
หลังจากเรา acquire lock เราปฏิบัติกับค่า return ที่ชื่อ num ใน
กรณีนี้ เป็น mutable reference ของข้อมูลภายในได้ ระบบ type รับ
ประกันว่าเรา acquire lock ก่อนใช้ค่าใน m type ของ m คือ
Mutex<i32> ไม่ใช่ i32 ดังนั้นเรา ต้อง เรียก lock เพื่อใช้
ค่า i32 ได้ เราลืมไม่ได้ — ระบบ type จะไม่ให้เราเข้าถึง i32
ภายในมิฉะนั้น
การเรียก lock return type ที่เรียก MutexGuard ที่ wrap ใน
LockResult ที่เราจัดการด้วยการเรียก unwrap type MutexGuard
implement Deref เพื่อชี้ไปยังข้อมูลภายในของเรา — type ยังมี
implementation Drop ที่ปล่อย lock อัตโนมัติเมื่อ MutexGuard
ออกจาก scope ซึ่งเกิดที่ท้ายสุดของ scope ภายใน ผลคือ เราไม่เสี่ยง
ลืมที่จะปล่อย lock และ block mutex จากการถูกใช้โดยเธรดอื่นเพราะการ
ปล่อย lock เกิดอัตโนมัติ
หลังจาก drop lock เรา print ค่า mutex และเห็นว่าเราสามารถเปลี่ยน
i32 ภายในเป็น 6 ได้
การเข้าถึง Mutex<T> แบบแชร์
ตอนนี้ลองแชร์ค่าระหว่างหลายเธรดโดยใช้ Mutex<T> เราจะ spin 10
เธรดและให้แต่ละตัวเพิ่มค่า counter โดย 1 ดังนั้น counter ไปจาก 0
ถึง 10 ตัวอย่างใน Listing 16-13 จะมี error compiler และเราจะใช้
error นั้นเพื่อเรียนรู้เพิ่มเกี่ยวกับการใช้ Mutex<T> และวิธีที่
Rust ช่วยเราใช้มันถูกต้อง
use std::sync::Mutex;
use std::thread;
fn main() {
let counter = Mutex::new(0);
let mut handles = vec![];
for _ in 0..10 {
let handle = thread::spawn(move || {
let mut num = counter.lock().unwrap();
*num += 1;
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
println!("Result: {}", *counter.lock().unwrap());
}
Mutex<T>เราสร้างตัวแปร counter เพื่อเก็บ i32 ภายใน Mutex<T> ดังที่เรา
ทำใน Listing 16-12 ถัดไป เราสร้าง 10 เธรดโดย iterate ผ่านช่วงของ
ตัวเลข เราใช้ thread::spawn และให้เธรดทั้งหมด closure เดียวกัน —
อันที่ย้าย counter เข้าเธรด acquire lock บน Mutex<T> โดยเรียก
เมธอด lock แล้วเพิ่ม 1 ให้ค่าใน mutex เมื่อเธรดเสร็จรัน closure
ของมัน num จะออกจาก scope และปล่อย lock เพื่อให้เธรดอื่น acquire
มันได้
ในเธรดหลัก เรา collect join handle ทั้งหมด จากนั้น ดังที่เราทำใน
Listing 16-2 เราเรียก join บนแต่ละ handle เพื่อให้แน่ใจว่าเธรด
ทั้งหมดเสร็จ ที่จุดนั้น เธรดหลักจะ acquire lock และ print ผลของ
โปรแกรมนี้
เราใบ้ว่าตัวอย่างนี้จะไม่คอมไพล์ ตอนนี้มาหาว่าทำไม!
$ cargo run
Compiling shared-state v0.1.0 (file:///projects/shared-state)
error[E0382]: borrow of moved value: `counter`
--> src/main.rs:21:29
|
5 | let counter = Mutex::new(0);
| ------- move occurs because `counter` has type `std::sync::Mutex<i32>`, which does not implement the `Copy` trait
...
8 | for _ in 0..10 {
| -------------- inside of this loop
9 | let handle = thread::spawn(move || {
| ------- value moved into closure here, in previous iteration of loop
...
21 | println!("Result: {}", *counter.lock().unwrap());
| ^^^^^^^ value borrowed here after move
|
help: consider moving the expression out of the loop so it is only moved once
|
8 ~ let mut value = counter.lock();
9 ~ for _ in 0..10 {
10 | let handle = thread::spawn(move || {
11 ~ let mut num = value.unwrap();
|
For more information about this error, try `rustc --explain E0382`.
error: could not compile `shared-state` (bin "shared-state") due to 1 previous error
ข้อความ error ระบุว่าค่า counter ถูกย้ายใน iteration ก่อนหน้าของ
loop Rust กำลังบอกเราว่าเราย้าย ownership ของ lock counter เข้า
หลายเธรดไม่ได้ มาแก้ error compiler ด้วยวิธี multiple-ownership ที่
เราพูดถึงในบทที่ 15
Multiple Ownership กับหลาย Thread
ในบทที่ 15 เราให้ค่าให้หลาย owner โดยใช้ smart pointer Rc<T>
เพื่อสร้างค่าที่นับ reference มาทำสิ่งเดียวกันที่นี่และดูสิ่งที่
เกิด เราจะ wrap Mutex<T> ใน Rc<T> ใน Listing 16-14 และ clone
Rc<T> ก่อนย้าย ownership ไปยังเธรด
use std::rc::Rc;
use std::sync::Mutex;
use std::thread;
fn main() {
let counter = Rc::new(Mutex::new(0));
let mut handles = vec![];
for _ in 0..10 {
let counter = Rc::clone(&counter);
let handle = thread::spawn(move || {
let mut num = counter.lock().unwrap();
*num += 1;
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
println!("Result: {}", *counter.lock().unwrap());
}
Rc<T> เพื่ออนุญาตให้หลายเธรด own Mutex<T>อีกครั้ง เราคอมไพล์และได้… error ต่างกัน! compiler กำลังสอนเรา มาก:
$ cargo run
Compiling shared-state v0.1.0 (file:///projects/shared-state)
error[E0277]: `Rc<std::sync::Mutex<i32>>` cannot be sent between threads safely
--> src/main.rs:11:36
|
11 | let handle = thread::spawn(move || {
| ------------- ^------
| | |
| ______________________|_____________within this `{closure@src/main.rs:11:36: 11:43}`
| | |
| | required by a bound introduced by this call
12 | | let mut num = counter.lock().unwrap();
13 | |
14 | | *num += 1;
15 | | });
| |_________^ `Rc<std::sync::Mutex<i32>>` cannot be sent between threads safely
|
= help: within `{closure@src/main.rs:11:36: 11:43}`, the trait `Send` is not implemented for `Rc<std::sync::Mutex<i32>>`
note: required because it's used within this closure
--> src/main.rs:11:36
|
11 | let handle = thread::spawn(move || {
| ^^^^^^^
note: required by a bound in `spawn`
--> /rustc/1159e78c4747b02ef996e55082b704c09b970588/library/std/src/thread/mod.rs:723:1
For more information about this error, try `rustc --explain E0277`.
error: could not compile `shared-state` (bin "shared-state") due to 1 previous error
ว้าว ข้อความ error นั้นยาวมาก! นี่คือส่วนสำคัญที่จะโฟกัส —
`Rc<Mutex<i32>>` cannot be sent between threads safely
compiler ยังบอกเราเหตุผลทำไม — the trait `Send` is not implemented for `Rc<Mutex<i32>>` เราจะพูดถึง Send ในส่วนถัดไป — มันเป็น
หนึ่งใน trait ที่รับประกันว่า type ที่เราใช้กับเธรดมีไว้สำหรับใช้
ในสถานการณ์ concurrent
โชคไม่ดี Rc<T> ไม่ปลอดภัยที่จะแชร์ข้ามเธรด เมื่อ Rc<T> จัดการ
reference count มันเพิ่มให้ count สำหรับแต่ละการเรียก clone และ
ลบจาก count เมื่อ clone แต่ละตัวถูก drop แต่มันไม่ใช้ concurrency
primitive ใดเพื่อทำให้แน่ใจว่าการเปลี่ยน count ไม่ถูก interrupt
โดยเธรดอื่น สิ่งนี้นำไปสู่ count ที่ผิด — bug ที่ละเอียดอ่อนที่นำ
ไปสู่ memory leak หรือค่าถูก drop ก่อนเราเสร็จกับมันได้ สิ่งที่เรา
ต้องการคือ type ที่เหมือน Rc<T> แน่นอน แต่ที่ทำการเปลี่ยนแปลง
ของ reference count ในแบบที่ thread-safe
นับ Reference แบบ Atomic ด้วย Arc<T>
โชคดี Arc<T> เป็น type เหมือน Rc<T> ที่ปลอดภัยที่จะใช้ใน
สถานการณ์ concurrent a ย่อมาจาก atomic หมายความว่ามันเป็น type
ที่นับ reference แบบ atomic Atomic เป็น primitive ของ
concurrency เพิ่มที่เราจะไม่ครอบคลุมในรายละเอียดที่นี่ — ดู
documentation standard library สำหรับ std::sync::atomic
สำหรับรายละเอียดเพิ่ม ที่จุดนี้ คุณเพียงต้องรู้ว่า atomic ทำงาน
เหมือน primitive type แต่ปลอดภัยที่จะแชร์ข้ามเธรด
คุณอาจสงสัยว่าทำไม primitive type ทั้งหมดไม่ใช่ atomic และทำไม
type standard library ไม่ถูก implement ให้ใช้ Arc<T> เป็นค่า
เริ่มต้น เหตุผลคือ thread safety มาพร้อมบทลงโทษ performance ที่
คุณต้องการจ่ายเมื่อคุณต้องการจริง ๆ ถ้าคุณเพียงทำ operation บน
ค่าภายในเธรดเดียว โค้ดของคุณรันได้เร็วกว่าถ้ามันไม่ต้องบังคับใช้
การรับประกันที่ atomic ให้
มากลับไปยังตัวอย่างของเรา — Arc<T> และ Rc<T> มี API เดียวกัน
ดังนั้นเราแก้โปรแกรมของเราโดยเปลี่ยนบรรทัด use การเรียก new
และการเรียก clone โค้ดใน Listing 16-15 จะคอมไพล์และรันได้ในที่สุด
use std::sync::{Arc, Mutex};
use std::thread;
fn main() {
let counter = Arc::new(Mutex::new(0));
let mut handles = vec![];
for _ in 0..10 {
let counter = Arc::clone(&counter);
let handle = thread::spawn(move || {
let mut num = counter.lock().unwrap();
*num += 1;
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
println!("Result: {}", *counter.lock().unwrap());
}
Arc<T> เพื่อ wrap Mutex<T> เพื่อสามารถแชร์ ownership ข้ามหลายเธรดโค้ดนี้จะ print ต่อไปนี้:
Result: 10
เราทำได้! เรานับจาก 0 ถึง 10 ซึ่งอาจไม่ดูประทับใจมาก แต่มันสอนเรา
มากเกี่ยวกับ Mutex<T> และ thread safety คุณยังใช้โครงสร้างของ
โปรแกรมนี้เพื่อทำ operation ที่ซับซ้อนกว่าเพียงเพิ่ม counter ได้
โดยใช้กลยุทธ์นี้ คุณแบ่งการคำนวณเป็นส่วนอิสระ แยกส่วนเหล่านั้นข้าม
เธรด แล้วใช้ Mutex<T> เพื่อให้แต่ละเธรดอัพเดทผลลัพธ์สุดท้ายด้วย
ส่วนของมันได้
สังเกตว่าถ้าคุณกำลังทำ operation ตัวเลขที่ง่าย มี type ที่ง่ายกว่า
type Mutex<T> ที่ให้โดย
โมดูล std::sync::atomic ของ standard library
type เหล่านี้ให้การเข้าถึง atomic ที่ปลอดภัยและ concurrent ของ
primitive type เราเลือกใช้ Mutex<T> กับ primitive type สำหรับ
ตัวอย่างนี้เพื่อให้เราสามารถโฟกัสที่ Mutex<T> ทำงานยังไง
เปรียบเทียบ RefCell<T>/Rc<T> และ Mutex<T>/Arc<T>
คุณอาจสังเกตเห็นว่า counter เป็น immutable แต่เราได้ mutable
reference ของค่าภายในมัน — นี่หมายความว่า Mutex<T> ให้ interior
mutability เช่นที่ตระกูล Cell ทำ ในแบบเดียวกับที่เราใช้
RefCell<T> ในบทที่ 15 เพื่ออนุญาตให้เรา mutate เนื้อหาภายใน
Rc<T> เราใช้ Mutex<T> เพื่อ mutate เนื้อหาภายใน Arc<T>
รายละเอียดอีกอย่างที่จะสังเกตคือ Rust ปกป้องคุณจาก error logic ทุก
ประเภทไม่ได้เมื่อคุณใช้ Mutex<T> จำได้จากบทที่ 15 ว่าการใช้ Rc<T>
มาพร้อมความเสี่ยงของการสร้าง reference cycle ที่ค่า Rc<T> สอง
ตัวอ้างถึงกัน ก่อให้เกิด memory leak ในทำนองเดียวกัน Mutex<T> มา
พร้อมความเสี่ยงของการสร้าง deadlock เหล่านี้เกิดเมื่อ operation
ต้อง lock สอง resource และสองเธรดได้ acquire หนึ่งใน lock แต่ละ
ก่อให้เกิดพวกมันรอกันตลอดไป ถ้าคุณสนใจ deadlock ลองสร้างโปรแกรม
Rust ที่มี deadlock — จากนั้น ค้นคว้ากลยุทธ์ mitigation deadlock
สำหรับ mutex ในภาษาใดและลอง implement พวกมันใน Rust API
documentation ของ standard library สำหรับ Mutex<T> และ
MutexGuard เสนอข้อมูลที่มีประโยชน์
เราจะปิดบทนี้โดยพูดถึง trait Send และ Sync และวิธีที่เราใช้
พวกมันกับ type กำหนดเองได้