Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Yield การควบคุมให้ Runtime

จำได้จากส่วน “โปรแกรม Async แรกของเรา” ว่าที่แต่ละ await point, Rust ให้ runtime โอกาสที่จะ pause task และ สลับไปยังอีกอันถ้า future ที่ await ไม่พร้อม สิ่งตรงข้ามก็จริง — Rust เพียง pause block async และส่งการควบคุมกลับให้ runtime ที่ await point ทุกอย่างระหว่าง await point เป็น synchronous

นั่นหมายความว่าถ้าคุณทำงานเยอะใน block async โดยไม่มี await point future นั้นจะ block future อื่นจากการก้าวหน้า บางครั้งคุณอาจได้ยิน นี่ถูกอ้างถึงว่า future หนึ่ง starve future อื่น ในบางกรณี นั่น อาจไม่เป็นเรื่องใหญ่ อย่างไรก็ตาม ถ้าคุณกำลังทำ setup ราคาสูงหรืองาน ที่รันนาน หรือถ้าคุณมี future ที่จะทำงานเฉพาะตลอดไป คุณจะต้องคิด ว่าเมื่อและที่ไหนที่จะส่งการควบคุมกลับให้ runtime

มา simulate operation ที่รันนานเพื่อแสดงปัญหา starvation จากนั้น สำรวจวิธีแก้มัน Listing 17-14 แนะนำฟังก์ชัน slow

Filename: src/main.rs
extern crate trpl; // required for mdbook test

use std::{thread, time::Duration};

fn main() {
    trpl::block_on(async {
        // We will call `slow` here later
    });
}

fn slow(name: &str, ms: u64) {
    thread::sleep(Duration::from_millis(ms));
    println!("'{name}' ran for {ms}ms");
}
Listing 17-14: ใช้ thread::sleep เพื่อ simulate operation ที่ช้า

โค้ดนี้ใช้ std::thread::sleep แทน trpl::sleep เพื่อให้การเรียก slow จะ block เธรดปัจจุบันสำหรับจำนวน millisecond บางอย่าง เราใช้ slow แทน operation ในโลกจริงที่ทั้งรันนานและ blocking

ใน Listing 17-15 เราใช้ slow เพื่อจำลองการทำงาน CPU-bound ประเภท นี้ในคู่ของ future

Filename: src/main.rs
extern crate trpl; // required for mdbook test

use std::{thread, time::Duration};

fn main() {
    trpl::block_on(async {
        let a = async {
            println!("'a' started.");
            slow("a", 30);
            slow("a", 10);
            slow("a", 20);
            trpl::sleep(Duration::from_millis(50)).await;
            println!("'a' finished.");
        };

        let b = async {
            println!("'b' started.");
            slow("b", 75);
            slow("b", 10);
            slow("b", 15);
            slow("b", 350);
            trpl::sleep(Duration::from_millis(50)).await;
            println!("'b' finished.");
        };

        trpl::select(a, b).await;
    });
}

fn slow(name: &str, ms: u64) {
    thread::sleep(Duration::from_millis(ms));
    println!("'{name}' ran for {ms}ms");
}
Listing 17-15: เรียกฟังก์ชัน slow เพื่อ simulate operation ที่ช้า

แต่ละ future ส่งการควบคุมกลับให้ runtime เพียง หลัง ดำเนินการ operation ช้าเยอะ ถ้าคุณรันโค้ดนี้ คุณจะเห็น output นี้:

'a' started.
'a' ran for 30ms
'a' ran for 10ms
'a' ran for 20ms
'b' started.
'b' ran for 75ms
'b' ran for 10ms
'b' ran for 15ms
'b' ran for 350ms
'a' finished.

เช่นเดียวกับ Listing 17-5 ที่เราใช้ trpl::select เพื่อแข่ง future fetch สอง URL select ยังเสร็จทันทีที่ a เสร็จ ไม่มี interleaving ระหว่างการเรียก slow ในสอง future อย่างไรก็ตาม future a ทำงาน ทั้งหมดจนกว่าการเรียก trpl::sleep ถูก await จากนั้น future b ทำงานทั้งหมดจนกว่าการเรียก trpl::sleep ของมันเองถูก await และ สุดท้าย future a เสร็จ เพื่ออนุญาตให้ทั้งสอง future ก้าวหน้า ระหว่าง task ช้าของพวกมัน เราต้องการ await point เพื่อเราส่งการ ควบคุมกลับให้ runtime ได้ นั่นหมายความว่าเราต้องการอะไรที่ await ได้!

เราเห็นการส่งต่อประเภทนี้เกิดใน Listing 17-15 ได้แล้ว — ถ้าเราลบ trpl::sleep ที่ท้ายสุดของ future a มันจะเสร็จโดยที่ future b ไม่ เคย รัน ลองใช้ฟังก์ชัน trpl::sleep เป็นจุดเริ่มเพื่อให้ operation สลับการก้าวหน้า ดังที่แสดงใน Listing 17-16

Filename: src/main.rs
extern crate trpl; // required for mdbook test

use std::{thread, time::Duration};

fn main() {
    trpl::block_on(async {
        let one_ms = Duration::from_millis(1);

        let a = async {
            println!("'a' started.");
            slow("a", 30);
            trpl::sleep(one_ms).await;
            slow("a", 10);
            trpl::sleep(one_ms).await;
            slow("a", 20);
            trpl::sleep(one_ms).await;
            println!("'a' finished.");
        };

        let b = async {
            println!("'b' started.");
            slow("b", 75);
            trpl::sleep(one_ms).await;
            slow("b", 10);
            trpl::sleep(one_ms).await;
            slow("b", 15);
            trpl::sleep(one_ms).await;
            slow("b", 350);
            trpl::sleep(one_ms).await;
            println!("'b' finished.");
        };

        trpl::select(a, b).await;
    });
}

fn slow(name: &str, ms: u64) {
    thread::sleep(Duration::from_millis(ms));
    println!("'{name}' ran for {ms}ms");
}
Listing 17-16: ใช้ trpl::sleep เพื่อให้ operation สลับการก้าวหน้า

เราเพิ่มการเรียก trpl::sleep กับ await point ระหว่างแต่ละการเรียก slow ตอนนี้งานของ future ทั้งสอง interleave:

'a' started.
'a' ran for 30ms
'b' started.
'b' ran for 75ms
'a' ran for 10ms
'b' ran for 10ms
'a' ran for 20ms
'b' ran for 15ms
'a' finished.

future a ยังรันสักครู่ก่อนส่งการควบคุมให้ b เพราะมันเรียก slow ก่อนเรียก trpl::sleep แต่หลังจากนั้น future สลับไปมาแต่ละครั้งที่ หนึ่งในพวกมันชน await point ในกรณีนี้ เราทำเช่นนั้นหลังทุกการเรียก slow แต่เราแบ่งงานในวิธีที่สมเหตุสมผลที่สุดสำหรับเราได้

เราไม่อยาก sleep ที่นี่จริง ๆ — เราต้องการก้าวหน้าเร็วที่สุดเท่า ที่ทำได้ เราเพียงต้องส่งการควบคุมกลับให้ runtime เราทำสิ่งนั้นได้ โดยตรงโดยใช้ฟังก์ชัน trpl::yield_now ใน Listing 17-17 เราแทนที่ การเรียก trpl::sleep ทั้งหมดเหล่านั้นด้วย trpl::yield_now

Filename: src/main.rs
extern crate trpl; // required for mdbook test

use std::{thread, time::Duration};

fn main() {
    trpl::block_on(async {
        let a = async {
            println!("'a' started.");
            slow("a", 30);
            trpl::yield_now().await;
            slow("a", 10);
            trpl::yield_now().await;
            slow("a", 20);
            trpl::yield_now().await;
            println!("'a' finished.");
        };

        let b = async {
            println!("'b' started.");
            slow("b", 75);
            trpl::yield_now().await;
            slow("b", 10);
            trpl::yield_now().await;
            slow("b", 15);
            trpl::yield_now().await;
            slow("b", 350);
            trpl::yield_now().await;
            println!("'b' finished.");
        };

        trpl::select(a, b).await;
    });
}

fn slow(name: &str, ms: u64) {
    thread::sleep(Duration::from_millis(ms));
    println!("'{name}' ran for {ms}ms");
}
Listing 17-17: ใช้ yield_now เพื่อให้ operation สลับการก้าวหน้า

โค้ดนี้ทั้งชัดเจนเกี่ยวกับเจตนาจริงและเร็วกว่าการใช้ sleep อย่าง มีนัยสำคัญ เพราะ timer เช่นที่ sleep ใช้มักมีขีดจำกัดเกี่ยวกับ ความละเอียดของพวกมันได้ version ของ sleep ที่เราใช้ ตัวอย่างเช่น จะ sleep อย่างน้อยหนึ่ง millisecond เสมอ แม้เราจะส่งมัน Duration ของหนึ่ง nanosecond อีกครั้ง คอมพิวเตอร์สมัยใหม่ เร็ว — พวกมัน ทำได้มากในหนึ่ง millisecond!

นี่หมายความว่า async มีประโยชน์แม้สำหรับ task compute-bound ขึ้นกับ สิ่งที่โปรแกรมของคุณกำลังทำอย่างอื่น เพราะมันให้เครื่องมือที่มี ประโยชน์ในการจัดโครงสร้างความสัมพันธ์ระหว่างส่วนต่าง ๆ ของโปรแกรม (แต่ที่ค่าของ overhead ของ state machine async) นี่คือรูปแบบของ cooperative multitasking ที่แต่ละ future มีอำนาจในการตัดสินเมื่อ มันส่งการควบคุมผ่าน await point แต่ละ future ดังนั้นยังมีความรับ ผิดชอบที่จะหลีกเลี่ยงการ block นานเกินไป ในบาง OS embedded ที่ใช้ Rust นี่คือ เท่านั้น ประเภทของ multitasking!

ในโค้ดโลกจริง คุณจะไม่สลับการเรียกฟังก์ชันกับ await point บนทุก บรรทัดเดียวโดยปกติ แน่นอน ในขณะที่การ yield การควบคุมในวิธีนี้ ราคาถูกค่อนข้างมาก มันไม่ฟรี ในหลายกรณี การพยายามแบ่ง task compute-bound อาจทำให้มันช้าลงอย่างมีนัยสำคัญ ดังนั้นบางครั้งดีกว่า สำหรับ performance โดยรวม ที่จะให้ operation block สั้น ๆ วัด เสมอเพื่อดูว่า performance bottleneck จริงของโค้ดของคุณคืออะไร dynamic ที่ underlying สำคัญที่จะเก็บในใจ อย่างไรก็ตาม ถ้าคุณ กำลัง เห็นงานเยอะเกิดใน serial ที่คุณคาดว่าจะเกิด concurrent!

สร้าง Async Abstraction ของเราเอง

เรายัง compose future ด้วยกันเพื่อสร้าง pattern ใหม่ได้ ตัวอย่าง เช่น เรา build ฟังก์ชัน timeout ด้วย building block async ที่เรา มีแล้วได้ เมื่อเราเสร็จ ผลจะเป็น building block อีกอันที่เราใช้ สร้าง async abstraction เพิ่มได้

Listing 17-18 แสดงว่าเราคาดว่า timeout นี้จะทำงานกับ future ช้า อย่างไร

Filename: src/main.rs
extern crate trpl; // required for mdbook test

use std::time::Duration;

fn main() {
    trpl::block_on(async {
        let slow = async {
            trpl::sleep(Duration::from_secs(5)).await;
            "Finally finished"
        };

        match timeout(slow, Duration::from_secs(2)).await {
            Ok(message) => println!("Succeeded with '{message}'"),
            Err(duration) => {
                println!("Failed after {} seconds", duration.as_secs())
            }
        }
    });
}
Listing 17-18: ใช้ timeout ที่จินตนาการของเราเพื่อรัน operation ช้ากับขีดจำกัดเวลา

มา implement สิ่งนี้! เพื่อเริ่ม มาคิดเกี่ยวกับ API สำหรับ timeout:

  • มันต้องเป็นฟังก์ชัน async เองเพื่อให้เรา await มันได้
  • parameter แรกของมันควรเป็น future ที่จะรัน เราทำให้มัน generic เพื่ออนุญาตให้มันทำงานกับ future ใดก็ได้
  • parameter ที่สองของมันจะเป็นเวลาสูงสุดที่จะรอ ถ้าเราใช้ Duration นั่นจะทำให้ง่ายในการส่งต่อให้ trpl::sleep
  • มันควร return Result ถ้า future เสร็จสำเร็จ Result จะเป็น Ok กับค่าที่ผลิตโดย future ถ้า timeout เลยก่อน Result จะเป็น Err กับ duration ที่ timeout รอ

Listing 17-19 แสดงการประกาศนี้

Filename: src/main.rs
extern crate trpl; // required for mdbook test

use std::time::Duration;

fn main() {
    trpl::block_on(async {
        let slow = async {
            trpl::sleep(Duration::from_secs(5)).await;
            "Finally finished"
        };

        match timeout(slow, Duration::from_secs(2)).await {
            Ok(message) => println!("Succeeded with '{message}'"),
            Err(duration) => {
                println!("Failed after {} seconds", duration.as_secs())
            }
        }
    });
}

async fn timeout<F: Future>(
    future_to_try: F,
    max_time: Duration,
) -> Result<F::Output, Duration> {
    // Here is where our implementation will go!
}
Listing 17-19: นิยาม signature ของ timeout

นั่นพอใจเป้าหมายของเราสำหรับ type ตอนนี้มาคิดเกี่ยวกับ พฤติกรรม ที่เราต้องการ — เราต้องการแข่ง future ที่ส่งเข้ากับ duration เราใช้ trpl::sleep เพื่อสร้าง future timer จาก duration และใช้ trpl::select เพื่อรัน timer นั้นกับ future ที่ caller ส่งเข้าได้

ใน Listing 17-20 เรา implement timeout โดย match บนผลของการ await trpl::select

Filename: src/main.rs
extern crate trpl; // required for mdbook test

use std::time::Duration;

use trpl::Either;

// --snip--

fn main() {
    trpl::block_on(async {
        let slow = async {
            trpl::sleep(Duration::from_secs(5)).await;
            "Finally finished"
        };

        match timeout(slow, Duration::from_secs(2)).await {
            Ok(message) => println!("Succeeded with '{message}'"),
            Err(duration) => {
                println!("Failed after {} seconds", duration.as_secs())
            }
        }
    });
}

async fn timeout<F: Future>(
    future_to_try: F,
    max_time: Duration,
) -> Result<F::Output, Duration> {
    match trpl::select(future_to_try, trpl::sleep(max_time)).await {
        Either::Left(output) => Ok(output),
        Either::Right(_) => Err(max_time),
    }
}
Listing 17-20: นิยาม timeout ด้วย select และ sleep

implementation ของ trpl::select ไม่ fair — มัน poll อาร์กิวเมนต์ ในลำดับที่พวกมันส่ง (implementation select อื่นจะสุ่มเลือก อาร์กิวเมนต์ที่จะ poll ก่อน) ดังนั้นเราส่ง future_to_try ให้ select ก่อนเพื่อให้มันมีโอกาสเสร็จแม้ max_time เป็น duration สั้นมาก ถ้า future_to_try เสร็จก่อน select จะ return Left กับ output จาก future_to_try ถ้า timer เสร็จก่อน select จะ return Right กับ output ของ timer เป็น ()

ถ้า future_to_try สำเร็จและเราได้ Left(output) เรา return Ok(output) ถ้า sleep timer เลยแทนและเราได้ Right(()) เรา ignore () ด้วย _ และ return Err(max_time) แทน

ด้วยนั้น เรามี timeout ทำงานที่ build ออกจากสอง async helper อื่น ถ้าเรารันโค้ดของเรา มันจะ print failure mode หลัง timeout:

Failed after 2 seconds

เพราะ future compose กับ future อื่น คุณสร้างเครื่องมือทรงพลังจริงๆ โดยใช้ building block async เล็กกว่าได้ ตัวอย่างเช่น คุณใช้แนวทาง เดียวกันนี้เพื่อรวม timeout กับ retry และในทางกลับใช้พวกนั้นกับ operation เช่นการเรียก network (เช่นที่ใน Listing 17-5)

ในการปฏิบัติ คุณจะทำงานโดยตรงกับ async และ await โดยปกติ และ รองด้วยฟังก์ชันเช่น select และมาโครเช่นมาโคร join! เพื่อควบคุม วิธีที่ future ภายนอกสุดถูก execute

เราเห็นวิธีหลายอย่างในการทำงานกับ future หลายตัวพร้อมกันแล้ว ถัดไป เราจะดูว่าเราทำงานกับ future หลายตัวในลำดับเหนือเวลาด้วย stream ยังไงได้