DataPipelineを利用してEFSのバックアップ環境を作ってみた

AWS Data Pipeline

はじめに

こんにちは、中山です。

最近DataPipelineとEFSを触る機会がありました。EFSは非常にシンプルなサービスなので比較的理解しやすいですが、DataPipelineの方は少しとっつきにくい印象があります。各サービスを連携する用途で利用されることを想定している点、またビックデータ系の知識が必要な点が主な理由なのかなと考えています。確かにDataPipelineをより深く理解するにはそれらの知識を習得する必要があるのですが、「定期的にデータをバックアップする」といった初歩的な用途であればそこまで難しくはない印象です。今回はDataPipelineを理解するために、以下のドキュメントに記載されている内容を参考にしてバックアップ環境を作ってみたので、本エントリでご紹介したいと思います。

なお、環境構築にはTerraformとCloudFromationを使っています。もちろんマネジメントコンソールでも作れるのですが、「便利すぎる」のでちゃんと理解してなくても使えてしまう場合があるからです。自分の手を動かしてコードに落とし込むと、マネジメントコンソールではいい感じに隠蔽されている内容をより良く理解できる印象が私にはあります。また、試行錯誤している過程で簡単に環境を破棄/作成できるのも便利なポイントです。そのため、今回はマネジメントコンソールを使わずに環境をセットアップしてみます。

作成したTerraformとCloudFromationのコードはGitHubにアップロードしておきました。ご自由にお使いください。

EFSのバックアップについて

本題に入る前に、EFSのバックアップについて記載します。

EFSは高耐久性/高可用性/スケーラブルといった特徴のある低コストかつシンプルなファイルシステム単位のストレージサービスです。インターフェースが非常にシンプルなので、特に迷うことなく使い始めることができると思います。詳細はAWSが公開している以下の資料に詳しいです。

非常に高い信頼性があるサービスなので、そのそも「EFSのバックアップを取得する必要があるのか」という点は事前に考えておく必要があります。バックアップ環境を作るとなると、今回のようにある程度の作り込みが必要になるためです。また、基本的にコストも増加します。ただし、EFSの障害でデータが消える可能性は極めて低いですが、オペミスなどの利用者側に起因するデータの消失はどの組織にも起きてしまう可能性はあります。どのようなストレージサービスを利用するにせよ、万が一に備えて消失しては困るデータをバックアップしておくのは重要なプラクティスです。

バックアップ環境について

今回は以下のような環境を作ってみます。

aws

ワークフローは単純です。

  1. スケジュールに基づき定期的にリソース(EC2インスタンス)を起動
  2. アクティビティで定義したシェルコマンドをインスタンス上で実行
  3. シェルの中でrsyncを実行し、Production用EFSからBackup用EFSへデータ転送
  4. アクティビティの実行ログはS3に保存
  5. アクティビティの成否をアクションで定義したSNSトピックにパブリッシュ

DataPipelineにはビジュアライズにワークフローを表示してくれる機能があります。パイプラインを作成すると以下のような図になりました。

aws2

パイプラインオブジェクト

今回利用している各オブジェクトについて説明します。なお、DataPipelineそのものについては以下のエントリに詳しくまとまっているので、参照するとより理解が深まるかと思います。

まずCloudFromationでDataPipelineを定義する際のポイントを記載します。CloudFromationではAWS::DataPipeline::Pipelineリソースを利用することでDataPipelineを作成します。プロパティにいろいろな値を設定できるのですが、特に重要なのは以下の3つです。

  • PipelineObjects
    • パイプラインを構成する各オブジェクトを定義
  • ParameterObjects
    • 各オブジェクトに渡すパラメータのオブジェクトを定義
  • ParameterValues
    • パラメータに入れる実際の値を定義

基本的に PipelineObjects を記述して、そのオブジェクトでパラメータを利用する場合に "#{mySomeParam}" という書式で参照するパラメータを指定させます。 ParameterObjects では Id: mySomeParam という定義でパラメータの説明や、デフォルト値などを指定します。そのパラメータに実際に渡される値を ParameterValuesId: mySomeParam で指定します。つまり、パラメータを定義するには Id に同じ文字列を複数指定することになります。

Default

パイプラインでデフォルト値として利用される値を定義しています。ここで定義された設定が各オブジェクトで利用されます。

        - Id: Default
          Name: Default
          Fields:
            - Key: type
              StringValue: Default
            - Key: scheduleType
              StringValue: cron
            - Key: failureAndRerunMode
              StringValue: CASCADE
            - Key: schedule
              RefValue: DefaultSchedule
            - Key: role
              StringValue: "#{myDataPipelineRole}"
            - Key: resourceRole
              StringValue: "#{myDataPipelineResourceRole}"
            - Key: pipelineLogUri
              StringValue: "#{myS3LogBucket}"
Key Value 意味
type Default デフォルト値であることを指定。
scheduleType cron アクティビティをcron形式で実行する。
failureAndRerunMode CASCADE Cascading Failures and Rerunsを指定。
schedule DefaultSchedule Scheduleの設定を参照。
role "#{myDataPipelineRole}" パイプラインを管理するためのIAM Role名。
resourceRole "#{myDataPipelineResourceRole}" リソースオブジェクトを管理するためのIAM Role名。
pipelineLogUri "#{myS3LogBucket}" パイプラインのログ出力先バケット名。

各IAM RoleはTerraformで定義しています。

data "aws_iam_policy_document" "datapipeline_resource" {
  statement {
    sid     = "EC2AssumeRole"
    effect  = "Allow"
    actions = ["sts:AssumeRole"]

    principals = {
      type        = "Service"
      identifiers = ["ec2.amazonaws.com"]
    }
  }
}

resource "aws_iam_role" "datapipeline_resource" {
  name               = "${var.name}-datapipeline-resource-role"
  assume_role_policy = "${data.aws_iam_policy_document.datapipeline_resource.json}"
}

resource "aws_iam_policy_attachment" "datapipeline_resource" {
  name       = "DataPipelineDefaultResourceRole"
  roles      = ["${aws_iam_role.datapipeline_resource.name}"]
  policy_arn = "arn:aws:iam::aws:policy/service-role/AmazonEC2RoleforDataPipelineRole"
}

resource "aws_iam_instance_profile" "datapipeline_resource" {
  name  = "${var.name}-datapipeline-resource"
  roles = ["${aws_iam_role.datapipeline_resource.name}"]
}

data "aws_iam_policy_document" "datapipeline_role" {
  statement {
    sid     = "AssumeRole"
    effect  = "Allow"
    actions = ["sts:AssumeRole"]

    principals = {
      type = "Service"

      identifiers = [
        "elasticmapreduce.amazonaws.com",
        "datapipeline.amazonaws.com",
      ]
    }
  }
}

resource "aws_iam_role" "datapipeline_role" {
  name               = "${var.name}-datapipeline-role"
  assume_role_policy = "${data.aws_iam_policy_document.datapipeline_role.json}"
}

resource "aws_iam_policy_attachment" "datapipeline_role" {
  name       = "AWSDataPipelineRole"
  roles      = ["${aws_iam_role.datapipeline_role.name}"]
  policy_arn = "arn:aws:iam::aws:policy/service-role/AWSDataPipelineRole"
}

DataPipelineのように複数のAWSリソースを連携させるサービスの場合、IAMの設定が複雑になりがちです。幸い、AWSは一般的に利用されるであろうパーミッションを含んだDataPipeline用ポリシーを用意してくれています。今回は下記2つのポリシーを利用しました。

  • arn:aws:iam::aws:policy/service-role/AWSDataPipelineRole
    • パイプライン管理用ポリシー
  • arn:aws:iam::aws:policy/service-role/AmazonEC2RoleforDataPipelineRole
    • リソースで起動したEC2インスタンスにアタッチするポリシー

インラインでポリシーを定義するのはそれなりの知識が必要なので、慣れないうちはマネージドポリシーを利用するとよいと思います。

Schedule

アクティビティの実行時間を定義しています。詳細なドキュメントはこちらです。

        - Id: DefaultSchedule
          Name: DefaultSchedule
          Fields:
            - Key: type
              StringValue: Schedule
            - Key: startAt
              StringValue: FIRST_ACTIVATION_DATE_TIME
            - Key: period
              StringValue: "#{myPeriod}"
Key Value 意味
type Schedule Scheduleタイプであることを指定。
startAt FIRST_ACTIVATION_DATE_TIME パイプラインがアクティブになった時点で処理を開始。
period "#{myPeriod}" パイプラインを実行させる頻度。デフォルトで1時間毎にしています。

Ec2Resource

アクティビティで定義した処理を実行させるリソースを指定しています。詳細なドキュメントはこちらです。今回はシンプルな設定にしているので、インスタンスを起動させる時に指定する内容とほぼ同じです。

        - Id: EC2ResourceObj
          Name: EC2ResourceObj
          Fields:
            - Key: type
              StringValue: Ec2Resource
            - Key: terminateAfter
              StringValue: "30 Minutes"
            - Key: instanceType
              StringValue: "#{myInstanceType}"
            - Key: securityGroupIds
              StringValue: "#{mySecurityGroupId}"
            - Key: subnetId
              StringValue: "#{mySubnetId}"
            - Key: associatePublicIpAddress
              StringValue: "true"
            - Key: imageId
              StringValue: "#{myImageId}"
            - Key: keyPair
              StringValue: "#{myKeyPair}"
Key Value 意味
type Ec2Resource リソースタイプをEC2インスタンスに指定。
terminateAfter "30 Minutes" インスタンスを30分後にTerminateさせる。
instanceType "#{myInstanceType}" 利用するインスタンスタイプ。
securityGroupIds "#{mySecurityGroupId}" インスタンスにアタッチするセキュリティグループのID。
subnetId "#{mySubnetId}" インスタンスを起動させるサブネットのID。
associatePublicIpAddress "true" インスタンスにパブリックIPを付与する。
imageId "#{myImageId}" インスタンスの起動元AMI ID。
keyPair "#{myKeyPair}" インスタンスにアタッチするキーペア名。

ShellCommandActivity

リソースで実行させるアクティビティを定義しています。詳細なドキュメントはこちらです。バックアップの処理部分はAWSが公開しているこちらのスクリプトを参考にしました。

        - Id: ShellCommandActivityObj
          Name: ShellCommandActivityObj
          Fields:
            - Key: type
              StringValue: ShellCommandActivity
            - Key: runsOn
              RefValue: EC2ResourceObj
            - Key: command
              StringValue: |
                source="$1"
                destination="$2"
                efs_id="$3"
                timezone="$4"
                backup_dir="/mnt/backups/$efs_id/$(TZ=$timezone date +%Y-%m-%d-%H-%M)"
                sudo yum -y install nfs-utils
                [[ -d /backup ]] || sudo mkdir /backup
                [[ -d /mnt/backups ]] || sudo mkdir /mnt/backups
                if ! mount -l -t nfs4 | grep -qF $source; then
                  sudo mount -t nfs -o nfsvers=4.1 -o rsize=1048576 -o wsize=1048576 -o timeo=600 -o retrans=2 -o hard "$source" /backup
                fi
                if ! mount -l -t nfs4 | grep -qF $destination; then
                  sudo mount -t nfs -o nfsvers=4.1 -o rsize=1048576 -o wsize=1048576 -o timeo=600 -o retrans=2 -o hard "$destination" /mnt/backups
                fi
                sudo mkdir -p "$backup_dir"
                sudo rsync -ah --stats --delete --numeric-ids --log-file=/tmp/efs-restore.log /backup/ "$backup_dir"
                rsync_status="$?"
                sudo cp /tmp/efs-restore.log "$backup_dir"
                exit "$rsync_status"
            - Key: scriptArgument
              StringValue: "#{myEFSSource}"
            - Key: scriptArgument
              StringValue: "#{myEFSBackup}"
            - Key: scriptArgument
              StringValue: "#{myEFSId}"
            - Key: scriptArgument
              StringValue: "#{myTimeZone}"
            - Key: onSuccess
              RefValue: SuccessNotify
            - Key: onFail
              RefValue: FailureNotify
Key Value 意味
type ShellCommandActivity アクティビティをシェルコマンドに指定。
runsOn EC2ResourceObj アクティビティを実行させるリソースを参照。
command 上記参照 今回はシンプルにフルバックアップさせる処理にしています。
scriptArgument "#{myEFSSource}" / "#{myEFSBackup}" / "#{myEFSId}" / "#{myTimeZone}" command に渡す引数を参照。
onSuccess SuccessNotify 処理が成功した場合に実施するアクションを参照。
onFail FailureNotify 処理が失敗した場合に実施するアクションを参照。

SnsAlarm

アクティビティの実行結果をSNSトピックに通知させます。詳細なドキュメントはこちらです。今回は成功/失敗時に通知させているので、2つのアクションを定義しています。

  • アクティビティの成功時
        - Id: SuccessNotify
          Name: SuccessNotify
          Fields:
            - Key: type
              StringValue: SnsAlarm
            - Key: topicArn
              StringValue: "#{myTopicArn}"
            - Key: subject
              StringValue: "[Info] EFS Backup Succeeded"
            - Key: message
              StringValue: |
                scheduledStartTime: "#{node.@scheduledStartTime}"
                actualStartTime: "#{node.@actualStartTime}"
                actualEndTime: "#{node.@actualEndTime}"
                hostname: "#{node.hostname}"
  • アクティビティの失敗時
        - Id: FailureNotify
          Name: FailureNotify
          Fields:
            - Key: type
              StringValue: SnsAlarm
            - Key: topicArn
              StringValue: "#{myTopicArn}"
            - Key: subject
              StringValue: "[Alart] EFS Backup Failed"
            - Key: message
              StringValue: |
                scheduledStartTime: "#{node.@scheduledStartTime}"
                actualStartTime: "#{node.@actualStartTime}"
                actualEndTime: "#{node.@actualEndTime}"
                hostname: "#{node.hostname}"
Key Value 意味
type SnsAlarm アクションをSNSに指定。
topicArn "#{myTopicArn}" SNSトピックのARNを指定。
subject 上記参照 トピックのサブジェクト。
message 上記参照 トピックのメッセージ。

message"#{node.<filed名>}" という書式でこのアクションを参照しているアクティビティやノードの情報を取得できます。取得可能な値は各ドキュメントにまとまっています。

まとめ

いかがだったでしょうか。

DataPipelineを利用したEFSのバックアップ環境について、TerraformやCloudFormationと絡めてご紹介しました。

最後まで読んでいただいた方の中には、「わざわざDataPipelineを使わなくても同じ環境は作れるんじゃないか」と思った方も多いと思います。正直この程度であれば、例えばスケジュール実行させたオートスケーリンググループでも同じようなことは可能だと思います。ただし、その場合、失敗時のリトライ/SNSへの通知/実行結果の保存などの「本来やりたいことにまつわる雑用」部分を自分で実装する必要が出てきます。

AWSを使う上では基本的にフルマネージドサービスを活用しつつ、そこから漏れる部分を自分たちで作り込むという姿勢が正しいあり方です。DataPipelineに当てはめると、「雑用」は事前に用意されている各機能に任せつつ、バックアップ取得処理部分の開発のみに集中できます。

今回DataPipelineを学んでいく中で、現在実施しているバッチ処理などをDataPipelineに載せることができれば、多くのメリットがあるなと思いました。みなさんも、一度触ってみるとよいのではないでしょうか。

本エントリがみなさんの参考になれば幸いに思います。