Step Functions の個別ステータスを Mackerel で監視してみよう

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

Introduction

Step Functions は様々な AWS リソースを組み合わせてビジネスに必要なワークフローを実現してくれるサービスです。主にデータパイプライン処理に使われているケースが一般的だと思っていて、今ジョインしている IoT プロジェクトでもデータのクレンジングと分析に Step Functions を使っています。JSON ベースのステートメント言語を書くだけで複雑なワークフローが実現でき、開発リソースをもっと効率的に使わせていただいております。

その中で複数の Glue Job を並列で実行している箇所で各 Glue Job の結果をそれぞれ監視する要望ができたんですが、現状 Mackerel のドキュメントを見る限りステータス単位のメトリクスはサポートしていないみたいだったので、Mackerel サーバーにメトリクスを送る Lambda を実装し Step Functions に連携する対策に取り組みました。

Goal

並列で動いている複数の Glue Job が終わるたびに Mackerel Server にメトリクスを送り

登場人物

  • Step Functions
  • Glue
    • Job
  • Lambda
    • Go
  • Secret Manager
  • IAM

前提条件

  • モニタリングツールとして Mackerel を利用している
  • Glue Job (make_rawdata / make_alarm / make_order) は get_next_order で取得した Iterator のデータを並列処理

Getting Started

  • バージョン
    • Terraform: 0.12.28
    • Go: 1.15.2

Secret Manager に Mackerel API key を追加

  • Lambda 側で Mackerel クライアントライブラリーを利用するためには API key が必要なので、Mackerel GUI (*1) から API Key を取得し安全に参照するため Secret Manager に登録します。ちなみに、API Key が見れる画面はManager権限以上ではないと見れないので、権限を昇格してもらう必要があります。

ssm.tf

resource "aws_secretsmanager_secret" "mackerel_credentials" {
  name = replace("${var.system_name_prefix}_mackerel_credentials", "_", "-")
}
  • 今回は AWS Console から手動で登録したので、Terraform 側の import を実行する必要があります。下を実行することで Terraform が管理するリソースとして判定されリソース更新対象から外れます。import せずにそのままterraform applyを実行すると、既に存在しているリソースだと判定されエラーになります。
$ terraform import aws_secretsmanager_secret.mackerel_credentials arn:aws:secretsmanager:REGION:ACCOUNT_ID:secret:SECRET_ARN

Mackerel Server にサービスメトリクスを送る Lambda を実装

$ cd mackerel
$ tree
.
├── Makefile
├── README.md
├── go.mod
├── go.sum
└── metrics
    └── put
        ├── main.go
        └── main_test.go
2 directories, 6 files
  • ディレクトリ構成

main.go

package main

import (
	"context"
	"encoding/json"
	"os"
	"time"

	"github.com/aws/aws-lambda-go/lambda"
	"github.com/aws/aws-sdk-go/aws"
	"github.com/aws/aws-sdk-go/aws/session"
	"github.com/aws/aws-sdk-go/service/secretsmanager"
	"github.com/mackerelio/mackerel-client-go"
)

type SecretData struct {
	MackerelAPIKey string `json:"MACKEREL_API_KEY"`
}

type Event struct {
	ServiceName string `json:"service-name"`
	MetricName  string `json:"metric-name"`
	MetricValue int32  `json:"metric-value"`
}

var (
	region string = "ap-northeast-1"

	client *mackerel.Client
)

func init() {
	secretName := os.Getenv("MACKEREL_API_KEY")
	if len(secretName) == 0 {
		panic("SECRET_NAME is empty string")
	}

	sess := session.Must(session.NewSession())
	svc := secretsmanager.New(
		sess,
		aws.NewConfig().WithRegion(region),
	)

	input := &secretsmanager.GetSecretValueInput{
		SecretId: aws.String(secretName),
	}

	result, err := svc.GetSecretValue(input)
	if err != nil {
		panic(err.Error())
	}

	var secretString string
	if result.SecretString != nil {
		secretString = *result.SecretString
	}

	var secretData SecretData
	err = json.Unmarshal([]byte(secretString), &secretData)
	if err != nil {
		panic(err.Error())
	}

	client = mackerel.NewClient(secretData.MackerelAPIKey)
}

func HandleRequest(ctx context.Context, event Event) (bool, error) {
	err := client.PostServiceMetricValues(event.ServiceName, []*mackerel.MetricValue{
		{
			Name:  event.MetricName,
			Time:  time.Now().Unix(),
			Value: event.MetricValue,
		},
	})

	if err != nil {
		return false, err
	}

	return true, nil
}

func main() {
	lambda.Start(HandleRequest)
}
  • クライアントオブジェクトは 1回生成して置いて再利用するのが一般的なので、handler がロードされる時のみ実行される init() の内部でインスタンス化する必要があります。

Makefile

build:
	GOOS=linux GOARCH=amd64 go build -o bin/metrics/put/main metrics/put/main.go

clean:
	rm -rf ./bin
  • せっかくなのでビルドコマンドと掃除コマンドを用意して置きましょう。

Lambda の設定を IaC 化する

lambda.tf

...

data "archive_file" "put_metrics_mackerel_lambda_zip" {
  type        = "zip"
  source_dir  = "${path.module}/../../../mackerel/bin/metrics/put"
  output_path = "bin/metrics/put.zip"
}

resource "aws_lambda_function" "put_metrics_mackerel" {
  function_name = replace("${var.system_name_prefix}_put_metrics_mackerel", "_", "-")

  filename                       = data.archive_file.put_metrics_mackerel_lambda_zip.output_path
  handler                        = "main"
  runtime                        = "go1.x"
  memory_size                    = 128
  timeout                        = 5
  reserved_concurrent_executions = 10
  role                           = aws_iam_role.lambda_role.arn
  source_code_hash               = data.archive_file.put_metrics_mackerel_lambda_zip.output_base64sha256

  environment {
    variables = {
      MACKEREL_API_KEY = aws_secretsmanager_secret.mackerel_credentials.name
    }
  }
  tags = {
    SystemName = var.system_name_prefix
  }
}
  • make buildを通して生成された Go バイナリファイルの経路をsource_dirに指定し、output_pathには圧縮された zip ファイルを格納する経路が必要です。この設定によってterraform plan とかで勝手に圧縮処理が走るので楽だし、data.archive_file.put_metrics_mackerel_lambda_zip.output_pathのように変数で参照できるようになり、コード管理がやりやすくなります。
{
  "error": "Lambda.TooManyRequestsException",
  "cause": "Rate Exceeded. (Service: AWSLambda; Status Code: 429; Error Code: TooManyRequestsException; Request ID: 1f7b2de8-ae88-4b83-aedb-b9a4f27a2303; Proxy: null)"
}
  • reserved_concurrent_executions オプションなんですが、Lambda の同時実行数を制御する役割なので 1とかに設定するとTooManyRequestsExceptionが出てくるので適当な数値を設定する必要があります。

refs. https://aws.amazon.com/jp/premiumsupport/knowledge-center/lambda-troubleshoot-throttling

iam.tf

...

resource "aws_iam_role_policy" "lambda_access_policy" {
  name   = replace("${var.system_name_prefix}_lambda_access_policy", "_", "-")
  role   = aws_iam_role.lambda_role.id
  policy = <<POLICY
{
  "Version": "2012-10-17",
  "Statement": [
    ...

    },
    {
      "Effect": "Allow",
      "Action": [
        "secretsmanager:GetSecretValue"
      ],
      "Resource": [
        "${aws_secretsmanager_secret.mackerel_credentials.arn}"
      ]
    }
  ]
}
  • そして Lambda 側で Secret Manager を参照するため、secretsmanager:GetSecretValueの許可も忘れずに入れて置きましょう。

Step Functions に Lambda を連携

stepfunctions.tf

resource "aws_sfn_state_machine" "workflow" {
  name       = replace("${var.system_name_prefix}_bmm_wf", "_", "-")
  role_arn   = aws_iam_role.state_machine_role.arn
  definition = <<EOF
{
  "StartAt": "make_baseinfo",
  "States": {
    ...,
    
    "Parallel": {
      "Type": "Parallel",
      "Next": "Done",
      "ResultPath": null,
      "Branches": [
        {
          "StartAt": "make_order",
          "States": {
            "make_order":{
              "Type": "Task",
              "ResultPath": "$.glueresult",
              "Resource": "arn:aws:states:::glue:startJobRun.sync",
              "Parameters": {
                "JobName": "${aws_glue_job.make_order.name}",
                "Arguments": {
                  "--TARGET_ORDER.$": "$.iterator.target_order"
                }
              },
              "Next": "Succeed_make_order",
              "Catch": [
                {
                  "ErrorEquals": [
                    "States.ALL"
                  ],
                  "Next": "Failed_make_order",
                  "ResultPath": "$.error-info"
                }
              ]
            },
            "Failed_make_order": {
              "Type": "Pass",
              "Result": {
                "service-name": "ORGANIZATION",
                "metric-name": "DOMAIN-ENVIRONMENT.glue.job.make_order.failed",
                "metric-value": 1
              },
              "ResultPath": "$",
              "Next": "Notify_make_order"
            },
            "Succeed_make_order": {
              "Type": "Pass",
              "Result": {
                "service-name": "ORGANIZATION",
                "metric-name": "DOMAIN-ENVIRONMENT.glue.job.make_order.failed",
                "metric-value": 0
              },
              "ResultPath": "$",
              "Next": "Notify_make_order"
            },
            "Notify_make_order": {
              "Type": "Task",
              "Resource": "${aws_lambda_function.put_metrics_mackerel.arn}",
              "InputPath": "$",
              "ResultPath": "$",
              "End": true
            }
          }
        },
        ...
      ]
    }
  }  
}
EOF

  tags = {
    SystemName = var.system_name_prefix
  }
}
  • Glue Job (make_order) の結果をもとに異なるメトリクス数値を Makcerel Server に送る必要があります。Glue Job が失敗したら 1、成功したら 0を Mackerel Server に送ることで継続的な監視ができるようになります。Step Function と Lambda 間のデータ渡しには Type: Pass を利用しました。Type: Passは簡単に突っ込むことができるのでデバッグとかにも良く使われそうです。

Deploy & Confirm

$ cd mackerel
$ make build
  • まず、Go バイナリファイルを生成します。
$ cd terraform/envs/prd
$ terraform plan
$ terraform apply
  • その後、Assume Role を確認し terraform 命令で AWS リソースを更新します。

  • Step Functions の実行が終わってから Mackerel GUI を見るとサービスメトリクスが取得できているのが分かります。

Bonus

Monitors → New Monitor → Service metric monitor

  • サービスメトリクスにもアラート条件の設定が可能なので、上のように設定して置きましょう。今回のメトリクスだと boolean に近いのでCritical Thresholdは 0 超過を設定し Average of the pastは個別メトリクス単位で検知されるよう 1 points を設定しました。

Summary

今回初めて Step Functions を触ったので、最初はステートメント言語を見てかなり絶望したんですが、ステータスの種類と役割とかステータス同士でのデータを渡す方法などをちゃんと理解できるようになったので、今回のタスクも良い勉強になった気がします。

この記事が誰かのお役に立てれば幸いです。

以上、CX事業本部 MADチーム、キム (@sano3071) でした。

Reference