ใช้ Concurrency กับ Async
ในส่วนนี้ เราจะใช้ async กับบางความท้าทาย concurrency เดียวกันที่เรา จัดการด้วยเธรดในบทที่ 16 เพราะเราพูดถึงแนวคิดหลักหลายอย่างที่นั่นแล้ว ในส่วนนี้เราจะโฟกัสที่สิ่งที่ต่างระหว่างเธรดและ future
ในหลายกรณี API สำหรับทำงานกับ concurrency โดยใช้ async คล้ายกับ สำหรับใช้เธรดมาก ในกรณีอื่น พวกมันลงเอยที่ต่างพอควร แม้เมื่อ API ดูเหมือน คล้ายกันระหว่างเธรดและ async พวกมันมักมีพฤติกรรมต่างกัน และพวกมันเกือบเสมอมีลักษณะ performance ต่างกัน
สร้าง Task ใหม่ด้วย spawn_task
operation แรกที่เราจัดการในส่วน
“สร้าง Thread ใหม่ด้วย spawn” ใน
บทที่ 16 คือการนับขึ้นบนสองเธรดแยก มาทำสิ่งเดียวกันโดยใช้ async
crate trpl ให้ฟังก์ชัน spawn_task ที่ดูคล้าย API thread::spawn
มาก และฟังก์ชัน sleep ที่เป็นเวอร์ชัน async ของ API thread::sleep
เราใช้พวกมันรวมกันเพื่อ implement ตัวอย่างการนับได้ ดังที่แสดงใน
Listing 17-6
extern crate trpl; // required for mdbook test
use std::time::Duration;
fn main() {
trpl::block_on(async {
trpl::spawn_task(async {
for i in 1..10 {
println!("hi number {i} from the first task!");
trpl::sleep(Duration::from_millis(500)).await;
}
});
for i in 1..5 {
println!("hi number {i} from the second task!");
trpl::sleep(Duration::from_millis(500)).await;
}
});
}
เป็นจุดเริ่มของเรา เราตั้งฟังก์ชัน main ของเราด้วย trpl::block_on
เพื่อให้ฟังก์ชันระดับสูงของเราเป็น async ได้
สังเกต — จากจุดนี้ไปข้างหน้าในบท ทุกตัวอย่างจะรวมโค้ด wrapping เดียวกันนี้กับ
trpl::block_onในmainดังนั้นเราจะข้ามมันบ่อย เหมือนที่เราทำกับmainจำที่จะรวมมันในโค้ดของคุณ!
จากนั้นเราเขียนสอง loop ภายใน block นั้น แต่ละ loop บรรจุการเรียก
trpl::sleep ซึ่งรอครึ่งวินาที (500 millisecond) ก่อนส่งข้อความ
ถัดไป เราใส่หนึ่ง loop ใน body ของ trpl::spawn_task และอีกอันใน
loop for ระดับสูง เรายังเพิ่ม await หลังการเรียก sleep
โค้ดนี้ทำตัวคล้าย implementation ที่ใช้เธรด — รวมข้อเท็จจริงว่าคุณ อาจเห็นข้อความปรากฏในลำดับต่างกันใน terminal ของคุณเองเมื่อคุณรัน มัน:
hi number 1 from the second task!
hi number 1 from the first task!
hi number 2 from the first task!
hi number 2 from the second task!
hi number 3 from the first task!
hi number 3 from the second task!
hi number 4 from the first task!
hi number 4 from the second task!
hi number 5 from the first task!
เวอร์ชันนี้หยุดทันทีที่ loop for ใน body ของ block async หลักเสร็จ
เพราะ task ที่ spawn โดย spawn_task ถูก shut down เมื่อฟังก์ชัน
main จบ ถ้าคุณต้องการให้มันรันจนถึงเสร็จของ task คุณจะต้องใช้
join handle เพื่อรอ task แรกเสร็จ ด้วยเธรด เราใช้เมธอด join เพื่อ
“block” จนกว่าเธรดเสร็จรัน ใน Listing 17-7 เราใช้ await เพื่อทำ
สิ่งเดียวกันได้ เพราะ task handle เองเป็น future type Output ของ
มันคือ Result ดังนั้นเรายัง unwrap มันหลัง await มัน
extern crate trpl; // required for mdbook test
use std::time::Duration;
fn main() {
trpl::block_on(async {
let handle = trpl::spawn_task(async {
for i in 1..10 {
println!("hi number {i} from the first task!");
trpl::sleep(Duration::from_millis(500)).await;
}
});
for i in 1..5 {
println!("hi number {i} from the second task!");
trpl::sleep(Duration::from_millis(500)).await;
}
handle.await.unwrap();
});
}
await กับ join handle เพื่อรัน task จนเสร็จเวอร์ชันที่อัพเดทนี้รันจนกว่า ทั้งสอง loop เสร็จ:
hi number 1 from the second task!
hi number 1 from the first task!
hi number 2 from the first task!
hi number 2 from the second task!
hi number 3 from the first task!
hi number 3 from the second task!
hi number 4 from the first task!
hi number 4 from the second task!
hi number 5 from the first task!
hi number 6 from the first task!
hi number 7 from the first task!
hi number 8 from the first task!
hi number 9 from the first task!
จนถึงตอนนี้ ดูเหมือน async และเธรดให้เราผลคล้ายกัน เพียงด้วย syntax
ต่างกัน — ใช้ await แทนการเรียก join บน join handle และ await
การเรียก sleep
ความแตกต่างใหญ่กว่าคือเราไม่ต้อง spawn เธรด OS อีกตัวเพื่อทำสิ่งนี้
จริง ๆ เราไม่ต้อง spawn task ที่นี่ด้วย เพราะ block async คอมไพล์
เป็น future anonymous เราใส่แต่ละ loop ใน block async และให้ runtime
รันทั้งสองจนเสร็จโดยใช้ฟังก์ชัน trpl::join ได้
ในส่วน “รอให้เธรดทั้งหมดเสร็จ” ในบท
ที่ 16 เราแสดงวิธีใช้เมธอด join บน type JoinHandle ที่ return
เมื่อคุณเรียก std::thread::spawn ฟังก์ชัน trpl::join คล้ายกัน
แต่สำหรับ future เมื่อคุณให้มันสอง future มันผลิต future ใหม่หนึ่ง
ที่ output ของมันคือ tuple ที่บรรจุ output ของแต่ละ future ที่คุณ
ส่งเข้าเมื่อพวกมัน ทั้งคู่ เสร็จ ดังนั้น ใน Listing 17-8 เราใช้
trpl::join เพื่อรอ fut1 และ fut2 เสร็จ เรา ไม่ await
fut1 และ fut2 แต่แทน future ใหม่ที่ผลิตโดย trpl::join เรา
ignore output เพราะมันเป็นเพียง tuple ที่บรรจุสองค่า unit
extern crate trpl; // required for mdbook test
use std::time::Duration;
fn main() {
trpl::block_on(async {
let fut1 = async {
for i in 1..10 {
println!("hi number {i} from the first task!");
trpl::sleep(Duration::from_millis(500)).await;
}
};
let fut2 = async {
for i in 1..5 {
println!("hi number {i} from the second task!");
trpl::sleep(Duration::from_millis(500)).await;
}
};
trpl::join(fut1, fut2).await;
});
}
trpl::join เพื่อ await สอง future anonymousเมื่อเรารันนี้ เราเห็นทั้งสอง future รันจนเสร็จ:
hi number 1 from the first task!
hi number 1 from the second task!
hi number 2 from the first task!
hi number 2 from the second task!
hi number 3 from the first task!
hi number 3 from the second task!
hi number 4 from the first task!
hi number 4 from the second task!
hi number 5 from the first task!
hi number 6 from the first task!
hi number 7 from the first task!
hi number 8 from the first task!
hi number 9 from the first task!
ตอนนี้ คุณจะเห็นลำดับเดียวกันแน่นอนทุกครั้ง ซึ่งต่างมากจากที่เรา
เห็นกับเธรดและกับ trpl::spawn_task ใน Listing 17-7 นั่นเพราะ
ฟังก์ชัน trpl::join เป็น fair หมายความว่ามันตรวจสอบแต่ละ future
อย่างเท่ากันบ่อย สลับระหว่างพวกมัน และไม่ให้อันหนึ่งแข่งไปข้างหน้า
ถ้าอีกอันพร้อม ด้วยเธรด OS ตัดสินว่าจะตรวจสอบเธรดไหนและให้รันนาน
เท่าไร ด้วย async Rust runtime ตัดสินว่าจะตรวจสอบ task ไหน (ในการ
ปฏิบัติ รายละเอียดซับซ้อนเพราะ async runtime อาจใช้เธรด OS ใต้
ฝ่ามือเป็นส่วนของวิธีที่มันจัดการ concurrency ดังนั้นการรับประกัน
fairness งานเพิ่มสำหรับ runtime — แต่ยังเป็นไปได้!) Runtime ไม่ต้อง
รับประกัน fairness สำหรับ operation ใด และพวกมันมักเสนอ API ต่างกัน
เพื่อให้คุณเลือกว่าคุณต้องการ fairness หรือไม่
ลองรูปแบบบางอย่างเหล่านี้ในการ await future และดูว่าพวกมันทำอะไร:
- ลบ block async จากรอบ loop ใดอันหนึ่งหรือทั้งสอง
- Await แต่ละ block async ทันทีหลังนิยามมัน
- Wrap เพียง loop แรกใน block async และ await future ที่ได้หลัง body ของ loop ที่สอง
สำหรับความท้าทายเพิ่ม ดูว่าคุณคิดออกได้ไหมว่า output จะเป็นอะไรใน แต่ละกรณี ก่อน รันโค้ด!
ส่งข้อมูลระหว่างสอง Task โดยใช้ Message Passing
การแชร์ข้อมูลระหว่าง future ก็จะคุ้นเคย — เราจะใช้ message passing อีก แต่คราวนี้ด้วย type และฟังก์ชัน async version เราจะใช้ทาง ต่างเล็กน้อยจากที่เราทำในส่วน “โอนข้อมูลระหว่าง Thread ด้วย Message Passing” ในบทที่ 16 เพื่อแสดงบางความแตกต่างหลักระหว่าง concurrency ที่ใช้ เธรดและที่ใช้ future ใน Listing 17-9 เราจะเริ่มด้วยเพียง block async เดียว — ไม่ spawn task แยกเหมือนที่เรา spawn เธรดแยก
extern crate trpl; // required for mdbook test
fn main() {
trpl::block_on(async {
let (tx, mut rx) = trpl::channel();
let val = String::from("hi");
tx.send(val).unwrap();
let received = rx.recv().await.unwrap();
println!("received '{received}'");
});
}
tx และ rxที่นี่ เราใช้ trpl::channel async version ของ API channel
multiple-producer, single-consumer ที่เราใช้กับเธรดในบทที่ 16
async version ของ API ต่างจาก thread-based version เพียงเล็กน้อย —
มันใช้ receiver rx แบบ mutable ไม่ใช่ immutable และเมธอด recv
ของมันผลิต future ที่เราต้อง await แทนการผลิตค่าโดยตรง ตอนนี้เรา
ส่งข้อความจาก sender ไปยัง receiver ได้ สังเกตว่าเราไม่ต้อง spawn
เธรดแยกหรือแม้แต่ task — เราเพียงต้อง await การเรียก rx.recv
เมธอด synchronous Receiver::recv ใน std::mpsc::channel block จน
กว่ามันรับข้อความ เมธอด trpl::Receiver::recv ไม่ทำ เพราะมันเป็น
async แทนการ block มันส่งการควบคุมกลับให้ runtime จนกว่าข้อความถูก
รับหรือฝั่ง send ของ channel ปิด เปรียบเทียบ เราไม่ await การเรียก
send เพราะมันไม่ block มันไม่ต้อง เพราะ channel ที่เราส่งเข้าเป็น
unbounded
สังเกต — เพราะโค้ด async ทั้งหมดนี้รันใน block async ในการเรียก
trpl::block_onทุกอย่างภายในมันหลีกเลี่ยงการ block ได้ อย่างไร ก็ตาม โค้ด ภายนอก มันจะ block บนฟังก์ชันblock_onreturn นั่น คือประเด็นทั้งหมดของฟังก์ชันtrpl::block_on— มันให้คุณ เลือก ที่ที่จะ block บนชุดของโค้ด async และดังนั้นที่ที่เปลี่ยนระหว่าง โค้ด sync และ async
สังเกตสองสิ่งเกี่ยวกับตัวอย่างนี้ ก่อนอื่น ข้อความจะมาถึงทันที ที่ สอง แม้เราใช้ future ที่นี่ ยังไม่มี concurrency ทุกอย่างใน listing เกิดในลำดับ เพียงเหมือนที่จะถ้าไม่มี future ที่เกี่ยวข้อง
มาแก้ส่วนแรกโดยส่งชุดของข้อความและ sleep ระหว่างพวกมัน ดังที่แสดงใน Listing 17-10
extern crate trpl; // required for mdbook test
use std::time::Duration;
fn main() {
trpl::block_on(async {
let (tx, mut rx) = trpl::channel();
let vals = vec![
String::from("hi"),
String::from("from"),
String::from("the"),
String::from("future"),
];
for val in vals {
tx.send(val).unwrap();
trpl::sleep(Duration::from_millis(500)).await;
}
while let Some(value) = rx.recv().await {
println!("received '{value}'");
}
});
}
await ระหว่างแต่ละข้อความนอกจากส่งข้อความ เราต้องรับพวกมัน ในกรณีนี้ เพราะเรารู้ว่ามีกี่
ข้อความที่กำลังมา เราทำสิ่งนั้นด้วยมือโดยเรียก rx.recv().await
สี่ครั้งได้ ในโลกจริง อย่างไรก็ตาม เราจะรอจำนวน ไม่รู้ ของข้อความ
โดยทั่วไป ดังนั้นเราต้องรอต่อจนกว่าเราตัดสินว่าไม่มีข้อความเพิ่ม
ใน Listing 16-10 เราใช้ loop for เพื่อประมวลผล item ทั้งหมดที่
รับจาก channel synchronous Rust ยังไม่มีวิธีใช้ loop for กับชุด
ของ item ผลิตแบบ asynchronous อย่างไรก็ตาม ดังนั้นเราต้องใช้ loop
ที่เรายังไม่เห็น — loop conditional while let นี่คือ loop version
ของ construct if let ที่เราเห็นในส่วน
“Control Flow สั้น ๆ ด้วย if let และ let...else”
ในบทที่ 6 loop จะ execute ต่อตราบใดที่ pattern ที่มันระบุยัง match
ค่า
การเรียก rx.recv ผลิต future ซึ่งเรา await runtime จะ pause
future จนกว่ามันพร้อม เมื่อข้อความมาถึง future จะ resolve เป็น
Some(message) มากเท่าที่ข้อความมาถึง เมื่อ channel ปิด ไม่ว่า ใด
ข้อความมาถึงหรือไม่ future จะแทน resolve เป็น None เพื่อระบุว่า
ไม่มีค่าเพิ่มและดังนั้นเราควรหยุด polling — นั่นคือ หยุด await
loop while let ดึงทั้งหมดนี้ด้วยกัน ถ้าผลของการเรียก
rx.recv().await คือ Some(message) เราได้สิทธิ์เข้าถึงข้อความและ
ใช้มันใน body ของ loop ได้ เพียงเหมือนที่เราทำได้กับ if let ถ้า
ผลคือ None loop จบ ทุกครั้งที่ loop เสร็จ มันชน await point อีก
ดังนั้น runtime pause มันอีกจนกว่าข้อความอื่นมาถึง
โค้ดตอนนี้ส่งและรับข้อความทั้งหมดสำเร็จ โชคไม่ดี ยังมีสองสามปัญหา สำหรับสิ่งหนึ่ง ข้อความไม่มาถึงที่ช่วงครึ่งวินาที พวกมันมาถึงพร้อม กันทั้งหมด 2 วินาที (2,000 millisecond) หลังเราเริ่มโปรแกรม สำหรับ อีกอย่าง โปรแกรมนี้ก็ไม่ออกเลย! แทน มันรอตลอดสำหรับข้อความใหม่ คุณ จะต้อง shut down โดยใช้ ctrl-C
โค้ดภายใน Async Block หนึ่ง Execute แบบ Linear
มาเริ่มโดยตรวจสอบทำไมข้อความมาพร้อมกันหลังการ delay เต็ม ไม่ใช่
มาด้วย delay ระหว่างแต่ละอัน ภายใน block async ที่ให้ ลำดับที่
keyword await ปรากฏในโค้ดก็เป็นลำดับที่พวกมันถูก execute เมื่อ
โปรแกรมรัน
มีเพียงหนึ่ง block async ใน Listing 17-10 ดังนั้นทุกอย่างในมันรัน
linear ยังไม่มี concurrency การเรียก tx.send ทั้งหมดเกิด แทรกกับ
การเรียก trpl::sleep ทั้งหมดและ await point ที่ associate ของพวก
มัน เพียงจากนั้น loop while let จึงไปผ่าน await point ใดบนการ
เรียก recv
เพื่อรับพฤติกรรมที่เราต้องการ ที่ sleep delay เกิดระหว่างแต่ละ
ข้อความ เราต้องใส่ operation tx และ rx ใน block async ของพวก
มันเอง ดังที่แสดงใน Listing 17-11 จากนั้น runtime execute แต่ละ
พวกมันแยกโดยใช้ trpl::join ได้ เพียงเหมือนใน Listing 17-8 อีก
ครั้ง เรา await ผลของการเรียก trpl::join ไม่ใช่ future แต่ละตัว
ถ้าเรา await future แต่ละตัวในลำดับ เราจะลงเอยกลับใน flow ลำดับ —
แน่นอนสิ่งที่เราพยายาม ไม่ ทำ
extern crate trpl; // required for mdbook test
use std::time::Duration;
fn main() {
trpl::block_on(async {
let (tx, mut rx) = trpl::channel();
let tx_fut = async {
let vals = vec![
String::from("hi"),
String::from("from"),
String::from("the"),
String::from("future"),
];
for val in vals {
tx.send(val).unwrap();
trpl::sleep(Duration::from_millis(500)).await;
}
};
let rx_fut = async {
while let Some(value) = rx.recv().await {
println!("received '{value}'");
}
};
trpl::join(tx_fut, rx_fut).await;
});
}
send และ recv เข้า block async ของพวกมันเองและ await future สำหรับ block เหล่านั้นด้วยโค้ดอัพเดทใน Listing 17-11 ข้อความถูก print ที่ช่วง 500 millisecond ไม่ใช่ทั้งหมดในรวดเดียวหลัง 2 วินาที
ย้าย Ownership เข้า Async Block
โปรแกรมยังไม่ออกเลย อย่างไรก็ตาม เพราะวิธีที่ loop while let
interact กับ trpl::join:
- future ที่ return จาก
trpl::joinเสร็จเฉพาะเมื่อ ทั้งสอง future ที่ส่งให้มันเสร็จ - future
tx_futเสร็จเมื่อมัน sleep เสร็จหลังส่งข้อความสุดท้ายในvals - future
rx_futจะไม่เสร็จจนกว่า loopwhile letจบ - loop
while letจะไม่จบจนกว่าการ awaitrx.recvผลิตNone - การ await
rx.recvจะ returnNoneเฉพาะเมื่อปลายอื่นของ channel ถูกปิด - channel จะปิดเฉพาะถ้าเราเรียก
rx.closeหรือเมื่อฝั่ง sender,txถูก drop - เราไม่เรียก
rx.closeที่ไหน และtxจะไม่ถูก drop จนกว่า block async ภายนอกสุดที่ส่งให้trpl::block_onจบ - block ไม่จบเพราะมันถูก block บน
trpl::joinเสร็จ ซึ่งพาเรากลับ ไปยังบนสุดของ list นี้
ตอนนี้ block async ที่เราส่งข้อความเพียง borrow tx เพราะการส่ง
ข้อความไม่ต้องการ ownership แต่ถ้าเรา ย้าย tx เข้า block async
นั้นได้ มันจะถูก drop เมื่อ block นั้นจบ ในส่วน
“จับ Reference หรือย้าย Ownership”
ในบทที่ 13 คุณเรียนวิธีใช้ keyword move กับ closure และดังที่
พูดในส่วน “ใช้ Closure move กับ Thread”
ในบทที่ 16 เราต้องย้ายข้อมูลเข้า closure เมื่อทำงานกับเธรดบ่อย
dynamic พื้นฐานเดียวกันใช้กับ block async ดังนั้น keyword move
ทำงานกับ block async เพียงเหมือนที่ทำกับ closure
ใน Listing 17-12 เราเปลี่ยน block ที่ใช้ส่งข้อความจาก async เป็น
async move
extern crate trpl; // required for mdbook test
use std::time::Duration;
fn main() {
trpl::block_on(async {
let (tx, mut rx) = trpl::channel();
let tx_fut = async move {
// --snip--
let vals = vec![
String::from("hi"),
String::from("from"),
String::from("the"),
String::from("future"),
];
for val in vals {
tx.send(val).unwrap();
trpl::sleep(Duration::from_millis(500)).await;
}
};
let rx_fut = async {
while let Some(value) = rx.recv().await {
println!("received '{value}'");
}
};
trpl::join(tx_fut, rx_fut).await;
});
}
เมื่อเรารัน นี้ version ของโค้ด มัน shut down อย่างสง่างามหลัง ข้อความสุดท้ายถูกส่งและรับ ถัดไป มาดูว่าอะไรจะต้องเปลี่ยนเพื่อส่ง ข้อมูลจากมากกว่าหนึ่ง future
Join จำนวน Future ด้วยมาโคร join!
channel async นี้ยังเป็น channel multiple-producer ดังนั้นเราเรียก
clone บน tx ได้ถ้าเราต้องการส่งข้อความจากหลาย future ดังที่
แสดงใน Listing 17-13
extern crate trpl; // required for mdbook test
use std::time::Duration;
fn main() {
trpl::block_on(async {
let (tx, mut rx) = trpl::channel();
let tx1 = tx.clone();
let tx1_fut = async move {
let vals = vec![
String::from("hi"),
String::from("from"),
String::from("the"),
String::from("future"),
];
for val in vals {
tx1.send(val).unwrap();
trpl::sleep(Duration::from_millis(500)).await;
}
};
let rx_fut = async {
while let Some(value) = rx.recv().await {
println!("received '{value}'");
}
};
let tx_fut = async move {
let vals = vec![
String::from("more"),
String::from("messages"),
String::from("for"),
String::from("you"),
];
for val in vals {
tx.send(val).unwrap();
trpl::sleep(Duration::from_millis(1500)).await;
}
};
trpl::join!(tx1_fut, tx_fut, rx_fut);
});
}
ก่อนอื่น เรา clone tx สร้าง tx1 ภายนอก block async แรก เราย้าย
tx1 เข้า block นั้นเพียงเหมือนที่เราทำก่อนกับ tx จากนั้น ภายหลัง
เราย้าย tx ดั้งเดิมเข้า block async ใหม่ ที่เราส่งข้อความเพิ่ม
ใน delay ที่ช้ากว่าเล็กน้อย เราบังเอิญใส่ block async ใหม่นี้หลัง
block async สำหรับรับข้อความ แต่มันไปก่อนมันก็ได้เช่นกัน กุญแจคือ
ลำดับที่ future ถูก await ไม่ใช่ที่พวกมันถูกสร้าง
block async ทั้งสองสำหรับส่งข้อความต้องเป็น block async move
เพื่อให้ทั้ง tx และ tx1 ถูก drop เมื่อ block เหล่านั้นเสร็จ
มิฉะนั้น เราจะลงเอยกลับใน loop infinite เดียวกันที่เราเริ่ม
สุดท้าย เราเปลี่ยนจาก trpl::join เป็น trpl::join! เพื่อจัดการ
future เพิ่ม — มาโคร join! await จำนวน future ตามอำเภอใจที่เรารู้
จำนวน future ที่ compile time เราจะพูดถึงการ await collection ของ
จำนวนไม่รู้ของ future ภายหลังในบทนี้
ตอนนี้เราเห็นข้อความทั้งหมดจาก future ส่งทั้งสอง และเพราะ future ส่งใช้ delay ต่างเล็กน้อยหลังส่ง ข้อความถูกรับที่ช่วงต่างเหล่านั้น ด้วย:
received 'hi'
received 'more'
received 'from'
received 'the'
received 'messages'
received 'future'
received 'for'
received 'you'
เราสำรวจวิธีใช้ message passing ในการส่งข้อมูลระหว่าง future, วิธีที่ โค้ดภายใน block async รันลำดับ, วิธีย้าย ownership เข้า block async และวิธี join หลาย future ถัดไป มาพูดถึงวิธีและทำไมบอก runtime ว่า มันสลับไปยังอีก task ได้