Deep EVM #29 : Sémaphores en Rust async — Chasse aux deadlocks et patterns fire-and-forget
Engineering Team
Quand la concurrence rencontre les limites de ressources
Quand vous exécutez un pipeline à haut débit — un bot MEV traitant 180 000 chaînes d’arbitrage par bloc, un serveur API gérant 10 000 requêtes simultanées, ou un job ETL écrivant des millions de lignes — vous finissez inévitablement par atteindre un plafond de ressources. Les pools de connexions à la base de données s’épuisent. Les fournisseurs RPC vous limitent en débit. La mémoire gonfle parce que vous avez créé 50 000 tâches tokio, chacune détenant les données d’une chaîne entière.
Le sémaphore est la primitive qui résout ce problème : il limite le nombre d’opérations concurrentes à un maximum donné.
tokio::sync::Semaphore : les bases
use tokio::sync::Semaphore;
use std::sync::Arc;
let semaphore = Arc::new(Semaphore::new(10)); // Maximum 10 opérations concurrentes
for item in items {
let permit = semaphore.clone().acquire_owned().await.unwrap();
tokio::spawn(async move {
process(item).await;
drop(permit); // Libérer le permit
});
}
acquire_owned() attend qu’un permit soit disponible, puis le retourne. Le permit est un guard RAII — il est automatiquement libéré quand il est droppé. Cela garantit que le sémaphore ne fuit jamais, même en cas de panique.
Permits RAII : pourquoi c’est important
Le pattern RAII (Resource Acquisition Is Initialization) est crucial pour les sémaphores. Comparez :
// DANGEREUX : le permit peut fuir si process() panique
let permit = semaphore.acquire().await.unwrap();
process(item).await;
permit.forget(); // Oubli intentionnel — rare mais parfois nécessaire
// SÛR : le permit est libéré même en cas de panique
let _permit = semaphore.acquire().await.unwrap();
process(item).await;
// _permit droppé automatiquement ici
Le pattern fire-and-forget
Dans les systèmes de production, vous avez souvent besoin de lancer des opérations d’écriture sans attendre leur achèvement :
struct FireAndForgetWriter {
semaphore: Arc<Semaphore>,
db_pool: PgPool,
}
impl FireAndForgetWriter {
fn new(max_concurrent: usize, db_pool: PgPool) -> Self {
Self {
semaphore: Arc::new(Semaphore::new(max_concurrent)),
db_pool,
}
}
fn write(&self, data: Vec<Record>) {
let semaphore = self.semaphore.clone();
let pool = self.db_pool.clone();
tokio::spawn(async move {
// Acquérir avec timeout pour éviter les deadlocks
let permit = match tokio::time::timeout(
Duration::from_secs(30),
semaphore.acquire_owned()
).await {
Ok(Ok(permit)) => permit,
Ok(Err(_)) => {
tracing::error!("Semaphore fermé");
return;
}
Err(_) => {
tracing::warn!("Timeout d'acquisition du sémaphore");
return;
}
};
if let Err(e) = bulk_insert(&pool, &data).await {
tracing::error!(error = %e, "Échec de l'écriture en lot");
}
drop(permit);
});
}
}
Ce pattern est idéal pour les écritures de métriques, de logs et d’événements où vous ne pouvez pas vous permettre de bloquer le chemin critique.
Chasse aux deadlocks
Les deadlocks avec les sémaphores surviennent quand :
- Acquisition imbriquée — Une tâche détient un permit et essaie d’en acquérir un autre du même sémaphore
- Ordre d’acquisition incohérent — Deux sémaphores acquis dans des ordres différents par différentes tâches
- Fuite de permits — Des permits qui ne sont jamais libérés (panic sans RAII, ou forget())
Diagnostic avec tracing
#[instrument(skip(semaphore))]
async fn process_with_tracing(
semaphore: &Semaphore,
item: &Item,
) -> Result<()> {
tracing::debug!(
available_permits = semaphore.available_permits(),
"En attente de permit"
);
let _permit = semaphore.acquire().await?;
tracing::debug!(
available_permits = semaphore.available_permits(),
"Permit acquis, traitement en cours"
);
process(item).await?;
tracing::debug!("Traitement terminé, permit sera libéré");
Ok(())
}
Diagnostic avec tokio-console
tokio-console est un débogueur TUI pour les applications tokio :
# Installer
cargo install tokio-console
# Activer dans votre application
# Cargo.toml: tokio = { features = ["tracing"] }
console_subscriber::init();
# Lancer tokio-console dans un autre terminal
tokio-console
tokio-console affiche :
- Les tâches en attente et depuis combien de temps
- Les ressources (sémaphores, mutexes) et qui les détient
- Les relations d’attente qui révèlent les deadlocks
Patterns avancés
Sémaphore pondéré
Pour limiter par coût plutôt que par nombre d’opérations :
let semaphore = Arc::new(Semaphore::new(1_000_000)); // 1MB de budget
// Chaque opération acquiert des permits proportionnels à sa taille
let data_size = data.len();
let _permit = semaphore.acquire_many(data_size as u32).await?;
process(data).await;
try_acquire pour le non-bloquant
match semaphore.try_acquire() {
Ok(permit) => {
// Traiter immédiatement
tokio::spawn(async move {
process(item).await;
drop(permit);
});
}
Err(_) => {
// Capacité pleine — mettre en file d'attente ou rejeter
tracing::warn!("Contrepression : tâche rejetée");
}
}
Sémaphore avec timeout
async fn acquire_with_timeout(
semaphore: &Semaphore,
timeout_duration: Duration,
) -> Result<SemaphorePermit<'_>> {
match tokio::time::timeout(timeout_duration, semaphore.acquire()).await {
Ok(Ok(permit)) => Ok(permit),
Ok(Err(_)) => Err(Error::SemaphoreClosed),
Err(_) => Err(Error::AcquireTimeout),
}
}
Concurrence structurée
Combinez les sémaphores avec la concurrence structurée pour un contrôle complet :
use tokio::task::JoinSet;
async fn process_batch(
items: Vec<Item>,
max_concurrent: usize,
) -> Vec<Result<Output>> {
let semaphore = Arc::new(Semaphore::new(max_concurrent));
let mut join_set = JoinSet::new();
for item in items {
let sem = semaphore.clone();
join_set.spawn(async move {
let _permit = sem.acquire().await.unwrap();
process(item).await
});
}
let mut results = Vec::new();
while let Some(result) = join_set.join_next().await {
results.push(result.unwrap());
}
results
}
JoinSet garantit que toutes les tâches sont attendues, et le sémaphore limite la concurrence. Pas de fuite de tâches, pas de fuite de permits.
Règles de production
- Toujours utiliser RAII — Ne jamais appeler
forget()sur un permit sauf si vous savez exactement ce que vous faites - Toujours mettre un timeout —
acquire()sans timeout peut bloquer indéfiniment - Monitorer les permits disponibles — Exposez
available_permits()comme métrique Prometheus - Dimensionner le sémaphore — Basez-le sur la capacité réelle de la ressource (taille du pool de connexions, limites du fournisseur RPC)
- Tester sous charge — Les deadlocks ne se manifestent souvent que sous forte concurrence
Quand utiliser un sémaphore vs d’autres primitives
| Primitive | Cas d’usage |
|---|---|
| Semaphore | Limiter N opérations concurrentes |
| Mutex | Accès exclusif à une ressource partagée |
| RwLock | Lecture concurrente, écriture exclusive |
| Channel (bounded) | File d’attente producteur-consommateur |
| Barrier | Synchronisation de N tâches à un point |
Conclusion
Le sémaphore tokio est la primitive fondamentale pour le contrôle de contrepression en Rust async. Les permits RAII garantissent l’absence de fuite, les timeouts préviennent les deadlocks, et la concurrence structurée avec JoinSet assure que toutes les tâches sont correctement gérées. Pour les systèmes de production à haut débit, le pattern fire-and-forget avec sémaphore est l’équilibre idéal entre performance et fiabilité.