Перейти к основному содержимому

Message Queue

Введение

архитектурная диаграмма

В этом примере мы создадим две функции:

  • sender - функция, которая будет отправлять сообщения в очередь входящих сообщений.
  • receiver - функция, которая будет получать сообщения из очереди при помощи триггера, обработать их и отправлять результат очередь результатов.

SDK

Для работы с Yandex Message Queue в Go мы будем использовать библиотеку aws-sdk-go-v2. В этом примере мы используем EndpointResolverV2 — новый протокол разрешения эндпоинтов. Он позволяет задавать правила разрешения эндпоинтов для различных сервисов на уровне сервиса, не глобално для всего SDK. Подробнее в документации AWS SDK.

Код

package main

import (
"context"
"fmt"
"net/url"

"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/service/sqs"
"github.com/aws/aws-sdk-go-v2/service/sqs/types"
smithyendpoints "github.com/aws/smithy-go/endpoints"
)

func sendMessageToQueue(
ctx context.Context,
ymqName string,
message string,
origin string,
delay int32,
) (*sqs.SendMessageOutput, error) {
// Load the SDK's configuration from environment and shared config
// In the serverless environment, the configuration is loaded from the environment variables
// AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY.
cfg, err := config.LoadDefaultConfig(
ctx,
)
if err != nil {
panic("configuration error, " + err.Error())
}
// Create an Amazon SQS service client
client := sqs.NewFromConfig(cfg, func(o *sqs.Options) {
o.Region = "ru-central1"
o.EndpointResolverV2 = &resolverV2{}
})

gQInput := &sqs.GetQueueUrlInput{
QueueName: &ymqName,
}

urlRes, err := client.GetQueueUrl(ctx, gQInput)

if err != nil {
fmt.Printf("Got an error getting the queue URL: %s", err)
return nil, err
}

sMInput := &sqs.SendMessageInput{
MessageAttributes: map[string]types.MessageAttributeValue{
"Origin": {
DataType: aws.String("String"),
StringValue: aws.String(origin),
},
},
MessageBody: aws.String(message),
QueueUrl: urlRes.QueueUrl,
DelaySeconds: delay,
}
resp, err := client.SendMessage(ctx, sMInput)
return resp, nil
}

type resolverV2 struct {
// you could inject additional application context here as well
}

func (*resolverV2) ResolveEndpoint(_ context.Context, _ sqs.EndpointParameters) (
smithyendpoints.Endpoint, error,
) {
u, err := url.Parse("https://message-queue.api.cloud.yandex.net")
if err != nil {
return smithyendpoints.Endpoint{}, err
}
return smithyendpoints.Endpoint{
URI: *u,
}, nil
}

На строках 32-35 мы создаем новый клиент для работы с Yandex Message Queue, передавая ему EndpointResolverV2 для разрешения кастомного эндпоинта — https://message-queue.api.cloud.yandex.net.

В остальном никаких больше особенностей по работе с очередями в Yandex Cloud нет, и вы можете использовать стандартные методы работы с очередями из aws-sdk-go-v2 и любые примеры, которые вы найдете в интернете.

Развертывание

Для развертывания этой функции в Yandex Cloud вы можете воспользоваться Terraform описанным в примере.

resource "archive_file" "function_files" {
output_path = "./function.zip"
source_dir = "../function"
type = "zip"
}

resource "yandex_function" "ymq_sender" {
name = "ymq-sender"
user_hash = archive_file.function_files.output_sha256
runtime = "golang121"
entrypoint = "sender.Sender"
memory = "128"
execution_timeout = "10"
content {
zip_filename = archive_file.function_files.output_path
}
service_account_id = yandex_iam_service_account.ymq_writer.id
environment = {
"YMQ_NAME" = yandex_message_queue.input_queue.name
"AWS_ACCESS_KEY_ID" = yandex_iam_service_account_static_access_key.ymq_writer.access_key
"AWS_SECRET_ACCESS_KEY" = yandex_iam_service_account_static_access_key.ymq_writer.secret_key
}
}


// IAM binding for making function public
resource "yandex_function_iam_binding" "ymq_sender_binding" {
function_id = yandex_function.ymq_sender.id
role = "functions.functionInvoker"
members = ["system:allUsers"]
}


resource "yandex_function" "ymq_receiver" {
name = "ymq-receiver"
user_hash = archive_file.function_files.output_sha256
runtime = "golang121"
entrypoint = "receiver.Receiver"
memory = "128"
execution_timeout = "10"
content {
zip_filename = archive_file.function_files.output_path
}
// This function also need ability to write to message queue
// because that is the way it will return the result of execution
service_account_id = yandex_iam_service_account.ymq_writer.id
environment = {
"YMQ_NAME" = yandex_message_queue.response_queue.name
"AWS_ACCESS_KEY_ID" = yandex_iam_service_account_static_access_key.ymq_writer.access_key
"AWS_SECRET_ACCESS_KEY" = yandex_iam_service_account_static_access_key.ymq_writer.secret_key
}
}

resource "yandex_function_trigger" "ymq_trigger" {
name = "ymq-trigger"

message_queue {
queue_id = yandex_message_queue.input_queue.arn
batch_cutoff = "5"
batch_size = "5"
service_account_id = yandex_iam_service_account.trigger_sa.id
}
function {
id = yandex_function.ymq_receiver.id
service_account_id = yandex_iam_service_account.trigger_sa.id
}
}

Для этого мы сначала определяем ресурс archive_file для создания zip-архива с функцией.

resource "archive_file" "function_files" {
output_path = "./function.zip"
source_dir = "../function"
type = "zip"
}

А затем ресурс yandex_function для создания функции в Yandex Cloud, куда передаем zip-архив с функцией.

resource "yandex_function" "ymq_sender" {
name = "ymq-sender"
user_hash = archive_file.function_files.output_sha256
runtime = "golang121"
entrypoint = "sender.Sender"
memory = "128"
execution_timeout = "10"
content {
zip_filename = archive_file.function_files.output_path
}
service_account_id = yandex_iam_service_account.ymq_writer.id
environment = {
"YMQ_NAME" = yandex_message_queue.input_queue.name
"AWS_ACCESS_KEY_ID" = yandex_iam_service_account_static_access_key.ymq_writer.access_key
"AWS_SECRET_ACCESS_KEY" = yandex_iam_service_account_static_access_key.ymq_writer.secret_key
}
}

Дополнительно, мы передаем переменные окружения AWS_ACCESS_KEY_ID и AWS_SECRET_ACCESS_KEY для доступа к очереди, а также YMQ_NAME для указания имени очереди.

Чтобы сделать функцию доступной из интернета, мы можем создать ресурс yandex_function_iam_binding что бы разрешить доступ к функции всем пользователям, тем самым сделав ее публичной.

resource "yandex_function_iam_binding" "ymq_sender_binding" {
function_id = yandex_function.ymq_sender.id
role = "functions.functionInvoker"
members = ["system:allUsers"]
}

Аналогично, мы создаем вторую функцию receiver.

resource "yandex_function" "ymq_receiver" {
name = "ymq-receiver"
user_hash = archive_file.function_files.output_sha256
runtime = "golang121"
entrypoint = "receiver.Receiver"
memory = "128"
execution_timeout = "10"
content {
zip_filename = archive_file.function_files.output_path
}
// This function also need ability to write to message queue
// because that is the way it will return the result of execution
service_account_id = yandex_iam_service_account.ymq_writer.id
environment = {
"YMQ_NAME" = yandex_message_queue.response_queue.name
"AWS_ACCESS_KEY_ID" = yandex_iam_service_account_static_access_key.ymq_writer.access_key
"AWS_SECRET_ACCESS_KEY" = yandex_iam_service_account_static_access_key.ymq_writer.secret_key
}
}

И триггер для функции receiver для того, чтобы она могла получать сообщения из очереди.

resource "yandex_function_trigger" "ymq_trigger" {
name = "ymq-trigger"

message_queue {
queue_id = yandex_message_queue.input_queue.arn
batch_cutoff = "5"
batch_size = "5"
service_account_id = yandex_iam_service_account.trigger_sa.id
}
function {
id = yandex_function.ymq_receiver.id
service_account_id = yandex_iam_service_account.trigger_sa.id
}
}

Запуск

Теперь, когда функция развернута, мы можем отправить сообщение в очередь при помощи функции sender.

SEND_FUNC_ID=$(terraform -chdir=./tf output -raw sender_function_id)
curl -XPOST \
"https://functions.yandexcloud.net/$SEND_FUNC_ID?integration=raw" \
-d '{"message": "Hello, world", "number": 24}' \
-H "Content-Type: application/json"

И затем получить результат из очереди после обработки функцией receiver.

QUEUE_URL=$(terraform output -raw ymq_id)
aws sqs receive-message --queue-url $QUEUE_URL --endpoint https://message-queue.api.cloud.yandex.net