异步编程
异步编程
.NET 与 Rust 都支持异步编程,使用体验在表面上很相似。
例如 C#:
async Task<string> PrintDelayed(string message, CancellationToken cancellationToken)
{
await Task.Delay(TimeSpan.FromSeconds(1), cancellationToken);
return $"Message: {message}";
}Rust(示例中 sleep 来自 async-std):
use std::time::Duration;
use async_std::task::sleep;
async fn format_delayed(message: &str) -> String {
sleep(Duration::from_secs(1)).await;
format!("Message: {}", message)
}关键差异:
- Rust 的
async会把代码块编译为实现Future的状态机; C# 编译器也会把async方法转为状态机。 - C# 的返回类型通常写成
Task<T>/ValueTask<T>;Rust 异步函数签名只写“内部值类型” (如String),具体 Future 类型由编译器生成。 await位置不同:C# 是前缀await expr;Rust 是后缀expr.await。
另见:
执行任务
C# 中,即便不 await,Task 也可能已经开始执行:
var cancellationToken = CancellationToken.None;
PrintDelayed("message", cancellationToken); // 1 秒后打印
await Task.Delay(TimeSpan.FromSeconds(2), cancellationToken);
async Task PrintDelayed(string message, CancellationToken cancellationToken)
{
await Task.Delay(TimeSpan.FromSeconds(1), cancellationToken);
Console.WriteLine(message);
}Rust 不一样。仅调用异步函数不会自动执行 Future:
use async_std::task::sleep;
use std::time::Duration;
#[tokio::main]
async fn main() {
print_delayed("message"); // 不会打印
sleep(Duration::from_secs(2)).await;
}
async fn print_delayed(message: &str) {
sleep(Duration::from_secs(1)).await;
println!("{}", message);
}原因是 Future 默认惰性,只有被 poll(最常见是 .await)才推进执行。
当 Future 阻塞时会让出线程,待可继续时由执行器恢复。
Rust 标准库不自带异步运行时,因此通常依赖 async runtimes(如 Tokio)。
#[tokio::main] 会把异步 main 注册为运行时入口。
任务取消
.NET 常把 CancellationToken 作为异步方法参数,实现可取消操作。
Rust 里取消机制建立在“Future 惰性 + 所有权”之上:
当 Future 被 drop 后,它将不再推进,并释放当前已构造的状态。
因此 Rust 里很多异步函数不需要显式传“取消令牌”。
如果需要更贴近 .NET 体验的可传播取消信号,可使用
tokio_util::sync::CancellationToken。
执行多个任务
在 .NET 中,Task.WhenAny / Task.WhenAll 很常见。
Tokio 的 tokio::select! 可近似对应 Task.WhenAny:
var cancellationToken = CancellationToken.None;
var result =
await Task.WhenAny(Delay(TimeSpan.FromSeconds(2), cancellationToken),
Delay(TimeSpan.FromSeconds(1), cancellationToken));
Console.WriteLine(result.Result); // Waited 1 second(s).
async Task<string> Delay(TimeSpan delay, CancellationToken cancellationToken)
{
await Task.Delay(delay, cancellationToken);
return $"Waited {delay.TotalSeconds} second(s).";
}use std::time::Duration;
use tokio::{select, time::sleep};
#[tokio::main]
async fn main() {
let result = select! {
result = delay(Duration::from_secs(2)) => result,
result = delay(Duration::from_secs(1)) => result,
};
println!("{}", result); // Waited 1 second(s).
}
async fn delay(delay: Duration) -> String {
sleep(delay).await;
format!("Waited {} second(s).", delay.as_secs())
}语义差异:tokio::select! 会取消其余分支;Task.WhenAny 本身不会自动取消其他任务。
Task.WhenAll 则可近似用 tokio::join!。
多消费者
.NET 的同一个 Task 可被多个消费者等待。Rust 的 Future 默认不可复制,
且 await 会消耗所有权。
可使用 futures::FutureExt::shared 把 Future 包装为可克隆句柄,再分发给多个消费者:
use futures::FutureExt;
use std::time::Duration;
use tokio::{select, time::sleep, signal};
use tokio_util::sync::CancellationToken;
#[tokio::main]
async fn main() {
let token = CancellationToken::new();
let child_token = token.child_token();
let bg_operation = background_operation(child_token);
let bg_operation_done = bg_operation.shared();
let bg_operation_final = bg_operation_done.clone();
select! {
_ = bg_operation_done => {},
_ = signal::ctrl_c() => {
token.cancel();
},
}
bg_operation_final.await;
}
async fn background_operation(cancellation_token: CancellationToken) {
select! {
_ = sleep(Duration::from_secs(2)) => println!("Background operation completed."),
_ = cancellation_token.cancelled() => println!("Background operation cancelled."),
}
}异步迭代
.NET 有 IAsyncEnumerable<T> 和
IAsyncEnumerator<T>。Rust 标准库尚未内建等价 API,
通常借助 Stream trait(来自 futures)实现。
C# 异步迭代:
await foreach (int item in RangeAsync(10, 3).WithCancellation(CancellationToken.None))
Console.Write(item + " "); // Prints "10 11 12".
async IAsyncEnumerable<int> RangeAsync(int start, int count)
{
for (int i = 0; i < count; i++)
{
await Task.Delay(TimeSpan.FromSeconds(i));
yield return start + i;
}
}Rust 可使用 Stream + 宏(如 async-stream):
use async_stream::stream;
use futures_core::stream::Stream;
use futures_util::{pin_mut, stream::StreamExt};
use std::{
io::{stdout, Write},
time::Duration,
};
use tokio::time::sleep;
#[tokio::main]
async fn main() {
let stream = range(10, 3);
pin_mut!(stream); // 迭代前需要 pin
while let Some(result) = stream.next().await {
print!("{} ", result); // Prints "10 11 12".
stdout().flush().unwrap();
}
}
fn range(start: i32, count: i32) -> impl Stream<Item = i32> {
stream! {
for i in 0..count {
sleep(Duration::from_secs(i as _)).await;
yield start + i;
}
}
}