انتقل إلى المحتوى الرئيسي
الهندسةMar 28, 2026

Deep EVM #29: السيمافورات في Rust غير المتزامن — مطاردة الجمود وأنماط أطلق وانسَ

OS
Open Soft Team

Engineering Team

لماذا السيمافورات في Rust غير المتزامن

عندما تشغّل خط أنابيب عالي الإنتاجية — روبوت MEV يعالج 180,000 سلسلة مراجحة لكل كتلة، أو خادم API يتعامل مع 10,000 طلب متزامن، أو مهمة ETL تكتب ملايين الصفوف — ستصطدم حتماً بسقف الموارد. تنفد اتصالات مجمع قاعدة البيانات. يحد مزودو RPC من معدل طلباتك. تنتفخ الذاكرة لأنك أطلقت 50,000 مهمة tokio، كل منها يحمل بيانات سلسلة.

النهج الساذج هو التزامن غير المحدود: tokio::spawn لكل وحدة عمل وتأمل أن يتدبر وقت التشغيل الأمر. لن يفعل. في الإنتاج، لاحظنا استخدام ذاكرة 15.4 جيجابايت من 2.7 مليون مهمة مُطلقة، كل منها يحمل Vec<Hop> بالإضافة إلى سياق المحاكاة. الحل كان التزامن المُجمّع مع ضغط خلفي قائم على السيمافور، مما أنزل الذاكرة إلى 0.8 جيجابايت.

السيمافور هو البنية الأساسية المناسبة عندما تحتاج للحد من عدد العمليات المتزامنة دون تسلسلها بالكامل. على عكس المُقفل (mutex) الذي يسمح بواحد فقط بالضبط، يسمح السيمافور بـ N متزامنين. هذا يجعله مثالياً لـ:

  • التزامن في كتابة قاعدة البيانات: الحد بحجم مجمع الاتصالات (مثلاً 20 كتابة متزامنة)
  • تحديد معدل RPC: تحديد سقف الطلبات الصادرة لتجنب استجابات 429
  • الضغط الخلفي للذاكرة: منع إطلاق المهام غير المحدود بالبوابة على التصاريح المتاحة
  • معالجة الدُفعات: التحكم في عدد دُفعات المحاكاة التي تعمل بالتوازي

أساسيات tokio::sync::Semaphore

tokio::sync::Semaphore هو سيمافور عدّاد مصمم للكود غير المتزامن. يحتفظ بعدّاد داخلي للتصاريح المتاحة. تستحوذ المهام على التصاريح قبل المتابعة وتحررها عند الانتهاء.

use std::sync::Arc;
use tokio::sync::Semaphore;

// السماح بحد أقصى 20 كتابة متزامنة في قاعدة البيانات
let semaphore = Arc::new(Semaphore::new(20));

for chain in chains_to_persist {
    let sem = semaphore.clone();
    let db_pool = db_pool.clone();

    tokio::spawn(async move {
        // استحواذ على تصريح — يحظر إذا كانت كل الـ 20 مستخدمة
        let _permit = sem.acquire().await.unwrap();

        // تنفيذ الكتابة — التصريح محتفظ به في هذا النطاق
        sqlx::query("UPDATE chains SET profit = $1 WHERE id = $2")
            .bind(chain.profit)
            .bind(chain.id)
            .execute(&db_pool)
            .await
            .ok();

        // يتم إسقاط _permit هنا — يُحرر تلقائياً
    });
}

واجهة برمجة التطبيقات الرئيسية:

  • Semaphore::new(permits) — إنشاء بـ N تصريح
  • acquire(&self) — غير متزامن، ينتظر حتى يتوفر تصريح، يُرجع SemaphorePermit (حارس RAII)
  • try_acquire(&self) — متزامن، يُرجع Err فوراً إذا لم تتوفر تصاريح
  • acquire_owned(self: Arc<Self>) — يُرجع OwnedSemaphorePermit يملك Arc
  • add_permits(&self, n) — زيادة عدد التصاريح ديناميكياً
  • close(&self) — إغلاق السيمافور
  • available_permits(&self) — فحص العدد الحالي

اختيار التصميم الحاسم: acquire() يُرجع حارس RAII. عندما يُسقط الحارس، يُحرر التصريح. هذا يعني تنظيفاً تلقائياً حتى مع الذعر والعودة المبكرة وعامل ?.

نمط الكتابة أطلق وانسَ

في الأنظمة عالية الإنتاجية، غالباً ما تريد حفظ البيانات بدون حظر المسار الساخن. النمط: إطلاق مهمة خلفية تستحوذ على تصريح سيمافور وتنفذ الكتابة وتحرره. المستدعي لا ينتظر النتيجة.

use std::sync::Arc;
use tokio::sync::Semaphore;

pub struct AsyncChainStore<S: ChainStore> {
    inner: Arc<S>,
    semaphore: Arc<Semaphore>,
}

impl<S: ChainStore + Send + Sync + 'static> AsyncChainStore<S> {
    pub fn new(inner: Arc<S>, max_concurrent_writes: usize) -> Self {
        Self {
            inner,
            semaphore: Arc::new(Semaphore::new(max_concurrent_writes)),
        }
    }

    pub fn save_profits_async(&self, chains: Vec<ChainProfit>) {
        let inner = self.inner.clone();
        let sem = self.semaphore.clone();

        tokio::spawn(async move {
            let _permit = match sem.acquire().await {
                Ok(p) => p,
                Err(_) => {
                    tracing::warn!("السيمافور مغلق، تجاهل الكتابة");
                    return;
                }
            };

            if let Err(e) = inner.batch_update_profits(&chains).await {
                tracing::error!(
                    error = %e,
                    count = chains.len(),
                    "فشلت كتابة أطلق وانسَ"
                );
            }
        });
    }
}

نمط المُزخرف هذا يبقي ChainStore الداخلي نقياً — لا يعرف شيئاً عن التحكم في التزامن. المُزخرف يتعامل مع إدارة السيمافور وتسجيل الأخطاء وإطلاق المهام.

سيناريوهات الجمود

جمود السيمافور غدّار لأنه لا يسبب ذعراً أو خطأً — البرنامج ببساطة يتوقف عن التقدم.

السيناريو 1: استحواذ متداخل (جمود ذاتي)

async fn simulate_and_persist(
    sem: &Semaphore,
    chain: &Chain,
) -> Result<()> {
    let _outer = sem.acquire().await?; // يأخذ 1 من N تصريح
    let result = simulate(chain).await?;
    let _inner = sem.acquire().await?; // خطأ: إذا N=1، جمود!
    persist(result).await?;
    Ok(())
}

مع N=1 هذا جمود فوري. مع N>1 يعمل حتى يزداد الحمل وتكون كل التصاريح محتجزة بمهام تنتظر استحواذها الداخلي.

السيناريو 2: تصريح محتفظ به عبر select!

async fn process_with_timeout(sem: &Semaphore) -> Result<()> {
    let permit = sem.acquire().await?;
    tokio::select! {
        result = do_work() => { result?; }
        _ = tokio::time::sleep(Duration::from_secs(30)) => {
            tracing::warn!("انتهت المهلة");
        }
    }
    Ok(())
}

ماكرو select! يلغي الفرع الخاسر بإسقاط مستقبله، لكن أي مهام أُطلقت داخل ذلك المستقبل تستمر في العمل.

تشخيص جمود السيمافور

التشخيص القائم على التتبع

let permits_before = semaphore.available_permits();
tracing::debug!(available = permits_before, "استحواذ على تصريح سيمافور");
let _permit = semaphore.acquire().await?;
tracing::debug!(available = semaphore.available_permits(), "تم الاستحواذ");

إذا أظهرت السجلات “استحواذ” بدون “تم الاستحواذ”، كل التصاريح محتجزة في مكان ما.

مقاييس Prometheus

use metrics::gauge;
gauge!("semaphore_available_permits", semaphore.available_permits() as f64);

مقياس ينخفض لصفر ويبقى هناك = جمود. مقياس يتأرجح قرب الصفر = تنافس.

tokio-console

tokio-console أداة تشخيص تتصل بتطبيق tokio قيد التشغيل وتعرض حالات المهام في الوقت الفعلي.

حلول جاهزة للإنتاج

الحل 1: استخدم دائماً acquire_owned

let semaphore = Arc::new(Semaphore::new(20));
for item in items {
    let permit = semaphore.clone().acquire_owned().await?;
    tokio::spawn(async move {
        do_work(item).await;
        drop(permit);
    });
}

الحل 2: استحواذ مع مهلة

use tokio::time::{timeout, Duration};
let permit = timeout(
    Duration::from_secs(30),
    semaphore.acquire(),
).await
    .map_err(|_| anyhow!("انتهت مهلة استحواذ السيمافور — جمود محتمل"))?
    .map_err(|_| anyhow!("السيمافور مغلق"))?;

الحل 3: التزامن المنظم مع JoinSet

use tokio::task::JoinSet;
let semaphore = Arc::new(Semaphore::new(20));
let mut join_set = JoinSet::new();

for batch in batches {
    let permit = semaphore.clone().acquire_owned().await?;
    let db_pool = db_pool.clone();
    join_set.spawn(async move {
        let result = process_batch(&db_pool, &batch).await;
        drop(permit);
        result
    });
}

while let Some(result) = join_set.join_next().await {
    match result {
        Ok(Ok(())) => {}
        Ok(Err(e)) => tracing::error!(error = %e, "فشلت معالجة الدُفعة"),
        Err(e) => tracing::error!(error = %e, "ذعر المهمة"),
    }
}

الحل 4: سيمافورات منفصلة لأغراض منفصلة

pub struct ConcurrencyLimits {
    pub simulation: Arc<Semaphore>,
    pub db_writes: Arc<Semaphore>,
    pub mempool: Arc<Semaphore>,
}

impl ConcurrencyLimits {
    pub fn new() -> Self {
        Self {
            simulation: Arc::new(Semaphore::new(4)),
            db_writes: Arc::new(Semaphore::new(20)),
            mempool: Arc::new(Semaphore::new(100)),
        }
    }
}

هذا يلغي جمود الاستحواذ المتداخل تماماً — مهمة تستحوذ على تصريح simulation، ثم تصريح db_writes، وهذه مجمعات مستقلة.

الأداء: حمل السيمافور

هل السيمافور نفسه عنق زجاجة؟ عملياً، لا. tokio::sync::Semaphore مبني على عدّاد ذري وقائمة مرتبطة تطفلية للمنتظرين. الاستحواذ على تصريح غير متنافس هو عملية fetch_sub واحدة — نانوثوانٍ.

قسنا حمل السيمافور في خط أنابيبنا:

العمليةبدون سيمافورمع سيمافور (20 تصريح)
10,000 كتابة DB1,340ms (أخطاء استنفاد المجمع)1,580ms (صفر أخطاء)
500 محاكاة RPC8,200ms (مهلات العقدة)9,100ms (صفر مهلات)
الذاكرة (2.7M مهمة)15.4 جيجابايت0.8 جيجابايت

الحمل 10-15% في وقت الجدار مهمل مقارنة بإلغاء الأخطاء والمهلات وانهيارات نفاد الذاكرة.

الخلاصة

السيمافورات في Rust غير المتزامن بسيطة بشكل مخادع — استحوذ، اعمل، أسقط التصريح. التعقيد يظهر في الإنتاج: تصاريح مسربة في هياكل طويلة العمر، استحواذات متداخلة عبر مكدسات الاستدعاء، تصاريح محتفظ بها عبر حدود إلغاء select!.

الدليل الدفاعي:

  1. استخدم دائماً acquire_owned() عند إطلاق المهام
  2. غلّف دائماً الاستحواذ بمهلة
  3. لا تستحوذ أبداً بشكل متداخل على نفس السيمافور
  4. افصل السيمافورات لمجمعات الموارد المنفصلة
  5. أجهّز التصاريح المتاحة بالمقاييس والتتبع
  6. استخدم JoinSet بدلاً من tokio::spawn غير المحدود

هذه الأنماط صمدت عبر ملايين الكتل المعالجة، ومئات الآلاف من المهام المتزامنة، وصفر حالات جمود في الإنتاج منذ اعتمادها.

الوسوم