Stream — Future ในลำดับ
จำได้ว่าเราใช้ receiver สำหรับ channel async ของเราก่อนหน้านี้ใน
บทในส่วน “Message Passing” เมธอด
async recv ผลิตลำดับของ item เหนือเวลา นี่คือ instance ของ
pattern ทั่วไปมากกว่าที่รู้จักว่า stream แนวคิดหลายอย่างถูกแทน
โดยธรรมชาติเป็น stream — item ที่กลายเป็นใช้ได้ใน queue, chunk
ของข้อมูลที่ถูกดึงเป็น increment จาก filesystem เมื่อชุดข้อมูลเต็ม
ใหญ่เกินไปสำหรับ memory ของคอมพิวเตอร์ หรือข้อมูลที่มาถึงผ่าน
network เหนือเวลา เพราะ stream เป็น future เราใช้พวกมันกับประเภท
อื่นของ future และรวมพวกมันในวิธีที่น่าสนใจได้ ตัวอย่างเช่น เรา
batch event เพื่อหลีกเลี่ยงการ trigger การเรียก network มากเกินไป
ตั้ง timeout บนลำดับของ operation ที่รันนาน หรือ throttle event
user interface เพื่อหลีกเลี่ยงการทำงานที่ไม่จำเป็นได้
เราเห็นลำดับของ item ในบทที่ 13 เมื่อเราดูที่ trait Iterator ใน
ส่วน “Trait Iterator และเมธอด next”
แต่มีสองความแตกต่างระหว่าง iterator และ receiver channel async
ความแตกต่างแรกคือเวลา — iterator เป็น synchronous ในขณะที่ receiver
channel เป็น asynchronous ความแตกต่างที่สองคือ API เมื่อทำงานโดยตรง
กับ Iterator เราเรียกเมธอด synchronous next ของมัน ด้วย stream
trpl::Receiver โดยเฉพาะ เราเรียกเมธอด asynchronous recv แทน
มิฉะนั้น API เหล่านี้รู้สึกคล้ายมาก และความคล้ายนั้นไม่ใช่ความ
บังเอิญ stream เหมือนรูปแบบ asynchronous ของ iteration ในขณะที่
trpl::Receiver รอเฉพาะเพื่อรับข้อความ อย่างไรก็ตาม API stream
จุดประสงค์ทั่วไปกว้างกว่ามาก — มันให้ item ถัดไปในแบบที่ Iterator
ทำ แต่แบบ asynchronous
ความคล้ายระหว่าง iterator และ stream ใน Rust หมายความว่าเราสร้าง
stream จาก iterator ใดก็ได้จริง ๆ เช่นเดียวกับ iterator เราทำงาน
กับ stream โดยเรียกเมธอด next ของมันและจากนั้น await output ดัง
ใน Listing 17-21 ซึ่งจะยังไม่คอมไพล์
extern crate trpl; // required for mdbook test
fn main() {
trpl::block_on(async {
let values = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
let iter = values.iter().map(|n| n * 2);
let mut stream = trpl::stream_from_iter(iter);
while let Some(value) = stream.next().await {
println!("The value was: {value}");
}
});
}
เราเริ่มด้วย array ของตัวเลข ซึ่งเราแปลงเป็น iterator แล้วเรียก
map บนเพื่อ double ค่าทั้งหมด จากนั้นเราแปลง iterator เป็น stream
โดยใช้ฟังก์ชัน trpl::stream_from_iter ถัดไป เรา loop ผ่าน item
ใน stream เมื่อพวกมันมาถึงด้วย loop while let
โชคไม่ดี เมื่อเราพยายามรันโค้ด มันไม่คอมไพล์แต่แทนรายงานว่าไม่มี
เมธอด next ใช้ได้:
error[E0599]: no method named `next` found for struct `tokio_stream::iter::Iter` in the current scope
--> src/main.rs:10:40
|
10 | while let Some(value) = stream.next().await {
| ^^^^
|
= help: items from traits can only be used if the trait is in scope
help: the following traits which provide `next` are implemented but not in scope; perhaps you want to import one of them
|
1 + use crate::trpl::StreamExt;
|
1 + use futures_util::stream::stream::StreamExt;
|
1 + use std::iter::Iterator;
|
1 + use std::str::pattern::Searcher;
|
help: there is a method `try_next` with a similar name
|
10 | while let Some(value) = stream.try_next().await {
| ~~~~~~~~
ดังที่ output นี้อธิบาย เหตุผลสำหรับ error compiler คือเราต้องการ
trait ที่ถูกต้องใน scope เพื่อใช้เมธอด next ได้ จากการพูดของเรา
จนถึงตอนนี้ คุณอาจคาดหวังตามเหตุผลว่า trait นั้นเป็น Stream แต่
มันจริง ๆ คือ StreamExt ย่อสำหรับ extension Ext เป็น pattern
ทั่วไปใน community Rust สำหรับขยาย trait หนึ่งด้วยอีก trait
trait Stream นิยาม interface ระดับต่ำที่รวม trait Iterator และ
Future อย่างมีประสิทธิภาพ StreamExt ให้ชุด API ระดับสูงกว่าด้าน
บน Stream รวมเมธอด next รวมถึงเมธอด utility อื่นคล้ายกับที่
trait Iterator ให้ Stream และ StreamExt ยังไม่เป็นส่วนของ
standard library ของ Rust แต่ ecosystem crate ส่วนใหญ่ใช้นิยาม
คล้ายกัน
การแก้ error compiler คือเพิ่ม statement use สำหรับ
trpl::StreamExt ดังใน Listing 17-22
extern crate trpl; // required for mdbook test
use trpl::StreamExt;
fn main() {
trpl::block_on(async {
let values = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
// --snip--
let iter = values.iter().map(|n| n * 2);
let mut stream = trpl::stream_from_iter(iter);
while let Some(value) = stream.next().await {
println!("The value was: {value}");
}
});
}
ด้วยชิ้นส่วนเหล่านั้นทั้งหมดใส่รวมกัน โค้ดนี้ทำงานในแบบที่เรา
ต้องการ! ที่มากกว่านี้ ตอนนี้เรามี StreamExt ใน scope เราใช้
เมธอด utility ทั้งหมดของมัน เพียงเหมือนกับ iterator