انتقل إلى المحتوى الرئيسي
DevOpsMar 28, 2026

Deep EVM #28: خط أنابيب بيانات عالي الإنتاجية — إدراج دُفعات وCOPY وحل التعارضات

OS
Open Soft Team

Engineering Team

المشكلة: ملايين الصفوف في الثانية

عندما تحتاج لإدراج 100,000+ صف في الثانية — بيانات بلوكتشين، سجلات أحداث، بيانات استشعار — INSERT العادي لا يكفي.

مقارنة طرق الإدراج

INSERT الفردي

INSERT INTO events (id, data) VALUES ($1, $2);
-- ~1,000 صف/ثانية

إدراج الدُفعات

INSERT INTO events (id, data) VALUES
    ($1, $2), ($3, $4), ($5, $6), ...;
-- ~50,000 صف/ثانية

COPY Protocol

COPY events (id, data) FROM STDIN (FORMAT BINARY);
-- ~500,000 صف/ثانية

COPY مع sqlx في Rust

use sqlx::postgres::PgCopyIn;

async fn bulk_insert(pool: &PgPool, events: &[Event]) -> Result<()> {
    let mut conn = pool.acquire().await?;
    let mut copy = conn
        .copy_in_raw("COPY events (id, data, created_at) FROM STDIN WITH (FORMAT CSV)")
        .await?;
    
    for event in events {
        let line = format!("{},{},{}\n", event.id, event.data, event.created_at);
        copy.send(line.as_bytes()).await?;
    }
    
    copy.finish().await?;
    Ok(())
}

ON CONFLICT للوحدانية

INSERT INTO chain_profits (chain_id, profit, updated_at)
VALUES ($1, $2, NOW())
ON CONFLICT (chain_id) DO UPDATE SET
    profit = EXCLUDED.profit,
    updated_at = EXCLUDED.updated_at
WHERE chain_profits.profit < EXCLUDED.profit;

نمط المُنتِج-المستهلِك للدُفعات

use tokio::sync::mpsc;

async fn batch_writer(
    pool: PgPool,
    mut rx: mpsc::Receiver<Event>,
    batch_size: usize,
    flush_interval: Duration,
) {
    let mut buffer = Vec::with_capacity(batch_size);
    let mut interval = tokio::time::interval(flush_interval);
    
    loop {
        tokio::select! {
            Some(event) = rx.recv() => {
                buffer.push(event);
                if buffer.len() >= batch_size {
                    flush(&pool, &mut buffer).await;
                }
            }
            _ = interval.tick() => {
                if !buffer.is_empty() {
                    flush(&pool, &mut buffer).await;
                }
            }
        }
    }
}

async fn flush(pool: &PgPool, buffer: &mut Vec<Event>) {
    if let Err(e) = bulk_insert(pool, buffer).await {
        tracing::error!("فشل الإدراج الجماعي: {e}");
    }
    buffer.clear();
}

مقاييس الأداء

الطريقةالصفوف/ثانيةالزمن (100K صف)
INSERT فردي1,000100 ثانية
INSERT دُفعات (100)50,0002 ثانية
INSERT دُفعات (1000)100,0001 ثانية
COPY500,0000.2 ثانية

الخلاصة

لخطوط أنابيب البيانات عالية الإنتاجية: استخدم COPY للسرعة القصوى، إدراج الدُفعات للمرونة، ON CONFLICT للوحدانية، ونمط المُنتِج-المستهلِك للتحكم في التدفق.