MQTTに配信したメッセージをfluentdでS3に保存する

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

AWS IoT が使えるなら、Device Gateway の MQTT ブローカーに配信したメッセージは Rules 定義により簡単に S3 に保存できます。 今回は、AWS IoT が使えない制約があり、代わりに EC2 上で MQTT ブローカーの Mosquitto を動かしているようなケースにおいて、MQTT に配信したメッセージを fluentd を使って S3 に保存する手順を紹介します。

今回の手順では Mosquitto 固有の機能・設定は使っていないため、Mosquitto 以外の MQTT ブローカーでも同じ手順で動作するかと思います。

mqtt_s3_integration_with_fluentd

事前準備

EC2 上で Mosquitto をインストールしておきます。

同じインスタンスに td-agent をインストールし、S3 にオブジェクトを PUT します。 AWS クレデンシャルと権限管理をするために、EC2 インスタンスには S3 の Put 系更新権限がついたIAM Role 付きで 起動ししておきます。

fluentd エージェントのインストール

$ curl -L https://toolbelt.treasuredata.com/sh/install-redhat-td-agent2.sh | sh
$ td-agent --version
td-agent 0.12.12

プラグインのインストール

fluentd のプラグインに関して

を使います。

後者は標準でインストールされているため、前者をインストールします。

$ sudo td-agent-gem install fluent-plugin-mqtt
WARN: Unresolved specs during Gem::Specification.reset:
      json (>= 1.4.3)
WARN: Clearing out unresolved specs.
Please report a bug if this causes problems.
Fetching: mqtt-0.3.1.gem (100%)
Successfully installed mqtt-0.3.1
Fetching: fluent-plugin-mqtt-0.0.4.gem (100%)
Successfully installed fluent-plugin-mqtt-0.0.4
Parsing documentation for fluent-plugin-mqtt-0.0.4
Installing ri documentation for fluent-plugin-mqtt-0.0.4
Parsing documentation for mqtt-0.3.1
Installing ri documentation for mqtt-0.3.1
Done installing documentation for fluent-plugin-mqtt, mqtt after 0 seconds
2 gems installed

fluentd エージェントの設定変更

fluentd エージェントの設定ファイルは /etc/td-agent/td-agent.conf にあります。

インプットの定義

今回はトピックが "foo/" で始まるメッセージを転送の対象とします。

<source>
  type mqtt
  port 1883
  topic foo/#
</source>
  • type をプラグイン名 mqtt にします。
  • topic でトピックを "foo/#" で絞ります。

アウトプットの定義

<match foo.bar>
  type s3

  s3_bucket YOUR_BUCKET_NAME
  path mqtt/foo/bar
  s3_object_key_format %{path}/%{time_slice}_HOSTNAME_%{index}.%{file_extension}

  buffer_path /var/log/td-agent/s3_foo.bar

  time_slice_format %Y%m%d%H%M
  time_slice_wait 10m
  utc

  store_as text
</match>
<match foo.baz>
  type s3

  s3_bucket YOUR_BUCKET_NAME
  path mqtt/foo/baz
  s3_object_key_format %{path}/%{time_slice}_HOSTNAME_%{index}.%{file_extension}

  buffer_path /var/log/td-agent/s3_foo.baz

  time_slice_format %Y%m%d%H%M
  time_slice_wait 10m
  utc

  store_as text
</match>
  • MQTT のトピック foo/bar が fluentd のタグでは foo.bar とスラッシュがドットに変換されます。 そのため、トピック foo/bar に対するマッチ条件は match foo.bar と書きます。
  • type をプラグイン名 s3 にします。
  • S3 にはgzip圧縮せず、AS IS のまま保存するため store_as text とします。

S3 のキーは以下で定義します。

  • s3_bucket バケット名
  • s3_object_key_format カスタマイズしたキー名 (%{path}/%{time_slice}_HOSTNAME_%{index}.%{file_extension})

%{path}%{time_slice_format} はそれぞれのパラメーターで定義したものに置き換えられます。

最終的には /mqtt/foo/bar/201511221156_HOSTNAME_0.txt というようなキー名になります。

ロードバランスされた複数の MQTT サーバーから S3 にメッセージ格納するため、S3 キーが被らないように、キー名にホスト名情報(_HOSTNAME_ の箇所。適宜変更してください) を含めています。

実際に保存されたS3キーが以下です。

$ aws s3 ls s3://YOUR_BUCKET_NAME/mqtt/foo/bar/
2015-11-22 12:07:17         49 201511221156_HOSTNAME_0.txt
2015-11-22 12:12:17         55 201511221201_HOSTNAME_0.txt
2015-11-22 12:13:17         94 201511221202_HOSTNAME_0.txt
2015-11-22 12:15:17        103 201511221204_HOSTNAME_0.txt
2015-11-22 12:16:17         55 201511221205_HOSTNAME_0.txt
...

パラメーター buffer_path ではバッファリングに利用するファイルパスを指定します。 fluentd のバッファリング機能は、logstash に対する大きなメリットのため、特別な理由がない限り活用しましょう。

buffer_path /var/log/td-agent/s3_foo.bar のように定義すると /var/log/td-agent/s3_foo.bar.201511221525.b525253b0998677a0.log というようにバッファリングファイルが作成されます。

fluentd の設定を有効にする

ファイルの設定変更を有効にします。

$ sudo service td-agent restart
Retarting td-agent:               [  OK  ]

メッセージ送信&保存テスト

MQTT ブローカーにメッセージ送信

$ mosquitto_pub -d -t foo/baz -m a
$ mosquitto_pub -d -t foo/bar -m 1

ローカルサーバーのバッファリングファイルを確認

$ cat /var/log/td-agent/s3_foo.bar.201511221538.b5252568fc7610d66.log
2015-11-22T15:38:08Z    foo.bar {"message":"1"}

送信メッセージが含まれています。

S3 に PUT されたオブジェクトを確認

しばらく様子見して、S3 に反映されていることを確認します。

$ aws s3 ls s3://YOUR_BUCKET_NAME/mqtt/foo/bar/
...
2015-11-22 15:48:09        456 201511221537_HOSTNAME_0.txt
2015-11-22 15:49:09        362 201511221538_HOSTNAME_0.txt
2015-11-22 15:50:09        408 201511221539_HOSTNAME_0.txt
$ aws s3 cp s3://YOUR_BUCKET_NAME/mqtt/foo/bar/201511221538_HOSTNAME_0.txt -
2015-11-22T15:38:08Z    foo.bar {"message":"1"}
2015-11-22T15:38:12Z    foo.bar {"message":"aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"}
2015-11-22T15:38:31Z    foo.bar {"message":"aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"}
2015-11-22T15:38:46Z    foo.bar {"message":"aaaaaaaaaaa"}
2015-11-22T15:38:57Z    foo.bar {"message":"aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"}

期待通り S3 に保存されています。

forest プラグインを使って match ルールをシンプルに書く

今回は対象のトピックが 2 つだけだったため、ほぼ同じ内容を2箇所にコピペして書きました。 トピックが5, 10 となると、設定の管理が手間ですね。

プラグイン forest を使うと、タグを動的に解釈させてすっきりと書けます。

先ほどの設定を forest を使って書き直してみましょう。

forest プラグインのインストール

まずは forest プラグインをインストールします。

$ sudo td-agent-gem install fluent-plugin-forest
WARN: Unresolved specs during Gem::Specification.reset:
      json (>= 1.4.3)
WARN: Clearing out unresolved specs.
Please report a bug if this causes problems.
Fetching: fluent-plugin-forest-0.3.0.gem (100%)
Successfully installed fluent-plugin-forest-0.3.0
Parsing documentation for fluent-plugin-forest-0.3.0
Installing ri documentation for fluent-plugin-forest-0.3.0
Done installing documentation for fluent-plugin-forest after 0 seconds
1 gem installed

アウトプットの定義

forest プラグインを使ってアウトプットの match ディレクティブを書き直したのが以下です。

<match foo.**>
  type forest
  subtype s3
  <template>
    s3_bucket YOUR_BUCKET_NAME
    path mqtt/foo/${tag_parts[1]}
    s3_object_key_format %{path}/%{time_slice}_HOSTNAME_%{index}.%{file_extension}

    buffer_path /var/log/td-agent/s3_${tag}

    time_slice_format %Y%m%d%H%M
    time_slice_wait 10m
    utc

    store_as text
  </template>
</match>

肝は template ディレクティブです。

ハードコードされていたタグ名を変数で置き換えています。

  • path mqtt/foo/barmqtt/foo/${tag_parts[1]}
  • buffer_path /var/log/td-agent/s3_foo.bar/var/log/td-agent/s3_${tag}

という具体です。

このようにしておくと、タグ(トピック)のバリエーションが増えても、fluentd の設定を変更することなく、S3 に保存されます。

まとめ

今回はトピックをキーに S3 保存先を切り替える設定例を紹介しました。 MQTT のトピックと fluentd のタグは非常に相性が良いことを感じていただけましたでしょうか?

参考