Перейти к основному содержимому

Distributed tracing в Yandex Cloud Functions с OpenTelemetry и Monium

· 8 мин. чтения

Monium недавно представил хранение трейсов для пользователей Облака. Я решил это протестировать и собрал пример из трёх функций на TypeScript: orders, payment и inventory. Orders принимает HTTP-запрос, синхронно вызывает payment и асинхронно отправляет сообщение в YMQ, которое подхватывает inventory.

Стек: TypeScript, OpenTelemetry SDK for Node.js, Yandex Cloud Functions, YMQ, Lockbox.

Настройка экспортера трейсов

Первым делом нужно настроить отправку спанов в Monium. Monium принимает трейсы по протоколу OTLP/gRPC. Для аутентификации используется API-ключ, который передаётся через gRPC-метаданные вместе с идентификатором проекта:

import { NodeTracerProvider } from "@opentelemetry/sdk-trace-node";
import { BatchSpanProcessor } from "@opentelemetry/sdk-trace-base";
import { OTLPTraceExporter } from "@opentelemetry/exporter-trace-otlp-grpc";
import { Resource } from "@opentelemetry/resources";
import { ATTR_SERVICE_NAME } from "@opentelemetry/semantic-conventions";
import { W3CTraceContextPropagator } from "@opentelemetry/core";
import * as grpc from "@grpc/grpc-js";

const serviceName = process.env.OTEL_SERVICE_NAME || "unknown-service";
const folderId = process.env.FOLDER_ID || "";
const apiKey = process.env.MONIUM_API_KEY || "";

const metadata = new grpc.Metadata();
metadata.set("authorization", `Api-Key ${apiKey}`);
metadata.set("x-monium-project", `folder__${folderId}`);

const exporter = new OTLPTraceExporter({
url: "https://ingest.monium.yandex.cloud:443",
metadata,
});

const provider = new NodeTracerProvider({
resource: new Resource({
[ATTR_SERVICE_NAME]: serviceName,
cluster: "default",
}),
spanProcessors: [new BatchSpanProcessor(exporter)],
});
provider.register({ propagator: new W3CTraceContextPropagator() });

Пара моментов:

  • API-ключ хранится в Lockbox и инжектится в функцию через блок secrets в Terraform — не в переменных окружения открытым текстом.
  • x-monium-project — идентификатор проекта в формате folder__<folder_id>. Без него Monium не будет знать, куда складывать трейсы.
  • W3CTraceContextPropagator — стандартный пропагатор, который умеет инжектить и извлекать traceparent / tracestate заголовки.

Как работает контекст в трейсинге

Прежде чем смотреть на хендлеры, разберёмся как OpenTelemetry понимает, какой спан к какому трейсу относится — особенно когда функция обрабатывает несколько запросов одновременно.

Ключевое: спаны привязаны к контексту, а не к трейсеру. getTracer() возвращает один и тот же объект-трейсер по имени сервиса. Но когда вы вызываете startActiveSpan, спан создаётся в рамках текущего активного контекста.

Под капотом OpenTelemetry SDK для Node.js использует AsyncLocalStorage. Каждая асинхронная цепочка (async chain) автоматически несёт свой контекст. Это работает без дополнительных усилий:

// context.with() устанавливает parent-контекст для callback
context.with(extractedContext, () =>
tracer.startActiveSpan("handle-request", async (span) => {
// здесь span — child extractedContext
// всё, что выполняется внутри, "видит" этот span как текущий
}),
);

Почему это важно? Yandex Cloud Functions поддерживают параметр concurrency — если его выставить, один экземпляр функции обрабатывает несколько вызовов одновременно (доступно для Node.js, Go, Java, Bash, Kotlin). Каждый вызов получает уникальный RequestID. Благодаря AsyncLocalStorage каждый concurrent вызов несёт свой контекст, и спаны не перемешиваются между параллельными запросами.

Тот же принцип работает при обработке batch-сообщений. Когда функция обработки соббщений из YMQ получает пачку из 5 сообщений, каждое несёт свой traceparent. В цикле мы восстанавливаем parent-контекст для каждого сообщения отдельно — и спаны корректно привязываются к разным трейсам.

Простой пример: синхронный вызов функции

Начнём с самого простого — payment-сервис. Он принимает HTTP-запрос и возвращает ответ:

import { withTracing } from './tracing';

export const handler = withTracing(async (span) => {
await new Promise((resolve) => setTimeout(resolve, 50));
span.addEvent('processing-complete');

return {
statusCode: 200,
body: JSON.stringify({
message: 'Callee processed successfully',
traceId: span.spanContext().traceId,
}),
headers: { 'Content-Type': 'application/json' },
};
});

Весь трейсинг спрятан в обёртке withTracing. Вот что она делает:

export function withTracing(
fn: (span: Span, event: Http.Event) => Promise<Http.Result>,
): TracedHttpHandler {
return async (event) => {
const tracer = getTracer();

// 1. Извлекаем trace-контекст из входящих HTTP-заголовков
const carrier: Record<string, string> = {};
if (event.headers) {
for (const [key, value] of Object.entries(event.headers)) {
carrier[key.toLowerCase()] = value;
}
}
const extractedContext = propagation.extract(context.active(), carrier);

try {
// 2. Устанавливаем извлечённый контекст как текущий
return await context.with(extractedContext, () =>
// 3. Создаём SERVER-спан
tracer.startActiveSpan(
"handle-request",
{ kind: SpanKind.SERVER },
async (span: Span) => {
span.setAttribute("http.method", event.httpMethod || "GET");
try {
const result = await fn(span, event);
span.setStatus({ code: SpanStatusCode.OK });
span.end();
return result;
} catch (err) {
span.setStatus({ code: SpanStatusCode.ERROR, message: String(err) });
span.end();
throw err;
}
},
),
);
} catch (err) {
return { statusCode: 500, body: JSON.stringify({ error: String(err) }) };
} finally {
// 4. Обязательно flush перед завершением — иначе спаны потеряются
await flushTracing();
}
};
}

Последовательность:

  1. propagation.extract() — достаём traceparent из входящих заголовков. Если заголовка нет, создастся новый корневой трейс.
  2. context.with() — устанавливаем извлечённый контекст как parent.
  3. startActiveSpan — создаём спан типа SERVER.
  4. flushTracing() в finallyкритично для serverless. BatchSpanProcessor копит спаны и отправляет пачками. Без явного flush функция завершится раньше, чем спаны уйдут в Monium.

Расширенный пример: передача через YMQ

Теперь orders-сервис. Он делает две вещи: вызывает payment по HTTP и отправляет сообщение в YMQ для inventory.

import {
withTracing, getTracer, sqsClient, SendMessageCommand,
propagation, context as otelContext, SpanStatusCode, SpanKind,
} from "./tracing";

const calleeUrl = process.env.CALLEE_URL || "";
const ordersQueueUrl = process.env.ORDERS_QUEUE_URL || "";

export const handler = withTracing(async (span) => {
const orderId = `order-${Date.now()}`;

// 1. Вызываем payment-сервис с пробросом trace-контекста
const calleeBody = await getTracer().startActiveSpan(
"invoke-payment-service",
{ kind: SpanKind.CLIENT },
async (clientSpan) => {
const headers: Record<string, string> = {};
propagation.inject(otelContext.active(), headers);

const response = await fetch(calleeUrl, { method: "GET", headers });
const body = await response.text();
clientSpan.setAttribute("http.status_code", response.status);
clientSpan.setStatus({ code: SpanStatusCode.OK });
clientSpan.end();
return body;
},
);

// 2. Отправляем в YMQ — AWS SDK автоинструментирован
await sqsClient.send(
new SendMessageCommand({
QueueUrl: ordersQueueUrl,
MessageBody: JSON.stringify({ orderId, status: "confirmed" }),
}),
);

span.setAttribute("order.id", orderId);

return {
statusCode: 200,
body: JSON.stringify({
message: "Order placed",
orderId,
traceId: span.spanContext().traceId,
paymentResponse: JSON.parse(calleeBody),
}),
headers: { "Content-Type": "application/json" },
};
});

Два способа передачи контекста в одном хендлере:

HTTP-вызовpropagation.inject(otelContext.active(), headers) кладёт traceparent в HTTP-заголовки. Payment-сервис на той стороне извлечёт его через withTracing.

YMQ — вызов sqsClient.send() автоматически инструментирован через AwsInstrumentation. Библиотека сама создаёт child-спан для SQS-операции и прокидывает trace-контекст в MessageAttributes сообщения.

На стороне inventory всё чуть интереснее. Функция получает batch сообщений через YMQ-триггер:

import {
withMqTracing, getTracer, extractContextFromSqsMessageAttributes,
context as otelContext, SpanStatusCode, SpanKind,
} from "./tracing";

export const handler = withMqTracing(async (_span, messages) => {
const tracer = getTracer();
for (const message of messages) {
const body = JSON.parse(message.details.message.body) as {
orderId: string;
status: string;
};

// Восстанавливаем trace-контекст из атрибутов сообщения
const parentContext = extractContextFromSqsMessageAttributes(
message.details.message.message_attributes,
);

// Создаём child-спан в контексте оригинального трейса
await otelContext.with(parentContext, () =>
tracer.startActiveSpan(
"reserve-inventory",
{ kind: SpanKind.CONSUMER },
async (span) => {
span.setAttribute("order.id", body.orderId);
span.setAttribute("order.status", body.status);
await new Promise((resolve) => setTimeout(resolve, 20));
span.addEvent("inventory-reserved");
span.setStatus({ code: SpanStatusCode.OK });
span.end();
},
),
);
}
});

Ключевой момент — extractContextFromSqsMessageAttributes вызывается для каждого сообщения в цикле. Каждое сообщение может прийти из разного трейса, и context.with(parentContext, ...) гарантирует, что спан reserve-inventory станет child'ом правильного родителя.

Итоговая цепочка спанов в Monium:

orders: handle-request (SERVER)
├── invoke-payment-service (CLIENT)
│ └── payment: handle-request (SERVER)
└── SQS.SendMessage (PRODUCER)
└── inventory: reserve-inventory (CONSUMER)

Нюанс: инструментирование AWS SDK и порядок импортов

AwsInstrumentation работает через monkey-patching: она подменяет методы AWS SDK, оборачивая их в спаны. Но для этого патч должен быть установлен до того, как SDK будет импортирован.

В TypeScript есть ловушка: все import statements hoisted — компилятор перемещает их в начало файла. Если написать:

import { registerInstrumentations } from "@opentelemetry/instrumentation";
import { AwsInstrumentation } from "@opentelemetry/instrumentation-aws-sdk";
import { SQSClient } from "@aws-sdk/client-sqs"; // ← будет вверху!

registerInstrumentations({
instrumentations: [new AwsInstrumentation()],
});

...то в скомпилированном JS @aws-sdk/client-sqs загрузится раньше, чем registerInstrumentations успеет применить патч. Автоинструментирование просто не заработает.

Решение — использовать require() для AWS SDK:

// 1. Сначала все OTel-импорты (обычные import)
import { registerInstrumentations } from "@opentelemetry/instrumentation";
import { AwsInstrumentation } from "@opentelemetry/instrumentation-aws-sdk";

// 2. Регистрируем инструментацию
registerInstrumentations({
instrumentations: [new AwsInstrumentation()],
});

// 3. Только теперь загружаем AWS SDK через require()
const {
SQSClient,
SendMessageCommand: SendMessageCommandCtor,
} = require("@aws-sdk/client-sqs") as typeof import("@aws-sdk/client-sqs");

require() выполняется в runtime в том месте, где написан — TypeScript его не перемещает. Это гарантирует правильный порядок: сначала патч, потом загрузка SDK.

Этот паттерн актуален для любой auto-instrumentation библиотеки, не только AWS — та же проблема будет с @opentelemetry/instrumentation-http, instrumentation-express и т.д.

Альтернатива: ручное пробрасывание traceId через MessageAttributes

Автоинструментирование AWS SDK — удобно, но не всегда доступно. Может быть другой язык, кастомный транспорт, или вы просто хотите больше контроля. Для таких случаев в примере есть ручные функции.

На стороне отправителяinjectTraceToSqsMessageAttributes:

export function injectTraceToSqsMessageAttributes(
messageAttributes: Record<string, SqsMessageAttribute> = {},
): Record<string, SqsMessageAttribute> {
const carrier: Record<string, string> = {};
propagation.inject(context.active(), carrier);

for (const [key, value] of Object.entries(carrier)) {
messageAttributes[key] = { DataType: "String", StringValue: value };
}

return messageAttributes;
}

Это берёт текущий активный контекст, сериализует его через propagator (получается traceparent и опционально tracestate) и кладёт в формат SQS MessageAttributes.

Использование в orders:

await sqsClient.send(
new SendMessageCommand({
QueueUrl: ordersQueueUrl,
MessageBody: JSON.stringify({ orderId, status: "confirmed" }),
MessageAttributes: injectTraceToSqsMessageAttributes(),
}),
);

На стороне получателяextractContextFromSqsMessageAttributes:

export function extractContextFromSqsMessageAttributes(
messageAttributes: Record<string, { string_value?: string }> = {},
) {
const carrier: Record<string, string> = {};
for (const key of ["traceparent", "tracestate", "baggage"]) {
const attr = messageAttributes[key];
if (attr && typeof attr.string_value === "string") {
carrier[key] = attr.string_value;
}
}
return propagation.extract(context.active(), carrier);
}

Этот ручной подход и автоинструментирование делают одно и то же. Если AwsInstrumentation работает корректно, ручную инъекцию можно не использовать. Но иметь fallback полезно — особенно для отладки или если вы работаете с языком, где auto-instrumentation для AWS SDK недоступна.

Заключение

Monium + OpenTelemetry SDK дают distributed tracing из коробки для Cloud Functions. Главное:

  • flushTracing() в finally — без этого спаны потеряются в serverless-среде
  • Порядок импортовrequire() для AWS SDK после registerInstrumentations(), иначе monkey-patching не сработает
  • context.with() — ключ к корректной привязке спанов, особенно при batch-обработке и concurrent-вызовах
  • Ручной fallback через MessageAttributes — когда автоинструментирование недоступно или не подходит

Полный код примера: examples/typescript/otel.