Embulk on DockerでCloudFrontログをParquet形式に変換する

ども、大瀧です。
CloudFrontのログをAthenaで効率良く見たいと思い、Parquet形式に変換する手段としてOSSのバルクローダ EmbulkをEC2上のDockerで実行してみました。その様子をレポートします。

Apache Drillで変換する様子は石川のエントリーを参照ください。

Dockerイメージの設計、作成

Dockerイメージの作成はそれぞれのDockerfileをGitHubリポジトリにアップロードし、Docker Hub Automated Buildを利用しました。 作成されたDockerイメージは公開されているので、試したい方は記事後半の動作確認の手順に従い、イメージをダウンロードしてご利用ください。

Embulk実行について、以下のポリシーで設計しました。コンテナ内部には極力データを置かない方針です。

  • 元データ、変換データともにAmazon S3とし、Dockerコンテナ内やコンテナを実行するホストにはデータを持たない
  • Amazon S3のバケット名など構成情報はイメージに含めず、環境変数もしくはIAMロールなどで外出しする
  • AWSのAPIキーなど認証情報はイメージに含めず、環境変数もしくはIAMロールなどで外出しする

また、Dockerイメージの作成についても、以下の方針としました。

  • takipone/embulk(Docker Hub): Embulkのバイナリを持つDockerイメージ
  • takipone/embulk-cloudfrontlogs-parquet(Docker Hub): FROMにバイナリを持つイメージを指定し、Embulkのコンフィグファイルを持つDockerイメージ

Embulkのコンフィグファイル(config.yml/config.yml.liquid)をイメージに含むかは悩んだのですが、今回はイメージに含め指定した部分のみコンテナの実行時に環境変数で指定する形にしました。この点において、EmbulkがLiquidテンプレートをサポートしているのが非常に役立ちました。これが無いと、エントリポイントのシェルスクリプト内でsedを連打する羽目になるので(笑)。コンテナ実行時に-vオプションなどでホストのコンフィグファイルを読み込むのもアリだと思います。

というわけで、それぞれのイメージを解説してみます。

takipone/embulk

こちらはEmbulkの実行ファイルを準備するシンプルなイメージです。

FROM openjdk:8-alpine

ENV EMBULK_VERSION 0.8.17

RUN apk --update add --virtual build-dependencies \
    curl \
    && mkdir /embulk \
    && curl -o  /embulk/embulk -L "https://dl.bintray.com/embulk/maven/embulk-$EMBULK_VERSION.jar" \
    && chmod +x /embulk/embulk \
    && apk del build-dependencies
# Install libc6-compat for Embulk Plugins to use JNI
# cf : https://github.com/jruby/jruby/wiki/JRuby-on-Alpine-Linux
RUN apk --update add libc6-compat

COPY docker-entrypoint.sh /entrypoint.sh
ENTRYPOINT ["/entrypoint.sh"]

Embulk公式の配布元(Bintray)からjarファイルをダウンロードしていますね。libc6-compatパッケージは大半のプラグインで必要になるかなと思い、共通コンポーネントとしてこちらのイメージでインストールしました。

#!/bin/sh

# Install Embulk Plug ins
if [ -n "$PLUGINS" ] ; then
  /embulk/embulk gem install $PLUGINS
fi

/embulk/embulk "$@"

こちらを参考に、コンテナ実行時の引数がエントリポイントのスクリプトに反映されるように工夫しました。こんな感じで実行できます(今回はembulk helpの体でヘルプを表示)。

$ docker run --rm -ti takipone/embulk help
2017-03-02 05:37:39.532 +0000: Embulk v0.8.17
Embulk v0.8.17
Usage: embulk [-vm-options] <command> [--options]
Commands:
   mkbundle   <directory>                             # create a new plugin bundle environment.
   bundle     [directory]                             # update a plugin bundle environment.
   run        <config.yml>                            # run a bulk load transaction.
   cleanup    <config.yml>                            # cleanup resume state.
   preview    <config.yml>                            # dry-run the bulk load without output and show preview.
   guess      <partial-config.yml> -o <output.yml>    # guess missing parameters to create a complete configuration file.
   gem        <install | list | help>                 # install a plugin or show installed plugins.
                                                      # plugin path is /root/.embulk/jruby/2.3.0
   new        <category> <name>                       # generates new plugin template
   migrate    <path>                                  # modify plugin code to use the latest Embulk plugin API
   example    [path]                                  # creates an example config file and csv file to try embulk.
   selfupdate [version]                               # upgrades embulk to the latest released version or to the specified version.

VM options:
   -J-O                             Disable JVM optimizations to speed up startup time (enabled by default if command is 'run')
   -J+O                             Enable JVM optimizations to speed up throughput
   -J...                            Set JVM options (use -J-help to see available options)
   -R...                            Set JRuby options (use -R--help to see available options)

error: Unknown subcommand "help".
$

とりあえず動くことが確認できました。

takipone/embulk-cloudfrontlogs-parquet

こちらのイメージでは、実際の用途に合わせてEmbulkのプラグインと構成ファイルを作り込んでいきます。まずはDockerfileから。

FROM takipone/embulk:latest

RUN /embulk/embulk gem install embulk-input-s3 embulk-output-parquet

COPY ./config.yml.liquid /config.yml.liquid

CMD ["run", "/config.yml.liquid"]

Embulkプラグインのインストールと構成ファイルのコピーをしています。CMDでは、構成ファイルを読んでEmbulkを実行する形にしました。構成ファイルは以下になります。

in:
  type: s3
  bucket: {{ env.IN_S3_BUCKET }}
  path_prefix: {{ env.IN_S3_PREFIX }}
  auth_method: instance
  decoders:
  - {type: gzip}
  parser:
    charset: UTF-8
    newline: CRLF
    type: csv
    delimiter: "\t"
    quote: ''
    escape: ''
    skip_header_lines: 2
    columns:
    - {name: date, type: string}
    - {name: time, type: string}
    - {name: edge_location, type: string}
    - {name: sc_bytes, type: long}
    - {name: c_ip, type: string}
    - {name: cs_method, type: string}
    - {name: cs_host, type: string}
    - {name: cs_uri_stem, type: string}
    - {name: sc_status, type: string}
    - {name: cs_referer, type: string}
    - {name: cs_user_agent, type: string}
    - {name: cs_uri_query, type: string}
    - {name: cs_cookie, type: string}
    - {name: edge_result_type, type: string}
    - {name: edge_result_id, type: string}
    - {name: host_header, type: string}
    - {name: cs_protocol, type: string}
    - {name: cs_bytes, type: long}
    - {name: time_taken, type: double}
    - {name: x_forwarded_for, type: string}
    - {name: ssl_protocol, type: string}
    - {name: ssl_cipher, type: string}
    - {name: x_edge_response_result_type, type: string}
    - {name: cs_protocol_version, type: string}
out:
  type: parquet
  path_prefix: s3a://{{ env.OUT_S3_BUCKET }}/{{ env.OUT_S3_PREFIX }}

S3にあるCloudFrontログを読んで、同じくS3にParquet形式で書き出す部分を決め打ちにし、それぞれS3のバケット、プレフィックスを以下の環境変数から読む感じです。

  • IN_S3_BUCKET: CloudFrontログのあるS3バケット名
  • IN_S3_PREFIX: 対象ログのプレフィックス。ディレクトリ名の他、ファイルの接頭辞でもOK
  • OUT_S3_BUCKET: Parquet形式のファイルを出力するS3バケット名
  • OUT_S3_PREFIX: 出力するParquet形式のファイルの接頭辞

なお、Amazon S3への認証はEC2インスタンスに付与するIAMロールを前提にしました。入力はconfig.yml.liquidの5行目で指定、出力はembulk-output-parquetプラグインの既定でIAMロールが対応しています。

Dockerイメージの作成は、Docker HubのAutomated Buildを利用しました。イメージは公開されているので、試したい方は以下の動作確認に従い、イメージをダウンロードしてご利用ください。手元でやってみたい方はGitHubリポジトリをgit cloneで持って来て、自前で以下のようにビルドを実行しても良いでしょう。

$ docker build -t myembulk-output-parquet .

動作確認

では、作成したDockerイメージでEmbulkの実行を試してみます。今回は以下のようなCloudFrontログを用意しました。

  • S3バケット名: ipv6deployedtoyouredgelocation.com-log
  • プレフィックス: raw/
$ aws s3 ls s3://ipv6deployedtoyouredgelocation.com-log/raw/ | head
2016-12-01 17:03:42          0
2016-12-01 18:39:44       1263 E32N7SRESB34ZS.2016-10-22-13.b6393ff5.gz
2016-12-01 18:39:44       1239 E32N7SRESB34ZS.2016-10-22-14.60b415a7.gz
2016-12-01 18:39:45       1092 E32N7SRESB34ZS.2016-10-22-14.e4a8b4db.gz
2016-12-01 18:39:44       1667 E32N7SRESB34ZS.2016-10-22-20.dbe00b1c.gz
2016-12-01 18:39:44       2415 E32N7SRESB34ZS.2016-10-22-23.082bb0d8.gz
2016-12-01 18:39:44       1134 E32N7SRESB34ZS.2016-10-23-02.40208676.gz
2016-12-01 18:39:44       3138 E32N7SRESB34ZS.2016-10-23-02.4c3fca73.gz
2016-12-01 18:39:44       2824 E32N7SRESB34ZS.2016-10-23-05.d5a4c495.gz
2016-12-01 18:39:44       3916 E32N7SRESB34ZS.2016-10-23-07.4ea726b5.gz
$

変換結果は同じS3バケットのparquet/ディレクトリ以下にtestの接頭辞を付けて出力してみます。以下のようなコマンドラインをDockerインストール済みのEC2で実行しました(Dockerイメージは初回実行時に自動でダウンロードされます)。

$ sudo docker run --rm -ti \
  -e IN_S3_BUCKET=ipv6deployedtoyouredgelocation.com-log \
  -e IN_S3_PREFIX=raw \
  -e OUT_S3_BUCKET=ipv6deployedtoyouredgelocation.com-log \
  -e OUT_S3_PREFIX=parquet/test \
  takipone/embulk-cloudfrontlogs-parquet:latest
2017-03-02 08:22:44.039 +0000: Embulk v0.8.17
2017-03-02 08:22:47.973 +0000 [INFO] (0001:transaction): Loaded plugin embulk-input-s3 (0.2.8)
2017-03-02 08:22:48.166 +0000 [INFO] (0001:transaction): Loaded plugin embulk-output-parquet (0.4.0)
2017-03-02 08:22:50.443 +0000 [WARN] (0001:transaction): Setting '' (empty string) to "quote" option is obsoleted. Currently it becomes '"' automatically but this behavior will be removed. Please set '"' explicitly.
2017-03-02 08:22:50.448 +0000 [WARN] (0001:transaction): Setting '' (empty string) to "escape" option is obsoleted. Currently it becomes null automatically but this behavior will be removed. Please set "escape: null" explicitly.
2017-03-02 08:22:50.573 +0000 [INFO] (0001:transaction): Using local thread executor with max_threads=4 / tasks=2023
2017-03-02 08:22:50.615 +0000 [INFO] (0001:transaction): {done:  0 / 2023, running: 0}
  : (略)
$

S3バケットのparquet/ディレクトリ以下を見てみると。。。

$ aws s3 ls s3://ipv6deployedtoyouredgelocation.com-log/parquet/ | head
2017-03-02 00:16:32       6542 test.000.parquet
2017-03-02 00:16:32       6360 test.001.parquet
2017-03-02 00:16:32       6405 test.002.parquet
2017-03-02 00:16:32       7005 test.003.parquet
2017-03-02 00:16:33       8586 test.004.parquet
2017-03-02 00:16:33       6431 test.005.parquet
2017-03-02 00:16:33       9786 test.006.parquet
2017-03-02 00:16:33       8769 test.007.parquet
2017-03-02 00:16:34       9926 test.008.parquet
2017-03-02 00:16:34       8734 test.009.parquet
$

変換されたファイルが出来ていますね!Athenaで以下のDDLを流して、検索することができました。

CREATE EXTERNAL TABLE IF NOT EXISTS cloudfront (
  date STRING,
  time STRING,
  edge_location STRING,
  sc_bytes INT,
  c_ip STRING,
  cs_method STRING,
  cs_host STRING,
  cs_uri_stem STRING,
  sc_status STRING,
  cs_referer STRING,
  cs_user_agent STRING,
  cs_uri_query STRING,
  cs_cookie_type STRING,
  edge_result_type STRING,
  edge_result_id STRING,
  host_header STRING,
  cs_protocol STRING,
  cs_bytes INT,
  time_taken DOUBLE,
  x_forwarded_for STRING,
  ssl_protocol STRING,
  ssl_cipher STRING,
  x_edge_response_result_type STRING,
  cs_protocol_version STRING
  )
ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
WITH SERDEPROPERTIES (
  'serialization.format' = '1'
) LOCATION 's3://ipv6deployedtoyouredgelocation.com-log/parquet/'

目的が達成されました!

まとめ

EmbulkをDockerで実行し、CloudFrontのログをParquet形式に変換する様子をご紹介しました。 Embulkには豊富なプラグインがあるので、ログファイルのリネームや他の形式への変換、AuroraやRedshiftへの投入などにも応用できそうです。

参考URL