Deep EVM #24: Пропагация контекста в async Rust — дедлайны, отмена и трейсинг
Engineering Team
Проблема контекста в async Rust
В синхронном мире контекст запроса живёт в стеке вызовов — каждая функция имеет доступ к данным вызывающей стороны через параметры или thread-local storage. В async Rust всё сложнее: future’ы могут переключаться между потоками, task’и могут создавать дочерние task’и, а один запрос может порождать десятки асинхронных операций.
Как передать request ID, дедлайн, данные аутентификации и трейсинг-контекст через всю цепочку async-вызовов?
Подход 1: Явная передача через параметры
Самый простой и идиоматичный подход:
struct RequestContext {
request_id: String,
deadline: Instant,
user_id: Option<Uuid>,
trace_id: String,
}
async fn handle_request(
ctx: RequestContext,
pool: &PgPool,
) -> Result<Response> {
let articles = fetch_articles(&ctx, pool).await?;
let enriched = enrich_articles(&ctx, &articles).await?;
Ok(Response::new(enriched))
}
async fn fetch_articles(
ctx: &RequestContext,
pool: &PgPool,
) -> Result<Vec<Article>> {
// Проверяем дедлайн
if Instant::now() > ctx.deadline {
return Err(Error::DeadlineExceeded);
}
tracing::info!(
request_id = %ctx.request_id,
"Fetching articles"
);
sqlx::query_as!(Article, "SELECT * FROM articles")
.fetch_all(pool).await
.map_err(Into::into)
}
Плюсы: прозрачность, compile-time проверка. Минусы: загрязняет сигнатуры функций.
Подход 2: tracing::Span как носитель контекста
tracing Span’ы автоматически распространяются через async-вызовы:
use tracing::Instrument;
async fn handle_request(
req: Request,
) -> Result<Response> {
let span = tracing::info_span!(
"request",
request_id = %Uuid::new_v4(),
method = %req.method(),
path = %req.uri().path(),
);
async {
let articles = fetch_articles().await?;
let enriched = enrich_articles(&articles).await?;
Ok(Response::new(enriched))
}
.instrument(span)
.await
}
Все логи внутри инструментированного блока автоматически включают request_id, method и path. Это работает через все уровни вложенности.
Подход 3: task_local! для request-scoped данных
Для данных, которые нужны везде, но неудобно передавать через параметры:
tokio::task_local! {
static REQUEST_ID: String;
static DEADLINE: Instant;
}
async fn middleware(req: Request, next: Next) -> Response {
let request_id = Uuid::new_v4().to_string();
let deadline = Instant::now() + Duration::from_secs(30);
REQUEST_ID.scope(request_id, async {
DEADLINE.scope(deadline, async {
next.run(req).await
}).await
}).await
}
// В любом месте обработки:
async fn deep_function() {
let rid = REQUEST_ID.with(|id| id.clone());
tracing::info!(request_id = %rid, "Deep operation");
}
Плюсы: не загрязняет сигнатуры. Минусы: паника при доступе вне scope, невидимые зависимости.
Дедлайны и таймауты
Дедлайн — это абсолютное время, к которому запрос должен завершиться. В отличие от таймаута (относительное время), дедлайн правильно распространяется:
async fn with_deadline<F, T>(
deadline: Instant,
future: F,
) -> Result<T, Error>
where
F: Future<Output = Result<T, Error>>,
{
tokio::select! {
result = future => result,
_ = tokio::time::sleep_until(deadline.into()) => {
Err(Error::DeadlineExceeded)
}
}
}
Использование:
let deadline = Instant::now() + Duration::from_secs(5);
let result = with_deadline(deadline, async {
let a = fetch_from_db(deadline).await?;
let b = call_external_api(deadline).await?;
Ok((a, b))
}).await;
Каждая подоперация получает тот же дедлайн и может оценить оставшееся время.
Graceful отмена задач
В Rust async отмена происходит через drop Future. Но это может оставить ресурсы в неконсистентном состоянии:
// Используем CancellationToken из tokio-util
use tokio_util::sync::CancellationToken;
let token = CancellationToken::new();
let child_token = token.child_token();
// Дочерняя задача
tokio::spawn(async move {
loop {
tokio::select! {
_ = child_token.cancelled() => {
// Cleanup
tracing::info!("Task cancelled, cleaning up");
break;
}
_ = do_work() => {}
}
}
});
// Отмена всех дочерних задач
token.cancel();
Распределённый трейсинг с OpenTelemetry
Для микросервисной архитектуры контекст нужно передавать между сервисами:
use opentelemetry::global;
use tracing_opentelemetry::OpenTelemetrySpanExt;
// Извлечение контекста из входящего запроса
fn extract_context(headers: &HeaderMap) -> Context {
global::get_text_map_propagator(|propagator| {
propagator.extract(&HeaderExtractor(headers))
})
}
// Инжекция контекста в исходящий запрос
fn inject_context(headers: &mut HeaderMap) {
global::get_text_map_propagator(|propagator| {
propagator.inject_context(
&Span::current().context(),
&mut HeaderInjector(headers),
);
});
}
Это обеспечивает сквозной трейсинг от HTTP-запроса клиента через все микросервисы до базы данных.
Middleware для Axum
Соберём всё вместе в Axum-middleware:
async fn context_middleware(
req: Request,
next: Next,
) -> Response {
let request_id = req.headers()
.get("x-request-id")
.and_then(|v| v.to_str().ok())
.unwrap_or(&Uuid::new_v4().to_string())
.to_string();
let span = tracing::info_span!(
"request",
request_id = %request_id,
method = %req.method(),
path = %req.uri().path(),
);
async move {
let start = Instant::now();
let response = next.run(req).await;
let duration = start.elapsed();
tracing::info!(
status = %response.status(),
duration_ms = %duration.as_millis(),
"Request completed"
);
response
}
.instrument(span)
.await
}
Заключение
Пропагация контекста в async Rust требует осознанного подхода. Для трейсинга используйте tracing::Span с .instrument(). Для дедлайнов — явную передачу Instant. Для отмены — CancellationToken. Для распределённых систем — OpenTelemetry. Комбинация этих инструментов обеспечивает полную наблюдаемость и контроль над асинхронными операциями.