Deep EVM #24: Context Propagation in Async Rust — Deadlines, Cancellation, and Tracing
Engineering Team
The Missing Context
Go has context.Context — a request-scoped value that carries deadlines, cancellation signals, and key-value pairs across API boundaries and goroutines. Every well-written Go function takes a ctx context.Context as its first parameter.
Rust has no built-in equivalent. Tokio provides cancellation via CancellationToken and timeouts via tokio::time::timeout, but there is no unified context type that propagates deadlines, cancellation, and metadata through an async call chain. Let us build one.
Building a Context Type
Our context needs three capabilities: deadlines, cancellation, and key-value metadata:
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::sync::watch;
use tokio_util::sync::CancellationToken;
#[derive(Clone)]
pub struct Context {
deadline: Option<Instant>,
cancel_token: CancellationToken,
metadata: Arc<ContextMetadata>,
}
struct ContextMetadata {
request_id: String,
trace_id: Option<String>,
parent: Option<Arc<ContextMetadata>>,
values: std::collections::HashMap<String, String>,
}
impl Context {
/// Create a background context with no deadline or cancellation.
pub fn background() -> Self {
Self {
deadline: None,
cancel_token: CancellationToken::new(),
metadata: Arc::new(ContextMetadata {
request_id: uuid::Uuid::new_v4().to_string(),
trace_id: None,
parent: None,
values: std::collections::HashMap::new(),
}),
}
}
/// Create a child context with a deadline.
pub fn with_deadline(&self, deadline: Instant) -> Self {
let effective_deadline = match self.deadline {
Some(existing) => existing.min(deadline),
None => deadline,
};
Self {
deadline: Some(effective_deadline),
cancel_token: self.cancel_token.child_token(),
metadata: self.metadata.clone(),
}
}
/// Create a child context with a timeout relative to now.
pub fn with_timeout(&self, duration: Duration) -> Self {
self.with_deadline(Instant::now() + duration)
}
/// Cancel this context and all children.
pub fn cancel(&self) {
self.cancel_token.cancel();
}
/// Check if the context is still valid.
pub fn is_done(&self) -> bool {
if self.cancel_token.is_cancelled() {
return true;
}
if let Some(deadline) = self.deadline {
return Instant::now() >= deadline;
}
false
}
/// Time remaining until deadline, or None if no deadline.
pub fn remaining(&self) -> Option<Duration> {
self.deadline.map(|d| d.saturating_duration_since(Instant::now()))
}
/// Get the request ID for tracing.
pub fn request_id(&self) -> &str {
&self.metadata.request_id
}
}
Deadline-Aware Operations
Wrap async operations with context-aware timeouts:
impl Context {
/// Run a future with this context's deadline.
/// Returns Err if the context expires before the future completes.
pub async fn run<F, T>(&self, future: F) -> Result<T, ContextError>
where
F: std::future::Future<Output = T>,
{
if self.is_done() {
return Err(ContextError::DeadlineExceeded);
}
let cancel = self.cancel_token.clone();
match self.deadline {
Some(deadline) => {
let timeout = deadline.saturating_duration_since(Instant::now());
tokio::select! {
result = future => Ok(result),
_ = tokio::time::sleep(timeout) => {
Err(ContextError::DeadlineExceeded)
}
_ = cancel.cancelled() => {
Err(ContextError::Cancelled)
}
}
}
None => {
tokio::select! {
result = future => Ok(result),
_ = cancel.cancelled() => {
Err(ContextError::Cancelled)
}
}
}
}
}
}
#[derive(Debug, thiserror::Error)]
pub enum ContextError {
#[error("context deadline exceeded")]
DeadlineExceeded,
#[error("context cancelled")]
Cancelled,
}
Usage in a service:
async fn fetch_user(ctx: &Context, db: &PgPool, id: i64) -> Result<User> {
ctx.run(async {
sqlx::query_as::<_, User>("SELECT * FROM users WHERE id = $1")
.bind(id)
.fetch_one(db)
.await
}).await
.map_err(|e| match e {
ContextError::DeadlineExceeded => {
anyhow::anyhow!("Database query timed out")
}
ContextError::Cancelled => {
anyhow::anyhow!("Request was cancelled")
}
})?
.map_err(Into::into)
}
Cancellation with CancellationToken
Tokio’s CancellationToken supports hierarchical cancellation — cancelling a parent token automatically cancels all children:
use tokio_util::sync::CancellationToken;
async fn handle_request(parent_token: CancellationToken) {
let child_token = parent_token.child_token();
// Spawn a subtask with the child token
let handle = tokio::spawn(async move {
tokio::select! {
result = do_expensive_work() => {
tracing::info!("Work completed: {:?}", result);
}
_ = child_token.cancelled() => {
tracing::info!("Work cancelled");
}
}
});
// If parent is cancelled, child is automatically cancelled
tokio::time::sleep(Duration::from_secs(5)).await;
parent_token.cancel(); // Cancels child_token too
handle.await.unwrap();
}
Graceful Shutdown with Cancellation
Use a root cancellation token for coordinated shutdown across all services:
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let shutdown = CancellationToken::new();
// Spawn services with child tokens
let http_server = tokio::spawn(
run_http_server(shutdown.child_token())
);
let block_watcher = tokio::spawn(
run_block_watcher(shutdown.child_token())
);
let metrics_server = tokio::spawn(
run_metrics_server(shutdown.child_token())
);
// Wait for shutdown signal
tokio::signal::ctrl_c().await?;
tracing::info!("Shutdown signal received");
// Cancel all services
shutdown.cancel();
// Wait for graceful shutdown with a timeout
let timeout = Duration::from_secs(30);
tokio::time::timeout(timeout, async {
let _ = tokio::join!(http_server, block_watcher, metrics_server);
}).await.ok();
tracing::info!("Shutdown complete");
Ok(())
}
tokio::select! for Cancellation Patterns
tokio::select! is the primary mechanism for responding to cancellation:
async fn process_with_cancellation(
ctx: &Context,
items: Vec<Item>,
) -> Result<Vec<ProcessedItem>> {
let mut results = Vec::with_capacity(items.len());
for item in items {
// Check context before each item
if ctx.is_done() {
tracing::warn!(
processed = results.len(),
remaining = items.len() - results.len(),
"Context expired, returning partial results"
);
break;
}
let result = ctx.run(process_item(&item)).await?;
results.push(result);
}
Ok(results)
}
The Drop Guard Pattern
Ensure cleanup happens even when a future is cancelled:
struct CleanupGuard {
resource: Option<TempResource>,
}
impl Drop for CleanupGuard {
fn drop(&mut self) {
if let Some(resource) = self.resource.take() {
// Synchronous cleanup
resource.release();
tracing::debug!("Resource cleaned up on drop");
}
}
}
async fn work_with_cleanup() -> Result<()> {
let resource = acquire_resource().await?;
let _guard = CleanupGuard {
resource: Some(resource.clone()),
};
// Even if this future is cancelled (dropped),
// the guard's Drop runs and cleans up
do_work(&resource).await?;
// Successful completion — prevent double cleanup
_guard.resource = None;
resource.commit().await?;
Ok(())
}
Tracing Spans Across Async Boundaries
Tracing spans do not automatically propagate across tokio::spawn boundaries. You must explicitly carry them:
use tracing::{info_span, Instrument};
async fn handle_request(req: Request) -> Response {
let span = info_span!(
"request",
method = %req.method(),
path = %req.uri().path(),
request_id = %uuid::Uuid::new_v4(),
);
async move {
// This span is active in the current task
tracing::info!("Processing request");
// Spawn a subtask — must explicitly attach the span
let current_span = tracing::Span::current();
let handle = tokio::spawn(
async move {
// This subtask inherits the parent span
tracing::info!("Subtask running");
do_background_work().await
}
.instrument(info_span!(parent: ¤t_span, "subtask"))
);
let result = handle.await?;
tracing::info!("Request complete");
result
}
.instrument(span)
.await
}
Structured Concurrency
Combine context propagation with structured concurrency to ensure all spawned work is tracked and cancellable:
struct TaskGroup {
handles: Vec<tokio::task::JoinHandle<anyhow::Result<()>>>,
cancel: CancellationToken,
}
impl TaskGroup {
fn new(cancel: CancellationToken) -> Self {
Self {
handles: Vec::new(),
cancel,
}
}
fn spawn<F>(&mut self, name: &str, future: F)
where
F: std::future::Future<Output = anyhow::Result<()>> + Send + 'static,
{
let cancel = self.cancel.child_token();
let span = info_span!("task", name = name);
let handle = tokio::spawn(
async move {
tokio::select! {
result = future => result,
_ = cancel.cancelled() => {
tracing::info!("Task cancelled");
Ok(())
}
}
}
.instrument(span)
);
self.handles.push(handle);
}
async fn join_all(self) -> anyhow::Result<()> {
let results = futures::future::join_all(self.handles).await;
for result in results {
result??;
}
Ok(())
}
}
Usage:
let cancel = CancellationToken::new();
let mut group = TaskGroup::new(cancel.clone());
group.spawn("block_processor", process_blocks(db.clone()));
group.spawn("price_oracle", update_prices(cache.clone()));
group.spawn("metrics", publish_metrics(registry.clone()));
// Cancel all tasks on shutdown
tokio::signal::ctrl_c().await?;
cancel.cancel();
group.join_all().await?;
Putting It All Together
Here is a complete example combining Context, CancellationToken, and tracing spans in an Axum handler:
async fn api_handler(
State(state): State<AppState>,
req: Request,
) -> Result<Json<ApiResponse>, AppError> {
// Create context with 5-second deadline
let ctx = Context::background()
.with_timeout(Duration::from_secs(5));
let span = info_span!(
"api",
request_id = %ctx.request_id(),
remaining_ms = ctx.remaining()
.map(|d| d.as_millis() as u64)
.unwrap_or(0),
);
async move {
let user = fetch_user(&ctx, &state.db, user_id).await?;
let orders = fetch_orders(&ctx, &state.db, user_id).await?;
// Check remaining time before expensive operation
if ctx.remaining().map(|d| d < Duration::from_secs(1)).unwrap_or(false) {
tracing::warn!("Less than 1s remaining, skipping enrichment");
return Ok(Json(ApiResponse::partial(user, orders)));
}
let enriched = enrich_orders(&ctx, &state.cache, orders).await?;
Ok(Json(ApiResponse::full(user, enriched)))
}
.instrument(span)
.await
}
Conclusion
Context propagation in async Rust requires explicit effort but delivers enormous benefits: deadline-aware operations that fail fast instead of hanging, hierarchical cancellation that cleanly shuts down complex systems, and tracing spans that survive async boundaries. Build a Context type, use CancellationToken for lifecycle management, instrument with tracing spans, and wrap spawned work in TaskGroups for structured concurrency. These patterns transform scattered async tasks into a coherent, observable, and controllable system.