Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Stream — Future ในลำดับ

จำได้ว่าเราใช้ receiver สำหรับ channel async ของเราก่อนหน้านี้ใน บทในส่วน “Message Passing” เมธอด async recv ผลิตลำดับของ item เหนือเวลา นี่คือ instance ของ pattern ทั่วไปมากกว่าที่รู้จักว่า stream แนวคิดหลายอย่างถูกแทน โดยธรรมชาติเป็น stream — item ที่กลายเป็นใช้ได้ใน queue, chunk ของข้อมูลที่ถูกดึงเป็น increment จาก filesystem เมื่อชุดข้อมูลเต็ม ใหญ่เกินไปสำหรับ memory ของคอมพิวเตอร์ หรือข้อมูลที่มาถึงผ่าน network เหนือเวลา เพราะ stream เป็น future เราใช้พวกมันกับประเภท อื่นของ future และรวมพวกมันในวิธีที่น่าสนใจได้ ตัวอย่างเช่น เรา batch event เพื่อหลีกเลี่ยงการ trigger การเรียก network มากเกินไป ตั้ง timeout บนลำดับของ operation ที่รันนาน หรือ throttle event user interface เพื่อหลีกเลี่ยงการทำงานที่ไม่จำเป็นได้

เราเห็นลำดับของ item ในบทที่ 13 เมื่อเราดูที่ trait Iterator ใน ส่วน “Trait Iterator และเมธอด next แต่มีสองความแตกต่างระหว่าง iterator และ receiver channel async ความแตกต่างแรกคือเวลา — iterator เป็น synchronous ในขณะที่ receiver channel เป็น asynchronous ความแตกต่างที่สองคือ API เมื่อทำงานโดยตรง กับ Iterator เราเรียกเมธอด synchronous next ของมัน ด้วย stream trpl::Receiver โดยเฉพาะ เราเรียกเมธอด asynchronous recv แทน มิฉะนั้น API เหล่านี้รู้สึกคล้ายมาก และความคล้ายนั้นไม่ใช่ความ บังเอิญ stream เหมือนรูปแบบ asynchronous ของ iteration ในขณะที่ trpl::Receiver รอเฉพาะเพื่อรับข้อความ อย่างไรก็ตาม API stream จุดประสงค์ทั่วไปกว้างกว่ามาก — มันให้ item ถัดไปในแบบที่ Iterator ทำ แต่แบบ asynchronous

ความคล้ายระหว่าง iterator และ stream ใน Rust หมายความว่าเราสร้าง stream จาก iterator ใดก็ได้จริง ๆ เช่นเดียวกับ iterator เราทำงาน กับ stream โดยเรียกเมธอด next ของมันและจากนั้น await output ดัง ใน Listing 17-21 ซึ่งจะยังไม่คอมไพล์

Filename: src/main.rs
extern crate trpl; // required for mdbook test

fn main() {
    trpl::block_on(async {
        let values = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
        let iter = values.iter().map(|n| n * 2);
        let mut stream = trpl::stream_from_iter(iter);

        while let Some(value) = stream.next().await {
            println!("The value was: {value}");
        }
    });
}
Listing 17-21: สร้าง stream จาก iterator และ print ค่าของมัน

เราเริ่มด้วย array ของตัวเลข ซึ่งเราแปลงเป็น iterator แล้วเรียก map บนเพื่อ double ค่าทั้งหมด จากนั้นเราแปลง iterator เป็น stream โดยใช้ฟังก์ชัน trpl::stream_from_iter ถัดไป เรา loop ผ่าน item ใน stream เมื่อพวกมันมาถึงด้วย loop while let

โชคไม่ดี เมื่อเราพยายามรันโค้ด มันไม่คอมไพล์แต่แทนรายงานว่าไม่มี เมธอด next ใช้ได้:

error[E0599]: no method named `next` found for struct `tokio_stream::iter::Iter` in the current scope
  --> src/main.rs:10:40
   |
10 |         while let Some(value) = stream.next().await {
   |                                        ^^^^
   |
   = help: items from traits can only be used if the trait is in scope
help: the following traits which provide `next` are implemented but not in scope; perhaps you want to import one of them
   |
1  + use crate::trpl::StreamExt;
   |
1  + use futures_util::stream::stream::StreamExt;
   |
1  + use std::iter::Iterator;
   |
1  + use std::str::pattern::Searcher;
   |
help: there is a method `try_next` with a similar name
   |
10 |         while let Some(value) = stream.try_next().await {
   |                                        ~~~~~~~~

ดังที่ output นี้อธิบาย เหตุผลสำหรับ error compiler คือเราต้องการ trait ที่ถูกต้องใน scope เพื่อใช้เมธอด next ได้ จากการพูดของเรา จนถึงตอนนี้ คุณอาจคาดหวังตามเหตุผลว่า trait นั้นเป็น Stream แต่ มันจริง ๆ คือ StreamExt ย่อสำหรับ extension Ext เป็น pattern ทั่วไปใน community Rust สำหรับขยาย trait หนึ่งด้วยอีก trait

trait Stream นิยาม interface ระดับต่ำที่รวม trait Iterator และ Future อย่างมีประสิทธิภาพ StreamExt ให้ชุด API ระดับสูงกว่าด้าน บน Stream รวมเมธอด next รวมถึงเมธอด utility อื่นคล้ายกับที่ trait Iterator ให้ Stream และ StreamExt ยังไม่เป็นส่วนของ standard library ของ Rust แต่ ecosystem crate ส่วนใหญ่ใช้นิยาม คล้ายกัน

การแก้ error compiler คือเพิ่ม statement use สำหรับ trpl::StreamExt ดังใน Listing 17-22

Filename: src/main.rs
extern crate trpl; // required for mdbook test

use trpl::StreamExt;

fn main() {
    trpl::block_on(async {
        let values = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
        // --snip--
        let iter = values.iter().map(|n| n * 2);
        let mut stream = trpl::stream_from_iter(iter);

        while let Some(value) = stream.next().await {
            println!("The value was: {value}");
        }
    });
}
Listing 17-22: ใช้ iterator เป็นพื้นฐานสำหรับ stream สำเร็จ

ด้วยชิ้นส่วนเหล่านั้นทั้งหมดใส่รวมกัน โค้ดนี้ทำงานในแบบที่เรา ต้องการ! ที่มากกว่านี้ ตอนนี้เรามี StreamExt ใน scope เราใช้ เมธอด utility ทั้งหมดของมัน เพียงเหมือนกับ iterator