cratosw

异步编程

异步编程

.NET 与 Rust 都支持异步编程,使用体验在表面上很相似。
例如 C#:

async Task<string> PrintDelayed(string message, CancellationToken cancellationToken)
{
    await Task.Delay(TimeSpan.FromSeconds(1), cancellationToken);
    return $"Message: {message}";
}

Rust(示例中 sleep 来自 async-std):

use std::time::Duration;
use async_std::task::sleep;

async fn format_delayed(message: &str) -> String {
    sleep(Duration::from_secs(1)).await;
    format!("Message: {}", message)
}

关键差异:

  1. Rust 的 async 会把代码块编译为实现 Future 的状态机; C# 编译器也会把 async 方法转为状态机。
  2. C# 的返回类型通常写成 Task<T>/ValueTask<T>;Rust 异步函数签名只写“内部值类型” (如 String),具体 Future 类型由编译器生成。
  3. await 位置不同:C# 是前缀 await expr;Rust 是后缀 expr.await

另见:

执行任务

C# 中,即便不 awaitTask 也可能已经开始执行:

var cancellationToken = CancellationToken.None;
PrintDelayed("message", cancellationToken); // 1 秒后打印
await Task.Delay(TimeSpan.FromSeconds(2), cancellationToken);

async Task PrintDelayed(string message, CancellationToken cancellationToken)
{
    await Task.Delay(TimeSpan.FromSeconds(1), cancellationToken);
    Console.WriteLine(message);
}

Rust 不一样。仅调用异步函数不会自动执行 Future:

use async_std::task::sleep;
use std::time::Duration;

#[tokio::main]
async fn main() {
    print_delayed("message"); // 不会打印
    sleep(Duration::from_secs(2)).await;
}

async fn print_delayed(message: &str) {
    sleep(Duration::from_secs(1)).await;
    println!("{}", message);
}

原因是 Future 默认惰性,只有被 poll(最常见是 .await)才推进执行。
当 Future 阻塞时会让出线程,待可继续时由执行器恢复。

Rust 标准库不自带异步运行时,因此通常依赖 async runtimes(如 Tokio)。 #[tokio::main] 会把异步 main 注册为运行时入口。

任务取消

.NET 常把 CancellationToken 作为异步方法参数,实现可取消操作。

Rust 里取消机制建立在“Future 惰性 + 所有权”之上:
当 Future 被 drop 后,它将不再推进,并释放当前已构造的状态。 因此 Rust 里很多异步函数不需要显式传“取消令牌”。

如果需要更贴近 .NET 体验的可传播取消信号,可使用 tokio_util::sync::CancellationToken

执行多个任务

在 .NET 中,Task.WhenAny / Task.WhenAll 很常见。

Tokio 的 tokio::select! 可近似对应 Task.WhenAny

var cancellationToken = CancellationToken.None;

var result =
    await Task.WhenAny(Delay(TimeSpan.FromSeconds(2), cancellationToken),
                       Delay(TimeSpan.FromSeconds(1), cancellationToken));

Console.WriteLine(result.Result); // Waited 1 second(s).

async Task<string> Delay(TimeSpan delay, CancellationToken cancellationToken)
{
    await Task.Delay(delay, cancellationToken);
    return $"Waited {delay.TotalSeconds} second(s).";
}
use std::time::Duration;
use tokio::{select, time::sleep};

#[tokio::main]
async fn main() {
    let result = select! {
        result = delay(Duration::from_secs(2)) => result,
        result = delay(Duration::from_secs(1)) => result,
    };

    println!("{}", result); // Waited 1 second(s).
}

async fn delay(delay: Duration) -> String {
    sleep(delay).await;
    format!("Waited {} second(s).", delay.as_secs())
}

语义差异:tokio::select! 会取消其余分支;Task.WhenAny 本身不会自动取消其他任务。 Task.WhenAll 则可近似用 tokio::join!

多消费者

.NET 的同一个 Task 可被多个消费者等待。Rust 的 Future 默认不可复制, 且 await 会消耗所有权。
可使用 futures::FutureExt::shared 把 Future 包装为可克隆句柄,再分发给多个消费者:

use futures::FutureExt;
use std::time::Duration;
use tokio::{select, time::sleep, signal};
use tokio_util::sync::CancellationToken;

#[tokio::main]
async fn main() {
    let token = CancellationToken::new();
    let child_token = token.child_token();

    let bg_operation = background_operation(child_token);

    let bg_operation_done = bg_operation.shared();
    let bg_operation_final = bg_operation_done.clone();

    select! {
        _ = bg_operation_done => {},
        _ = signal::ctrl_c() => {
            token.cancel();
        },
    }

    bg_operation_final.await;
}

async fn background_operation(cancellation_token: CancellationToken) {
    select! {
        _ = sleep(Duration::from_secs(2)) => println!("Background operation completed."),
        _ = cancellation_token.cancelled() => println!("Background operation cancelled."),
    }
}

异步迭代

.NET 有 IAsyncEnumerable<T>IAsyncEnumerator<T>。Rust 标准库尚未内建等价 API, 通常借助 Stream trait(来自 futures)实现。

C# 异步迭代:

await foreach (int item in RangeAsync(10, 3).WithCancellation(CancellationToken.None))
    Console.Write(item + " "); // Prints "10 11 12".

async IAsyncEnumerable<int> RangeAsync(int start, int count)
{
    for (int i = 0; i < count; i++)
    {
        await Task.Delay(TimeSpan.FromSeconds(i));
        yield return start + i;
    }
}

Rust 可使用 Stream + 宏(如 async-stream):

use async_stream::stream;
use futures_core::stream::Stream;
use futures_util::{pin_mut, stream::StreamExt};
use std::{
    io::{stdout, Write},
    time::Duration,
};
use tokio::time::sleep;

#[tokio::main]
async fn main() {
    let stream = range(10, 3);
    pin_mut!(stream); // 迭代前需要 pin
    while let Some(result) = stream.next().await {
        print!("{} ", result); // Prints "10 11 12".
        stdout().flush().unwrap();
    }
}

fn range(start: i32, count: i32) -> impl Stream<Item = i32> {
    stream! {
        for i in 0..count {
            sleep(Duration::from_secs(i as _)).await;
            yield start + i;
        }
    }
}

On this page