Deep EVM #29: Semáforos en Rust Asíncrono — Caza de Deadlocks y Patrones Fire-and-Forget
Engineering Team
Por qué semáforos en Rust asíncrono
Cuando ejecutas un pipeline de alto rendimiento — un bot de MEV procesando 180.000 cadenas de arbitraje por bloque, un servidor API manejando 10.000 solicitudes concurrentes, o un job ETL escribiendo millones de filas — inevitablemente llegas a un techo de recursos. Los pools de conexiones a base de datos se agotan. Los proveedores de RPC te limitan. La memoria se dispara porque lanzaste 50.000 tareas tokio, cada una con datos de una cadena.
El enfoque ingenuo es concurrencia ilimitada: tokio::spawn para cada unidad de trabajo y esperar que el runtime lo resuelva. No lo hace. En producción, observamos 15,4 GB de uso de memoria de 2,7 millones de tareas spawneadas, cada una con un Vec<Hop> más contexto de simulación. La solución fue concurrencia por lotes con contrapresión basada en semáforos, que redujo la memoria a 0,8 GB.
Un semáforo es la primitiva correcta cuando necesitas limitar el número de operaciones concurrentes sin serializarlas completamente. A diferencia de un mutex (que permite exactamente una), un semáforo permite N accesos concurrentes. Esto lo hace perfecto para:
- Concurrencia de escritura a BD: limitar al tamaño del pool de conexiones (ej. 20 escrituras concurrentes)
- Limitación de velocidad RPC: respetar los límites de tasa de los proveedores (ej. 100 solicitudes/segundo)
- Control de memoria: prevenir que las tareas spawneadas se acumulen sin límite
- Control de backpressure: propagar presión de retorno desde los recursos downstream
La API de tokio::sync::Semaphore
La API del semáforo de tokio es mínima pero poderosa:
use tokio::sync::Semaphore;
use std::sync::Arc;
// Crear un semáforo con N permisos
let sem = Arc::new(Semaphore::new(20));
// Adquirir un permiso (espera si todos están ocupados)
let permit = sem.acquire().await.unwrap();
// ... hacer trabajo ...
drop(permit); // Liberar el permiso
// Intentar adquirir sin esperar
let permit = sem.try_acquire();
// Adquirir con timeout
let permit = tokio::time::timeout(
Duration::from_secs(5),
sem.acquire()
).await;
Patrón: escritura fire-and-forget con semáforo
Un patrón común en bots de MEV es la escritura fire-and-forget: enviar resultados a la base de datos sin esperar confirmación, pero con contrapresión para no desbordar el pool de conexiones.
struct Writer {
pool: PgPool,
semaphore: Arc<Semaphore>,
}
impl Writer {
fn new(pool: PgPool, max_concurrent: usize) -> Self {
Self {
pool,
semaphore: Arc::new(Semaphore::new(max_concurrent)),
}
}
fn write(&self, data: WritePayload) {
let pool = self.pool.clone();
let sem = self.semaphore.clone();
tokio::spawn(async move {
// Esperar por un permiso (backpressure aquí)
let _permit = match tokio::time::timeout(
Duration::from_secs(10),
sem.acquire()
).await {
Ok(Ok(permit)) => permit,
_ => {
tracing::warn!("Write semaphore timeout, dropping payload");
return;
}
};
// Ejecutar la escritura
if let Err(e) = insert_data(&pool, &data).await {
tracing::error!("Write failed: {}", e);
}
// _permit se libera aquí (RAII)
});
}
}
Este patrón es no-bloqueante para el caller — la escritura se ejecuta en una tarea separada. Pero el semáforo previene que se acumulen más escrituras de las que el pool puede manejar.
Diagnóstico de deadlocks
Los deadlocks con semáforos ocurren cuando:
- Adquisición anidada: una tarea que ya tiene un permiso intenta adquirir otro del mismo semáforo
- Dependencia circular: tarea A espera permiso de semáforo B, tarea B espera permiso de semáforo A
- Permiso retenido a través de await: una tarea adquiere un permiso, hace un await que nunca resuelve, y nunca libera el permiso
Detección con tokio-console
// En main.rs
console_subscriber::init();
// Ejecutar con:
// RUSTFLAGS="--cfg tokio_unstable" cargo run
// tokio-console
tokio-console muestra:
- Tareas bloqueadas esperando semáforos
- Duración del bloqueo
- Stack traces de las tareas bloqueadas
Detección con tracing
use tracing::{instrument, warn};
#[instrument(skip(sem), fields(available = sem.available_permits()))]
async fn acquire_with_diagnostics(
sem: &Semaphore,
label: &str,
) -> Result<SemaphorePermit<'_>, AcquireError> {
let start = Instant::now();
let permit = sem.acquire().await?;
let elapsed = start.elapsed();
if elapsed > Duration::from_secs(1) {
warn!(
label = label,
elapsed_ms = elapsed.as_millis(),
"Slow semaphore acquisition detected"
);
}
Ok(permit)
}
Patrón: semáforo con timeout obligatorio
En producción, nunca adquieras un semáforo sin timeout:
async fn safe_acquire(
sem: &Semaphore,
timeout_secs: u64,
) -> Result<SemaphorePermit<'_>> {
tokio::time::timeout(
Duration::from_secs(timeout_secs),
sem.acquire()
)
.await
.map_err(|_| anyhow!("Semaphore acquire timed out after {}s", timeout_secs))?
.map_err(|e| anyhow!("Semaphore closed: {}", e))
}
Permisos RAII: nunca perder un permiso
El patrón RAII (Resource Acquisition Is Initialization) de Rust garantiza que los permisos se liberen incluso si la tarea paniquea:
async fn process_with_permit(sem: Arc<Semaphore>, data: Data) {
let _permit = sem.acquire().await.unwrap();
// Si process() paniquea, _permit aún se libera
// porque Rust ejecuta drop() al salir del scope
process(data).await;
}
Esto es fundamentalmente más seguro que el patrón acquire/release manual de otros lenguajes, donde un panic entre acquire y release pierde el permiso permanentemente.
Patrones avanzados
Semáforo ponderado
Para operaciones con diferente peso de recurso:
let sem = Semaphore::new(100); // 100 unidades de capacidad
// Operación pequeña: 1 unidad
let _permit = sem.acquire_many(1).await?;
// Operación grande: 10 unidades
let _permit = sem.acquire_many(10).await?;
Rate limiter basado en semáforo
struct RateLimiter {
semaphore: Arc<Semaphore>,
}
impl RateLimiter {
fn new(max_per_second: usize) -> Self {
let sem = Arc::new(Semaphore::new(max_per_second));
let sem_clone = sem.clone();
// Reponer permisos cada segundo
tokio::spawn(async move {
loop {
tokio::time::sleep(Duration::from_secs(1)).await;
let deficit = max_per_second - sem_clone.available_permits();
sem_clone.add_permits(deficit);
}
});
Self { semaphore: sem }
}
async fn acquire(&self) -> SemaphorePermit<'_> {
self.semaphore.acquire().await.unwrap()
}
}
Métricas de producción
Monitorea estos indicadores:
- available_permits: si está constantemente en 0, necesitas más capacidad
- acquire_duration_p99: latencia de adquisición en el percentil 99
- timeout_count: número de timeouts de adquisición por minuto
- held_duration_p99: cuánto tiempo se retienen los permisos
use prometheus::{Histogram, IntGauge};
let permits_available = IntGauge::new(
"semaphore_permits_available",
"Number of available semaphore permits"
).unwrap();
let acquire_duration = Histogram::new(
"semaphore_acquire_duration_seconds",
"Time to acquire a semaphore permit"
).unwrap();
Caso de estudio: bot de MEV
Nuestro bot de MEV usa tres semáforos:
- RPC semaphore (50 permisos): limita llamadas concurrentes al nodo Ethereum
- Simulation semaphore (20 permisos): limita simulaciones EVM concurrentes (intensivas en CPU)
- DB write semaphore (10 permisos): limita escrituras concurrentes al pool PostgreSQL
struct MevBot {
rpc_sem: Arc<Semaphore>,
sim_sem: Arc<Semaphore>,
db_sem: Arc<Semaphore>,
}
impl MevBot {
async fn process_opportunity(&self, opp: Opportunity) {
// Fase 1: leer estado on-chain
let _rpc = self.rpc_sem.acquire().await.unwrap();
let state = self.read_state(&opp).await;
drop(_rpc);
// Fase 2: simular
let _sim = self.sim_sem.acquire().await.unwrap();
let result = self.simulate(&opp, state).await;
drop(_sim);
// Fase 3: escribir resultado (fire-and-forget)
if result.is_profitable() {
let db_sem = self.db_sem.clone();
tokio::spawn(async move {
let _db = db_sem.acquire().await.unwrap();
save_result(result).await;
});
}
}
}
Cada fase libera su permiso antes de que la siguiente lo adquiera, maximizando el paralelismo entre fases.
Conclusión
Los semáforos son la primitiva de concurrencia esencial para sistemas de alto rendimiento en Rust asíncrono. Proporcionan control de backpressure sin serialización completa, previenen la exhaustión de recursos, y con el patrón RAII de Rust, garantizan la liberación de permisos incluso ante errores. Siempre usa timeouts de adquisición en producción, monitorea los permisos disponibles, y diagnostica los bloqueos con tokio-console y tracing instrumentado.