Deep EVM #28: 고처리량 데이터 파이프라인 — 배치 삽입, COPY, 충돌 해결
Engineering Team
고처리량 파이프라인의 과제
블록체인 인덱서를 운영하고 있습니다. 각 블록에는 평균 200개의 트랜잭션이 있고, 12초마다 새 블록이 도착합니다. 각 트랜잭션에서 여러 이벤트를 추출하므로, 초당 약 500-1000개의 행을 PostgreSQL에 삽입해야 합니다. 동시에 이 데이터를 쿼리하는 API도 서비스해야 합니다.
단순한 행 단위 INSERT로는 처리량이 부족합니다. 네트워크 왕복, 트랜잭션 오버헤드, WAL 쓰기가 각 INSERT마다 발생하기 때문입니다.
배치 삽입
가장 간단한 최적화는 여러 행을 하나의 INSERT 문으로 묶는 것입니다:
// 나쁨: 행 단위 삽입 (행당 1 왕복)
for tx in &transactions {
sqlx::query(
"INSERT INTO transactions (hash, from_addr, to_addr, value) VALUES ($1, $2, $3, $4)"
)
.bind(&tx.hash)
.bind(&tx.from)
.bind(&tx.to)
.bind(&tx.value)
.execute(&pool)
.await?;
}
// 좋음: 배치 삽입 (배치당 1 왕복)
let mut query_builder = sqlx::QueryBuilder::new(
"INSERT INTO transactions (hash, from_addr, to_addr, value) "
);
query_builder.push_values(&transactions, |mut b, tx| {
b.push_bind(&tx.hash)
.push_bind(&tx.from)
.push_bind(&tx.to)
.push_bind(&tx.value);
});
query_builder.build().execute(&pool).await?;
성능 비교:
| 방법 | 1000행 삽입 시간 | 초당 처리량 |
|---|---|---|
| 개별 INSERT | 847ms | 1,181 행/초 |
| 배치 INSERT (100행) | 23ms | 43,478 행/초 |
| COPY 프로토콜 | 8ms | 125,000 행/초 |
COPY 프로토콜: 최대 처리량
COPY는 PostgreSQL의 최고 속도 데이터 로딩 방법입니다. SQL 파싱과 실행 계획을 우회하고, 바이너리 형식으로 직접 데이터를 스트리밍합니다:
use tokio_postgres::types::ToSql;
async fn bulk_insert_copy(
client: &tokio_postgres::Client,
transactions: &[Transaction],
) -> Result<u64> {
let sink = client.copy_in(
"COPY transactions (hash, from_addr, to_addr, value, block_number) FROM STDIN WITH (FORMAT binary)"
).await?;
let writer = BinaryCopyInWriter::new(
sink,
&[
Type::BYTEA, // hash
Type::BYTEA, // from_addr
Type::BYTEA, // to_addr
Type::NUMERIC, // value
Type::INT8, // block_number
],
);
pin_mut!(writer);
for tx in transactions {
writer.as_mut().write(&[
&tx.hash as &(dyn ToSql + Sync),
&tx.from,
&tx.to,
&tx.value,
&tx.block_number,
]).await?;
}
writer.finish().await?;
Ok(transactions.len() as u64)
}
COPY가 빠른 이유:
- SQL 파서 우회
- 실행 계획 없음
- 배치로 WAL에 쓰기
- 바이너리 형식으로 인코딩/디코딩 오버헤드 최소화
벌크 업서트: ON CONFLICT
중복 데이터가 도착할 수 있는 경우(블록 재구성, 리트라이 등), 업서트가 필요합니다:
INSERT INTO transactions (hash, from_addr, to_addr, value, block_number)
VALUES
($1, $2, $3, $4, $5),
($6, $7, $8, $9, $10),
...
ON CONFLICT (hash) DO UPDATE SET
block_number = EXCLUDED.block_number,
value = EXCLUDED.value;
대규모 업서트 최적화:
-- 임시 테이블을 스테이징 영역으로 사용
CREATE TEMP TABLE staging_transactions (
LIKE transactions INCLUDING DEFAULTS
) ON COMMIT DROP;
-- COPY로 스테이징 테이블에 빠르게 로드
COPY staging_transactions FROM STDIN WITH (FORMAT binary);
-- 스테이징에서 실제 테이블로 업서트
INSERT INTO transactions
SELECT * FROM staging_transactions
ON CONFLICT (hash) DO UPDATE SET
block_number = EXCLUDED.block_number,
value = EXCLUDED.value;
이 패턴은 COPY의 속도와 ON CONFLICT의 안전성을 결합합니다.
WAL 튜닝
Write-Ahead Log(WAL)는 데이터 내구성을 보장하지만, 쓰기 성능의 주요 병목입니다:
# postgresql.conf — 고처리량 최적화
wal_level = replica # minimal이면 WAL 감소
max_wal_size = 4GB # 기본 1GB -> 체크포인트 빈도 감소
min_wal_size = 1GB
checkpoint_completion_target = 0.9 # 체크포인트를 분산
wal_compression = zstd # WAL 크기 30-50% 감소
commit_delay = 100 # 마이크로초, 커밋 배치
commit_siblings = 5 # 동시 트랜잭션이 5개 이상이면 배치
대량 로드 시 일시적 조정:
-- 대량 로드 전
ALTER TABLE transactions SET UNLOGGED; -- WAL 비활성화 (주의!)
DROP INDEX idx_transactions_block; -- 인덱스 제거
-- 대량 로드 실행
-- ...
-- 대량 로드 후
ALTER TABLE transactions SET LOGGED; -- WAL 재활성화
CREATE INDEX CONCURRENTLY idx_transactions_block ON transactions (block_number);
커넥션 풀링과 PgBouncer
고처리량 시스템에서 커넥션 관리는 핵심입니다:
# pgbouncer.ini
[databases]
mydb = host=localhost port=5432 dbname=mydb
[pgbouncer]
listen_port = 6432
pool_mode = transaction
max_client_conn = 500
default_pool_size = 25
min_pool_size = 10
reserve_pool_size = 5
server_idle_timeout = 300
log_connections = 0
log_disconnections = 0
Rust 애플리케이션에서 PgBouncer를 통한 연결:
let pool = PgPoolOptions::new()
.max_connections(50)
.min_connections(10)
.acquire_timeout(Duration::from_secs(3))
.idle_timeout(Duration::from_secs(300))
.connect("postgres://user:pass@localhost:6432/mydb")
.await?;
모니터링: pg_stat_statements
CREATE EXTENSION IF NOT EXISTS pg_stat_statements;
-- 쓰기 처리량이 높은 쿼리
SELECT
LEFT(query, 80) AS query,
calls,
ROUND(total_exec_time::numeric / 1000, 2) AS total_sec,
ROUND(mean_exec_time::numeric, 2) AS avg_ms,
rows AS total_rows,
ROUND(rows::numeric / calls, 0) AS rows_per_call
FROM pg_stat_statements
WHERE query LIKE 'INSERT%' OR query LIKE 'COPY%'
ORDER BY total_exec_time DESC
LIMIT 10;
파이프라인 아키텍처
전체 파이프라인을 조합하면:
[블록체인 노드] -> [WebSocket 수신]
|
v
[이벤트 디코더]
|
v
[배치 버퍼] (100행 또는 100ms마다 플러시)
|
v
[COPY Writer] -> [PgBouncer] -> [PostgreSQL]
|
v
[WAL -> 리플리카]
이 아키텍처로 초당 100,000행 이상의 안정적인 삽입 처리량을 달성할 수 있습니다.
결론
고처리량 PostgreSQL 데이터 파이프라인 구축의 핵심은: 행 단위 INSERT 대신 배치 삽입 또는 COPY 프로토콜 사용, ON CONFLICT를 통한 안전한 업서트, WAL 튜닝으로 쓰기 오버헤드 감소, PgBouncer를 통한 커넥션 다중화, pg_stat_statements를 통한 지속적 모니터링. 이러한 기법을 조합하면, PostgreSQL은 전문 시계열 데이터베이스에 필적하는 쓰기 처리량을 달성할 수 있습니다.