Pipeline Data Throughput Tinggi — Batch Insert, COPY, dan Resolusi Konflik
Engineering Team
Tantangan: Jutaan Insert per Jam
Ketika aplikasi Anda perlu memasukkan ratusan ribu atau jutaan baris per jam — event logging, data IoT, atau indeksasi blockchain — insert satu per satu tidak akan cukup. Anda memerlukan teknik bulk loading.
Perbandingan Metode Insert
| Metode | Throughput | Latensi | Kompleksitas |
|---|---|---|---|
| Single INSERT | ~500/detik | Rendah | Rendah |
| Batch INSERT | ~10.000/detik | Sedang | Sedang |
| COPY | ~100.000/detik | Tinggi | Sedang |
| COPY BINARY | ~150.000/detik | Tinggi | Tinggi |
Batch INSERT
Gabungkan beberapa INSERT menjadi satu statement:
async fn batch_insert(
pool: &PgPool,
events: &[Event],
) -> Result<(), DbError> {
if events.is_empty() { return Ok(()); }
let mut query = String::from(
"INSERT INTO events (id, event_type, payload, created_at) VALUES "
);
let mut params: Vec<Box<dyn sqlx::Encode<Postgres> + Send>> = Vec::new();
for (i, event) in events.iter().enumerate() {
if i > 0 { query.push_str(", "); }
let base = i * 4;
query.push_str(&format!(
"(${}, ${}, ${}, ${})",
base + 1, base + 2, base + 3, base + 4
));
}
query.push_str(" ON CONFLICT (id) DO NOTHING");
let mut q = sqlx::query(&query);
for event in events {
q = q.bind(&event.id)
.bind(&event.event_type)
.bind(&event.payload)
.bind(&event.created_at);
}
q.execute(pool).await?;
Ok(())
}
Performance tip: batch size 100-1000 memberikan throughput optimal. Lebih dari 1000 dan overhead parsing SQL membatalkan keuntungan.
COPY Protocol
COPY adalah protokol PostgreSQL yang dioptimasi untuk bulk loading. Jauh lebih cepat dari INSERT karena melewati SQL parser:
use tokio_postgres::CopyInSink;
use bytes::BytesMut;
async fn copy_insert(
client: &tokio_postgres::Client,
events: &[Event],
) -> Result<u64, DbError> {
let sink = client
.copy_in("COPY events (id, event_type, payload, created_at) FROM STDIN WITH (FORMAT csv)")
.await?;
let writer = BufWriter::new(sink);
let mut csv_writer = csv::Writer::from_writer(writer);
for event in events {
csv_writer.write_record(&[
event.id.to_string(),
event.event_type.clone(),
serde_json::to_string(&event.payload)?,
event.created_at.to_rfc3339(),
])?;
}
csv_writer.flush()?;
let rows = csv_writer.into_inner()?.finish().await?;
Ok(rows)
}
ON CONFLICT: Upsert Pattern
-- Insert or update
INSERT INTO metrics (sensor_id, timestamp, value)
VALUES ($1, $2, $3)
ON CONFLICT (sensor_id, timestamp) DO UPDATE SET
value = EXCLUDED.value,
updated_at = NOW();
-- Insert or skip
INSERT INTO events (id, type, data)
VALUES ($1, $2, $3)
ON CONFLICT (id) DO NOTHING;
Buffering Pattern di Rust
struct InsertBuffer {
buffer: Vec<Event>,
capacity: usize,
pool: PgPool,
flush_interval: Duration,
}
impl InsertBuffer {
fn new(pool: PgPool, capacity: usize, flush_interval: Duration) -> Self {
Self {
buffer: Vec::with_capacity(capacity),
capacity,
pool,
flush_interval,
}
}
async fn add(&mut self, event: Event) -> Result<(), DbError> {
self.buffer.push(event);
if self.buffer.len() >= self.capacity {
self.flush().await?;
}
Ok(())
}
async fn flush(&mut self) -> Result<(), DbError> {
if self.buffer.is_empty() { return Ok(()); }
let events: Vec<Event> = self.buffer.drain(..).collect();
batch_insert(&self.pool, &events).await
}
}
// Dengan timer otomatis
async fn run_buffer(mut buffer: InsertBuffer, mut rx: mpsc::Receiver<Event>) {
let mut interval = tokio::time::interval(buffer.flush_interval);
loop {
tokio::select! {
Some(event) = rx.recv() => {
buffer.add(event).await.unwrap();
}
_ = interval.tick() => {
buffer.flush().await.unwrap();
}
}
}
}
Optimasi Tambahan
1. Disable Index Sementara
-- Untuk bulk load besar
ALTER INDEX idx_events_created_at SET (fastupdate = off);
-- Load data...
ALTER INDEX idx_events_created_at SET (fastupdate = on);
REINDEX INDEX idx_events_created_at;
2. Unlogged Table
-- Tidak menulis WAL — 2-3x lebih cepat, tetapi tidak crash-safe
CREATE UNLOGGED TABLE staging_events (LIKE events);
-- Load ke staging, lalu INSERT INTO events SELECT FROM staging_events;
3. Tuning WAL
# postgresql.conf untuk bulk loading
wal_level = minimal
max_wal_senders = 0
fsync = off # PERINGATAN: risiko data loss pada crash!
Monitoring Throughput
-- Rows inserted per detik
SELECT
relname,
n_tup_ins AS total_inserts,
n_tup_ins - lag(n_tup_ins) OVER (ORDER BY now()) AS inserts_per_interval
FROM pg_stat_user_tables
WHERE relname = 'events';
Kesimpulan
Pipeline data throughput tinggi memerlukan batch insert atau COPY untuk menghindari overhead per-row. Buffering di aplikasi mengumpulkan event sebelum flush, ON CONFLICT menangani duplikat, dan tuning PostgreSQL (WAL, index, unlogged table) memberikan throughput maksimal. Untuk kasus ekstrem, COPY binary memberikan throughput tertinggi dengan overhead parsing minimal.