Deep EVM #29:Async Rustでのセマフォ — デッドロックハンティングとFire-and-Forgetパターン
Engineering Team
Async Rustでセマフォが必要な理由
高スループットパイプライン — ブロックあたり180,000のアービトラージチェーンを処理するMEVボット、10,000の同時リクエストを処理するAPIサーバー、数百万行を書き込むETLジョブ — を実行すると、必ずリソースの上限に達します。データベースコネクションプールが枯渇し、RPCプロバイダーがレート制限をかけ、50,000のtokioタスクをスポーンしてメモリが膨張します。
ナイーブなアプローチは無制限の並行性:すべての作業単位にtokio::spawnして、ランタイムが何とかすることを期待します。しかし、そうはなりません。プロダクションでは270万のスポーンされたタスクから15.4GBのメモリ使用を観測しました。修正はセマフォベースのバックプレッシャーによるバッチ並行性で、メモリを0.8GBに削減しました。
セマフォは、完全にシリアライズせずに並行操作数を制限する必要がある場合に適切なプリミティブです。ミューテックス(正確に1つ)とは異なり、セマフォはN個の同時アクセサーを許可します。
用途:
- データベース書き込み並行性:コネクションプールサイズに制限(例:20の同時書き込み)
- RPCレート制限:429レスポンスを避けるために発信リクエストを制限
- メモリバックプレッシャー:利用可能なパーミットでゲートして無制限タスクスポーンを防止
- バッチ処理:共有ノードに対して並列実行するシミュレーションバッチ数を制御
tokio::sync::Semaphoreの基本
tokio::sync::Semaphoreは非同期コード用のカウンティングセマフォです。利用可能なパーミットの内部カウンターを維持します。タスクは処理前にパーミットを取得し、完了時に解放します。
use std::sync::Arc;
use tokio::sync::Semaphore;
let semaphore = Arc::new(Semaphore::new(20));
for chain in chains_to_persist {
let sem = semaphore.clone();
let db_pool = db_pool.clone();
tokio::spawn(async move {
let _permit = sem.acquire().await.unwrap();
sqlx::query("UPDATE chains SET profit = $1 WHERE id = $2")
.bind(chain.profit)
.bind(chain.id)
.execute(&db_pool)
.await
.ok();
// _permitはここでドロップ — 自動的に解放
});
}
重要な設計選択:acquire()はRAIIガードを返します。ガードがドロップされるとパーミットが解放されます。パニック、早期リターン、?オペレーターのベイルアウトでも自動クリーンアップが得られます。
Fire-and-Forget書き込みパターン
高スループットシステムでは、ホットパスをブロックせずにデータを永続化したいことがよくあります。パターン:バックグラウンドタスクをスポーンし、セマフォパーミットを取得し、書き込みを実行し、解放。呼び出し側は結果を待ちません。
pub struct AsyncChainStore<S: ChainStore> {
inner: Arc<S>,
semaphore: Arc<Semaphore>,
}
impl<S: ChainStore + Send + Sync + 'static> AsyncChainStore<S> {
pub fn save_profits_async(&self, chains: Vec<ChainProfit>) {
let inner = self.inner.clone();
let sem = self.semaphore.clone();
tokio::spawn(async move {
let _permit = match sem.acquire().await {
Ok(p) => p,
Err(_) => {
tracing::warn!("セマフォクローズ、書き込みドロップ");
return;
}
};
if let Err(e) = inner.batch_update_profits(&chains).await {
tracing::error!(error = %e, count = chains.len(), "fire-and-forget書き込み失敗");
}
});
}
}
なぜDBプールの組み込み接続制限を使わないのか?セマフォは別のノブを提供するからです。プールに50接続があっても、fire-and-forget書き込みには最大20を使い、30をレイテンシクリティカルな読み取りに予約できます。
デッドロックシナリオ
セマフォデッドロックは陰湿です。パニックやエラーは発生せず、プログラムが単に進行を停止します。
シナリオ1:早期リターンでパーミット未解放
RAIIガードはほとんどの早期リターンから保護します。危険なのはパーミットを長寿命の構造体に移動する場合です。
シナリオ2:ネストされたAcquire(セルフデッドロック)
let _outer = sem.acquire().await?;
let _inner = sem.acquire().await?; // N=1ならデッドロック
修正:別の関心事には別のセマフォを使用。
シナリオ3:Select内のawaitポイントでパーミット保持
select!マクロは負けたブランチのfutureをドロップしてキャンセルしますが、そのfuture内でスポーンされたタスクは実行を続けます。
セマフォデッドロックの診断
トレーシングベースの診断
acquire/releaseを構造化ログで計装。「acquiring」が出るが「acquired」が出ない場合、すべてのパーミットがどこかで保持されています。
Prometheusメトリクス
利用可能パーミットのゲージメトリクスを公開。ゼロに落ちて留まるゲージはデッドロック。
tokio-console
実行中のtokioアプリケーションにアタッチし、セマフォacquireでIdleステートのタスクを表示。
プロダクション強化ソリューション
ソリューション1:OwnedSemaphorePermitとArc
タスクスポーン時はacquire_owned()を使用。ライフタイム問題を完全に回避。
ソリューション2:タイムアウト付きAcquire
let permit = timeout(Duration::from_secs(30), semaphore.acquire()).await
.map_err(|_| anyhow!("セマフォacquireタイムアウト — デッドロックの可能性"))?
.map_err(|_| anyhow!("セマフォクローズ"))?;
ソリューション3:JoinSetによる構造化並行性
無制限のtokio::spawnの代わりにJoinSetを使用。バウンドされた並行性、サイレントタスク失敗なし、すべての子の完了を把握。
ソリューション4:関心事ごとに別のセマフォ
pub struct ConcurrencyLimits {
pub simulation: Arc<Semaphore>, // ノードへのRPCシミュレーション
pub db_writes: Arc<Semaphore>, // データベース書き込み
pub mempool: Arc<Semaphore>, // メンプールハンドラー
}
ネストされたacquireデッドロックを完全に排除。
パフォーマンス:セマフォオーバーヘッド
tokio::sync::Semaphoreはアトミックカウンターと侵入型リンクリストで実装。非競合パーミットの取得は単一のfetch_sub — ナノ秒。
| 操作 | セマフォなし | セマフォあり(20パーミット) |
|---|---|---|
| 10,000 DB書き込み | 1,340ms(プール枯渇エラー) | 1,580ms(制御済み、エラーゼロ) |
| 500 RPCシミュレーション | 8,200ms(ノード過負荷) | 9,100ms(4並列、タイムアウトゼロ) |
| メモリ(270万タスク) | 15.4 GB | 0.8 GB(JoinSetでバッチ) |
10-15%のウォールクロックオーバーヘッドは、エラー、タイムアウト、OOMクラッシュの排除と比較して無視できます。
まとめ
Async Rustでのセマフォは見かけ上シンプルです — acquire、作業実行、パーミットドロップ。複雑さはプロダクションで現れます:長寿命構造体にリークしたパーミット、コールスタック間のネストされたacquire、select!キャンセレーション境界で保持されたパーミット。
防御的プレイブック:
- タスクスポーン時は常に
acquire_owned()を使用 - acquireには常にタイムアウトをラップ
- 同じセマフォでのacquireを決してネストしない
- 別のリソースプールには別のセマフォを分離
- メトリクスとトレーシングで利用可能パーミットを計装
- 無制限
tokio::spawnの代わりにJoinSetを使用
これらのパターンは、数百万のブロック処理、数十万の同時タスク、そしてこれらを採用して以降のプロダクションでのゼロデッドロックで実証されています。