Aller au contenu principal
IngénierieMar 28, 2026

Deep EVM #29 : Sémaphores en Rust async — Chasse aux deadlocks et patterns fire-and-forget

OS
Open Soft Team

Engineering Team

Quand la concurrence rencontre les limites de ressources

Quand vous exécutez un pipeline à haut débit — un bot MEV traitant 180 000 chaînes d’arbitrage par bloc, un serveur API gérant 10 000 requêtes simultanées, ou un job ETL écrivant des millions de lignes — vous finissez inévitablement par atteindre un plafond de ressources. Les pools de connexions à la base de données s’épuisent. Les fournisseurs RPC vous limitent en débit. La mémoire gonfle parce que vous avez créé 50 000 tâches tokio, chacune détenant les données d’une chaîne entière.

Le sémaphore est la primitive qui résout ce problème : il limite le nombre d’opérations concurrentes à un maximum donné.

tokio::sync::Semaphore : les bases

use tokio::sync::Semaphore;
use std::sync::Arc;

let semaphore = Arc::new(Semaphore::new(10)); // Maximum 10 opérations concurrentes

for item in items {
    let permit = semaphore.clone().acquire_owned().await.unwrap();
    tokio::spawn(async move {
        process(item).await;
        drop(permit); // Libérer le permit
    });
}

acquire_owned() attend qu’un permit soit disponible, puis le retourne. Le permit est un guard RAII — il est automatiquement libéré quand il est droppé. Cela garantit que le sémaphore ne fuit jamais, même en cas de panique.

Permits RAII : pourquoi c’est important

Le pattern RAII (Resource Acquisition Is Initialization) est crucial pour les sémaphores. Comparez :

// DANGEREUX : le permit peut fuir si process() panique
let permit = semaphore.acquire().await.unwrap();
process(item).await;
permit.forget(); // Oubli intentionnel — rare mais parfois nécessaire

// SÛR : le permit est libéré même en cas de panique
let _permit = semaphore.acquire().await.unwrap();
process(item).await;
// _permit droppé automatiquement ici

Le pattern fire-and-forget

Dans les systèmes de production, vous avez souvent besoin de lancer des opérations d’écriture sans attendre leur achèvement :

struct FireAndForgetWriter {
    semaphore: Arc<Semaphore>,
    db_pool: PgPool,
}

impl FireAndForgetWriter {
    fn new(max_concurrent: usize, db_pool: PgPool) -> Self {
        Self {
            semaphore: Arc::new(Semaphore::new(max_concurrent)),
            db_pool,
        }
    }

    fn write(&self, data: Vec<Record>) {
        let semaphore = self.semaphore.clone();
        let pool = self.db_pool.clone();

        tokio::spawn(async move {
            // Acquérir avec timeout pour éviter les deadlocks
            let permit = match tokio::time::timeout(
                Duration::from_secs(30),
                semaphore.acquire_owned()
            ).await {
                Ok(Ok(permit)) => permit,
                Ok(Err(_)) => {
                    tracing::error!("Semaphore fermé");
                    return;
                }
                Err(_) => {
                    tracing::warn!("Timeout d'acquisition du sémaphore");
                    return;
                }
            };

            if let Err(e) = bulk_insert(&pool, &data).await {
                tracing::error!(error = %e, "Échec de l'écriture en lot");
            }

            drop(permit);
        });
    }
}

Ce pattern est idéal pour les écritures de métriques, de logs et d’événements où vous ne pouvez pas vous permettre de bloquer le chemin critique.

Chasse aux deadlocks

Les deadlocks avec les sémaphores surviennent quand :

  1. Acquisition imbriquée — Une tâche détient un permit et essaie d’en acquérir un autre du même sémaphore
  2. Ordre d’acquisition incohérent — Deux sémaphores acquis dans des ordres différents par différentes tâches
  3. Fuite de permits — Des permits qui ne sont jamais libérés (panic sans RAII, ou forget())

Diagnostic avec tracing

#[instrument(skip(semaphore))]
async fn process_with_tracing(
    semaphore: &Semaphore,
    item: &Item,
) -> Result<()> {
    tracing::debug!(
        available_permits = semaphore.available_permits(),
        "En attente de permit"
    );

    let _permit = semaphore.acquire().await?;

    tracing::debug!(
        available_permits = semaphore.available_permits(),
        "Permit acquis, traitement en cours"
    );

    process(item).await?;

    tracing::debug!("Traitement terminé, permit sera libéré");
    Ok(())
}

Diagnostic avec tokio-console

tokio-console est un débogueur TUI pour les applications tokio :

# Installer
cargo install tokio-console

# Activer dans votre application
# Cargo.toml: tokio = { features = ["tracing"] }
console_subscriber::init();

# Lancer tokio-console dans un autre terminal
tokio-console

tokio-console affiche :

  • Les tâches en attente et depuis combien de temps
  • Les ressources (sémaphores, mutexes) et qui les détient
  • Les relations d’attente qui révèlent les deadlocks

Patterns avancés

Sémaphore pondéré

Pour limiter par coût plutôt que par nombre d’opérations :

let semaphore = Arc::new(Semaphore::new(1_000_000)); // 1MB de budget

// Chaque opération acquiert des permits proportionnels à sa taille
let data_size = data.len();
let _permit = semaphore.acquire_many(data_size as u32).await?;
process(data).await;

try_acquire pour le non-bloquant

match semaphore.try_acquire() {
    Ok(permit) => {
        // Traiter immédiatement
        tokio::spawn(async move {
            process(item).await;
            drop(permit);
        });
    }
    Err(_) => {
        // Capacité pleine — mettre en file d'attente ou rejeter
        tracing::warn!("Contrepression : tâche rejetée");
    }
}

Sémaphore avec timeout

async fn acquire_with_timeout(
    semaphore: &Semaphore,
    timeout_duration: Duration,
) -> Result<SemaphorePermit<'_>> {
    match tokio::time::timeout(timeout_duration, semaphore.acquire()).await {
        Ok(Ok(permit)) => Ok(permit),
        Ok(Err(_)) => Err(Error::SemaphoreClosed),
        Err(_) => Err(Error::AcquireTimeout),
    }
}

Concurrence structurée

Combinez les sémaphores avec la concurrence structurée pour un contrôle complet :

use tokio::task::JoinSet;

async fn process_batch(
    items: Vec<Item>,
    max_concurrent: usize,
) -> Vec<Result<Output>> {
    let semaphore = Arc::new(Semaphore::new(max_concurrent));
    let mut join_set = JoinSet::new();

    for item in items {
        let sem = semaphore.clone();
        join_set.spawn(async move {
            let _permit = sem.acquire().await.unwrap();
            process(item).await
        });
    }

    let mut results = Vec::new();
    while let Some(result) = join_set.join_next().await {
        results.push(result.unwrap());
    }
    results
}

JoinSet garantit que toutes les tâches sont attendues, et le sémaphore limite la concurrence. Pas de fuite de tâches, pas de fuite de permits.

Règles de production

  1. Toujours utiliser RAII — Ne jamais appeler forget() sur un permit sauf si vous savez exactement ce que vous faites
  2. Toujours mettre un timeoutacquire() sans timeout peut bloquer indéfiniment
  3. Monitorer les permits disponibles — Exposez available_permits() comme métrique Prometheus
  4. Dimensionner le sémaphore — Basez-le sur la capacité réelle de la ressource (taille du pool de connexions, limites du fournisseur RPC)
  5. Tester sous charge — Les deadlocks ne se manifestent souvent que sous forte concurrence

Quand utiliser un sémaphore vs d’autres primitives

PrimitiveCas d’usage
SemaphoreLimiter N opérations concurrentes
MutexAccès exclusif à une ressource partagée
RwLockLecture concurrente, écriture exclusive
Channel (bounded)File d’attente producteur-consommateur
BarrierSynchronisation de N tâches à un point

Conclusion

Le sémaphore tokio est la primitive fondamentale pour le contrôle de contrepression en Rust async. Les permits RAII garantissent l’absence de fuite, les timeouts préviennent les deadlocks, et la concurrence structurée avec JoinSet assure que toutes les tâches sont correctement gérées. Pour les systèmes de production à haut débit, le pattern fire-and-forget avec sémaphore est l’équilibre idéal entre performance et fiabilité.