Deep EVM #29: Семафоры в async Rust — охота на дедлоки и fire-and-forget паттерны
Engineering Team
Зачем семафоры в асинхронном Rust
Когда вы запускаете высоконагруженный конвейер — MEV-бот, обрабатывающий 180 000 арбитражных цепочек за блок, API-сервер с 10 000 одновременных запросов или ETL-процесс, записывающий миллионы строк — вы неизбежно упираетесь в потолок ресурсов. Пулы подключений к базе исчерпываются. RPC-провайдеры отвечают rate-limit ошибками. Память раздувается, потому что вы заспаунили 50 000 tokio-задач, каждая из которых удерживает данные цепочки.
Наивный подход — неограниченная конкурентность: tokio::spawn на каждую единицу работы в надежде, что рантайм разберётся. Не разберётся. В продакшне мы наблюдали потребление 15,4 ГБ памяти от 2,7 миллиона порождённых задач, каждая из которых держала Vec<Hop> плюс контекст симуляции. Решением стала пакетная конкурентность с backpressure на семафорах, что снизило потребление до 0,8 ГБ.
Семафор — правильный примитив, когда нужно ограничить число одновременных операций, не сериализуя их полностью. В отличие от мьютекса (который допускает ровно одного), семафор допускает N одновременных обращений. Это идеально для:
- Конкурентность записи в БД: ограничение до размера пула соединений (например, 20 одновременных записей)
- Rate limiting RPC: ограничение исходящих запросов, чтобы избежать 429-ответов от провайдеров
- Backpressure по памяти: предотвращение неограниченного порождения задач через гейтинг по доступным пермитам
- Пакетная обработка: контроль числа параллельных батчей симуляции против общего узла
Основы tokio::sync::Semaphore
tokio::sync::Semaphore — это счётный семафор, спроектированный для асинхронного кода. Он поддерживает внутренний счётчик доступных пермитов. Задачи захватывают пермиты перед выполнением и освобождают по завершении.
use std::sync::Arc;
use tokio::sync::Semaphore;
// Разрешаем до 20 одновременных записей в БД
let semaphore = Arc::new(Semaphore::new(20));
for chain in chains_to_persist {
let sem = semaphore.clone();
let db_pool = db_pool.clone();
tokio::spawn(async move {
// Захватываем пермит — блокируемся, если все 20 заняты
let _permit = sem.acquire().await.unwrap();
// Выполняем запись — пермит удерживается в этой области
sqlx::query("UPDATE chains SET profit = $1 WHERE id = $2")
.bind(chain.profit)
.bind(chain.id)
.execute(&db_pool)
.await
.ok();
// _permit дропается здесь — автоматически освобождается
});
}
Основной API:
Semaphore::new(permits)— создание с N пермитамиacquire(&self)— асинхронный, ждёт доступного пермита, возвращаетSemaphorePermit(RAII-гард)try_acquire(&self)— синхронный, немедленно возвращаетErr(TryAcquireError)при отсутствии пермитовacquire_owned(self: Arc<Self>)— возвращаетOwnedSemaphorePermit, владеющийArc, полезен когда пермит должен пережить заимствованиеadd_permits(&self, n)— динамическое увеличение счётчикаclose(&self)— закрытие семафора; все ожидающиеacquireполучаютErr(AcquireError)available_permits(&self)— инспекция текущего счётчика (полезно для метрик)
Ключевое проектное решение: acquire() возвращает RAII-гард. При дропе гарда пермит освобождается. Это означает автоматическую очистку даже при паниках, ранних возвратах и операторе ? — пока гард живёт на стеке.
Fire-and-Forget паттерн записи
В высоконагруженных системах часто нужно персистить данные, не блокируя горячий путь. Паттерн: порождаем фоновую задачу, которая захватывает пермит семафора, выполняет запись и освобождает. Вызывающий код не ждёт результата.
В хорошо спроектированной системе это реализуется как декоратор, оборачивающий внутренний сервис хранения:
use std::sync::Arc;
use tokio::sync::Semaphore;
pub struct AsyncChainStore<S: ChainStore> {
inner: Arc<S>,
semaphore: Arc<Semaphore>,
}
impl<S: ChainStore + Send + Sync + 'static> AsyncChainStore<S> {
pub fn new(inner: Arc<S>, max_concurrent_writes: usize) -> Self {
Self {
inner,
semaphore: Arc::new(Semaphore::new(max_concurrent_writes)),
}
}
/// Fire-and-forget: персистим прибыль цепочек, не блокируя вызывающий код.
/// Backpressure обеспечивается семафором — если все пермиты заняты,
/// порождённая задача ждёт, но вызывающий код возвращается немедленно.
pub fn save_profits_async(&self, chains: Vec<ChainProfit>) {
let inner = self.inner.clone();
let sem = self.semaphore.clone();
tokio::spawn(async move {
// Захватываем пермит — здесь срабатывает backpressure
let _permit = match sem.acquire().await {
Ok(p) => p,
Err(_) => {
tracing::warn!("семафор закрыт, дропаем запись");
return;
}
};
if let Err(e) = inner.batch_update_profits(&chains).await {
tracing::error!(
error = %e,
count = chains.len(),
"fire-and-forget запись не удалась"
);
}
// _permit дропается здесь — следующая задача в очереди продолжает
});
}
}
Этот паттерн декоратора (R-004 в наших соглашениях кодовой базы) сохраняет внутренний ChainStore чистым — он ничего не знает об управлении конкурентностью. Декоратор занимается управлением семафором, логированием ошибок и порождением задач. ServiceLocator связывает их:
// В locator/mod.rs
let chain_store = Arc::new(PgChainStore::new(db_pool.clone()));
let async_chain_store = AsyncChainStore::new(chain_store, 20);
Почему бы просто не использовать встроенное ограничение пула БД? Потому что семафор даёт отдельную ручку настройки. Пул может иметь 50 соединений, но вы хотите, чтобы fire-and-forget записи использовали максимум 20, оставив 30 для критичных по задержке чтений. Семафор обеспечивает это разделение на уровне приложения.
Сценарии дедлоков
Дедлоки на семафорах коварны: они не вызывают паники или ошибки — программа просто перестаёт продвигаться. Вот паттерны, с которыми мы столкнулись в продакшне.
Сценарий 1: Пермит не освобождается при раннем возврате
async fn process_batch(sem: &Semaphore, batch: &[Chain]) -> Result<()> {
let permit = sem.acquire().await?;
// Ранний возврат если батч пуст — НО пермит всё ещё удерживается!
if batch.is_empty() {
return Ok(()); // пермит дропается здесь — на самом деле ок в этом случае
}
// Реальная опасность: сохранение пермита в структуре, которая переживает скоуп
let ctx = ProcessingContext {
permit: Some(permit), // пермит перемещён в структуру
batch,
};
// Если process() сохраняет ctx куда-то долгоживущее, пермит утекает
process(ctx).await?;
Ok(())
}
RAII-гард защищает от большинства ранних возвратов. Опасность возникает, когда вы перемещаете пермит в структуру, которая покидает ожидаемое время жизни — хранится в HashMap, отправляется через канал или удерживается в static.
Сценарий 2: Вложенный Acquire (Самодедлок)
async fn simulate_and_persist(
sem: &Semaphore,
chain: &Chain,
) -> Result<()> {
let _outer = sem.acquire().await?; // Берём 1 из N пермитов
let result = simulate(chain).await?;
// БАГ: захватываем ТОТ ЖЕ семафор внутри удерживаемого пермита
let _inner = sem.acquire().await?; // При N=1 — дедлок. При N>1 — снижение пропускной способности
persist(result).await?;
Ok(())
}
При N=1 это мгновенный дедлок. При N>1 это работает, пока нагрузка не вырастет и все пермиты не окажутся заняты задачами, ждущими внутреннего acquire. Решение: используйте отдельные семафоры для разных задач или реструктурируйте код, чтобы избежать вложенного захвата.
Сценарий 3: Пермит удерживается через await-точки в Select
async fn process_with_timeout(sem: &Semaphore) -> Result<()> {
let permit = sem.acquire().await?;
tokio::select! {
result = do_work() => {
result?;
}
_ = tokio::time::sleep(Duration::from_secs(30)) => {
tracing::warn!("таймаут, но пермит всё ещё удерживается до дропа");
}
}
// пермит дропается здесь — это корректно
// НО: если do_work() порождает подзадачу, захватившую пермит,
// отмена do_work() НЕ отменяет подзадачу
Ok(())
}
Макрос select! отменяет проигравшую ветку, дропая её future, но любые задачи, порождённые внутри этого future, продолжают работать. Если эти задачи захватили ссылку на пермит (или клон OwnedSemaphorePermit), пермит не освобождается когда ожидалось.
Диагностика дедлоков на семафорах
Когда система перестаёт продвигаться, как отличить дедлок на семафоре от медленной зависимости?
Диагностика через трейсинг
Инструментируйте acquire/release структурированным логированием:
let permits_before = semaphore.available_permits();
tracing::debug!(
available = permits_before,
"захватываем пермит семафора"
);
let _permit = semaphore.acquire().await?;
tracing::debug!(
available = semaphore.available_permits(),
"пермит семафора захвачен"
);
Если логи показывают «захватываем», но никогда «захвачен» — все пермиты удерживаются где-то. Сопоставьте с последними N записями «захвачен», чтобы найти виновника.
Метрики Prometheus
Экспортируйте gauge-метрику доступных пермитов:
use metrics::gauge;
// В декораторе или middleware
gauge!("semaphore_available_permits", semaphore.available_permits() as f64);
Gauge, упавший до нуля и оставшийся там — это дедлок. Gauge, колеблющийся около нуля — это contention (высокая нагрузка, возможно нужно больше пермитов).
tokio-console
tokio-console — диагностический инструмент, подключающийся к работающему tokio-приложению и показывающий состояние задач в реальном времени:
cargo add --dev console-subscriber
// В main.rs (только debug-сборки)
#[cfg(debug_assertions)]
console_subscriber::init();
Запустите tokio-console в другом терминале и ищите задачи, застрявшие в состоянии “Idle” на acquire семафора. Инструмент показывает, какая именно задача ждёт и как долго.
Закалённые продакшн-решения
Решение 1: Всегда используйте OwnedSemaphorePermit с Arc
При порождении задач предпочитайте acquire_owned() вместо acquire(). Owned-вариант принимает Arc<Semaphore> и возвращает пермит, который не заимствует семафор — он владеет клоном Arc. Это полностью устраняет проблемы с временами жизни:
let semaphore = Arc::new(Semaphore::new(20));
for item in items {
let permit = semaphore.clone().acquire_owned().await?;
tokio::spawn(async move {
// пермит перемещён в задачу — никаких проблем с заимствованиями
do_work(item).await;
drop(permit); // явный drop для ясности, или пусть скоуп обработает
});
}
Решение 2: Acquire с таймаутом
Никогда не ждите пермит бесконечно в продакшне. Используйте tokio::time::timeout для раннего обнаружения дедлоков:
use tokio::time::{timeout, Duration};
let permit = timeout(
Duration::from_secs(30),
semaphore.acquire(),
).await
.map_err(|_| anyhow!("таймаут acquire семафора — возможен дедлок"))?
.map_err(|_| anyhow!("семафор закрыт"))?;
Когда таймаут срабатывает, логируйте доступные пермиты, число ожидающих задач и стек вызовов. Это даёт немедленную видимость дедлока без падения процесса.
Решение 3: Структурированная конкурентность с JoinSet
Вместо неограниченного tokio::spawn используйте tokio::task::JoinSet для сохранения владения порождёнными задачами в паре с семафором:
use tokio::task::JoinSet;
let semaphore = Arc::new(Semaphore::new(20));
let mut join_set = JoinSet::new();
for batch in batches {
let permit = semaphore.clone().acquire_owned().await?;
let db_pool = db_pool.clone();
join_set.spawn(async move {
let result = process_batch(&db_pool, &batch).await;
drop(permit); // освобождаем перед тем как JoinSet соберёт результат
result
});
}
// Дренируем все задачи — собираем ошибки вместо тихой потери
while let Some(result) = join_set.join_next().await {
match result {
Ok(Ok(())) => {}
Ok(Err(e)) => tracing::error!(error = %e, "обработка батча не удалась"),
Err(e) => tracing::error!(error = %e, "задача запаниковала"),
}
}
Этот паттерн гарантирует: (1) ограниченную конкурентность через семафор, (2) отсутствие тихих падений задач, (3) родительская задача знает, когда все дочерние завершились, (4) пермиты всегда освобождаются, даже если задача паникует (потому что JoinSet::join_next возвращает JoinError для запаниковавших задач, а пермит дропается вместе с состоянием задачи).
Решение 4: Отдельные семафоры для отдельных задач
Никогда не используйте один семафор для несвязанных операций. В нашем MEV-конвейере мы используем отдельные семафоры:
pub struct ConcurrencyLimits {
/// Ограничивает конкурентные RPC-вызовы симуляции к узлу
pub simulation: Arc<Semaphore>,
/// Ограничивает конкурентные записи в БД для персистенции цепочек
pub db_writes: Arc<Semaphore>,
/// Ограничивает конкурентные обработчики подписки на мемпул
pub mempool: Arc<Semaphore>,
}
impl ConcurrencyLimits {
pub fn new() -> Self {
Self {
simulation: Arc::new(Semaphore::new(4)), // узел выдерживает 4 параллельных батча
db_writes: Arc::new(Semaphore::new(20)), // 20 из 50 соединений пула
mempool: Arc::new(Semaphore::new(100)), // 100 конкурентных обработчиков tx
}
}
}
Это полностью устраняет дедлок вложенного acquire — задача захватывает пермит simulation, затем пермит db_writes, и это независимые пулы.
Производительность: накладные расходы семафора
Является ли сам семафор узким местом? На практике — нет. tokio::sync::Semaphore реализован на атомарном счётчике и интрузивном связном списке ожидающих. Захват неоспариваемого пермита — один fetch_sub, наносекунды. Даже при contention накладные расходы — это уведомление waker’а (тоже наносекунды) по сравнению с миллисекундами вашего реального I/O.
Мы замерили накладные расходы семафора в нашем конвейере:
| Операция | Без семафора | С семафором (20 пермитов) |
|---|---|---|
| 10 000 записей в БД | 1 340 мс (все конкурентно, ошибки исчерпания пула) | 1 580 мс (контролируемо, ноль ошибок) |
| 500 RPC-симуляций | 8 200 мс (узел перегружен, таймауты) | 9 100 мс (по 4 за раз, ноль таймаутов) |
| Память (2,7М задач) | 15,4 ГБ | 0,8 ГБ (батчами с JoinSet) |
10-15% накладных расходов по wall-clock времени пренебрежимо малы по сравнению с устранением ошибок, таймаутов и OOM-крешей.
Заключение
Семафоры в асинхронном Rust обманчиво просты — acquire, выполняем работу, дропаем пермит. Сложность проявляется в продакшне: пермиты, утёкшие в долгоживущие структуры, вложенные acquire через стек вызовов, пермиты, удерживаемые через границы отмены select!.
Защитный плейбук:
- Всегда используйте
acquire_owned()при порождении задач - Всегда оборачивайте acquire в таймаут
- Никогда не делайте вложенный acquire на одном семафоре
- Разделяйте семафоры для разных пулов ресурсов
- Инструментируйте доступные пермиты метриками и трейсингом
- Используйте JoinSet вместо неограниченного
tokio::spawnдля структурированной конкурентности
Эти паттерны выдержали миллионы обработанных блоков, сотни тысяч конкурентных задач и ноль дедлоков в продакшне с момента их внедрения.