Перейти к основному содержимому
EngineeringMar 28, 2026

Deep EVM #24: Context Propagation in Async Rust — Deadlines, Cancellation, and Tracing

OS
Open Soft Team

Engineering Team

The Missing Context

Go has context.Context — a request-scoped value that carries deadlines, cancellation signals, and key-value pairs across API boundaries and goroutines. Every well-written Go function takes a ctx context.Context as its first parameter.

Rust has no built-in equivalent. Tokio provides cancellation via CancellationToken and timeouts via tokio::time::timeout, but there is no unified context type that propagates deadlines, cancellation, and metadata through an async call chain. Let us build one.

Building a Context Type

Our context needs three capabilities: deadlines, cancellation, and key-value metadata:

use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::sync::watch;
use tokio_util::sync::CancellationToken;

#[derive(Clone)]
pub struct Context {
    deadline: Option<Instant>,
    cancel_token: CancellationToken,
    metadata: Arc<ContextMetadata>,
}

struct ContextMetadata {
    request_id: String,
    trace_id: Option<String>,
    parent: Option<Arc<ContextMetadata>>,
    values: std::collections::HashMap<String, String>,
}

impl Context {
    /// Create a background context with no deadline or cancellation.
    pub fn background() -> Self {
        Self {
            deadline: None,
            cancel_token: CancellationToken::new(),
            metadata: Arc::new(ContextMetadata {
                request_id: uuid::Uuid::new_v4().to_string(),
                trace_id: None,
                parent: None,
                values: std::collections::HashMap::new(),
            }),
        }
    }

    /// Create a child context with a deadline.
    pub fn with_deadline(&self, deadline: Instant) -> Self {
        let effective_deadline = match self.deadline {
            Some(existing) => existing.min(deadline),
            None => deadline,
        };
        Self {
            deadline: Some(effective_deadline),
            cancel_token: self.cancel_token.child_token(),
            metadata: self.metadata.clone(),
        }
    }

    /// Create a child context with a timeout relative to now.
    pub fn with_timeout(&self, duration: Duration) -> Self {
        self.with_deadline(Instant::now() + duration)
    }

    /// Cancel this context and all children.
    pub fn cancel(&self) {
        self.cancel_token.cancel();
    }

    /// Check if the context is still valid.
    pub fn is_done(&self) -> bool {
        if self.cancel_token.is_cancelled() {
            return true;
        }
        if let Some(deadline) = self.deadline {
            return Instant::now() >= deadline;
        }
        false
    }

    /// Time remaining until deadline, or None if no deadline.
    pub fn remaining(&self) -> Option<Duration> {
        self.deadline.map(|d| d.saturating_duration_since(Instant::now()))
    }

    /// Get the request ID for tracing.
    pub fn request_id(&self) -> &str {
        &self.metadata.request_id
    }
}

Deadline-Aware Operations

Wrap async operations with context-aware timeouts:

impl Context {
    /// Run a future with this context's deadline.
    /// Returns Err if the context expires before the future completes.
    pub async fn run<F, T>(&self, future: F) -> Result<T, ContextError>
    where
        F: std::future::Future<Output = T>,
    {
        if self.is_done() {
            return Err(ContextError::DeadlineExceeded);
        }

        let cancel = self.cancel_token.clone();

        match self.deadline {
            Some(deadline) => {
                let timeout = deadline.saturating_duration_since(Instant::now());
                tokio::select! {
                    result = future => Ok(result),
                    _ = tokio::time::sleep(timeout) => {
                        Err(ContextError::DeadlineExceeded)
                    }
                    _ = cancel.cancelled() => {
                        Err(ContextError::Cancelled)
                    }
                }
            }
            None => {
                tokio::select! {
                    result = future => Ok(result),
                    _ = cancel.cancelled() => {
                        Err(ContextError::Cancelled)
                    }
                }
            }
        }
    }
}

#[derive(Debug, thiserror::Error)]
pub enum ContextError {
    #[error("context deadline exceeded")]
    DeadlineExceeded,
    #[error("context cancelled")]
    Cancelled,
}

Usage in a service:

async fn fetch_user(ctx: &Context, db: &PgPool, id: i64) -> Result<User> {
    ctx.run(async {
        sqlx::query_as::<_, User>("SELECT * FROM users WHERE id = $1")
            .bind(id)
            .fetch_one(db)
            .await
    }).await
    .map_err(|e| match e {
        ContextError::DeadlineExceeded => {
            anyhow::anyhow!("Database query timed out")
        }
        ContextError::Cancelled => {
            anyhow::anyhow!("Request was cancelled")
        }
    })?
    .map_err(Into::into)
}

Cancellation with CancellationToken

Tokio’s CancellationToken supports hierarchical cancellation — cancelling a parent token automatically cancels all children:

use tokio_util::sync::CancellationToken;

async fn handle_request(parent_token: CancellationToken) {
    let child_token = parent_token.child_token();

    // Spawn a subtask with the child token
    let handle = tokio::spawn(async move {
        tokio::select! {
            result = do_expensive_work() => {
                tracing::info!("Work completed: {:?}", result);
            }
            _ = child_token.cancelled() => {
                tracing::info!("Work cancelled");
            }
        }
    });

    // If parent is cancelled, child is automatically cancelled
    tokio::time::sleep(Duration::from_secs(5)).await;
    parent_token.cancel(); // Cancels child_token too

    handle.await.unwrap();
}

Graceful Shutdown with Cancellation

Use a root cancellation token for coordinated shutdown across all services:

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    let shutdown = CancellationToken::new();

    // Spawn services with child tokens
    let http_server = tokio::spawn(
        run_http_server(shutdown.child_token())
    );
    let block_watcher = tokio::spawn(
        run_block_watcher(shutdown.child_token())
    );
    let metrics_server = tokio::spawn(
        run_metrics_server(shutdown.child_token())
    );

    // Wait for shutdown signal
    tokio::signal::ctrl_c().await?;
    tracing::info!("Shutdown signal received");

    // Cancel all services
    shutdown.cancel();

    // Wait for graceful shutdown with a timeout
    let timeout = Duration::from_secs(30);
    tokio::time::timeout(timeout, async {
        let _ = tokio::join!(http_server, block_watcher, metrics_server);
    }).await.ok();

    tracing::info!("Shutdown complete");
    Ok(())
}

tokio::select! for Cancellation Patterns

tokio::select! is the primary mechanism for responding to cancellation:

async fn process_with_cancellation(
    ctx: &Context,
    items: Vec<Item>,
) -> Result<Vec<ProcessedItem>> {
    let mut results = Vec::with_capacity(items.len());

    for item in items {
        // Check context before each item
        if ctx.is_done() {
            tracing::warn!(
                processed = results.len(),
                remaining = items.len() - results.len(),
                "Context expired, returning partial results"
            );
            break;
        }

        let result = ctx.run(process_item(&item)).await?;
        results.push(result);
    }

    Ok(results)
}

The Drop Guard Pattern

Ensure cleanup happens even when a future is cancelled:

struct CleanupGuard {
    resource: Option<TempResource>,
}

impl Drop for CleanupGuard {
    fn drop(&mut self) {
        if let Some(resource) = self.resource.take() {
            // Synchronous cleanup
            resource.release();
            tracing::debug!("Resource cleaned up on drop");
        }
    }
}

async fn work_with_cleanup() -> Result<()> {
    let resource = acquire_resource().await?;
    let _guard = CleanupGuard {
        resource: Some(resource.clone()),
    };

    // Even if this future is cancelled (dropped),
    // the guard's Drop runs and cleans up
    do_work(&resource).await?;

    // Successful completion — prevent double cleanup
    _guard.resource = None;
    resource.commit().await?;
    Ok(())
}

Tracing Spans Across Async Boundaries

Tracing spans do not automatically propagate across tokio::spawn boundaries. You must explicitly carry them:

use tracing::{info_span, Instrument};

async fn handle_request(req: Request) -> Response {
    let span = info_span!(
        "request",
        method = %req.method(),
        path = %req.uri().path(),
        request_id = %uuid::Uuid::new_v4(),
    );

    async move {
        // This span is active in the current task
        tracing::info!("Processing request");

        // Spawn a subtask — must explicitly attach the span
        let current_span = tracing::Span::current();
        let handle = tokio::spawn(
            async move {
                // This subtask inherits the parent span
                tracing::info!("Subtask running");
                do_background_work().await
            }
            .instrument(info_span!(parent: &current_span, "subtask"))
        );

        let result = handle.await?;
        tracing::info!("Request complete");
        result
    }
    .instrument(span)
    .await
}

Structured Concurrency

Combine context propagation with structured concurrency to ensure all spawned work is tracked and cancellable:

struct TaskGroup {
    handles: Vec<tokio::task::JoinHandle<anyhow::Result<()>>>,
    cancel: CancellationToken,
}

impl TaskGroup {
    fn new(cancel: CancellationToken) -> Self {
        Self {
            handles: Vec::new(),
            cancel,
        }
    }

    fn spawn<F>(&mut self, name: &str, future: F)
    where
        F: std::future::Future<Output = anyhow::Result<()>> + Send + 'static,
    {
        let cancel = self.cancel.child_token();
        let span = info_span!("task", name = name);

        let handle = tokio::spawn(
            async move {
                tokio::select! {
                    result = future => result,
                    _ = cancel.cancelled() => {
                        tracing::info!("Task cancelled");
                        Ok(())
                    }
                }
            }
            .instrument(span)
        );

        self.handles.push(handle);
    }

    async fn join_all(self) -> anyhow::Result<()> {
        let results = futures::future::join_all(self.handles).await;
        for result in results {
            result??;
        }
        Ok(())
    }
}

Usage:

let cancel = CancellationToken::new();
let mut group = TaskGroup::new(cancel.clone());

group.spawn("block_processor", process_blocks(db.clone()));
group.spawn("price_oracle", update_prices(cache.clone()));
group.spawn("metrics", publish_metrics(registry.clone()));

// Cancel all tasks on shutdown
tokio::signal::ctrl_c().await?;
cancel.cancel();
group.join_all().await?;

Putting It All Together

Here is a complete example combining Context, CancellationToken, and tracing spans in an Axum handler:

async fn api_handler(
    State(state): State<AppState>,
    req: Request,
) -> Result<Json<ApiResponse>, AppError> {
    // Create context with 5-second deadline
    let ctx = Context::background()
        .with_timeout(Duration::from_secs(5));

    let span = info_span!(
        "api",
        request_id = %ctx.request_id(),
        remaining_ms = ctx.remaining()
            .map(|d| d.as_millis() as u64)
            .unwrap_or(0),
    );

    async move {
        let user = fetch_user(&ctx, &state.db, user_id).await?;
        let orders = fetch_orders(&ctx, &state.db, user_id).await?;

        // Check remaining time before expensive operation
        if ctx.remaining().map(|d| d < Duration::from_secs(1)).unwrap_or(false) {
            tracing::warn!("Less than 1s remaining, skipping enrichment");
            return Ok(Json(ApiResponse::partial(user, orders)));
        }

        let enriched = enrich_orders(&ctx, &state.cache, orders).await?;
        Ok(Json(ApiResponse::full(user, enriched)))
    }
    .instrument(span)
    .await
}

Conclusion

Context propagation in async Rust requires explicit effort but delivers enormous benefits: deadline-aware operations that fail fast instead of hanging, hierarchical cancellation that cleanly shuts down complex systems, and tracing spans that survive async boundaries. Build a Context type, use CancellationToken for lifecycle management, instrument with tracing spans, and wrap spawned work in TaskGroups for structured concurrency. These patterns transform scattered async tasks into a coherent, observable, and controllable system.