Message Queue
Введение
В этом примере мы создадим две функции:
sender
- функция, которая будет отправлять сообщения в очередь входящих сообщений.receiver
- функция, которая будет получать сообщения из очереди при помощи триггера, обработать их и отправлять результат очередь результатов.
SDK
Для работы с Yandex Message Queue в Go мы будем использовать библиотеку aws-sdk-go-v2
.
В этом примере мы используем EndpointResolverV2
— новый протокол разрешения эндпоинтов. Он позволяет
задавать правила разрешения эндпоинтов для различных сервисов на уровне сервиса, не глобално для всего SDK.
Подробнее в документации AWS SDK.
Код
package main
import (
"context"
"fmt"
"net/url"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/service/sqs"
"github.com/aws/aws-sdk-go-v2/service/sqs/types"
smithyendpoints "github.com/aws/smithy-go/endpoints"
)
func sendMessageToQueue(
ctx context.Context,
ymqName string,
message string,
origin string,
delay int32,
) (*sqs.SendMessageOutput, error) {
// Load the SDK's configuration from environment and shared config
// In the serverless environment, the configuration is loaded from the environment variables
// AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY.
cfg, err := config.LoadDefaultConfig(
ctx,
)
if err != nil {
panic("configuration error, " + err.Error())
}
// Create an Amazon SQS service client
client := sqs.NewFromConfig(cfg, func(o *sqs.Options) {
o.Region = "ru-central1"
o.EndpointResolverV2 = &resolverV2{}
})
gQInput := &sqs.GetQueueUrlInput{
QueueName: &ymqName,
}
urlRes, err := client.GetQueueUrl(ctx, gQInput)
if err != nil {
fmt.Printf("Got an error getting the queue URL: %s", err)
return nil, err
}
sMInput := &sqs.SendMessageInput{
MessageAttributes: map[string]types.MessageAttributeValue{
"Origin": {
DataType: aws.String("String"),
StringValue: aws.String(origin),
},
},
MessageBody: aws.String(message),
QueueUrl: urlRes.QueueUrl,
DelaySeconds: delay,
}
resp, err := client.SendMessage(ctx, sMInput)
return resp, nil
}
type resolverV2 struct {
// you could inject additional application context here as well
}
func (*resolverV2) ResolveEndpoint(_ context.Context, _ sqs.EndpointParameters) (
smithyendpoints.Endpoint, error,
) {
u, err := url.Parse("https://message-queue.api.cloud.yandex.net")
if err != nil {
return smithyendpoints.Endpoint{}, err
}
return smithyendpoints.Endpoint{
URI: *u,
}, nil
}
На строках 32-35
мы создаем новый клиент для работы с Yandex Message Queue, передавая ему EndpointResolverV2
для
разрешения кастомного эндпоинта — https://message-queue.api.cloud.yandex.net
.
В остальном никаких больше особенностей по работе с очередями в Yandex Cloud нет, и вы можете использовать
стандартные методы работы с очередями из aws-sdk-go-v2
и любые примеры, которые вы найдете в интернете.
Развертывание
Для развертывания этой функции в Yandex Cloud вы можете воспользоваться Terraform описанным в примере.
resource "archive_file" "function_files" {
output_path = "./function.zip"
source_dir = "../function"
type = "zip"
}
resource "yandex_function" "ymq_sender" {
name = "ymq-sender"
user_hash = archive_file.function_files.output_sha256
runtime = "golang121"
entrypoint = "sender.Sender"
memory = "128"
execution_timeout = "10"
content {
zip_filename = archive_file.function_files.output_path
}
service_account_id = yandex_iam_service_account.ymq_writer.id
environment = {
"YMQ_NAME" = yandex_message_queue.input_queue.name
"AWS_ACCESS_KEY_ID" = yandex_iam_service_account_static_access_key.ymq_writer.access_key
"AWS_SECRET_ACCESS_KEY" = yandex_iam_service_account_static_access_key.ymq_writer.secret_key
}
}
// IAM binding for making function public
resource "yandex_function_iam_binding" "ymq_sender_binding" {
function_id = yandex_function.ymq_sender.id
role = "functions.functionInvoker"
members = ["system:allUsers"]
}
resource "yandex_function" "ymq_receiver" {
name = "ymq-receiver"
user_hash = archive_file.function_files.output_sha256
runtime = "golang121"
entrypoint = "receiver.Receiver"
memory = "128"
execution_timeout = "10"
content {
zip_filename = archive_file.function_files.output_path
}
// This function also need ability to write to message queue
// because that is the way it will return the result of execution
service_account_id = yandex_iam_service_account.ymq_writer.id
environment = {
"YMQ_NAME" = yandex_message_queue.response_queue.name
"AWS_ACCESS_KEY_ID" = yandex_iam_service_account_static_access_key.ymq_writer.access_key
"AWS_SECRET_ACCESS_KEY" = yandex_iam_service_account_static_access_key.ymq_writer.secret_key
}
}
resource "yandex_function_trigger" "ymq_trigger" {
name = "ymq-trigger"
message_queue {
queue_id = yandex_message_queue.input_queue.arn
batch_cutoff = "5"
batch_size = "5"
service_account_id = yandex_iam_service_account.trigger_sa.id
}
function {
id = yandex_function.ymq_receiver.id
service_account_id = yandex_iam_service_account.trigger_sa.id
}
}
Для этого мы сначала определяем ресурс archive_file
для создания zip-архива с функцией.
resource "archive_file" "function_files" {
output_path = "./function.zip"
source_dir = "../function"
type = "zip"
}
А затем ресурс yandex_function
для создания функции в Yandex Cloud, куда передаем zip-архив с функцией.
resource "yandex_function" "ymq_sender" {
name = "ymq-sender"
user_hash = archive_file.function_files.output_sha256
runtime = "golang121"
entrypoint = "sender.Sender"
memory = "128"
execution_timeout = "10"
content {
zip_filename = archive_file.function_files.output_path
}
service_account_id = yandex_iam_service_account.ymq_writer.id
environment = {
"YMQ_NAME" = yandex_message_queue.input_queue.name
"AWS_ACCESS_KEY_ID" = yandex_iam_service_account_static_access_key.ymq_writer.access_key
"AWS_SECRET_ACCESS_KEY" = yandex_iam_service_account_static_access_key.ymq_writer.secret_key
}
}
Дополнительно, мы передаем переменные окружения AWS_ACCESS_KEY_ID
и AWS_SECRET_ACCESS_KEY
для доступа к очереди,
а также YMQ_NAME
для указания имени очереди.
Чтобы сделать функцию доступной из интернета, мы можем создать ресурс yandex_function_iam_binding
что бы разрешить доступ к функции всем пользователям, тем самым сделав ее публичной.
resource "yandex_function_iam_binding" "ymq_sender_binding" {
function_id = yandex_function.ymq_sender.id
role = "functions.functionInvoker"
members = ["system:allUsers"]
}
Аналогично, мы создаем вторую функцию receiver
.
resource "yandex_function" "ymq_receiver" {
name = "ymq-receiver"
user_hash = archive_file.function_files.output_sha256
runtime = "golang121"
entrypoint = "receiver.Receiver"
memory = "128"
execution_timeout = "10"
content {
zip_filename = archive_file.function_files.output_path
}
// This function also need ability to write to message queue
// because that is the way it will return the result of execution
service_account_id = yandex_iam_service_account.ymq_writer.id
environment = {
"YMQ_NAME" = yandex_message_queue.response_queue.name
"AWS_ACCESS_KEY_ID" = yandex_iam_service_account_static_access_key.ymq_writer.access_key
"AWS_SECRET_ACCESS_KEY" = yandex_iam_service_account_static_access_key.ymq_writer.secret_key
}
}
И триггер для функции receiver
для того, чтобы она могла получать сообщения из очереди.
resource "yandex_function_trigger" "ymq_trigger" {
name = "ymq-trigger"
message_queue {
queue_id = yandex_message_queue.input_queue.arn
batch_cutoff = "5"
batch_size = "5"
service_account_id = yandex_iam_service_account.trigger_sa.id
}
function {
id = yandex_function.ymq_receiver.id
service_account_id = yandex_iam_service_account.trigger_sa.id
}
}
Запуск
Теперь, когда функция развернута, мы можем отправить сообщение в очередь при помощи функции sender
.
SEND_FUNC_ID=$(terraform -chdir=./tf output -raw sender_function_id)
curl -XPOST \
"https://functions.yandexcloud.net/$SEND_FUNC_ID?integration=raw" \
-d '{"message": "Hello, world", "number": 24}' \
-H "Content-Type: application/json"
И затем получить результат из очереди после обработки функцией receiver
.
QUEUE_URL=$(terraform output -raw ymq_id)
aws sqs receive-message --queue-url $QUEUE_URL --endpoint https://message-queue.api.cloud.yandex.net