Object Storage
Введение
Этот пример показывает, как использовать триггер объектного хранилища с функцией Go. Функция запускается при появлении
нового объекта в бакете. Функция читает объект, предполагая, что это изображение, и изменяет его размер до 100x100 пикселей,
помещая миниатюру в тот же бакет в папку thumbnails
.
Эта функция также использует libvips
для демонстрации того, как предоставлять бинарные зависимости функции вместе с функцией.
Если вы не загрузите библиотеку вместе с функцией, вы получите следующую ошибку:
# pkg-config --cflags -- vips
Package vips was not found in the pkg-config search path.
Perhaps you should add the directory containing `vips.pc'
to the PKG_CONFIG_PATH environment variable
No package 'vips' found
Чтобы решить эту проблему, нам нужно скачать библиотеку и загрузить ее вместе с функцией. Важно предоставить версию библиотеки, совместимую с ОС, где будет выполняться функция. Для этого мы можем найти версию ОС в документации и загрузить библиотеку из официального репозитория. Затем нам нужно найти библиотеку в репозитории Ubuntu и скачать:
Что бы собрать функцию, выполните следующую команду:
docker build --platform linux/amd64 \
-t ycf-go:1.21.9 \
-f ./Dockerfile .
mkdir ./build || true
docker run --rm \
--platform linux/amd64 \
-v "./function:/function" \
-v "./build:/build" \
ycf-go:1.21.9 \
/bin/sh -c "cd function && ./build.sh"
В билд-скрипте мы сначала собираем функцию как плагин, а затем, используя утилиту ldd
, находим все зависимости.
Затем мы копируем все зависимости в папку build/shared-libs
и архивируем ее. Архив будет загружен в Object Storage,
так как он превысит лимит размера для прямой загрузки — 3.5 МБ.
Версия Go должна совпадать с версией, используемой в Yandex Cloud Functions, с точностью до патча. В противном случае функция не сможет загрузиться.
Код
package main
import (
"context"
"fmt"
"io"
"log"
"net/url"
"strings"
"sync"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/service/s3"
smithyendpoints "github.com/aws/smithy-go/endpoints"
)
// Handler handles an object storage event.
// It creates a new S3 client, retrieves the object involved in the event, and returns a response.
func Handler(ctx context.Context, event *ObjectStorageEvent) (*ObjectStorageResponse, error) {
// Load the AWS configuration with the custom endpoint resolver.
cfg, err := config.LoadDefaultConfig(ctx,
config.WithDefaultRegion("ru-central1"),
)
if err != nil {
log.Fatal(err)
}
// Create a new S3 client.
s3Client := s3.NewFromConfig(cfg, func(o *s3.Options) {
o.Region = "ru-central1"
o.EndpointResolverV2 = &resolverV2{}
})
// Initialize a WaitGroup to manage the goroutine.
wg := sync.WaitGroup{}
// Add a task to the WaitGroup.
wg.Add(len(event.Messages))
for _, message := range event.Messages {
// Get the object involved in the event.
object, err := s3Client.GetObject(ctx, &s3.GetObjectInput{
Bucket: aws.String(message.Details.BucketID),
Key: aws.String(message.Details.ObjectID),
})
if err != nil {
return nil, fmt.Errorf("failed to get object: %w", err)
}
// Print the size of the object to stdout.
fmt.Printf("Object size: %d", *object.ContentLength)
thumbnailKey := "thumbnail/" + strings.TrimPrefix(message.Details.ObjectID, "uploads/")
pipeReader, pipeWriter := io.Pipe()
// Start a new goroutine to handle the object storage operation.
go func() {
// Attempt to put the object into the bucket.
_, err = s3Client.PutObject(ctx, &s3.PutObjectInput{
// The name of the bucket to put the object into.
Bucket: aws.String(message.Details.BucketID),
// The key to store the object under, prefixed with "thumbnail-".
Key: aws.String(thumbnailKey),
// The data to store in the object.
Body: pipeReader,
// The MIME type of the object.
ContentType: object.ContentType,
})
// If an error occurred while putting the object, panic.
if err != nil {
panic(fmt.Errorf("failed to upload object: %w", err))
}
// Signal to the WaitGroup that the task is done.
wg.Done()
}()
// Create a thumbnail of the object.
defer object.Body.Close()
err = Thumbnail(object.Body, pipeWriter)
if err != nil {
return nil, err
}
}
// Wait for all goroutins to finish.
wg.Wait()
// Return a successful response.
return &ObjectStorageResponse{
StatusCode: 200,
}, nil
}
type resolverV2 struct {
// you could inject additional application context here as well
}
func (*resolverV2) ResolveEndpoint(ctx context.Context, params s3.EndpointParameters) (
smithyendpoints.Endpoint, error,
) {
u, err := url.Parse("https://storage.yandexcloud.net")
if err != nil {
return smithyendpoints.Endpoint{}, err
}
u.Path += "/" + *params.Bucket
return smithyendpoints.Endpoint{
URI: *u,
}, nil
}
На строках 22-24
мы создаем конфиг s3-клиента, который будет использоваться для чтения и записи объектов.
Он предполагает, что ключи доступа к Object Storage передаются через переменные окружения AWS_ACCESS_KEY_ID
и AWS_SECRET_ACCESS_KEY
.
cfg, err := config.LoadDefaultConfig(ctx,
config.WithDefaultRegion("ru-central1"),
)
Так как в этом примере мы используем EndpointResolverV2
, нам нужно создать объект resoverV2
, удовлетворяющий
этому интерфейсу и передать его в конфиг сервиса, так как в отличие от V1 разрешение эндпоинтов перенесли с
глобального уровня
SDK на уровень сервиса.
// Create a new S3 client.
s3Client := s3.NewFromConfig(cfg, func(o *s3.Options) {
o.Region = "ru-central1"
o.EndpointResolverV2 = &resolverV2{}
})
Далее, на строках 41-44
мы получаем объект из бакета.
object, err := s3Client.GetObject(ctx, &s3.GetObjectInput{
Bucket: aws.String(message.Details.BucketID),
Key: aws.String(message.Details.ObjectID),
})
На строках 56-79
мы задаем горутину, которая будет читать io.Pipe
и записывать его содержимое в S3.
go func() {
// Attempt to put the object into the bucket.
_, err = s3Client.PutObject(ctx, &s3.PutObjectInput{
// The name of the bucket to put the object into.
Bucket: aws.String(message.Details.BucketID),
// The key to store the object under, prefixed with "thumbnail-".
Key: aws.String(thumbnailKey),
// The data to store in the object.
Body: pipeReader,
// The MIME type of the object.
ContentType: object.ContentType,
})
// If an error occurred while putting the object, panic.
if err != nil {
panic(fmt.Errorf("failed to upload object: %w", err))
}
// Signal to the WaitGroup that the task is done.
wg.Done()
}()
Но для этого нам нужно писать данные в этот пайп. Это делается на строке 83
.
err = Thumbnail(object.Body, pipeWriter)
Здесь вызывается функция Thumbnail
обертка над libvips
, которая изменяет размер изображения и записывает его в пайп.
package main
import (
"io"
"github.com/davidbyttow/govips/v2/vips"
)
const (
width = 100
height = 100
)
func Thumbnail(input io.Reader, output io.WriteCloser) error {
image, err := vips.NewImageFromReader(input)
if err != nil {
return err
}
if err = image.Thumbnail(width, height, vips.InterestingCentre); err != nil {
return err
}
data, _, err := image.ExportNative()
if err != nil {
return err
}
defer output.Close()
if _, err = output.Write(data); err != nil {
return err
}
return nil
}
Развертывание
Для развертывания этой функции в Yandex Cloud вы можете воспользоваться Terraform описанным в примере.
Для этого мы сначала определяем ресурс archive_file
для создания zip-архива с функцией.
data "archive_file" "function_code" {
output_path = local.archive_output_path
source_dir = "../build"
type = "zip"
}
А затем ресурс yandex_function
для создания функции в Yandex Cloud, куда передаем zip-архив с функцией.
resource "yandex_function" "storage-handler" {
name = "storage-handler"
user_hash = data.archive_file.function_code.output_sha
runtime = "golang121"
entrypoint = "handler.Handler"
memory = "128"
execution_timeout = "10"
package {
bucket_name = yandex_storage_bucket.for-deploy.bucket
object_name = "function.zip"
# sha_256 = archive_file.function_code.output_sha256
}
service_account_id = yandex_iam_service_account.sa_storage_editor.id
environment = {
# The trigger will provide the name of the bucket and object key, but not actual content of the object
# So we need to get the content of the object ourselves
"AWS_ACCESS_KEY_ID" = yandex_iam_service_account_static_access_key.sa_storage_editor.access_key
"AWS_SECRET_ACCESS_KEY" = yandex_iam_service_account_static_access_key.sa_storage_editor.secret_key
}
depends_on = [
yandex_storage_object.function_code
]
}
Дополнительно, мы передаем переменные окружения AWS_ACCESS_KEY_ID
и AWS_SECRET_ACCESS_KEY
для доступа к Object Storage.
Также мы создаем триггер для функции, который будет вызывать функцию при появлении нового объекта в бакете.
resource "yandex_function_trigger" "storage-trigger" {
name = "storage-trigger"
object_storage {
bucket_id = yandex_storage_bucket.for-uploads.bucket
prefix = "uploads/"
batch_cutoff = 1
create = true
update = true
}
function {
id = yandex_function.storage-handler.id
service_account_id = yandex_iam_service_account.trigger_sa.id
}
}
Запуск
Теперь, когда функция развернута, мы можем загрузить изображение в бакет и увидеть миниатюру в папке thumbnails
.
BUCKET=$(terraform -chdir=./tf output -raw bucket)
aws s3 cp --endpoint-url=https://storage.yandexcloud.net \
./image.jpg \
s3://$BUCKET/uploads/image.jpg