跳到主要内容
工程Mar 28, 2026

Deep EVM #29:异步Rust中的信号量——死锁排查和即发即弃模式

OS
Open Soft Team

Engineering Team

为什么在异步Rust中使用信号量

当你运行高吞吐量管线时——MEV机器人每个区块处理180,000条套利链、API服务器处理10,000个并发请求,或ETL任务写入数百万行——你不可避免地会触及资源上限。数据库连接池耗尽。RPC提供者限制你的速率。因为你生成了50,000个tokio任务导致内存膨胀。

朴素的方法是无界并发:对每个工作单元使用tokio::spawn并希望运行时自行解决。它不会。在生产中,我们观察到270万个已生成任务导致15.4 GB的内存使用。修复方案是基于信号量背压的批量并发,将内存降至0.8 GB。

信号量是在不完全串行化操作的情况下限制并发数量的正确原语。与互斥锁(只允许一个)不同,信号量允许N个并发访问者。这使它非常适合:

  • 数据库写入并发:限制为连接池大小(如20个并发写入)
  • RPC速率限制:封顶出站请求以避免提供者的429响应
  • 内存背压:通过可用许可控制来防止无界任务生成
  • 批处理:控制多少模拟批次并行运行

tokio::sync::Semaphore基础

tokio::sync::Semaphore是为异步代码设计的计数信号量。它维护一个可用许可的内部计数器。任务在继续之前获取许可,完成后释放。

use std::sync::Arc;
use tokio::sync::Semaphore;

// 允许最多20个并发数据库写入
let semaphore = Arc::new(Semaphore::new(20));

for chain in chains_to_persist {
    let sem = semaphore.clone();
    let db_pool = db_pool.clone();

    tokio::spawn(async move {
        // 获取许可——如果所有20个都在使用中则阻塞
        let _permit = sem.acquire().await.unwrap();

        // 执行写入——许可在此作用域内持有
        sqlx::query("UPDATE chains SET profit = $1 WHERE id = $2")
            .bind(chain.profit)
            .bind(chain.id)
            .execute(&db_pool)
            .await
            .ok();

        // _permit在此处被丢弃——自动释放
    });
}

关键API:

  • Semaphore::new(permits) — 创建N个许可
  • acquire(&self) — 异步等待许可可用,返回SemaphorePermit(RAII守卫)
  • try_acquire(&self) — 非异步,无许可时立即返回Err
  • acquire_owned(self: Arc<Self>) — 返回拥有ArcOwnedSemaphorePermit
  • add_permits(&self, n) — 动态增加许可数
  • close(&self) — 关闭信号量

关键设计选择:acquire()返回RAII守卫。当守卫被丢弃时,许可被释放。这意味着即使在panic、提前返回和?操作符中断时也能自动清理。

即发即弃写入模式

在高吞吐量系统中,你经常想在不阻塞热路径的情况下持久化数据。模式:生成一个后台任务,获取信号量许可,执行写入,然后释放。调用者不等待结果。

pub struct AsyncChainStore<S: ChainStore> {
    inner: Arc<S>,
    semaphore: Arc<Semaphore>,
}

impl<S: ChainStore + Send + Sync + 'static> AsyncChainStore<S> {
    pub fn new(inner: Arc<S>, max_concurrent_writes: usize) -> Self {
        Self {
            inner,
            semaphore: Arc::new(Semaphore::new(max_concurrent_writes)),
        }
    }

    /// 即发即弃:不阻塞调用者地持久化链利润。
    pub fn save_profits_async(&self, chains: Vec<ChainProfit>) {
        let inner = self.inner.clone();
        let sem = self.semaphore.clone();

        tokio::spawn(async move {
            let _permit = match sem.acquire().await {
                Ok(p) => p,
                Err(_) => {
                    tracing::warn!("semaphore closed, dropping write");
                    return;
                }
            };

            if let Err(e) = inner.batch_update_profits(&chains).await {
                tracing::error!(
                    error = %e,
                    count = chains.len(),
                    "fire-and-forget write failed"
                );
            }
        });
    }
}

死锁场景

信号量死锁很隐蔽,因为它们不会导致panic或错误——程序只是停止进展。

场景1:提前返回时许可未释放

RAII守卫保护你免受大多数提前返回的影响。危险在于将许可移入一个超出预期生命周期的结构体中。

场景2:嵌套获取(自死锁)

async fn simulate_and_persist(
    sem: &Semaphore,
    chain: &Chain,
) -> Result<()> {
    let _outer = sem.acquire().await?; // 获取N个许可中的1个
    let result = simulate(chain).await?;
    // 错误:在持有许可的情况下获取同一信号量
    let _inner = sem.acquire().await?; // 如果N=1,死锁
    persist(result).await?;
    Ok(())
}

修复:为不同关注点使用独立的信号量。

场景3:在select中跨await点持有许可

select!宏通过丢弃future来取消失败分支,但在该future内部生成的任务继续运行。

诊断信号量死锁

基于tracing的诊断

使用结构化日志检测acquire/release:

let permits_before = semaphore.available_permits();
tracing::debug!(
    available = permits_before,
    "acquiring semaphore permit"
);
let _permit = semaphore.acquire().await?;
tracing::debug!(
    available = semaphore.available_permits(),
    "acquired semaphore permit"
);

Prometheus指标

暴露可用许可的gauge指标。降至零并保持不变就是死锁。

tokio-console

tokio-console是一个诊断工具,可以附加到运行中的tokio应用程序并实时显示任务状态。

生产级解决方案

解决方案1:始终使用OwnedSemaphorePermit

生成任务时,优先使用acquire_owned()而非acquire()

解决方案2:带超时的获取

在生产中永远不要无限期等待许可:

use tokio::time::{timeout, Duration};

let permit = timeout(
    Duration::from_secs(30),
    semaphore.acquire(),
).await
    .map_err(|_| anyhow!("semaphore acquire timed out — possible deadlock"))?
    .map_err(|_| anyhow!("semaphore closed"))?;

解决方案3:使用JoinSet的结构化并发

使用tokio::task::JoinSet代替无界的tokio::spawn

use tokio::task::JoinSet;

let semaphore = Arc::new(Semaphore::new(20));
let mut join_set = JoinSet::new();

for batch in batches {
    let permit = semaphore.clone().acquire_owned().await?;
    let db_pool = db_pool.clone();

    join_set.spawn(async move {
        let result = process_batch(&db_pool, &batch).await;
        drop(permit);
        result
    });
}

while let Some(result) = join_set.join_next().await {
    match result {
        Ok(Ok(())) => {}
        Ok(Err(e)) => tracing::error!(error = %e, "batch failed"),
        Err(e) => tracing::error!(error = %e, "task panicked"),
    }
}

解决方案4:为不同关注点使用独立信号量

pub struct ConcurrencyLimits {
    pub simulation: Arc<Semaphore>,  // 节点并行模拟
    pub db_writes: Arc<Semaphore>,   // 数据库写入
    pub mempool: Arc<Semaphore>,     // 内存池处理
}

impl ConcurrencyLimits {
    pub fn new() -> Self {
        Self {
            simulation: Arc::new(Semaphore::new(4)),
            db_writes: Arc::new(Semaphore::new(20)),
            mempool: Arc::new(Semaphore::new(100)),
        }
    }
}

性能:信号量开销

tokio::sync::Semaphore使用原子计数器和侵入式等待者链表实现。获取无竞争的许可是单个fetch_sub——纳秒级。即使在竞争下,开销与实际I/O相比也可忽略不计。

操作无信号量有信号量(20许可)
10,000次DB写入1,340ms(池耗尽错误)1,580ms(零错误)
500次RPC模拟8,200ms(节点过载)9,100ms(零超时)
内存(270万任务)15.4 GB0.8 GB

10-15%的墙钟开销与消除错误、超时和OOM崩溃相比微不足道。

总结

异步Rust中的信号量看似简单——获取、工作、丢弃许可。复杂性在生产中显现:许可泄漏到长生命周期结构体、跨调用栈的嵌套获取、跨select!取消边界持有的许可。

防御性方案:

  1. 始终生成任务时使用acquire_owned()
  2. 始终用超时包装acquire
  3. 绝不在同一信号量上嵌套获取
  4. 分离不同资源池使用不同信号量
  5. 检测使用指标和追踪监控可用许可
  6. 使用JoinSet代替无界tokio::spawn维护结构化并发