この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。
はじめに
こんにちは、中山です。
最近DataPipelineとEFSを触る機会がありました。EFSは非常にシンプルなサービスなので比較的理解しやすいですが、DataPipelineの方は少しとっつきにくい印象があります。各サービスを連携する用途で利用されることを想定している点、またビックデータ系の知識が必要な点が主な理由なのかなと考えています。確かにDataPipelineをより深く理解するにはそれらの知識を習得する必要があるのですが、「定期的にデータをバックアップする」といった初歩的な用途であればそこまで難しくはない印象です。今回はDataPipelineを理解するために、以下のドキュメントに記載されている内容を参考にしてバックアップ環境を作ってみたので、本エントリでご紹介したいと思います。
なお、環境構築にはTerraformとCloudFromationを使っています。もちろんマネジメントコンソールでも作れるのですが、「便利すぎる」のでちゃんと理解してなくても使えてしまう場合があるからです。自分の手を動かしてコードに落とし込むと、マネジメントコンソールではいい感じに隠蔽されている内容をより良く理解できる印象が私にはあります。また、試行錯誤している過程で簡単に環境を破棄/作成できるのも便利なポイントです。そのため、今回はマネジメントコンソールを使わずに環境をセットアップしてみます。
作成したTerraformとCloudFromationのコードはGitHubにアップロードしておきました。ご自由にお使いください。
EFSのバックアップについて
本題に入る前に、EFSのバックアップについて記載します。
EFSは高耐久性/高可用性/スケーラブルといった特徴のある低コストかつシンプルなファイルシステム単位のストレージサービスです。インターフェースが非常にシンプルなので、特に迷うことなく使い始めることができると思います。詳細はAWSが公開している以下の資料に詳しいです。
非常に高い信頼性があるサービスなので、そのそも「EFSのバックアップを取得する必要があるのか」という点は事前に考えておく必要があります。バックアップ環境を作るとなると、今回のようにある程度の作り込みが必要になるためです。また、基本的にコストも増加します。ただし、EFSの障害でデータが消える可能性は極めて低いですが、オペミスなどの利用者側に起因するデータの消失はどの組織にも起きてしまう可能性はあります。どのようなストレージサービスを利用するにせよ、万が一に備えて消失しては困るデータをバックアップしておくのは重要なプラクティスです。
バックアップ環境について
今回は以下のような環境を作ってみます。
ワークフローは単純です。
- スケジュールに基づき定期的にリソース(EC2インスタンス)を起動
- アクティビティで定義したシェルコマンドをインスタンス上で実行
- シェルの中でrsyncを実行し、Production用EFSからBackup用EFSへデータ転送
- アクティビティの実行ログはS3に保存
- アクティビティの成否をアクションで定義したSNSトピックにパブリッシュ
DataPipelineにはビジュアライズにワークフローを表示してくれる機能があります。パイプラインを作成すると以下のような図になりました。
パイプラインオブジェクト
今回利用している各オブジェクトについて説明します。なお、DataPipelineそのものについては以下のエントリに詳しくまとまっているので、参照するとより理解が深まるかと思います。
まずCloudFromationでDataPipelineを定義する際のポイントを記載します。CloudFromationではAWS::DataPipeline::Pipelineリソースを利用することでDataPipelineを作成します。プロパティにいろいろな値を設定できるのですが、特に重要なのは以下の3つです。
- PipelineObjects
- パイプラインを構成する各オブジェクトを定義
- ParameterObjects
- 各オブジェクトに渡すパラメータのオブジェクトを定義
- ParameterValues
- パラメータに入れる実際の値を定義
基本的に PipelineObjects
を記述して、そのオブジェクトでパラメータを利用する場合に "#{mySomeParam}"
という書式で参照するパラメータを指定させます。 ParameterObjects
では Id: mySomeParam
という定義でパラメータの説明や、デフォルト値などを指定します。そのパラメータに実際に渡される値を ParameterValues
に Id: mySomeParam
で指定します。つまり、パラメータを定義するには Id
に同じ文字列を複数指定することになります。
Default
パイプラインでデフォルト値として利用される値を定義しています。ここで定義された設定が各オブジェクトで利用されます。
- Id: Default
Name: Default
Fields:
- Key: type
StringValue: Default
- Key: scheduleType
StringValue: cron
- Key: failureAndRerunMode
StringValue: CASCADE
- Key: schedule
RefValue: DefaultSchedule
- Key: role
StringValue: "#{myDataPipelineRole}"
- Key: resourceRole
StringValue: "#{myDataPipelineResourceRole}"
- Key: pipelineLogUri
StringValue: "#{myS3LogBucket}"
Key | Value | 意味 |
---|---|---|
type |
Default |
デフォルト値であることを指定。 |
scheduleType |
cron |
アクティビティをcron形式で実行する。 |
failureAndRerunMode |
CASCADE |
Cascading Failures and Rerunsを指定。 |
schedule |
DefaultSchedule |
Scheduleの設定を参照。 |
role |
"#{myDataPipelineRole}" |
パイプラインを管理するためのIAM Role名。 |
resourceRole |
"#{myDataPipelineResourceRole}" |
リソースオブジェクトを管理するためのIAM Role名。 |
pipelineLogUri |
"#{myS3LogBucket}" |
パイプラインのログ出力先バケット名。 |
各IAM RoleはTerraformで定義しています。
data "aws_iam_policy_document" "datapipeline_resource" {
statement {
sid = "EC2AssumeRole"
effect = "Allow"
actions = ["sts:AssumeRole"]
principals = {
type = "Service"
identifiers = ["ec2.amazonaws.com"]
}
}
}
resource "aws_iam_role" "datapipeline_resource" {
name = "${var.name}-datapipeline-resource-role"
assume_role_policy = "${data.aws_iam_policy_document.datapipeline_resource.json}"
}
resource "aws_iam_policy_attachment" "datapipeline_resource" {
name = "DataPipelineDefaultResourceRole"
roles = ["${aws_iam_role.datapipeline_resource.name}"]
policy_arn = "arn:aws:iam::aws:policy/service-role/AmazonEC2RoleforDataPipelineRole"
}
resource "aws_iam_instance_profile" "datapipeline_resource" {
name = "${var.name}-datapipeline-resource"
roles = ["${aws_iam_role.datapipeline_resource.name}"]
}
data "aws_iam_policy_document" "datapipeline_role" {
statement {
sid = "AssumeRole"
effect = "Allow"
actions = ["sts:AssumeRole"]
principals = {
type = "Service"
identifiers = [
"elasticmapreduce.amazonaws.com",
"datapipeline.amazonaws.com",
]
}
}
}
resource "aws_iam_role" "datapipeline_role" {
name = "${var.name}-datapipeline-role"
assume_role_policy = "${data.aws_iam_policy_document.datapipeline_role.json}"
}
resource "aws_iam_policy_attachment" "datapipeline_role" {
name = "AWSDataPipelineRole"
roles = ["${aws_iam_role.datapipeline_role.name}"]
policy_arn = "arn:aws:iam::aws:policy/service-role/AWSDataPipelineRole"
}
DataPipelineのように複数のAWSリソースを連携させるサービスの場合、IAMの設定が複雑になりがちです。幸い、AWSは一般的に利用されるであろうパーミッションを含んだDataPipeline用ポリシーを用意してくれています。今回は下記2つのポリシーを利用しました。
arn:aws:iam::aws:policy/service-role/AWSDataPipelineRole
- パイプライン管理用ポリシー
arn:aws:iam::aws:policy/service-role/AmazonEC2RoleforDataPipelineRole
- リソースで起動したEC2インスタンスにアタッチするポリシー
インラインでポリシーを定義するのはそれなりの知識が必要なので、慣れないうちはマネージドポリシーを利用するとよいと思います。
Schedule
アクティビティの実行時間を定義しています。詳細なドキュメントはこちらです。
- Id: DefaultSchedule
Name: DefaultSchedule
Fields:
- Key: type
StringValue: Schedule
- Key: startAt
StringValue: FIRST_ACTIVATION_DATE_TIME
- Key: period
StringValue: "#{myPeriod}"
Key | Value | 意味 |
---|---|---|
type |
Schedule |
Scheduleタイプであることを指定。 |
startAt |
FIRST_ACTIVATION_DATE_TIME |
パイプラインがアクティブになった時点で処理を開始。 |
period |
"#{myPeriod}" |
パイプラインを実行させる頻度。デフォルトで1時間毎にしています。 |
Ec2Resource
アクティビティで定義した処理を実行させるリソースを指定しています。詳細なドキュメントはこちらです。今回はシンプルな設定にしているので、インスタンスを起動させる時に指定する内容とほぼ同じです。
- Id: EC2ResourceObj
Name: EC2ResourceObj
Fields:
- Key: type
StringValue: Ec2Resource
- Key: terminateAfter
StringValue: "30 Minutes"
- Key: instanceType
StringValue: "#{myInstanceType}"
- Key: securityGroupIds
StringValue: "#{mySecurityGroupId}"
- Key: subnetId
StringValue: "#{mySubnetId}"
- Key: associatePublicIpAddress
StringValue: "true"
- Key: imageId
StringValue: "#{myImageId}"
- Key: keyPair
StringValue: "#{myKeyPair}"
Key | Value | 意味 |
---|---|---|
type |
Ec2Resource |
リソースタイプをEC2インスタンスに指定。 |
terminateAfter |
"30 Minutes" |
インスタンスを30分後にTerminateさせる。 |
instanceType |
"#{myInstanceType}" |
利用するインスタンスタイプ。 |
securityGroupIds |
"#{mySecurityGroupId}" |
インスタンスにアタッチするセキュリティグループのID。 |
subnetId |
"#{mySubnetId}" |
インスタンスを起動させるサブネットのID。 |
associatePublicIpAddress |
"true" |
インスタンスにパブリックIPを付与する。 |
imageId |
"#{myImageId}" |
インスタンスの起動元AMI ID。 |
keyPair |
"#{myKeyPair}" |
インスタンスにアタッチするキーペア名。 |
ShellCommandActivity
リソースで実行させるアクティビティを定義しています。詳細なドキュメントはこちらです。バックアップの処理部分はAWSが公開しているこちらのスクリプトを参考にしました。
- Id: ShellCommandActivityObj
Name: ShellCommandActivityObj
Fields:
- Key: type
StringValue: ShellCommandActivity
- Key: runsOn
RefValue: EC2ResourceObj
- Key: command
StringValue: |
source="$1"
destination="$2"
efs_id="$3"
timezone="$4"
backup_dir="/mnt/backups/$efs_id/$(TZ=$timezone date +%Y-%m-%d-%H-%M)"
sudo yum -y install nfs-utils
[[ -d /backup ]] || sudo mkdir /backup
[[ -d /mnt/backups ]] || sudo mkdir /mnt/backups
if ! mount -l -t nfs4 | grep -qF $source; then
sudo mount -t nfs -o nfsvers=4.1 -o rsize=1048576 -o wsize=1048576 -o timeo=600 -o retrans=2 -o hard "$source" /backup
fi
if ! mount -l -t nfs4 | grep -qF $destination; then
sudo mount -t nfs -o nfsvers=4.1 -o rsize=1048576 -o wsize=1048576 -o timeo=600 -o retrans=2 -o hard "$destination" /mnt/backups
fi
sudo mkdir -p "$backup_dir"
sudo rsync -ah --stats --delete --numeric-ids --log-file=/tmp/efs-restore.log /backup/ "$backup_dir"
rsync_status="$?"
sudo cp /tmp/efs-restore.log "$backup_dir"
exit "$rsync_status"
- Key: scriptArgument
StringValue: "#{myEFSSource}"
- Key: scriptArgument
StringValue: "#{myEFSBackup}"
- Key: scriptArgument
StringValue: "#{myEFSId}"
- Key: scriptArgument
StringValue: "#{myTimeZone}"
- Key: onSuccess
RefValue: SuccessNotify
- Key: onFail
RefValue: FailureNotify
Key | Value | 意味 |
---|---|---|
type |
ShellCommandActivity |
アクティビティをシェルコマンドに指定。 |
runsOn |
EC2ResourceObj |
アクティビティを実行させるリソースを参照。 |
command |
上記参照 | 今回はシンプルにフルバックアップさせる処理にしています。 |
scriptArgument |
"#{myEFSSource}" / "#{myEFSBackup}" / "#{myEFSId}" / "#{myTimeZone}" |
command に渡す引数を参照。 |
onSuccess |
SuccessNotify |
処理が成功した場合に実施するアクションを参照。 |
onFail |
FailureNotify |
処理が失敗した場合に実施するアクションを参照。 |
SnsAlarm
アクティビティの実行結果をSNSトピックに通知させます。詳細なドキュメントはこちらです。今回は成功/失敗時に通知させているので、2つのアクションを定義しています。
- アクティビティの成功時
- Id: SuccessNotify
Name: SuccessNotify
Fields:
- Key: type
StringValue: SnsAlarm
- Key: topicArn
StringValue: "#{myTopicArn}"
- Key: subject
StringValue: "[Info] EFS Backup Succeeded"
- Key: message
StringValue: |
scheduledStartTime: "#{node.@scheduledStartTime}"
actualStartTime: "#{node.@actualStartTime}"
actualEndTime: "#{node.@actualEndTime}"
hostname: "#{node.hostname}"
- アクティビティの失敗時
- Id: FailureNotify
Name: FailureNotify
Fields:
- Key: type
StringValue: SnsAlarm
- Key: topicArn
StringValue: "#{myTopicArn}"
- Key: subject
StringValue: "[Alart] EFS Backup Failed"
- Key: message
StringValue: |
scheduledStartTime: "#{node.@scheduledStartTime}"
actualStartTime: "#{node.@actualStartTime}"
actualEndTime: "#{node.@actualEndTime}"
hostname: "#{node.hostname}"
Key | Value | 意味 |
---|---|---|
type |
SnsAlarm |
アクションをSNSに指定。 |
topicArn |
"#{myTopicArn}" |
SNSトピックのARNを指定。 |
subject |
上記参照 | トピックのサブジェクト。 |
message |
上記参照 | トピックのメッセージ。 |
message
に "#{node.<filed名>}"
という書式でこのアクションを参照しているアクティビティやノードの情報を取得できます。取得可能な値は各ドキュメントにまとまっています。
まとめ
いかがだったでしょうか。
DataPipelineを利用したEFSのバックアップ環境について、TerraformやCloudFormationと絡めてご紹介しました。
最後まで読んでいただいた方の中には、「わざわざDataPipelineを使わなくても同じ環境は作れるんじゃないか」と思った方も多いと思います。正直この程度であれば、例えばスケジュール実行させたオートスケーリンググループでも同じようなことは可能だと思います。ただし、その場合、失敗時のリトライ/SNSへの通知/実行結果の保存などの「本来やりたいことにまつわる雑用」部分を自分で実装する必要が出てきます。
AWSを使う上では基本的にフルマネージドサービスを活用しつつ、そこから漏れる部分を自分たちで作り込むという姿勢が正しいあり方です。DataPipelineに当てはめると、「雑用」は事前に用意されている各機能に任せつつ、バックアップ取得処理部分の開発のみに集中できます。
今回DataPipelineを学んでいく中で、現在実施しているバッチ処理などをDataPipelineに載せることができれば、多くのメリットがあるなと思いました。みなさんも、一度触ってみるとよいのではないでしょうか。
本エントリがみなさんの参考になれば幸いに思います。