Про асинхронность в Cloud Functions
Сегодня я хотел бы разобрать модель работы облачных функций в Яндекс Облаке.
Функции в облаке запускаются синхронно и в один момент времени обрабатывают ровно один запрос.
В конце 2023 года появилась возможность запускать функции асинхронно. Подробнее можно почитать в отдельном посте.
Upd: В конце 2024 в Яндекс Облаке появилась конкурентность в функциях. Таким образом, теперь функции могут обрабатывать несколько запросов одновременно.
Это значит, что вы можете быть уверены, что все переменные, созданные внутри обработчика будут относиться именно к текущему контексту. Т.е. если вы в обработчике идете в базу, чтобы получить текущего пользователя, это будет именно тот пользователь, чей id пришел в запросе.
Однако, экземпляр рантайма может быть переиспользован для обработки разных запросов. То есть, когда закончится обработка первого запроса, в тот же рантайм может быть передан для обработки следующий запрос. Ему могут быть доступны переменные определенные вне функции-обработчика.
Это одновременно позволяет нам кешировать между вызовами объекты, на чью инициализацию может потребоваться время (например, подключения к базе данных), но так же заставляет нас внимательнее следить, что мы кешируем. Например, не стоит в глобальные переменные помещать данные про пользователя, так как следующий запрос может относиться к другому пользователю.
Теперь подробнее разберем, что значит синхронность запуска функции.
Для этого возьмем простую функцию на Python, которая будет делать только одно: ждать 1 секунду и возвращать ответ Ok.
import time
def handler(event, context):
time.sleep(1)
return {
'statusCode': 200,
'body': 'Ok',
}
Теперь, если мы вызовем ее при помощи curl
, то через 1 секунду получим ответ Ok
.
curl 'https://functions.yandexcloud.net/d4eic18eqks97gmifd3l'
Но, что будет, если мы будем ждать ответа не 1 секунду, 500мс?
curl -m .5 'https://functions.yandexcloud.net/d4eic18eqks97gmifd3l'
В ответ мы увидим вот что:
curl: (28) Operation timed out after 504 milliseconds with 0 bytes received
А в логах функции:
Что же произошло?
Рантайм функций увидел, что клиент разорвал соединение и больше не ждет ответа, значит функция не сможет вернуть результат своего выполнения, вызвавшему её коду, а значит можно больше не тратить деньги клиента Облака (ведь облачные функции тарифицируются именно за время исполнения) и прервать функцию. В логах такое поведение можно найти по коду ответа 499. Этот нестандартный код ответа говорит, что клиент не дождался и отменил запрос.
Поднимать таймаут функции в этом случае не имеет смысла, т.к. это не проблема в вашем коде, а в том, что клиент не дождался ответа. Если у вас есть возможность, то лучше всего обработать эту ошибку в клиентском коде. Если же это невозможно, например, если вашу функцию в качестве веб-хука вызывает сторонний сервис, то вам придется только оптимизировать функцию, чтобы она успевала отвечать в рамках таймаута.
При этом эта ошибка в логах необязательно говорит о каких-то проблемах. Ведь логика клиентского приложения может быть построена так, что оно отменяет запросы в которых более не нуждается, чтобы тем самым снизить нагрузку и на сервер. То есть, например, пользователь решил не ждать загрузки витрины. Отлично, можно отменить этот запрос, ведь нам не понадобится его результат для отрисовки страницы в браузере.
Что же это значит для разработчика, который пишет приложение в serverless модели? Нужно постараться не использовать синхронные вызовы одной функции из другой.
Потому что когда наступит таймаут у внешней функции, то и вызванная из неё функция завершится с ошибкой. И тут либо аккуратно следить за настройками функций, поддерживая таймауты у внешних чуть больше, чем у внутренних, либо использовать асинхронные подходы.
К тому же, вызывая одну функцию из другой, вы заплатите вдвойне, т.к. будет работать два экземпляра рантаймов. Так же придется следить за квотами.
В итоге если функции написаны на одном языке программирования, то проще вызвать код вложенной функции нативно: включив его в сборку внешней функции.
Асинхронность в функциях.
Первое, о чем вы могли подумать: «Точно просто вместо обычной функции-обработчика сделаем её асинхронной!»
const {
setTimeout,
} = require('node:timers/promises');
module.exports.handler = async function (event, context) {
await setTimeout(1000);
return {
statusCode: 200,
body: 'ok',
};
};
Но это не поможет. Да обработчик стал асинхронным, теперь мы можем использовать в нём синтаксический сахар async/await
, но вызов функции так и останется синхронным. То есть, если вы вызывая функцию по http, не дождетесь окончания ее исполнения, то она точно так же как и в случае с синхронным обработчиком вернёт 499 код ошибки.
Так что же делать, если вы хотите выполнить в облачной функции какую-то длительную работу и не хотите ждать ее завершения, и вас бы устроило, чтобы например результат был записан в базу данных?
Для этого можно использовать очереди сообщений, таким образом развязать постановку задачи и ее выполнение. А заодно и распараллелить выполнение задач.
Как положить сообщение в очередь?
У нас есть как минимум два похода:
- Мы можем напрямую взаимодействовать с очередью через AWS SDK (т.к. YMQ поддерживает протокол AWS SQS);
const AWS = require('aws-sdk');
const QUEUE_URL = process.env.QUEUE_URL ?? "";
const AWS_ACCESS_KEY_ID = process.env.AWS_ACCESS_KEY_ID ?? "";
const AWS_SECRET_ACCESS_KEY = process.env.AWS_SECRET_ACCESS_KEY ?? "";
const sqs = new AWS.SQS({
region: "ru-central1",
endpoint: "https://message-queue.api.cloud.yandex.net",
credentials: {
accessKeyId: AWS_ACCESS_KEY_ID,
secretAccessKey: AWS_SECRET_ACCESS_KEY,
}
});
function messageSendResultHandler(err, data) {
if (err) {
console.log("Error", err);
} else {
console.log("Success", data.MessageId);
}
}
module.exports.handler = async function (event, context) {
sqs.sendMessage({
MessageBody: JSON.stringify({"foo": "bar"}),
QueueUrl: QUEUE_URL
}, messageSendResultHandler);
return {
statusCode: 200,
body: 'ok',
};
};
Пример отправки сообщения в YMQ
- Воспользоваться интеграцией API Gateway.
После этого нужно будет лишь настроить триггер, который будет обрабатывать сообщения из очереди и готово.
Таким образом, если у вас есть долгая операция и требование от внешней системы быстро отвечать на запросы, вы сможете выполнить оба условия. В первой функции быстро складывать данные в очередь задач и возможно проверять по ключу в базе готова ли задача, а другой отдельной асинхронной функции (кстати, это может быть даже и не функция, а виртуальная машина, в том случае, если вам не хватает ограничений serverless среды и нужны, например большие ресурсы, вычисления на GPU или доступ к сервисам из внутренней VPC) выполнять отложенные задачи.
Ограничения YMQ.
У вас может возникнуть желание применить описанный выше подход и для FIFO-очередей. К сожалению в текущей реализации триггеров вы не сможете создать триггер для FIFO-очереди YMQ. В таком случае у вас есть опции использовать для разбора очереди приложение развернутое на ВМ, в нём вы сможете читать любые YMQ очереди. Или же, если вы хотите оставаться serverless, вы можете использовать Yandex Data Streams.
Это сервис, который реализует API совместимое с AWS Kinesis. Вы точно так же, как и в случае с YMQ, сможете писать в него при помощи либо AWS SDK, либо интеграции с API Gateway. Yandex Data Streams предоставит вам гарантии порядка обработки сообщений записанных в стрим. Вам лишь нужно будет определить ключ, по которому будут шардироваться сообщения и помнить, что порядок гарантируется только в рамках сегмента, который задается ключом.
То есть, если вы хотите гарантировать, что сообщения, относящиеся к одному пользователю обрабатывались в порядке записи, то вам стоит включить в ключ шардирования например идентификатор пользователя, чтобы убедиться, что они попадут в один сегмент.