Langsung ke konten utama
RekayasaMar 28, 2026

Deep EVM #29: Semaphore di Async Rust — Berburu Deadlock dan Pola Fire-and-Forget

OS
Open Soft Team

Engineering Team

Mengapa Semaphore di Async Rust

Ketika Anda menjalankan pipeline throughput tinggi — bot MEV yang memproses 180.000 rantai arbitrase per block, server API yang menangani 10.000 request bersamaan, atau job ETL yang menulis jutaan baris — Anda pasti akan menabrak batas sumber daya. Connection pool database habis. Provider RPC membatasi rate Anda. Memory membengkak karena Anda men-spawn 50.000 task tokio, masing-masing memegang data sebesar satu rantai.

Pendekatan naif adalah concurrency tanpa batas: tokio::spawn untuk setiap unit kerja dan berharap runtime mengurusnya. Tidak demikian. Dalam produksi, kami mengamati penggunaan memory 15,4 GB dari 2,7 juta task yang di-spawn, masing-masing memegang Vec<Hop> plus konteks simulasi. Perbaikannya adalah concurrency berbatch dengan backpressure berbasis semaphore, yang menurunkan memory ke 0,8 GB.

Semaphore adalah primitif yang tepat ketika Anda perlu membatasi jumlah operasi bersamaan tanpa men-serialize mereka sepenuhnya. Tidak seperti mutex (yang mengizinkan tepat satu), semaphore mengizinkan N aksesor bersamaan. Ini membuatnya sempurna untuk:

  • Concurrency write database: batasi ke ukuran connection pool (misalnya 20 write bersamaan)
  • Rate limiting RPC: batasi request keluar untuk menghindari respons 429 dari provider
  • Backpressure memory: cegah spawning task tanpa batas dengan gating pada permit yang tersedia
  • Pemrosesan batch: kontrol berapa banyak batch simulasi yang berjalan paralel terhadap shared node

Dasar-Dasar tokio::sync::Semaphore

tokio::sync::Semaphore adalah counting semaphore yang dirancang untuk kode async. Ia mempertahankan counter internal dari permit yang tersedia. Task mengakuisisi permit sebelum melanjutkan dan melepasnya ketika selesai.

use std::sync::Arc;
use tokio::sync::Semaphore;

// Izinkan hingga 20 write database bersamaan
let semaphore = Arc::new(Semaphore::new(20));

for chain in chains_to_persist {
    let sem = semaphore.clone();
    let db_pool = db_pool.clone();

    tokio::spawn(async move {
        // Akuisisi permit — memblokir jika semua 20 sedang digunakan
        let _permit = sem.acquire().await.unwrap();

        // Lakukan write — permit dipegang untuk scope ini
        sqlx::query("UPDATE chains SET profit = $1 WHERE id = $2")
            .bind(chain.profit)
            .bind(chain.id)
            .execute(&db_pool)
            .await
            .ok();

        // _permit di-drop di sini — otomatis dilepas
    });
}

API surface kunci:

  • Semaphore::new(permits) — buat dengan N permit
  • acquire(&self) — async, menunggu sampai permit tersedia, mengembalikan SemaphorePermit (guard RAII)
  • try_acquire(&self) — non-async, mengembalikan Err(TryAcquireError) langsung jika tidak ada permit
  • acquire_owned(self: Arc<Self>) — mengembalikan OwnedSemaphorePermit yang memiliki Arc, berguna ketika permit harus outlive borrow
  • add_permits(&self, n) — tingkatkan jumlah permit secara dinamis
  • close(&self) — tutup semaphore; semua panggilan acquire yang menunggu mengembalikan Err(AcquireError)
  • available_permits(&self) — inspeksi jumlah saat ini (berguna untuk metrik)

Pilihan desain kritis: acquire() mengembalikan guard RAII. Ketika guard di-drop, permit dilepas. Ini berarti Anda mendapat cleanup otomatis bahkan pada panic, early return, dan bailout operator ? — selama guard hidup di stack.

Pola Write Fire-and-Forget

Dalam sistem throughput tinggi, Anda sering ingin mempersist data tanpa memblokir hot path. Polanya: spawn task background yang mengakuisisi permit semaphore, melakukan write, dan melepas. Pemanggil tidak menunggu hasilnya.

Dalam sistem yang dirancang dengan baik, ini diimplementasikan sebagai decorator yang membungkus service storage inner:

use std::sync::Arc;
use tokio::sync::Semaphore;

pub struct AsyncChainStore<S: ChainStore> {
    inner: Arc<S>,
    semaphore: Arc<Semaphore>,
}

impl<S: ChainStore + Send + Sync + 'static> AsyncChainStore<S> {
    pub fn new(inner: Arc<S>, max_concurrent_writes: usize) -> Self {
        Self {
            inner,
            semaphore: Arc::new(Semaphore::new(max_concurrent_writes)),
        }
    }

    /// Fire-and-forget: persist profit rantai tanpa memblokir pemanggil.
    /// Backpressure ditegakkan oleh semaphore — jika semua permit
    /// terpakai, task yang di-spawn menunggu, tetapi pemanggil kembali segera.
    pub fn save_profits_async(&self, chains: Vec<ChainProfit>) {
        let inner = self.inner.clone();
        let sem = self.semaphore.clone();

        tokio::spawn(async move {
            // Akuisisi permit — di sinilah backpressure terjadi
            let _permit = match sem.acquire().await {
                Ok(p) => p,
                Err(_) => {
                    tracing::warn!("semaphore ditutup, membuang write");
                    return;
                }
            };

            if let Err(e) = inner.batch_update_profits(&chains).await {
                tracing::error!(
                    error = %e,
                    count = chains.len(),
                    "write fire-and-forget gagal"
                );
            }
            // _permit di-drop di sini — task berikutnya yang mengantre melanjutkan
        });
    }
}

Pola decorator ini menjaga ChainStore inner tetap murni — ia tidak tahu apa-apa tentang kontrol concurrency. Decorator menangani manajemen semaphore, logging error, dan spawning task. ServiceLocator menghubungkan mereka:

// Di locator/mod.rs
let chain_store = Arc::new(PgChainStore::new(db_pool.clone()));
let async_chain_store = AsyncChainStore::new(chain_store, 20);

Mengapa tidak cukup menggunakan batas koneksi bawaan database pool? Karena semaphore memberikan knob terpisah. Pool Anda mungkin memiliki 50 koneksi, tetapi Anda ingin write fire-and-forget menggunakan paling banyak 20, menyisakan 30 untuk read yang sensitif latensi. Semaphore menegakkan partisi ini di tingkat aplikasi.

Skenario Deadlock

Deadlock semaphore berbahaya karena tidak menyebabkan panic atau error — program hanya berhenti membuat progress. Berikut pola yang kami temui di produksi.

Skenario 1: Permit Tidak Dilepas pada Early Return

async fn process_batch(sem: &Semaphore, batch: &[Chain]) -> Result<()> {
    let permit = sem.acquire().await?;

    // Early return jika batch kosong — tetapi permit masih dipegang!
    if batch.is_empty() {
        return Ok(()); // permit di-drop di sini — sebenarnya tidak masalah
    }

    // Bahaya sebenarnya: menyimpan permit di struct yang outlive scope
    let ctx = ProcessingContext {
        permit: Some(permit), // permit dipindahkan ke struct
        batch,
    };

    // Jika process() menyimpan ctx di tempat yang berumur panjang, permit bocor
    process(ctx).await?;
    Ok(())
}

Guard RAII melindungi Anda dari kebanyakan early return. Bahaya datang ketika Anda memindahkan permit ke struct yang keluar dari lifetime yang diharapkan — disimpan di HashMap, dikirim melalui channel, atau dipegang di static.

Skenario 2: Nested Acquire (Self-Deadlock)

async fn simulate_and_persist(
    sem: &Semaphore,
    chain: &Chain,
) -> Result<()> {
    let _outer = sem.acquire().await?; // Mengambil 1 dari N permit

    let result = simulate(chain).await?;

    // BUG: mengakuisisi semaphore YANG SAMA di dalam permit yang dipegang
    let _inner = sem.acquire().await?; // Jika N=1, deadlock. Jika N>1, mengurangi throughput
    persist(result).await?;

    Ok(())
}

Dengan N=1, ini deadlock instan. Dengan N>1, bekerja sampai beban meningkat dan semua permit dipegang oleh task yang menunggu inner acquire mereka. Perbaikan: gunakan semaphore terpisah untuk concern terpisah, atau restrukturisasi untuk menghindari akuisisi bersarang.

Skenario 3: Permit Dipegang Melintasi Await Point di Select

async fn process_with_timeout(sem: &Semaphore) -> Result<()> {
    let permit = sem.acquire().await?;

    tokio::select! {
        result = do_work() => {
            result?;
        }
        _ = tokio::time::sleep(Duration::from_secs(30)) => {
            tracing::warn!("timeout, tetapi permit masih dipegang sampai drop");
        }
    }
    // permit di-drop di sini — ini sebenarnya benar
    // TETAPI: jika do_work() men-spawn sub-task yang menangkap permit,
    // pembatalan do_work() TIDAK membatalkan sub-task
    Ok(())
}

Makro select! membatalkan branch yang kalah dengan men-drop future-nya, tetapi task yang di-spawn di dalam future tersebut terus berjalan. Jika task tersebut menangkap referensi ke permit, permit tidak dilepas ketika diharapkan.

Mendiagnosis Deadlock Semaphore

Ketika sistem Anda berhenti membuat progress, bagaimana mengidentifikasi deadlock semaphore versus dependency yang lambat?

Diagnosis Berbasis Tracing

Instrumentasi acquire/release dengan structured logging:

let permits_before = semaphore.available_permits();
tracing::debug!(
    available = permits_before,
    "mengakuisisi permit semaphore"
);

let _permit = semaphore.acquire().await?;

tracing::debug!(
    available = semaphore.available_permits(),
    "permit semaphore diakuisisi"
);

Jika log Anda menunjukkan “mengakuisisi” tetapi tidak pernah “diakuisisi”, semua permit dipegang di suatu tempat.

Metrik Prometheus

Ekspos gauge metric untuk permit yang tersedia:

use metrics::gauge;

gauge!("semaphore_available_permits", semaphore.available_permits() as f64);

Gauge yang turun ke nol dan tetap di sana adalah deadlock. Gauge yang berfluktuasi dekat nol adalah contention.

tokio-console

tokio-console adalah alat diagnostik yang terhubung ke aplikasi tokio yang berjalan dan menampilkan state task secara real time:

cargo add --dev console-subscriber
#[cfg(debug_assertions)]
console_subscriber::init();

Jalankan tokio-console di terminal lain dan cari task yang stuck di state “Idle” pada semaphore acquire.

Solusi Produksi

Solusi 1: Selalu Gunakan OwnedSemaphorePermit dengan Arc

Ketika spawning task, lebih baik gunakan acquire_owned() daripada acquire(). Varian owned mengambil Arc<Semaphore> dan mengembalikan permit yang tidak meminjam semaphore — ia memiliki clone dari Arc:

let semaphore = Arc::new(Semaphore::new(20));

for item in items {
    let permit = semaphore.clone().acquire_owned().await?;

    tokio::spawn(async move {
        // permit dipindahkan ke task — tanpa masalah lifetime
        do_work(item).await;
        drop(permit); // drop eksplisit untuk kejelasan
    });
}

Solusi 2: Acquire dengan Timeout

Jangan pernah menunggu permit tanpa batas di produksi. Gunakan tokio::time::timeout untuk mendeteksi deadlock lebih awal:

use tokio::time::{timeout, Duration};

let permit = timeout(
    Duration::from_secs(30),
    semaphore.acquire(),
).await
    .map_err(|_| anyhow!("acquire semaphore timeout — kemungkinan deadlock"))?
    .map_err(|_| anyhow!("semaphore ditutup"))?;

Ketika timeout menyala, log permit yang tersedia, jumlah task yang menunggu, dan stack trace.

Solusi 3: Structured Concurrency dengan JoinSet

Alih-alih tokio::spawn tanpa batas, gunakan tokio::task::JoinSet untuk mempertahankan ownership dari task yang di-spawn dan padukan dengan semaphore:

use tokio::task::JoinSet;

let semaphore = Arc::new(Semaphore::new(20));
let mut join_set = JoinSet::new();

for batch in batches {
    let permit = semaphore.clone().acquire_owned().await?;
    let db_pool = db_pool.clone();

    join_set.spawn(async move {
        let result = process_batch(&db_pool, &batch).await;
        drop(permit); // lepas sebelum JoinSet mengumpulkan
        result
    });
}

// Drain semua task — kumpulkan error alih-alih kehilangan diam-diam
while let Some(result) = join_set.join_next().await {
    match result {
        Ok(Ok(())) => {}
        Ok(Err(e)) => tracing::error!(error = %e, "pemrosesan batch gagal"),
        Err(e) => tracing::error!(error = %e, "task panic"),
    }
}

Pola ini memastikan: (1) concurrency terbatas via semaphore, (2) tidak ada kegagalan task yang diam, (3) task induk tahu kapan semua anak selesai, (4) permit selalu dilepas meskipun task panic.

Solusi 4: Semaphore Terpisah untuk Concern Terpisah

Jangan pernah berbagi satu semaphore untuk operasi yang tidak terkait. Dalam pipeline MEV kami, kami menggunakan semaphore terpisah:

pub struct ConcurrencyLimits {
    /// Membatasi panggilan simulasi RPC bersamaan ke node
    pub simulation: Arc<Semaphore>,
    /// Membatasi write database bersamaan untuk persistensi rantai
    pub db_writes: Arc<Semaphore>,
    /// Membatasi handler langganan mempool bersamaan
    pub mempool: Arc<Semaphore>,
}

impl ConcurrencyLimits {
    pub fn new() -> Self {
        Self {
            simulation: Arc::new(Semaphore::new(4)),   // node menangani 4 batch paralel
            db_writes: Arc::new(Semaphore::new(20)),   // 20 dari 50 koneksi pool
            mempool: Arc::new(Semaphore::new(100)),    // 100 handler tx bersamaan
        }
    }
}

Ini menghilangkan deadlock nested acquire sepenuhnya — task mengakuisisi permit simulation, lalu permit db_writes, dan ini adalah pool independen.

Performa: Overhead Semaphore

Apakah semaphore itu sendiri bottleneck? Dalam praktik, tidak. tokio::sync::Semaphore diimplementasikan dengan counter atomik dan linked list intrusif dari waiter. Mengakuisisi permit yang tidak dikontestasi adalah satu fetch_sub — nanosecond. Bahkan di bawah contention, overhead-nya adalah notifikasi waker (juga nanosecond) versus milidetik yang diambil I/O aktual Anda.

Kami membenchmark overhead semaphore di pipeline kami:

OperasiTanpa SemaphoreDengan Semaphore (20 permit)
10.000 write DB1.340ms (semua bersamaan, error pool habis)1.580ms (terkontrol, zero error)
500 simulasi RPC8.200ms (node overload, timeout)9.100ms (4 sekaligus, zero timeout)
Memory (2,7 juta task)15,4 GB0,8 GB (berbatch dengan JoinSet)

Overhead wall-clock 10-15% dapat diabaikan dibandingkan menghilangkan error, timeout, dan crash OOM.

Kesimpulan

Semaphore di async Rust terlihat sederhana — acquire, lakukan kerja, drop permit. Kompleksitas muncul di produksi: permit yang bocor ke struct berumur panjang, nested acquire lintas call stack, permit yang dipegang melintasi batas pembatalan select!.

Playbook pertahanan:

  1. Selalu gunakan acquire_owned() ketika spawning task
  2. Selalu bungkus acquire dalam timeout
  3. Jangan pernah nest acquire pada semaphore yang sama
  4. Pisahkan semaphore untuk pool sumber daya yang berbeda
  5. Instrumentasi permit yang tersedia dengan metrik dan tracing
  6. Gunakan JoinSet alih-alih tokio::spawn tanpa batas untuk structured concurrency

Pola-pola ini telah bertahan melintasi jutaan block yang diproses, ratusan ribu task bersamaan, dan zero deadlock di produksi sejak mengadopsinya.