Yield การควบคุมให้ Runtime
จำได้จากส่วน “โปรแกรม Async แรกของเรา” ว่าที่แต่ละ await point, Rust ให้ runtime โอกาสที่จะ pause task และ สลับไปยังอีกอันถ้า future ที่ await ไม่พร้อม สิ่งตรงข้ามก็จริง — Rust เพียง pause block async และส่งการควบคุมกลับให้ runtime ที่ await point ทุกอย่างระหว่าง await point เป็น synchronous
นั่นหมายความว่าถ้าคุณทำงานเยอะใน block async โดยไม่มี await point future นั้นจะ block future อื่นจากการก้าวหน้า บางครั้งคุณอาจได้ยิน นี่ถูกอ้างถึงว่า future หนึ่ง starve future อื่น ในบางกรณี นั่น อาจไม่เป็นเรื่องใหญ่ อย่างไรก็ตาม ถ้าคุณกำลังทำ setup ราคาสูงหรืองาน ที่รันนาน หรือถ้าคุณมี future ที่จะทำงานเฉพาะตลอดไป คุณจะต้องคิด ว่าเมื่อและที่ไหนที่จะส่งการควบคุมกลับให้ runtime
มา simulate operation ที่รันนานเพื่อแสดงปัญหา starvation จากนั้น
สำรวจวิธีแก้มัน Listing 17-14 แนะนำฟังก์ชัน slow
extern crate trpl; // required for mdbook test
use std::{thread, time::Duration};
fn main() {
trpl::block_on(async {
// We will call `slow` here later
});
}
fn slow(name: &str, ms: u64) {
thread::sleep(Duration::from_millis(ms));
println!("'{name}' ran for {ms}ms");
}
thread::sleep เพื่อ simulate operation ที่ช้าโค้ดนี้ใช้ std::thread::sleep แทน trpl::sleep เพื่อให้การเรียก
slow จะ block เธรดปัจจุบันสำหรับจำนวน millisecond บางอย่าง เราใช้
slow แทน operation ในโลกจริงที่ทั้งรันนานและ blocking
ใน Listing 17-15 เราใช้ slow เพื่อจำลองการทำงาน CPU-bound ประเภท
นี้ในคู่ของ future
extern crate trpl; // required for mdbook test
use std::{thread, time::Duration};
fn main() {
trpl::block_on(async {
let a = async {
println!("'a' started.");
slow("a", 30);
slow("a", 10);
slow("a", 20);
trpl::sleep(Duration::from_millis(50)).await;
println!("'a' finished.");
};
let b = async {
println!("'b' started.");
slow("b", 75);
slow("b", 10);
slow("b", 15);
slow("b", 350);
trpl::sleep(Duration::from_millis(50)).await;
println!("'b' finished.");
};
trpl::select(a, b).await;
});
}
fn slow(name: &str, ms: u64) {
thread::sleep(Duration::from_millis(ms));
println!("'{name}' ran for {ms}ms");
}
slow เพื่อ simulate operation ที่ช้าแต่ละ future ส่งการควบคุมกลับให้ runtime เพียง หลัง ดำเนินการ operation ช้าเยอะ ถ้าคุณรันโค้ดนี้ คุณจะเห็น output นี้:
'a' started.
'a' ran for 30ms
'a' ran for 10ms
'a' ran for 20ms
'b' started.
'b' ran for 75ms
'b' ran for 10ms
'b' ran for 15ms
'b' ran for 350ms
'a' finished.
เช่นเดียวกับ Listing 17-5 ที่เราใช้ trpl::select เพื่อแข่ง future
fetch สอง URL select ยังเสร็จทันทีที่ a เสร็จ ไม่มี interleaving
ระหว่างการเรียก slow ในสอง future อย่างไรก็ตาม future a ทำงาน
ทั้งหมดจนกว่าการเรียก trpl::sleep ถูก await จากนั้น future b
ทำงานทั้งหมดจนกว่าการเรียก trpl::sleep ของมันเองถูก await และ
สุดท้าย future a เสร็จ เพื่ออนุญาตให้ทั้งสอง future ก้าวหน้า
ระหว่าง task ช้าของพวกมัน เราต้องการ await point เพื่อเราส่งการ
ควบคุมกลับให้ runtime ได้ นั่นหมายความว่าเราต้องการอะไรที่ await ได้!
เราเห็นการส่งต่อประเภทนี้เกิดใน Listing 17-15 ได้แล้ว — ถ้าเราลบ
trpl::sleep ที่ท้ายสุดของ future a มันจะเสร็จโดยที่ future b
ไม่ เคย รัน ลองใช้ฟังก์ชัน trpl::sleep เป็นจุดเริ่มเพื่อให้
operation สลับการก้าวหน้า ดังที่แสดงใน Listing 17-16
extern crate trpl; // required for mdbook test
use std::{thread, time::Duration};
fn main() {
trpl::block_on(async {
let one_ms = Duration::from_millis(1);
let a = async {
println!("'a' started.");
slow("a", 30);
trpl::sleep(one_ms).await;
slow("a", 10);
trpl::sleep(one_ms).await;
slow("a", 20);
trpl::sleep(one_ms).await;
println!("'a' finished.");
};
let b = async {
println!("'b' started.");
slow("b", 75);
trpl::sleep(one_ms).await;
slow("b", 10);
trpl::sleep(one_ms).await;
slow("b", 15);
trpl::sleep(one_ms).await;
slow("b", 350);
trpl::sleep(one_ms).await;
println!("'b' finished.");
};
trpl::select(a, b).await;
});
}
fn slow(name: &str, ms: u64) {
thread::sleep(Duration::from_millis(ms));
println!("'{name}' ran for {ms}ms");
}
trpl::sleep เพื่อให้ operation สลับการก้าวหน้าเราเพิ่มการเรียก trpl::sleep กับ await point ระหว่างแต่ละการเรียก
slow ตอนนี้งานของ future ทั้งสอง interleave:
'a' started.
'a' ran for 30ms
'b' started.
'b' ran for 75ms
'a' ran for 10ms
'b' ran for 10ms
'a' ran for 20ms
'b' ran for 15ms
'a' finished.
future a ยังรันสักครู่ก่อนส่งการควบคุมให้ b เพราะมันเรียก slow
ก่อนเรียก trpl::sleep แต่หลังจากนั้น future สลับไปมาแต่ละครั้งที่
หนึ่งในพวกมันชน await point ในกรณีนี้ เราทำเช่นนั้นหลังทุกการเรียก
slow แต่เราแบ่งงานในวิธีที่สมเหตุสมผลที่สุดสำหรับเราได้
เราไม่อยาก sleep ที่นี่จริง ๆ — เราต้องการก้าวหน้าเร็วที่สุดเท่า
ที่ทำได้ เราเพียงต้องส่งการควบคุมกลับให้ runtime เราทำสิ่งนั้นได้
โดยตรงโดยใช้ฟังก์ชัน trpl::yield_now ใน Listing 17-17 เราแทนที่
การเรียก trpl::sleep ทั้งหมดเหล่านั้นด้วย trpl::yield_now
extern crate trpl; // required for mdbook test
use std::{thread, time::Duration};
fn main() {
trpl::block_on(async {
let a = async {
println!("'a' started.");
slow("a", 30);
trpl::yield_now().await;
slow("a", 10);
trpl::yield_now().await;
slow("a", 20);
trpl::yield_now().await;
println!("'a' finished.");
};
let b = async {
println!("'b' started.");
slow("b", 75);
trpl::yield_now().await;
slow("b", 10);
trpl::yield_now().await;
slow("b", 15);
trpl::yield_now().await;
slow("b", 350);
trpl::yield_now().await;
println!("'b' finished.");
};
trpl::select(a, b).await;
});
}
fn slow(name: &str, ms: u64) {
thread::sleep(Duration::from_millis(ms));
println!("'{name}' ran for {ms}ms");
}
yield_now เพื่อให้ operation สลับการก้าวหน้าโค้ดนี้ทั้งชัดเจนเกี่ยวกับเจตนาจริงและเร็วกว่าการใช้ sleep อย่าง
มีนัยสำคัญ เพราะ timer เช่นที่ sleep ใช้มักมีขีดจำกัดเกี่ยวกับ
ความละเอียดของพวกมันได้ version ของ sleep ที่เราใช้ ตัวอย่างเช่น
จะ sleep อย่างน้อยหนึ่ง millisecond เสมอ แม้เราจะส่งมัน Duration
ของหนึ่ง nanosecond อีกครั้ง คอมพิวเตอร์สมัยใหม่ เร็ว — พวกมัน
ทำได้มากในหนึ่ง millisecond!
นี่หมายความว่า async มีประโยชน์แม้สำหรับ task compute-bound ขึ้นกับ สิ่งที่โปรแกรมของคุณกำลังทำอย่างอื่น เพราะมันให้เครื่องมือที่มี ประโยชน์ในการจัดโครงสร้างความสัมพันธ์ระหว่างส่วนต่าง ๆ ของโปรแกรม (แต่ที่ค่าของ overhead ของ state machine async) นี่คือรูปแบบของ cooperative multitasking ที่แต่ละ future มีอำนาจในการตัดสินเมื่อ มันส่งการควบคุมผ่าน await point แต่ละ future ดังนั้นยังมีความรับ ผิดชอบที่จะหลีกเลี่ยงการ block นานเกินไป ในบาง OS embedded ที่ใช้ Rust นี่คือ เท่านั้น ประเภทของ multitasking!
ในโค้ดโลกจริง คุณจะไม่สลับการเรียกฟังก์ชันกับ await point บนทุก บรรทัดเดียวโดยปกติ แน่นอน ในขณะที่การ yield การควบคุมในวิธีนี้ ราคาถูกค่อนข้างมาก มันไม่ฟรี ในหลายกรณี การพยายามแบ่ง task compute-bound อาจทำให้มันช้าลงอย่างมีนัยสำคัญ ดังนั้นบางครั้งดีกว่า สำหรับ performance โดยรวม ที่จะให้ operation block สั้น ๆ วัด เสมอเพื่อดูว่า performance bottleneck จริงของโค้ดของคุณคืออะไร dynamic ที่ underlying สำคัญที่จะเก็บในใจ อย่างไรก็ตาม ถ้าคุณ กำลัง เห็นงานเยอะเกิดใน serial ที่คุณคาดว่าจะเกิด concurrent!
สร้าง Async Abstraction ของเราเอง
เรายัง compose future ด้วยกันเพื่อสร้าง pattern ใหม่ได้ ตัวอย่าง
เช่น เรา build ฟังก์ชัน timeout ด้วย building block async ที่เรา
มีแล้วได้ เมื่อเราเสร็จ ผลจะเป็น building block อีกอันที่เราใช้
สร้าง async abstraction เพิ่มได้
Listing 17-18 แสดงว่าเราคาดว่า timeout นี้จะทำงานกับ future ช้า
อย่างไร
extern crate trpl; // required for mdbook test
use std::time::Duration;
fn main() {
trpl::block_on(async {
let slow = async {
trpl::sleep(Duration::from_secs(5)).await;
"Finally finished"
};
match timeout(slow, Duration::from_secs(2)).await {
Ok(message) => println!("Succeeded with '{message}'"),
Err(duration) => {
println!("Failed after {} seconds", duration.as_secs())
}
}
});
}
timeout ที่จินตนาการของเราเพื่อรัน operation ช้ากับขีดจำกัดเวลามา implement สิ่งนี้! เพื่อเริ่ม มาคิดเกี่ยวกับ API สำหรับ timeout:
- มันต้องเป็นฟังก์ชัน async เองเพื่อให้เรา await มันได้
- parameter แรกของมันควรเป็น future ที่จะรัน เราทำให้มัน generic เพื่ออนุญาตให้มันทำงานกับ future ใดก็ได้
- parameter ที่สองของมันจะเป็นเวลาสูงสุดที่จะรอ ถ้าเราใช้
Durationนั่นจะทำให้ง่ายในการส่งต่อให้trpl::sleep - มันควร return
Resultถ้า future เสร็จสำเร็จResultจะเป็นOkกับค่าที่ผลิตโดย future ถ้า timeout เลยก่อนResultจะเป็นErrกับ duration ที่ timeout รอ
Listing 17-19 แสดงการประกาศนี้
extern crate trpl; // required for mdbook test
use std::time::Duration;
fn main() {
trpl::block_on(async {
let slow = async {
trpl::sleep(Duration::from_secs(5)).await;
"Finally finished"
};
match timeout(slow, Duration::from_secs(2)).await {
Ok(message) => println!("Succeeded with '{message}'"),
Err(duration) => {
println!("Failed after {} seconds", duration.as_secs())
}
}
});
}
async fn timeout<F: Future>(
future_to_try: F,
max_time: Duration,
) -> Result<F::Output, Duration> {
// Here is where our implementation will go!
}
timeoutนั่นพอใจเป้าหมายของเราสำหรับ type ตอนนี้มาคิดเกี่ยวกับ พฤติกรรม
ที่เราต้องการ — เราต้องการแข่ง future ที่ส่งเข้ากับ duration เราใช้
trpl::sleep เพื่อสร้าง future timer จาก duration และใช้
trpl::select เพื่อรัน timer นั้นกับ future ที่ caller ส่งเข้าได้
ใน Listing 17-20 เรา implement timeout โดย match บนผลของการ await
trpl::select
extern crate trpl; // required for mdbook test
use std::time::Duration;
use trpl::Either;
// --snip--
fn main() {
trpl::block_on(async {
let slow = async {
trpl::sleep(Duration::from_secs(5)).await;
"Finally finished"
};
match timeout(slow, Duration::from_secs(2)).await {
Ok(message) => println!("Succeeded with '{message}'"),
Err(duration) => {
println!("Failed after {} seconds", duration.as_secs())
}
}
});
}
async fn timeout<F: Future>(
future_to_try: F,
max_time: Duration,
) -> Result<F::Output, Duration> {
match trpl::select(future_to_try, trpl::sleep(max_time)).await {
Either::Left(output) => Ok(output),
Either::Right(_) => Err(max_time),
}
}
timeout ด้วย select และ sleepimplementation ของ trpl::select ไม่ fair — มัน poll อาร์กิวเมนต์
ในลำดับที่พวกมันส่ง (implementation select อื่นจะสุ่มเลือก
อาร์กิวเมนต์ที่จะ poll ก่อน) ดังนั้นเราส่ง future_to_try ให้
select ก่อนเพื่อให้มันมีโอกาสเสร็จแม้ max_time เป็น duration
สั้นมาก ถ้า future_to_try เสร็จก่อน select จะ return Left
กับ output จาก future_to_try ถ้า timer เสร็จก่อน select จะ
return Right กับ output ของ timer เป็น ()
ถ้า future_to_try สำเร็จและเราได้ Left(output) เรา return
Ok(output) ถ้า sleep timer เลยแทนและเราได้ Right(()) เรา
ignore () ด้วย _ และ return Err(max_time) แทน
ด้วยนั้น เรามี timeout ทำงานที่ build ออกจากสอง async helper
อื่น ถ้าเรารันโค้ดของเรา มันจะ print failure mode หลัง timeout:
Failed after 2 seconds
เพราะ future compose กับ future อื่น คุณสร้างเครื่องมือทรงพลังจริงๆ โดยใช้ building block async เล็กกว่าได้ ตัวอย่างเช่น คุณใช้แนวทาง เดียวกันนี้เพื่อรวม timeout กับ retry และในทางกลับใช้พวกนั้นกับ operation เช่นการเรียก network (เช่นที่ใน Listing 17-5)
ในการปฏิบัติ คุณจะทำงานโดยตรงกับ async และ await โดยปกติ และ
รองด้วยฟังก์ชันเช่น select และมาโครเช่นมาโคร join! เพื่อควบคุม
วิธีที่ future ภายนอกสุดถูก execute
เราเห็นวิธีหลายอย่างในการทำงานกับ future หลายตัวพร้อมกันแล้ว ถัดไป เราจะดูว่าเราทำงานกับ future หลายตัวในลำดับเหนือเวลาด้วย stream ยังไงได้