Deep EVM #29: Semaphores in Async Rust — Deadlock Hunting and Fire-and-Forget Patterns
Engineering Team
Why Semaphores in Async Rust
When you run a high-throughput pipeline — an MEV bot processing 180,000 arbitrage chains per block, an API server handling 10,000 concurrent requests, or an ETL job writing millions of rows — you inevitably hit a resource ceiling. Database connection pools exhaust. RPC providers rate-limit you. Memory balloons because you spawned 50,000 tokio tasks, each holding a chain’s worth of data.
The naive approach is unbounded concurrency: tokio::spawn for every unit of work and hope the runtime sorts it out. It does not. In production, we observed 15.4 GB memory usage from 2.7 million spawned tasks, each holding a Vec<Hop> plus simulation context. The fix was batched concurrency with semaphore-based backpressure, which brought memory down to 0.8 GB.
A semaphore is the right primitive when you need to limit the number of concurrent operations without serializing them entirely. Unlike a mutex (which allows exactly one), a semaphore allows N concurrent accessors. This makes it perfect for:
- Database write concurrency: limit to the connection pool size (e.g., 20 concurrent writes)
- RPC rate limiting: cap outgoing requests to avoid 429 responses from providers
- Memory backpressure: prevent unbounded task spawning by gating on available permits
- Batch processing: control how many simulation batches run in parallel against a shared node
tokio::sync::Semaphore Basics
The tokio::sync::Semaphore is a counting semaphore designed for async code. It maintains an internal counter of available permits. Tasks acquire permits before proceeding and release them when done.
use std::sync::Arc;
use tokio::sync::Semaphore;
// Allow up to 20 concurrent database writes
let semaphore = Arc::new(Semaphore::new(20));
for chain in chains_to_persist {
let sem = semaphore.clone();
let db_pool = db_pool.clone();
tokio::spawn(async move {
// Acquire a permit — blocks if all 20 are in use
let _permit = sem.acquire().await.unwrap();
// Do the write — permit is held for this scope
sqlx::query("UPDATE chains SET profit = $1 WHERE id = $2")
.bind(chain.profit)
.bind(chain.id)
.execute(&db_pool)
.await
.ok();
// _permit is dropped here — automatically released
});
}
Key API surface:
Semaphore::new(permits)— create with N permitsacquire(&self)— async, waits until a permit is available, returnsSemaphorePermit(RAII guard)try_acquire(&self)— non-async, returnsErr(TryAcquireError)immediately if no permits availableacquire_owned(self: Arc<Self>)— returnsOwnedSemaphorePermitthat owns theArc, useful when the permit must outlive the borrowadd_permits(&self, n)— dynamically increase the permit countclose(&self)— close the semaphore; all pendingacquirecalls returnErr(AcquireError)available_permits(&self)— inspect current count (useful for metrics)
The critical design choice: acquire() returns an RAII guard. When the guard is dropped, the permit is released. This means you get automatic cleanup even on panics, early returns, and ? operator bailouts — as long as the guard lives on the stack.
Fire-and-Forget Write Pattern
In high-throughput systems, you often want to persist data without blocking the hot path. The pattern: spawn a background task that acquires a semaphore permit, performs the write, and releases. The caller does not await the result.
In a well-architected system, this is implemented as a decorator that wraps the inner storage service:
use std::sync::Arc;
use tokio::sync::Semaphore;
pub struct AsyncChainStore<S: ChainStore> {
inner: Arc<S>,
semaphore: Arc<Semaphore>,
}
impl<S: ChainStore + Send + Sync + 'static> AsyncChainStore<S> {
pub fn new(inner: Arc<S>, max_concurrent_writes: usize) -> Self {
Self {
inner,
semaphore: Arc::new(Semaphore::new(max_concurrent_writes)),
}
}
/// Fire-and-forget: persist chain profits without blocking the caller.
/// Backpressure is enforced by the semaphore — if all permits are
/// taken, the spawned task waits, but the caller returns immediately.
pub fn save_profits_async(&self, chains: Vec<ChainProfit>) {
let inner = self.inner.clone();
let sem = self.semaphore.clone();
tokio::spawn(async move {
// Acquire permit — this is where backpressure happens
let _permit = match sem.acquire().await {
Ok(p) => p,
Err(_) => {
tracing::warn!("semaphore closed, dropping write");
return;
}
};
if let Err(e) = inner.batch_update_profits(&chains).await {
tracing::error!(
error = %e,
count = chains.len(),
"fire-and-forget write failed"
);
}
// _permit dropped here — next queued task proceeds
});
}
}
This decorator pattern (R-004 in our codebase conventions) keeps the inner ChainStore pure — it knows nothing about concurrency control. The decorator handles semaphore management, error logging, and task spawning. The ServiceLocator wires them together:
// In locator/mod.rs
let chain_store = Arc::new(PgChainStore::new(db_pool.clone()));
let async_chain_store = AsyncChainStore::new(chain_store, 20);
Why not just use the database pool’s built-in connection limit? Because the semaphore gives you a separate knob. Your pool might have 50 connections, but you want fire-and-forget writes to use at most 20, reserving 30 for latency-critical reads. The semaphore enforces this partitioning at the application level.
Deadlock Scenarios
Semaphore deadlocks are insidious because they don’t cause a panic or an error — the program simply stops making progress. Here are the patterns we’ve encountered in production.
Scenario 1: Permit Not Released on Early Return
async fn process_batch(sem: &Semaphore, batch: &[Chain]) -> Result<()> {
let permit = sem.acquire().await?;
// Early return if batch is empty — BUT permit is still held!
if batch.is_empty() {
return Ok(()); // permit dropped here — actually fine in this case
}
// The real danger: storing the permit in a struct that outlives the scope
let ctx = ProcessingContext {
permit: Some(permit), // permit moved into struct
batch,
};
// If process() stores ctx somewhere long-lived, permit is leaked
process(ctx).await?;
Ok(())
}
The RAII guard protects you against most early returns. The danger comes when you move the permit into a struct that escapes the expected lifetime — stored in a HashMap, sent through a channel, or held in a static.
Scenario 2: Nested Acquire (Self-Deadlock)
async fn simulate_and_persist(
sem: &Semaphore,
chain: &Chain,
) -> Result<()> {
let _outer = sem.acquire().await?; // Takes 1 of N permits
let result = simulate(chain).await?;
// BUG: acquiring the SAME semaphore inside a held permit
let _inner = sem.acquire().await?; // If N=1, deadlock. If N>1, reduces throughput
persist(result).await?;
Ok(())
}
With N=1, this is an instant deadlock. With N>1, it works until load increases and all permits are held by tasks waiting for their inner acquire. The fix: use separate semaphores for separate concerns, or restructure to avoid nested acquisition.
Scenario 3: Permit Held Across Await Points in Select
async fn process_with_timeout(sem: &Semaphore) -> Result<()> {
let permit = sem.acquire().await?;
tokio::select! {
result = do_work() => {
result?;
}
_ = tokio::time::sleep(Duration::from_secs(30)) => {
tracing::warn!("timeout, but permit is still held until drop");
// If do_work() is cancelled but its future holds resources
// that reference the permit indirectly, you get a leak
}
}
// permit dropped here — this is actually correct
// BUT: if do_work() spawns a sub-task that captures the permit, the
// cancellation of do_work() does NOT cancel the sub-task
Ok(())
}
The select! macro cancels the losing branch by dropping its future, but any tasks spawned inside that future continue running. If those tasks captured a reference to the permit (or a clone of an OwnedSemaphorePermit), the permit is not released when expected.
Diagnosing Semaphore Deadlocks
When your system stops making progress, how do you identify a semaphore deadlock versus a slow dependency?
Tracing-Based Diagnosis
Instrument acquire/release with structured logging:
let permits_before = semaphore.available_permits();
tracing::debug!(
available = permits_before,
"acquiring semaphore permit"
);
let _permit = semaphore.acquire().await?;
tracing::debug!(
available = semaphore.available_permits(),
"acquired semaphore permit"
);
If your logs show “acquiring” but never “acquired”, all permits are held somewhere. Correlate with the last N “acquired” log entries to find who is holding them.
Prometheus Metrics
Expose a gauge metric for available permits:
use metrics::gauge;
// In your decorator or middleware
gauge!("semaphore_available_permits", semaphore.available_permits() as f64);
A gauge that drops to zero and stays there is a deadlock. A gauge that hovers near zero but fluctuates is contention (high load, possibly needs more permits).
tokio-console
tokio-console is a diagnostic tool that attaches to a running tokio application and shows task states in real time:
cargo add --dev console-subscriber
// In main.rs (debug builds only)
#[cfg(debug_assertions)]
console_subscriber::init();
Run tokio-console in another terminal and look for tasks stuck in “Idle” state on a semaphore acquire. The tool shows you exactly which task is waiting and for how long.
Production-Hardened Solutions
Solution 1: Always Use OwnedSemaphorePermit with Arc
When spawning tasks, prefer acquire_owned() over acquire(). The owned variant takes Arc<Semaphore> and returns a permit that does not borrow the semaphore — it owns a clone of the Arc. This avoids lifetime issues entirely:
let semaphore = Arc::new(Semaphore::new(20));
for item in items {
let permit = semaphore.clone().acquire_owned().await?;
tokio::spawn(async move {
// permit is moved into the task — no borrow issues
do_work(item).await;
drop(permit); // explicit drop for clarity, or let scope handle it
});
}
Solution 2: Acquire with Timeout
Never wait indefinitely for a permit in production. Use tokio::time::timeout to detect deadlocks early:
use tokio::time::{timeout, Duration};
let permit = timeout(
Duration::from_secs(30),
semaphore.acquire(),
).await
.map_err(|_| anyhow!("semaphore acquire timed out — possible deadlock"))?
.map_err(|_| anyhow!("semaphore closed"))?;
When the timeout fires, log the available permits, the number of waiting tasks, and a stack trace. This gives you immediate visibility into the deadlock without bringing down the process.
Solution 3: Structured Concurrency with JoinSet
Instead of unbounded tokio::spawn, use tokio::task::JoinSet to maintain ownership of spawned tasks and pair it with a semaphore:
use tokio::task::JoinSet;
let semaphore = Arc::new(Semaphore::new(20));
let mut join_set = JoinSet::new();
for batch in batches {
let permit = semaphore.clone().acquire_owned().await?;
let db_pool = db_pool.clone();
join_set.spawn(async move {
let result = process_batch(&db_pool, &batch).await;
drop(permit); // release before the JoinSet collects
result
});
}
// Drain all tasks — collect errors instead of silently losing them
while let Some(result) = join_set.join_next().await {
match result {
Ok(Ok(())) => {}
Ok(Err(e)) => tracing::error!(error = %e, "batch processing failed"),
Err(e) => tracing::error!(error = %e, "task panicked"),
}
}
This pattern ensures: (1) bounded concurrency via semaphore, (2) no silent task failures, (3) the parent task knows when all children complete, (4) permits are always released even if a task panics (because JoinSet::join_next returns JoinError for panicked tasks, and the permit is dropped with the task’s state).
Solution 4: Separate Semaphores for Separate Concerns
Never share a single semaphore across unrelated operations. In our MEV pipeline, we use separate semaphores for:
pub struct ConcurrencyLimits {
/// Limits concurrent RPC simulation calls to the node
pub simulation: Arc<Semaphore>,
/// Limits concurrent database writes for chain persistence
pub db_writes: Arc<Semaphore>,
/// Limits concurrent mempool subscription handlers
pub mempool: Arc<Semaphore>,
}
impl ConcurrencyLimits {
pub fn new() -> Self {
Self {
simulation: Arc::new(Semaphore::new(4)), // node can handle 4 parallel batches
db_writes: Arc::new(Semaphore::new(20)), // 20 of 50 pool connections
mempool: Arc::new(Semaphore::new(100)), // 100 concurrent tx handlers
}
}
}
This eliminates the nested acquire deadlock entirely — a task acquires simulation permit, then db_writes permit, and these are independent pools.
Performance: Semaphore Overhead
Is the semaphore itself a bottleneck? In practice, no. tokio::sync::Semaphore is implemented with an atomic counter and an intrusive linked list of waiters. Acquiring an uncontested permit is a single fetch_sub — nanoseconds. Even under contention, the overhead is the waker notification (also nanoseconds) versus the milliseconds your actual I/O takes.
We benchmarked semaphore overhead in our pipeline:
| Operation | Without Semaphore | With Semaphore (20 permits) |
|---|---|---|
| 10,000 DB writes | 1,340ms (all concurrent, pool exhaustion errors) | 1,580ms (controlled, zero errors) |
| 500 RPC simulations | 8,200ms (node overloaded, timeouts) | 9,100ms (4 at a time, zero timeouts) |
| Memory (2.7M tasks) | 15.4 GB | 0.8 GB (batched with JoinSet) |
The 10-15% wall-clock overhead is negligible compared to eliminating errors, timeouts, and OOM crashes.
Conclusion
Semaphores in async Rust are deceptively simple — acquire, do work, drop the permit. The complexity emerges in production: permits leaked into long-lived structs, nested acquires across call stacks, permits held across select! cancellation boundaries.
The defensive playbook:
- Always use
acquire_owned()when spawning tasks - Always wrap acquire in a timeout
- Never nest acquires on the same semaphore
- Separate semaphores for separate resource pools
- Instrument available permits with metrics and tracing
- Use JoinSet instead of unbounded
tokio::spawnto maintain structured concurrency
These patterns have held up across millions of blocks processed, hundreds of thousands of concurrent tasks, and zero deadlocks in production since adopting them.