SQSのメッセージ数に応じてオートスケールするスポットフリートをTerraformとLambda関数で実装する

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

2016年12月30日追記
追記が遅れてしまいましたが、公式にスポットフリートがオートスケーリンググループに対応したようです。そのため、こちらに記載している情報は基本的に利用しない方が良いと思います。

はじめに

こんにちは、中山です。

今回スポットフリートをオートスケールさせる仕組みを作ってみたのでご紹介します。

スポットフリートとは何か

簡単にいうと、スポットフリートとはスポットリクエストのまとまりです。複数のスポットリクエストをまとめて管理し、あるリクエストが無効になっても、他のリクエストを利用して全体の数(capacity)を維持してくれる仕組みです。コスト削減(スポットリクエスト)と安全性(capacity)を両立してくれるのでとても便利です。

しかし、スポットフリートはまだ比較的新し目のサービスなためか、AWSの他サービスとの連携が不足している感があります。例えば、CloudWatch Alarmとの連携に対応していないため、閾値を超過した場合にcapacityを増減させるといったASGのようなことはデフォルト機能ではできません。自分で仕組みを実装する必要があります。今回はこの仕組をLambda関数で実装してみました。

仕組みを図にすると以下のような感じです。

aws

SQSのメッセージ数に応じてスポットフリートを増減させます。いわゆるJob Observerパターンです。

Terraformでスポットフリートを利用する方法

Terraformではスポットフリートに未対応でしたが、v.0.7.0 から対応する予定です。aws_spot_fleet_requestというリソースがそれです。ありがたい。今回はこれを利用します。

コード

GitHubに上げておきました。ご自由にお使いください。

ディレクトリ構造

Lambda関数のデプロイにはApexを利用しているで、ディレクトリ構造もそちらに寄せています。

lambda-auto-scale-spot-fleet/
├── functions
│   └── auto_scale_spot_fleet
│       └── main.py
├── infrastructure
│   ├── dev
│   │   ├── main.tf
│   │   ├── outputs.tf
│   │   └── variables.tf
│   └── modules
│       ├── cloudwatch
│       │   ├── cloudwatch.tf
│       │   └── variables.tf
│       ├── iam
│       │   ├── iam.tf
│       │   └── outputs.tf
│       ├── sns
│       │   ├── outputs.tf
│       │   ├── sns.tf
│       │   └── variables.tf
│       ├── spot_fleet
│       │   ├── assume_role_policy.json
│       │   ├── compute.tf
│       │   ├── iam.tf
│       │   ├── keys
│       │   ├── network.tf
│       │   ├── outputs.tf
│       │   ├── security_group.tf
│       │   └── variables.tf
│       └── sqs
│           ├── outputs.tf
│           ├── sqs.tf
│           └── variables.tf
└── project.json

コードの解説

主要なコードを抜粋してご紹介します。

functions/auto_scale_spot_fleet/main.py

Lambda関数のコードです。Boto3を利用して、modify_spot_fleet_requestメソッドでスポットフリートのcapacityを変更しています。結果はpublishメソッドで通知するようにしています。

import os
import json
import boto3
import pprint
pp = pprint.PrettyPrinter()


class SpotFleet(object):
    def __init__(self, ec2_client, sns_client, alarm_name):
        self.ec2_client = ec2_client
        self.sns_client = sns_client
        self.alarm_name = alarm_name
        self.spot_fleet_request_id = os.environ['SpotFleetRequestId']
        self.topic_arn = os.environ['TopicArn']

    def describe_spot_fleet_target_capacity(self):
        resp = self.ec2_client.describe_spot_fleet_requests(
                DryRun=False,
                SpotFleetRequestIds=[self.spot_fleet_request_id]
                )
        return resp['SpotFleetRequestConfigs'][0]['SpotFleetRequestConfig']['TargetCapacity']

    def modify_spot_fleet_target_capacity(self, target_capacity):
        self.ec2_client.modify_spot_fleet_request(
                SpotFleetRequestId=self.spot_fleet_request_id,
                TargetCapacity=target_capacity
                )

    def publish2topic(self, msg):
        req = {}
        req.update({
            'TopicArn': self.topic_arn,
            'Message': msg,
            'Subject': '[ALART] auto scale spot fleet'
            })
        return self.sns_client.publish(**req)


def handle(event, context):
    spot_fleet = SpotFleet(
            boto3.client('ec2'),
            boto3.client('sns'),
            json.loads(event['Records'][0]['Sns']['Message'])['AlarmName']
            )
    try:
        current_target_capacity = spot_fleet.describe_spot_fleet_target_capacity()
    except Exception as e:
        pp.pprint(e)
        spot_fleet.publish2topic(e)

    if spot_fleet.alarm_name == 'scale_out':
        target_capacity = current_target_capacity + 1
        if target_capacity >= 5:
            return
    else:
        target_capacity = current_target_capacity - 1
        if target_capacity <= 2:
            return

    try:
        spot_fleet.modify_spot_fleet_target_capacity(target_capacity)
    except Exception as e:
        pp.pprint(e)
        spot_fleet.publish2topic(e)

    spot_fleet.publish2topic(
            'Modify target capacity {} to {}'.format(
                current_target_capacity,
                target_capacity
                )
            )

infrastructure/modules/spot_fleet/compute.tf

スポットフリートの設定をしているファイルです。

resource "aws_spot_fleet_request" "fleet" {
  iam_fleet_role                      = "${aws_iam_role.spot_fleet_role.arn}"
  spot_price                          = "${var.spot_prices["max"]}"
  allocation_strategy                 = "lowestPrice"
  terminate_instances_with_expiration = true
  excess_capacity_termination_policy  = "Default"
  target_capacity                     = 3
  valid_until                         = "2017-07-14T06:22:35Z"

  launch_specification {
    instance_type               = "${var.instance_types["m3_medium"]}"
    ami                         = "${data.aws_ami.amazon_linux.id}"
    key_name                    = "${aws_key_pair.site_key.key_name}"
    spot_price                  = "${var.spot_prices["m3_medium"]}"
    availability_zone           = "${data.aws_availability_zones.azs.names[0]}"
    subnet_id                   = "${aws_subnet.public.0.id}"
    vpc_security_group_ids      = ["${aws_security_group.sg.id}"]
    associate_public_ip_address = true

    root_block_device {
      volume_size = "8"
      volume_type = "gp2"
    }
  }

  launch_specification {
    instance_type               = "${var.instance_types["m3_large"]}"
    ami                         = "${data.aws_ami.amazon_linux.id}"
    key_name                    = "${aws_key_pair.site_key.key_name}"
    spot_price                  = "${var.spot_prices["m3_large"]}"
    availability_zone           = "${data.aws_availability_zones.azs.names[1]}"
    subnet_id                   = "${aws_subnet.public.1.id}"
    vpc_security_group_ids      = ["${aws_security_group.sg.id}"]
    associate_public_ip_address = true
    weighted_capacity           = 35

    root_block_device {
      volume_size = "8"
      volume_type = "gp2"
    }
  }
}

aws_spot_fleet_request で設定しているそれぞれの引数の意味は以下の通りです。

引数 意味
iam_fleet_role スポットフリートに紐付けるIAM Role。EC2インスタンスをterminateさせる権限などが必要。
spot_price 最大の入札価格。
allocation_strategy 起動させるインスタンスの配置方法。 lowestPrice の場合はスポット価格が一番低くなるようなインスタンスを起動させる。 diviersified はスポットリクエストが平均的に分散されるよう起動させる。
terminate_instances_with_expiration スポットリクエストの有効期限が切れた後もインスタンスを起動させ続けるか。
excess_capacity_termination_policy capacityが現在の値よりも減少した場合に、インスタンスをterminateさせるかどうか。デフォルトはterminateさせる。
target_capacity capacityの数。どれだけインスタンスを起動させるか。
valid_until リクエストの有効期限。
launch_specification SpotFleetLaunchSpecificationの設定。

infrastructure/modules/cloudwatch/cloudwatch.tf

CloudWatch Alarmの設定をしているファイルです。SQSの ApproximateNumberOfMessagesVisible を対象としています。アラームの設定はスケールアウト/スケールイン両方を作成しています。

resource "aws_cloudwatch_metric_alarm" "scale_out" {
  alarm_name          = "scale_out"
  comparison_operator = "GreaterThanOrEqualToThreshold"
  evaluation_periods  = 1
  metric_name         = "ApproximateNumberOfMessagesVisible"
  period              = 300
  namespace           = "AWS/SQS"
  statistic           = "Average"
  threshold           = 100
  alarm_actions       = ["${var.topic_arn}"]

  dimensions {
    QueueName = "${var.name}-queue"
  }
}

resource "aws_cloudwatch_metric_alarm" "scale_in" {
  alarm_name          = "scale_in"
  comparison_operator = "LessThanOrEqualToThreshold"
  evaluation_periods  = 1
  metric_name         = "ApproximateNumberOfMessagesVisible"
  period              = 300
  namespace           = "AWS/SQS"
  statistic           = "Average"
  threshold           = 20
  alarm_actions       = ["${var.topic_arn}"]

  dimensions {
    QueueName = "${var.name}-queue"
  }
}

コードの使い方

Apexの実行方法は以前のエントリに書きました。詳細はそちらを参照してください。

実行する前に project.jsoninfrastructure/dev/variables.tf の修正が必要です。また、Terraformのv0.7.0は現時点(2016年7月14日)ではまだリリースされていないため自分でコンパイルする必要があります。方法は以下のエントリを参照してください。

以下ではLambda関数のデプロイとTerraformの実行が完了した後の作業について記述します。

スポットフリートをスケールアウトさせる

スケールアウトはキューのメッセージ数を閾値としているので、awscliを利用してキューにメッセージをenqueueさせます。 <queue-url> は自分の環境に合うよう適宜変更してください。

$ while true; do
  count=$(( count + 1 ))
  aws sqs send-message \
    --queue-url <queue-url> \
    --message-body "message-$count"
  sleep 1
done

以下のコマンドで現在のおおよそのメッセージ数を確認できます。

$ aws sqs get-queue-attributes \
    --queue-url <queue-url> \
    --attribute-names ApproximateNumberOfMessages
{
    "Attributes": {
        "ApproximateNumberOfMessages": "39"
    }
}

しばらくすると、SNSでcapacityを増加させた旨のログが表示されると思います。

スポットフリートをスケールインさせる

今度は逆にキューからメッセージを削除しましょう。以下のコマンドを実行してください。

$ aws sqs purge-queue \
  --queue-url <queue-url>

メッセージ数を確認してみます。

$ aws sqs get-queue-attributes \
    --queue-url <queue-url> \
    --attribute-names ApproximateNumberOfMessages
{
    "Attributes": {
        "ApproximateNumberOfMessages": "0"
    }
}

しばらくすると、SNSでcapacityを減少させた旨のログが表示されると思います。

まとめ

いかがだったでしょうか。

スポットフリートは便利な半面、まだまだ機能が不足している部分があります。不足した機能をLambda関数で補うことによりASGのような仕組みを実装することが可能です。Lambda便利すぎる。

本エントリがみなさんの参考になれば幸いです。

参考リンク