Перейти к основному содержимому

Object Storage

Введение

Этот пример показывает, как использовать триггер объектного хранилища с функцией Go. Функция запускается при появлении нового объекта в бакете. Функция читает объект, предполагая, что это изображение, и изменяет его размер до 100x100 пикселей, помещая миниатюру в тот же бакет в папку thumbnails.

Эта функция также использует libvips для демонстрации того, как предоставлять бинарные зависимости функции вместе с функцией. Если вы не загрузите библиотеку вместе с функцией, вы получите следующую ошибку:

# pkg-config --cflags  -- vips
Package vips was not found in the pkg-config search path.
Perhaps you should add the directory containing `vips.pc'
to the PKG_CONFIG_PATH environment variable
No package 'vips' found

Чтобы решить эту проблему, нам нужно скачать библиотеку и загрузить ее вместе с функцией. Важно предоставить версию библиотеки, совместимую с ОС, где будет выполняться функция. Для этого мы можем найти версию ОС в документации и загрузить библиотеку из официального репозитория. Затем нам нужно найти библиотеку в репозитории Ubuntu и скачать:

Что бы собрать функцию, выполните следующую команду:

docker build --platform linux/amd64 \
-t ycf-go:1.21.9 \
-f ./Dockerfile .
mkdir ./build || true
docker run --rm \
--platform linux/amd64 \
-v "./function:/function" \
-v "./build:/build" \
ycf-go:1.21.9 \
/bin/sh -c "cd function && ./build.sh"

В билд-скрипте мы сначала собираем функцию как плагин, а затем, используя утилиту ldd, находим все зависимости. Затем мы копируем все зависимости в папку build/shared-libs и архивируем ее. Архив будет загружен в Object Storage, так как он превысит лимит размера для прямой загрузки — 3.5 МБ.

Версия Go должна совпадать с версией, используемой в Yandex Cloud Functions, с точностью до патча. В противном случае функция не сможет загрузиться.

Код

package main

import (
"context"
"fmt"
"io"
"log"
"net/url"
"strings"
"sync"

"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/service/s3"
smithyendpoints "github.com/aws/smithy-go/endpoints"
)

// Handler handles an object storage event.
// It creates a new S3 client, retrieves the object involved in the event, and returns a response.
func Handler(ctx context.Context, event *ObjectStorageEvent) (*ObjectStorageResponse, error) {
// Load the AWS configuration with the custom endpoint resolver.
cfg, err := config.LoadDefaultConfig(ctx,
config.WithDefaultRegion("ru-central1"),
)
if err != nil {
log.Fatal(err)
}

// Create a new S3 client.
s3Client := s3.NewFromConfig(cfg, func(o *s3.Options) {
o.Region = "ru-central1"
o.EndpointResolverV2 = &resolverV2{}
})
// Initialize a WaitGroup to manage the goroutine.
wg := sync.WaitGroup{}
// Add a task to the WaitGroup.
wg.Add(len(event.Messages))

for _, message := range event.Messages {
// Get the object involved in the event.
object, err := s3Client.GetObject(ctx, &s3.GetObjectInput{
Bucket: aws.String(message.Details.BucketID),
Key: aws.String(message.Details.ObjectID),
})
if err != nil {
return nil, fmt.Errorf("failed to get object: %w", err)
}

// Print the size of the object to stdout.
fmt.Printf("Object size: %d", *object.ContentLength)
thumbnailKey := "thumbnail/" + strings.TrimPrefix(message.Details.ObjectID, "uploads/")

pipeReader, pipeWriter := io.Pipe()

// Start a new goroutine to handle the object storage operation.
go func() {
// Attempt to put the object into the bucket.
_, err = s3Client.PutObject(ctx, &s3.PutObjectInput{
// The name of the bucket to put the object into.
Bucket: aws.String(message.Details.BucketID),

// The key to store the object under, prefixed with "thumbnail-".
Key: aws.String(thumbnailKey),

// The data to store in the object.
Body: pipeReader,

// The MIME type of the object.
ContentType: object.ContentType,
})

// If an error occurred while putting the object, panic.
if err != nil {
panic(fmt.Errorf("failed to upload object: %w", err))
}

// Signal to the WaitGroup that the task is done.
wg.Done()
}()

// Create a thumbnail of the object.
defer object.Body.Close()
err = Thumbnail(object.Body, pipeWriter)
if err != nil {
return nil, err
}
}
// Wait for all goroutins to finish.
wg.Wait()

// Return a successful response.
return &ObjectStorageResponse{
StatusCode: 200,
}, nil
}

type resolverV2 struct {
// you could inject additional application context here as well
}

func (*resolverV2) ResolveEndpoint(ctx context.Context, params s3.EndpointParameters) (
smithyendpoints.Endpoint, error,
) {
u, err := url.Parse("https://storage.yandexcloud.net")
if err != nil {
return smithyendpoints.Endpoint{}, err
}
u.Path += "/" + *params.Bucket
return smithyendpoints.Endpoint{
URI: *u,
}, nil
}

На строках 22-24 мы создаем конфиг s3-клиента, который будет использоваться для чтения и записи объектов. Он предполагает, что ключи доступа к Object Storage передаются через переменные окружения AWS_ACCESS_KEY_ID и AWS_SECRET_ACCESS_KEY.

cfg, err := config.LoadDefaultConfig(ctx,
config.WithDefaultRegion("ru-central1"),
)

Так как в этом примере мы используем EndpointResolverV2, нам нужно создать объект resoverV2, удовлетворяющий этому интерфейсу и передать его в конфиг сервиса, так как в отличие от V1 разрешение эндпоинтов перенесли с глобального уровня SDK на уровень сервиса.

// Create a new S3 client.
s3Client := s3.NewFromConfig(cfg, func(o *s3.Options) {
o.Region = "ru-central1"
o.EndpointResolverV2 = &resolverV2{}
})

Далее, на строках 41-44 мы получаем объект из бакета.

object, err := s3Client.GetObject(ctx, &s3.GetObjectInput{
Bucket: aws.String(message.Details.BucketID),
Key: aws.String(message.Details.ObjectID),
})

На строках 56-79 мы задаем горутину, которая будет читать io.Pipe и записывать его содержимое в S3.

go func() {
// Attempt to put the object into the bucket.
_, err = s3Client.PutObject(ctx, &s3.PutObjectInput{
// The name of the bucket to put the object into.
Bucket: aws.String(message.Details.BucketID),

// The key to store the object under, prefixed with "thumbnail-".
Key: aws.String(thumbnailKey),

// The data to store in the object.
Body: pipeReader,

// The MIME type of the object.
ContentType: object.ContentType,
})

// If an error occurred while putting the object, panic.
if err != nil {
panic(fmt.Errorf("failed to upload object: %w", err))
}

// Signal to the WaitGroup that the task is done.
wg.Done()
}()

Но для этого нам нужно писать данные в этот пайп. Это делается на строке 83.

err = Thumbnail(object.Body, pipeWriter)

Здесь вызывается функция Thumbnail обертка над libvips, которая изменяет размер изображения и записывает его в пайп.

package main

import (
"io"

"github.com/davidbyttow/govips/v2/vips"
)

const (
width = 100
height = 100
)

func Thumbnail(input io.Reader, output io.WriteCloser) error {
image, err := vips.NewImageFromReader(input)
if err != nil {
return err
}
if err = image.Thumbnail(width, height, vips.InterestingCentre); err != nil {
return err
}
data, _, err := image.ExportNative()
if err != nil {
return err
}
defer output.Close()
if _, err = output.Write(data); err != nil {
return err
}
return nil
}

Развертывание

Для развертывания этой функции в Yandex Cloud вы можете воспользоваться Terraform описанным в примере.

Для этого мы сначала определяем ресурс archive_file для создания zip-архива с функцией.

data "archive_file" "function_code" {
output_path = local.archive_output_path
source_dir = "../build"
type = "zip"
}

А затем ресурс yandex_function для создания функции в Yandex Cloud, куда передаем zip-архив с функцией.

resource "yandex_function" "storage-handler" {
name = "storage-handler"
user_hash = data.archive_file.function_code.output_sha
runtime = "golang121"
entrypoint = "handler.Handler"
memory = "128"
execution_timeout = "10"
package {
bucket_name = yandex_storage_bucket.for-deploy.bucket
object_name = "function.zip"
# sha_256 = archive_file.function_code.output_sha256
}
service_account_id = yandex_iam_service_account.sa_storage_editor.id
environment = {
# The trigger will provide the name of the bucket and object key, but not actual content of the object
# So we need to get the content of the object ourselves
"AWS_ACCESS_KEY_ID" = yandex_iam_service_account_static_access_key.sa_storage_editor.access_key
"AWS_SECRET_ACCESS_KEY" = yandex_iam_service_account_static_access_key.sa_storage_editor.secret_key
}
depends_on = [
yandex_storage_object.function_code
]
}

Дополнительно, мы передаем переменные окружения AWS_ACCESS_KEY_ID и AWS_SECRET_ACCESS_KEY для доступа к Object Storage.

Также мы создаем триггер для функции, который будет вызывать функцию при появлении нового объекта в бакете.

resource "yandex_function_trigger" "storage-trigger" {
name = "storage-trigger"

object_storage {
bucket_id = yandex_storage_bucket.for-uploads.bucket
prefix = "uploads/"
batch_cutoff = 1
create = true
update = true
}
function {
id = yandex_function.storage-handler.id
service_account_id = yandex_iam_service_account.trigger_sa.id
}
}

Запуск

Теперь, когда функция развернута, мы можем загрузить изображение в бакет и увидеть миниатюру в папке thumbnails.

BUCKET=$(terraform -chdir=./tf output -raw bucket)

aws s3 cp --endpoint-url=https://storage.yandexcloud.net \
./image.jpg \
s3://$BUCKET/uploads/image.jpg