Context Propagation di Async Rust — Deadline, Pembatalan, dan Tracing
Engineering Team
Masalah Context di Async Rust
Dalam aplikasi async, satu request HTTP memicu rantai operasi: query database, panggilan API eksternal, komputasi, dan lainnya. Setiap operasi ini perlu mengetahui tentang context request: berapa waktu tersisa (deadline), apakah request sudah dibatalkan, dan ID request untuk tracing.
Rust tidak memiliki implicit context seperti Go’s context.Context. Kita harus mengelolanya secara eksplisit — tetapi hasilnya lebih type-safe.
Deadline dan Timeout
tokio::time::timeout
use tokio::time::{timeout, Duration};
async fn fetch_with_timeout(url: &str) -> Result<Response, AppError> {
timeout(Duration::from_secs(5), reqwest::get(url))
.await
.map_err(|_| AppError::Timeout)?
.map_err(AppError::from)
}
Propagasi Deadline
struct RequestContext {
deadline: Instant,
request_id: Uuid,
span: tracing::Span,
}
impl RequestContext {
fn remaining(&self) -> Duration {
self.deadline.saturating_duration_since(Instant::now())
}
fn is_expired(&self) -> bool {
Instant::now() >= self.deadline
}
}
async fn process_order(ctx: &RequestContext, order: Order) -> Result<(), AppError> {
if ctx.is_expired() {
return Err(AppError::DeadlineExceeded);
}
// Propagasi sisa waktu ke sub-operasi
let remaining = ctx.remaining();
timeout(remaining, save_to_database(&order)).await??;
let remaining = ctx.remaining();
timeout(remaining, notify_warehouse(&order)).await??;
Ok(())
}
Pembatalan Kooperatif
CancellationToken
use tokio_util::sync::CancellationToken;
async fn long_running_task(cancel: CancellationToken) {
loop {
tokio::select! {
_ = cancel.cancelled() => {
tracing::info!("Task dibatalkan");
cleanup().await;
return;
}
result = do_work() => {
match result {
Ok(()) => continue,
Err(e) => {
tracing::error!("Error: {:?}", e);
return;
}
}
}
}
}
}
// Di handler HTTP:
async fn handle_request() -> Response {
let cancel = CancellationToken::new();
let child_cancel = cancel.child_token();
let task = tokio::spawn(long_running_task(child_cancel));
// Jika client disconnect, batalkan task
tokio::select! {
result = task => { /* task selesai */ }
_ = client_disconnected() => {
cancel.cancel();
}
}
}
Distributed Tracing
use tracing::{instrument, info_span, Instrument};
#[instrument(skip(pool), fields(request_id = %request_id))]
async fn handle_order(
pool: &PgPool,
request_id: Uuid,
order: CreateOrder,
) -> Result<Order, AppError> {
let order = create_order(pool, &order)
.instrument(info_span!("db.create_order"))
.await?;
send_notification(&order)
.instrument(info_span!("notify.send"))
.await?;
Ok(order)
}
Integrasi dengan OpenTelemetry
use tracing_subscriber::prelude::*;
use tracing_opentelemetry::OpenTelemetryLayer;
fn setup_tracing() {
let tracer = opentelemetry_jaeger::new_agent_pipeline()
.with_service_name("my-api")
.install_simple()
.unwrap();
tracing_subscriber::registry()
.with(tracing_subscriber::fmt::layer())
.with(OpenTelemetryLayer::new(tracer))
.init();
}
Middleware Axum untuk Context
use axum::middleware::{self, Next};
async fn context_middleware(
mut request: Request,
next: Next,
) -> Response {
let request_id = Uuid::new_v4();
let deadline = Instant::now() + Duration::from_secs(30);
let ctx = RequestContext {
deadline,
request_id,
span: info_span!("request", id = %request_id),
};
request.extensions_mut().insert(ctx);
let response = next.run(request)
.instrument(info_span!("http", id = %request_id))
.await;
response
}
Kesimpulan
Context propagation di async Rust memerlukan pendekatan eksplisit: struct RequestContext untuk deadline, CancellationToken untuk pembatalan kooperatif, dan tracing spans untuk distributed tracing. Meskipun memerlukan lebih banyak kode dari implicit context, hasilnya lebih type-safe dan lebih mudah di-debug.