Перейти к основному содержимому
ИнженерияMar 28, 2026

Deep EVM #29: Семафоры в async Rust — охота на дедлоки и fire-and-forget паттерны

OS
Open Soft Team

Engineering Team

Зачем семафоры в асинхронном Rust

Когда вы запускаете высоконагруженный конвейер — MEV-бот, обрабатывающий 180 000 арбитражных цепочек за блок, API-сервер с 10 000 одновременных запросов или ETL-процесс, записывающий миллионы строк — вы неизбежно упираетесь в потолок ресурсов. Пулы подключений к базе исчерпываются. RPC-провайдеры отвечают rate-limit ошибками. Память раздувается, потому что вы заспаунили 50 000 tokio-задач, каждая из которых удерживает данные цепочки.

Наивный подход — неограниченная конкурентность: tokio::spawn на каждую единицу работы в надежде, что рантайм разберётся. Не разберётся. В продакшне мы наблюдали потребление 15,4 ГБ памяти от 2,7 миллиона порождённых задач, каждая из которых держала Vec<Hop> плюс контекст симуляции. Решением стала пакетная конкурентность с backpressure на семафорах, что снизило потребление до 0,8 ГБ.

Семафор — правильный примитив, когда нужно ограничить число одновременных операций, не сериализуя их полностью. В отличие от мьютекса (который допускает ровно одного), семафор допускает N одновременных обращений. Это идеально для:

  • Конкурентность записи в БД: ограничение до размера пула соединений (например, 20 одновременных записей)
  • Rate limiting RPC: ограничение исходящих запросов, чтобы избежать 429-ответов от провайдеров
  • Backpressure по памяти: предотвращение неограниченного порождения задач через гейтинг по доступным пермитам
  • Пакетная обработка: контроль числа параллельных батчей симуляции против общего узла

Основы tokio::sync::Semaphore

tokio::sync::Semaphore — это счётный семафор, спроектированный для асинхронного кода. Он поддерживает внутренний счётчик доступных пермитов. Задачи захватывают пермиты перед выполнением и освобождают по завершении.

use std::sync::Arc;
use tokio::sync::Semaphore;

// Разрешаем до 20 одновременных записей в БД
let semaphore = Arc::new(Semaphore::new(20));

for chain in chains_to_persist {
    let sem = semaphore.clone();
    let db_pool = db_pool.clone();

    tokio::spawn(async move {
        // Захватываем пермит — блокируемся, если все 20 заняты
        let _permit = sem.acquire().await.unwrap();

        // Выполняем запись — пермит удерживается в этой области
        sqlx::query("UPDATE chains SET profit = $1 WHERE id = $2")
            .bind(chain.profit)
            .bind(chain.id)
            .execute(&db_pool)
            .await
            .ok();

        // _permit дропается здесь — автоматически освобождается
    });
}

Основной API:

  • Semaphore::new(permits) — создание с N пермитами
  • acquire(&self) — асинхронный, ждёт доступного пермита, возвращает SemaphorePermit (RAII-гард)
  • try_acquire(&self) — синхронный, немедленно возвращает Err(TryAcquireError) при отсутствии пермитов
  • acquire_owned(self: Arc<Self>) — возвращает OwnedSemaphorePermit, владеющий Arc, полезен когда пермит должен пережить заимствование
  • add_permits(&self, n) — динамическое увеличение счётчика
  • close(&self) — закрытие семафора; все ожидающие acquire получают Err(AcquireError)
  • available_permits(&self) — инспекция текущего счётчика (полезно для метрик)

Ключевое проектное решение: acquire() возвращает RAII-гард. При дропе гарда пермит освобождается. Это означает автоматическую очистку даже при паниках, ранних возвратах и операторе ? — пока гард живёт на стеке.

Fire-and-Forget паттерн записи

В высоконагруженных системах часто нужно персистить данные, не блокируя горячий путь. Паттерн: порождаем фоновую задачу, которая захватывает пермит семафора, выполняет запись и освобождает. Вызывающий код не ждёт результата.

В хорошо спроектированной системе это реализуется как декоратор, оборачивающий внутренний сервис хранения:

use std::sync::Arc;
use tokio::sync::Semaphore;

pub struct AsyncChainStore<S: ChainStore> {
    inner: Arc<S>,
    semaphore: Arc<Semaphore>,
}

impl<S: ChainStore + Send + Sync + 'static> AsyncChainStore<S> {
    pub fn new(inner: Arc<S>, max_concurrent_writes: usize) -> Self {
        Self {
            inner,
            semaphore: Arc::new(Semaphore::new(max_concurrent_writes)),
        }
    }

    /// Fire-and-forget: персистим прибыль цепочек, не блокируя вызывающий код.
    /// Backpressure обеспечивается семафором — если все пермиты заняты,
    /// порождённая задача ждёт, но вызывающий код возвращается немедленно.
    pub fn save_profits_async(&self, chains: Vec<ChainProfit>) {
        let inner = self.inner.clone();
        let sem = self.semaphore.clone();

        tokio::spawn(async move {
            // Захватываем пермит — здесь срабатывает backpressure
            let _permit = match sem.acquire().await {
                Ok(p) => p,
                Err(_) => {
                    tracing::warn!("семафор закрыт, дропаем запись");
                    return;
                }
            };

            if let Err(e) = inner.batch_update_profits(&chains).await {
                tracing::error!(
                    error = %e,
                    count = chains.len(),
                    "fire-and-forget запись не удалась"
                );
            }
            // _permit дропается здесь — следующая задача в очереди продолжает
        });
    }
}

Этот паттерн декоратора (R-004 в наших соглашениях кодовой базы) сохраняет внутренний ChainStore чистым — он ничего не знает об управлении конкурентностью. Декоратор занимается управлением семафором, логированием ошибок и порождением задач. ServiceLocator связывает их:

// В locator/mod.rs
let chain_store = Arc::new(PgChainStore::new(db_pool.clone()));
let async_chain_store = AsyncChainStore::new(chain_store, 20);

Почему бы просто не использовать встроенное ограничение пула БД? Потому что семафор даёт отдельную ручку настройки. Пул может иметь 50 соединений, но вы хотите, чтобы fire-and-forget записи использовали максимум 20, оставив 30 для критичных по задержке чтений. Семафор обеспечивает это разделение на уровне приложения.

Сценарии дедлоков

Дедлоки на семафорах коварны: они не вызывают паники или ошибки — программа просто перестаёт продвигаться. Вот паттерны, с которыми мы столкнулись в продакшне.

Сценарий 1: Пермит не освобождается при раннем возврате

async fn process_batch(sem: &Semaphore, batch: &[Chain]) -> Result<()> {
    let permit = sem.acquire().await?;

    // Ранний возврат если батч пуст — НО пермит всё ещё удерживается!
    if batch.is_empty() {
        return Ok(()); // пермит дропается здесь — на самом деле ок в этом случае
    }

    // Реальная опасность: сохранение пермита в структуре, которая переживает скоуп
    let ctx = ProcessingContext {
        permit: Some(permit), // пермит перемещён в структуру
        batch,
    };

    // Если process() сохраняет ctx куда-то долгоживущее, пермит утекает
    process(ctx).await?;
    Ok(())
}

RAII-гард защищает от большинства ранних возвратов. Опасность возникает, когда вы перемещаете пермит в структуру, которая покидает ожидаемое время жизни — хранится в HashMap, отправляется через канал или удерживается в static.

Сценарий 2: Вложенный Acquire (Самодедлок)

async fn simulate_and_persist(
    sem: &Semaphore,
    chain: &Chain,
) -> Result<()> {
    let _outer = sem.acquire().await?; // Берём 1 из N пермитов

    let result = simulate(chain).await?;

    // БАГ: захватываем ТОТ ЖЕ семафор внутри удерживаемого пермита
    let _inner = sem.acquire().await?; // При N=1 — дедлок. При N>1 — снижение пропускной способности
    persist(result).await?;

    Ok(())
}

При N=1 это мгновенный дедлок. При N>1 это работает, пока нагрузка не вырастет и все пермиты не окажутся заняты задачами, ждущими внутреннего acquire. Решение: используйте отдельные семафоры для разных задач или реструктурируйте код, чтобы избежать вложенного захвата.

Сценарий 3: Пермит удерживается через await-точки в Select

async fn process_with_timeout(sem: &Semaphore) -> Result<()> {
    let permit = sem.acquire().await?;

    tokio::select! {
        result = do_work() => {
            result?;
        }
        _ = tokio::time::sleep(Duration::from_secs(30)) => {
            tracing::warn!("таймаут, но пермит всё ещё удерживается до дропа");
        }
    }
    // пермит дропается здесь — это корректно
    // НО: если do_work() порождает подзадачу, захватившую пермит,
    // отмена do_work() НЕ отменяет подзадачу
    Ok(())
}

Макрос select! отменяет проигравшую ветку, дропая её future, но любые задачи, порождённые внутри этого future, продолжают работать. Если эти задачи захватили ссылку на пермит (или клон OwnedSemaphorePermit), пермит не освобождается когда ожидалось.

Диагностика дедлоков на семафорах

Когда система перестаёт продвигаться, как отличить дедлок на семафоре от медленной зависимости?

Диагностика через трейсинг

Инструментируйте acquire/release структурированным логированием:

let permits_before = semaphore.available_permits();
tracing::debug!(
    available = permits_before,
    "захватываем пермит семафора"
);

let _permit = semaphore.acquire().await?;

tracing::debug!(
    available = semaphore.available_permits(),
    "пермит семафора захвачен"
);

Если логи показывают «захватываем», но никогда «захвачен» — все пермиты удерживаются где-то. Сопоставьте с последними N записями «захвачен», чтобы найти виновника.

Метрики Prometheus

Экспортируйте gauge-метрику доступных пермитов:

use metrics::gauge;

// В декораторе или middleware
gauge!("semaphore_available_permits", semaphore.available_permits() as f64);

Gauge, упавший до нуля и оставшийся там — это дедлок. Gauge, колеблющийся около нуля — это contention (высокая нагрузка, возможно нужно больше пермитов).

tokio-console

tokio-console — диагностический инструмент, подключающийся к работающему tokio-приложению и показывающий состояние задач в реальном времени:

cargo add --dev console-subscriber
// В main.rs (только debug-сборки)
#[cfg(debug_assertions)]
console_subscriber::init();

Запустите tokio-console в другом терминале и ищите задачи, застрявшие в состоянии “Idle” на acquire семафора. Инструмент показывает, какая именно задача ждёт и как долго.

Закалённые продакшн-решения

Решение 1: Всегда используйте OwnedSemaphorePermit с Arc

При порождении задач предпочитайте acquire_owned() вместо acquire(). Owned-вариант принимает Arc<Semaphore> и возвращает пермит, который не заимствует семафор — он владеет клоном Arc. Это полностью устраняет проблемы с временами жизни:

let semaphore = Arc::new(Semaphore::new(20));

for item in items {
    let permit = semaphore.clone().acquire_owned().await?;

    tokio::spawn(async move {
        // пермит перемещён в задачу — никаких проблем с заимствованиями
        do_work(item).await;
        drop(permit); // явный drop для ясности, или пусть скоуп обработает
    });
}

Решение 2: Acquire с таймаутом

Никогда не ждите пермит бесконечно в продакшне. Используйте tokio::time::timeout для раннего обнаружения дедлоков:

use tokio::time::{timeout, Duration};

let permit = timeout(
    Duration::from_secs(30),
    semaphore.acquire(),
).await
    .map_err(|_| anyhow!("таймаут acquire семафора — возможен дедлок"))?
    .map_err(|_| anyhow!("семафор закрыт"))?;

Когда таймаут срабатывает, логируйте доступные пермиты, число ожидающих задач и стек вызовов. Это даёт немедленную видимость дедлока без падения процесса.

Решение 3: Структурированная конкурентность с JoinSet

Вместо неограниченного tokio::spawn используйте tokio::task::JoinSet для сохранения владения порождёнными задачами в паре с семафором:

use tokio::task::JoinSet;

let semaphore = Arc::new(Semaphore::new(20));
let mut join_set = JoinSet::new();

for batch in batches {
    let permit = semaphore.clone().acquire_owned().await?;
    let db_pool = db_pool.clone();

    join_set.spawn(async move {
        let result = process_batch(&db_pool, &batch).await;
        drop(permit); // освобождаем перед тем как JoinSet соберёт результат
        result
    });
}

// Дренируем все задачи — собираем ошибки вместо тихой потери
while let Some(result) = join_set.join_next().await {
    match result {
        Ok(Ok(())) => {}
        Ok(Err(e)) => tracing::error!(error = %e, "обработка батча не удалась"),
        Err(e) => tracing::error!(error = %e, "задача запаниковала"),
    }
}

Этот паттерн гарантирует: (1) ограниченную конкурентность через семафор, (2) отсутствие тихих падений задач, (3) родительская задача знает, когда все дочерние завершились, (4) пермиты всегда освобождаются, даже если задача паникует (потому что JoinSet::join_next возвращает JoinError для запаниковавших задач, а пермит дропается вместе с состоянием задачи).

Решение 4: Отдельные семафоры для отдельных задач

Никогда не используйте один семафор для несвязанных операций. В нашем MEV-конвейере мы используем отдельные семафоры:

pub struct ConcurrencyLimits {
    /// Ограничивает конкурентные RPC-вызовы симуляции к узлу
    pub simulation: Arc<Semaphore>,
    /// Ограничивает конкурентные записи в БД для персистенции цепочек
    pub db_writes: Arc<Semaphore>,
    /// Ограничивает конкурентные обработчики подписки на мемпул
    pub mempool: Arc<Semaphore>,
}

impl ConcurrencyLimits {
    pub fn new() -> Self {
        Self {
            simulation: Arc::new(Semaphore::new(4)),   // узел выдерживает 4 параллельных батча
            db_writes: Arc::new(Semaphore::new(20)),   // 20 из 50 соединений пула
            mempool: Arc::new(Semaphore::new(100)),    // 100 конкурентных обработчиков tx
        }
    }
}

Это полностью устраняет дедлок вложенного acquire — задача захватывает пермит simulation, затем пермит db_writes, и это независимые пулы.

Производительность: накладные расходы семафора

Является ли сам семафор узким местом? На практике — нет. tokio::sync::Semaphore реализован на атомарном счётчике и интрузивном связном списке ожидающих. Захват неоспариваемого пермита — один fetch_sub, наносекунды. Даже при contention накладные расходы — это уведомление waker’а (тоже наносекунды) по сравнению с миллисекундами вашего реального I/O.

Мы замерили накладные расходы семафора в нашем конвейере:

ОперацияБез семафораС семафором (20 пермитов)
10 000 записей в БД1 340 мс (все конкурентно, ошибки исчерпания пула)1 580 мс (контролируемо, ноль ошибок)
500 RPC-симуляций8 200 мс (узел перегружен, таймауты)9 100 мс (по 4 за раз, ноль таймаутов)
Память (2,7М задач)15,4 ГБ0,8 ГБ (батчами с JoinSet)

10-15% накладных расходов по wall-clock времени пренебрежимо малы по сравнению с устранением ошибок, таймаутов и OOM-крешей.

Заключение

Семафоры в асинхронном Rust обманчиво просты — acquire, выполняем работу, дропаем пермит. Сложность проявляется в продакшне: пермиты, утёкшие в долгоживущие структуры, вложенные acquire через стек вызовов, пермиты, удерживаемые через границы отмены select!.

Защитный плейбук:

  1. Всегда используйте acquire_owned() при порождении задач
  2. Всегда оборачивайте acquire в таймаут
  3. Никогда не делайте вложенный acquire на одном семафоре
  4. Разделяйте семафоры для разных пулов ресурсов
  5. Инструментируйте доступные пермиты метриками и трейсингом
  6. Используйте JoinSet вместо неограниченного tokio::spawn для структурированной конкурентности

Эти паттерны выдержали миллионы обработанных блоков, сотни тысяч конкурентных задач и ноль дедлоков в продакшне с момента их внедрения.