DevOpsMar 28, 2026
Deep EVM #28: خط أنابيب بيانات عالي الإنتاجية — إدراج دُفعات وCOPY وحل التعارضات
OS
Open Soft Team
Engineering Team
المشكلة: ملايين الصفوف في الثانية
عندما تحتاج لإدراج 100,000+ صف في الثانية — بيانات بلوكتشين، سجلات أحداث، بيانات استشعار — INSERT العادي لا يكفي.
مقارنة طرق الإدراج
INSERT الفردي
INSERT INTO events (id, data) VALUES ($1, $2);
-- ~1,000 صف/ثانية
إدراج الدُفعات
INSERT INTO events (id, data) VALUES
($1, $2), ($3, $4), ($5, $6), ...;
-- ~50,000 صف/ثانية
COPY Protocol
COPY events (id, data) FROM STDIN (FORMAT BINARY);
-- ~500,000 صف/ثانية
COPY مع sqlx في Rust
use sqlx::postgres::PgCopyIn;
async fn bulk_insert(pool: &PgPool, events: &[Event]) -> Result<()> {
let mut conn = pool.acquire().await?;
let mut copy = conn
.copy_in_raw("COPY events (id, data, created_at) FROM STDIN WITH (FORMAT CSV)")
.await?;
for event in events {
let line = format!("{},{},{}\n", event.id, event.data, event.created_at);
copy.send(line.as_bytes()).await?;
}
copy.finish().await?;
Ok(())
}
ON CONFLICT للوحدانية
INSERT INTO chain_profits (chain_id, profit, updated_at)
VALUES ($1, $2, NOW())
ON CONFLICT (chain_id) DO UPDATE SET
profit = EXCLUDED.profit,
updated_at = EXCLUDED.updated_at
WHERE chain_profits.profit < EXCLUDED.profit;
نمط المُنتِج-المستهلِك للدُفعات
use tokio::sync::mpsc;
async fn batch_writer(
pool: PgPool,
mut rx: mpsc::Receiver<Event>,
batch_size: usize,
flush_interval: Duration,
) {
let mut buffer = Vec::with_capacity(batch_size);
let mut interval = tokio::time::interval(flush_interval);
loop {
tokio::select! {
Some(event) = rx.recv() => {
buffer.push(event);
if buffer.len() >= batch_size {
flush(&pool, &mut buffer).await;
}
}
_ = interval.tick() => {
if !buffer.is_empty() {
flush(&pool, &mut buffer).await;
}
}
}
}
}
async fn flush(pool: &PgPool, buffer: &mut Vec<Event>) {
if let Err(e) = bulk_insert(pool, buffer).await {
tracing::error!("فشل الإدراج الجماعي: {e}");
}
buffer.clear();
}
مقاييس الأداء
| الطريقة | الصفوف/ثانية | الزمن (100K صف) |
|---|---|---|
| INSERT فردي | 1,000 | 100 ثانية |
| INSERT دُفعات (100) | 50,000 | 2 ثانية |
| INSERT دُفعات (1000) | 100,000 | 1 ثانية |
| COPY | 500,000 | 0.2 ثانية |
الخلاصة
لخطوط أنابيب البيانات عالية الإنتاجية: استخدم COPY للسرعة القصوى، إدراج الدُفعات للمرونة، ON CONFLICT للوحدانية، ونمط المُنتِج-المستهلِك للتحكم في التدفق.