Step Functions の個別ステータスを Mackerel で監視してみよう
Introduction
Step Functions は様々な AWS リソースを組み合わせてビジネスに必要なワークフローを実現してくれるサービスです。主にデータパイプライン処理に使われているケースが一般的だと思っていて、今ジョインしている IoT プロジェクトでもデータのクレンジングと分析に Step Functions を使っています。JSON ベースのステートメント言語を書くだけで複雑なワークフローが実現でき、開発リソースをもっと効率的に使わせていただいております。
その中で複数の Glue Job を並列で実行している箇所で各 Glue Job の結果をそれぞれ監視する要望ができたんですが、現状 Mackerel のドキュメントを見る限りステータス単位のメトリクスはサポートしていないみたいだったので、Mackerel サーバーにメトリクスを送る Lambda を実装し Step Functions に連携する対策に取り組みました。
Goal
並列で動いている複数の Glue Job が終わるたびに Mackerel Server にメトリクスを送り
登場人物
- Step Functions
- Glue
- Job
- Lambda
- Go
- Secret Manager
- IAM
前提条件
- モニタリングツールとして Mackerel を利用している
- Glue Job (make_rawdata / make_alarm / make_order) は get_next_order で取得した Iterator のデータを並列処理
Getting Started
- バージョン
- Terraform: 0.12.28
- Go: 1.15.2
Secret Manager に Mackerel API key を追加
- Lambda 側で Mackerel クライアントライブラリーを利用するためには API key が必要なので、Mackerel GUI (*1) から API Key を取得し安全に参照するため Secret Manager に登録します。ちなみに、API Key が見れる画面は
Manager
権限以上ではないと見れないので、権限を昇格してもらう必要があります。
resource "aws_secretsmanager_secret" "mackerel_credentials" { name = replace("${var.system_name_prefix}_mackerel_credentials", "_", "-") }
- 今回は AWS Console から手動で登録したので、Terraform 側の import を実行する必要があります。下を実行することで Terraform が管理するリソースとして判定されリソース更新対象から外れます。import せずにそのまま
terraform apply
を実行すると、既に存在しているリソースだと判定されエラーになります。
$ terraform import aws_secretsmanager_secret.mackerel_credentials arn:aws:secretsmanager:REGION:ACCOUNT_ID:secret:SECRET_ARN
Mackerel Server にサービスメトリクスを送る Lambda を実装
$ cd mackerel $ tree . ├── Makefile ├── README.md ├── go.mod ├── go.sum └── metrics └── put ├── main.go └── main_test.go 2 directories, 6 files
- ディレクトリ構成
package main import ( "context" "encoding/json" "os" "time" "github.com/aws/aws-lambda-go/lambda" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/secretsmanager" "github.com/mackerelio/mackerel-client-go" ) type SecretData struct { MackerelAPIKey string `json:"MACKEREL_API_KEY"` } type Event struct { ServiceName string `json:"service-name"` MetricName string `json:"metric-name"` MetricValue int32 `json:"metric-value"` } var ( region string = "ap-northeast-1" client *mackerel.Client ) func init() { secretName := os.Getenv("MACKEREL_API_KEY") if len(secretName) == 0 { panic("SECRET_NAME is empty string") } sess := session.Must(session.NewSession()) svc := secretsmanager.New( sess, aws.NewConfig().WithRegion(region), ) input := &secretsmanager.GetSecretValueInput{ SecretId: aws.String(secretName), } result, err := svc.GetSecretValue(input) if err != nil { panic(err.Error()) } var secretString string if result.SecretString != nil { secretString = *result.SecretString } var secretData SecretData err = json.Unmarshal([]byte(secretString), &secretData) if err != nil { panic(err.Error()) } client = mackerel.NewClient(secretData.MackerelAPIKey) } func HandleRequest(ctx context.Context, event Event) (bool, error) { err := client.PostServiceMetricValues(event.ServiceName, []*mackerel.MetricValue{ { Name: event.MetricName, Time: time.Now().Unix(), Value: event.MetricValue, }, }) if err != nil { return false, err } return true, nil } func main() { lambda.Start(HandleRequest) }
- クライアントオブジェクトは 1回生成して置いて再利用するのが一般的なので、handler がロードされる時のみ実行される init() の内部でインスタンス化する必要があります。
build: GOOS=linux GOARCH=amd64 go build -o bin/metrics/put/main metrics/put/main.go clean: rm -rf ./bin
- せっかくなのでビルドコマンドと掃除コマンドを用意して置きましょう。
Lambda の設定を IaC 化する
... data "archive_file" "put_metrics_mackerel_lambda_zip" { type = "zip" source_dir = "${path.module}/../../../mackerel/bin/metrics/put" output_path = "bin/metrics/put.zip" } resource "aws_lambda_function" "put_metrics_mackerel" { function_name = replace("${var.system_name_prefix}_put_metrics_mackerel", "_", "-") filename = data.archive_file.put_metrics_mackerel_lambda_zip.output_path handler = "main" runtime = "go1.x" memory_size = 128 timeout = 5 reserved_concurrent_executions = 10 role = aws_iam_role.lambda_role.arn source_code_hash = data.archive_file.put_metrics_mackerel_lambda_zip.output_base64sha256 environment { variables = { MACKEREL_API_KEY = aws_secretsmanager_secret.mackerel_credentials.name } } tags = { SystemName = var.system_name_prefix } }
make build
を通して生成された Go バイナリファイルの経路をsource_dir
に指定し、output_path
には圧縮された zip ファイルを格納する経路が必要です。この設定によってterraform plan
とかで勝手に圧縮処理が走るので楽だし、data.archive_file.put_metrics_mackerel_lambda_zip.output_path
のように変数で参照できるようになり、コード管理がやりやすくなります。
{ "error": "Lambda.TooManyRequestsException", "cause": "Rate Exceeded. (Service: AWSLambda; Status Code: 429; Error Code: TooManyRequestsException; Request ID: 1f7b2de8-ae88-4b83-aedb-b9a4f27a2303; Proxy: null)" }
- reserved_concurrent_executions オプションなんですが、Lambda の同時実行数を制御する役割なので 1とかに設定すると
TooManyRequestsException
が出てくるので適当な数値を設定する必要があります。
refs. https://aws.amazon.com/jp/premiumsupport/knowledge-center/lambda-troubleshoot-throttling
... resource "aws_iam_role_policy" "lambda_access_policy" { name = replace("${var.system_name_prefix}_lambda_access_policy", "_", "-") role = aws_iam_role.lambda_role.id policy = <<POLICY { "Version": "2012-10-17", "Statement": [ ... }, { "Effect": "Allow", "Action": [ "secretsmanager:GetSecretValue" ], "Resource": [ "${aws_secretsmanager_secret.mackerel_credentials.arn}" ] } ] }
- そして Lambda 側で Secret Manager を参照するため、
secretsmanager:GetSecretValue
の許可も忘れずに入れて置きましょう。
Step Functions に Lambda を連携
resource "aws_sfn_state_machine" "workflow" { name = replace("${var.system_name_prefix}_bmm_wf", "_", "-") role_arn = aws_iam_role.state_machine_role.arn definition = <<EOF { "StartAt": "make_baseinfo", "States": { ..., "Parallel": { "Type": "Parallel", "Next": "Done", "ResultPath": null, "Branches": [ { "StartAt": "make_order", "States": { "make_order":{ "Type": "Task", "ResultPath": "$.glueresult", "Resource": "arn:aws:states:::glue:startJobRun.sync", "Parameters": { "JobName": "${aws_glue_job.make_order.name}", "Arguments": { "--TARGET_ORDER.$": "$.iterator.target_order" } }, "Next": "Succeed_make_order", "Catch": [ { "ErrorEquals": [ "States.ALL" ], "Next": "Failed_make_order", "ResultPath": "$.error-info" } ] }, "Failed_make_order": { "Type": "Pass", "Result": { "service-name": "ORGANIZATION", "metric-name": "DOMAIN-ENVIRONMENT.glue.job.make_order.failed", "metric-value": 1 }, "ResultPath": "$", "Next": "Notify_make_order" }, "Succeed_make_order": { "Type": "Pass", "Result": { "service-name": "ORGANIZATION", "metric-name": "DOMAIN-ENVIRONMENT.glue.job.make_order.failed", "metric-value": 0 }, "ResultPath": "$", "Next": "Notify_make_order" }, "Notify_make_order": { "Type": "Task", "Resource": "${aws_lambda_function.put_metrics_mackerel.arn}", "InputPath": "$", "ResultPath": "$", "End": true } } }, ... ] } } } EOF tags = { SystemName = var.system_name_prefix } }
- Glue Job (make_order) の結果をもとに異なるメトリクス数値を Makcerel Server に送る必要があります。Glue Job が失敗したら 1、成功したら 0を Mackerel Server に送ることで継続的な監視ができるようになります。Step Function と Lambda 間のデータ渡しには Type: Pass を利用しました。
Type: Pass
は簡単に突っ込むことができるのでデバッグとかにも良く使われそうです。
Deploy & Confirm
$ cd mackerel $ make build
- まず、Go バイナリファイルを生成します。
$ cd terraform/envs/prd $ terraform plan $ terraform apply
- その後、Assume Role を確認し terraform 命令で AWS リソースを更新します。
- Step Functions の実行が終わってから Mackerel GUI を見るとサービスメトリクスが取得できているのが分かります。
Bonus
Monitors → New Monitor → Service metric monitor
- サービスメトリクスにもアラート条件の設定が可能なので、上のように設定して置きましょう。今回のメトリクスだと boolean に近いので
Critical Threshold
は 0 超過を設定しAverage of the past
は個別メトリクス単位で検知されるよう 1 points を設定しました。
Summary
今回初めて Step Functions を触ったので、最初はステートメント言語を見てかなり絶望したんですが、ステータスの種類と役割とかステータス同士でのデータを渡す方法などをちゃんと理解できるようになったので、今回のタスクも良い勉強になった気がします。
この記事が誰かのお役に立てれば幸いです。
以上、CX事業本部 MADチーム、キム (@sano3071) でした。