新感覚なELTツール「Meltano」を使ってSlackのデータをDWHに連携してみた

メール頼める?
2021.09.22

大阪オフィスの玉井です。

今回はMeltanoというツールをご紹介します。

Meltanoとは?

公式から引用すると「DataOps時代におけるEL(T)ツール」だそうです。

…私の個人的な感覚で説明しますが、絶妙な位置づけのEL(T)ツールです。やることはELT(メインはEL)なのですが、とにかく位置づけが絶妙なのです。

Meltanoの絶妙なポジション

データをE(抽出)してL(ロード)する…という仕組みを行いたいとき、ざっくり分けると、下記のどちらかを選ぶと思います。

  • 手動で開発する(Pythonなど)
  • そういうサービスを導入する(Fivetranなど)

前者は何でもできますが、人と時間のコストが半端じゃないです。後者はめちゃくちゃ楽ですが、ちょっとカスタマイズしたいみたいな時に、あまり身動きがとれません(融通が効きづらい)。

Meltanoは上記の中間に位置する感じです。コードやコマンドで設定を行うのですが、データの抽出部分やロード部分はパッケージングされており、あとはそれを組み合わせるだけです、面倒な部分は作らなくてよくて、設定部分はコードベースで自由に管理する、非常に新感覚なET(T)ツールとなっています。

Tは?

さっきから「EL(T)」という謎めいた表現をしていますが、これには理由があります。

Meltanoが単体で保有している機能はE(抽出)とL(ロード)だけです。ではT(変換)はどうするのかというと、dbtと連携して実現します。要するに、Tはdbtに一任するということです。この発想も「ほう!」ってなりました。

Singer

MeltanoはSingerという技術が利用されています。

データ移行を簡単に行うためのオープンソースのツールです。ファイル統合サービスでおなじみのStitchでも、このSingerを使うことが出来ます。

Singer tapと呼ばれる、抽出やロードをパッケージングしたものを組み合わせてデータ移行の仕組みを構築することができます。つまり、上述したMeltanoの絶妙な位置づけは、このSingerによるところが大きいといえるでしょう。

従来のSingerユーザーであれば、めちゃくちゃ簡単にMeltanoを利用することができると思います(Singer知らなくてもかなり簡単に使えます)。

やってみた

下記を参考にします。このチュートリアルはGitlabのデータを使用していますが、私はSlack(社内で使っているやつ)のデータを使いました。

検証環境

  • macOS Big Sur 11.5.2
  • fish 3.3.1
  • Python 3.9.7
  • pip 21.2.4
  • meltano 1.80.1

やることまとめ

Slackにある私の分報チャネルに関するデータをDWH(BQ)にロードします(集計もちょっとやりたい)。

Meltanoのインストール

Dockerコンテナも用意されていますが、今後のコマンドの実行が面倒そうだったので、ローカルにインストールしました。

ドキュメントにならって(Pythonのバージョンはドキュメントに記載されているものと異なりますが)、専用ディレクトリを用意し、venvを使用した仮想環境内にmeltanoをインストールすることにします。

> mkdir meltano-projects
> cd meltano-projects
> python3 -m venv .venv
> source .venv/bin/activate.fish

ここまで出来たら下準備は完了です。meltanoをインストールします。

> pip3 install meltano

動作確認します。

> meltano --version
meltano, version 1.80.1

Meltano用のプロジェクトを作成する

インストールが完了したら、最初にやることは、Meltano Projectの用意です。下記のコマンドで生成できます。

> meltano init test_pj

こんな感じの構造をしたディレクトリ群が作成されます。Meltano Projectの実態はコードなので、Gitでバージョン管理することもできます。

.
├── README.md
├── analyze
├── extract
├── load
├── meltano.yml
├── model
├── notebook
├── orchestrate
├── requirements.txt
└── transform

後述しますが、meltano.ymlがProjectの核となるファイルです。ここにデータ連携の設定が記述されていきます。

extractorの設定を行う

Projectを作ったら、次にやることは、「何のデータを持ってくるか(抽出するか、Extract)」です。Meltanoはextractorというものを設定して、データの抽出元を定義します。

どういうものに対応しているのかは、下記のコマンドで確認できます。

> meltano discover extractors
Extractors
tap-adwords
tap-ask-nicely
tap-bigquery
tap-bing-ads
tap-chargebee
tap-csv
tap-facebook
tap-fastly
tap-gitlab
tap-github
...

GithubやZendeskといったアプリケーションのデータや、csvファイル等に対応しています。この中から、自分が使用したいextractorを選んでインストールする形となります。自分が求めているものがない場合は自作することも可能だそうです。

extractorのインストール

今回は、冒頭で述べたとおり、Slackのデータを使いたいので、tap-slackというextractorをインストールします。

> meltano add extractor tap-slack
Added extractor 'tap-slack' to your Meltano project
Repository:     https://github.com/Mashey/tap-slack
Documentation:  https://hub.meltano.com/extractors/slack.html

Installing extractor 'tap-slack'...
Installed extractor 'tap-slack'

To learn more about extractor 'tap-slack', visit https://hub.meltano.com/extractors/slack.html

インストールしたextractorの使い方はhelpオプションで呼び出すことが出来ます。

> meltano invoke tap-slack --help
usage: tap-slack [-h] -c CONFIG [-s STATE] [-p PROPERTIES] [--catalog CATALOG] [-d]

optional arguments:
  -h, --help            show this help message and exit
  -c CONFIG, --config CONFIG
                        Config file
  -s STATE, --state STATE
                        State file
  -p PROPERTIES, --properties PROPERTIES
                        Property selections: DEPRECATED, Please use --catalog instead
  --catalog CATALOG     Catalog file
  -d, --discover        Do schema discovery

extractorの設定値のセット

extractorには、それぞれ設定できる項目(環境変数みたいな)のようなものがあり、それをセットしていくことで、設定を行います。中には設定必須な項目もあり、それを設定するまでは、extractorは正しく動作しません。

設定項目については、下記のコマンドで確認することができます。

> meltano config tap-slack list
api_token [env: TAP_SLACK_API_TOKEN] current value: None (default)
start_date [env: TAP_SLACK_START_DATE] current value: None (default)
        Sync Start Date: Determines how much historical data will be extracted. Please be aware that the larger the time period and amount of data, the longer the initial extraction can be expected to take.
channels [env: TAP_SLACK_CHANNELS] current value: None (default)
        By default the tap will sync all channels it has been invited to, but this can be overriden to limit it ot specific channels. Note this needs to be channel ID, not the name, as recommended by the Slack API. To get the ID for a channel, either use the Slack API or find it in the URL.
private_channels [env: TAP_SLACK_PRIVATE_CHANNELS] current value: True (default)
        Join Private Channels: Specifies whether to sync private channels or not. Default is true.
join_public_channels [env: TAP_SLACK_JOIN_PUBLIC_CHANNELS] current value: False (default)
        Join Public Channels: Specifies whether to have the tap auto-join all public channels in your ogranziation. Default is false.
archived_channels [env: TAP_SLACK_ARCHIVED_CHANNELS] current value: False (default)
        Sync Archived Channels: Specifies whether the tap will sync archived channels or not. Note that a bot cannot join an archived channel, so unless the bot was added to the channel prior to it being archived it will not be able to sync the data from that channel. Default is false.
date_window_size [env: TAP_SLACK_DATE_WINDOW_SIZE] current value: 7 (default)
        Date Window Size: Specifies the window size for syncing certain streams (messages, files, threads). The default is 7 days.

To learn more about extractor 'tap-slack' and its settings, visit https://hub.meltano.com/extractors/slack.html

…実は大体のextractorはドキュメントが用意されていて、こっちを見たほうが早いです…。

tap-slackに関しては、SlackのAPIを経由してデータを取得するので、SlackのAPIトークンは必須です。項目に値をセットするコマンドは下記の通り。トークン以外にも、抽出対象とするチャネルや(IDを指定する)、抽出するデータの開始時期を指定しました(テスト的な利用なので過去1年分のみを抽出するように設定)。

> meltano config tap-slack set token <トークン>
Extractor 'tap-slack' setting 'token' was set in `meltano.yml`: '<トークン>'

> meltano config tap-slack set channels '["チャネルID"]'
Extractor 'tap-slack' setting 'channels' was set in `meltano.yml`: ['チャネルID']

> meltano config tap-slack set start_date 2020-10-01T00:00:00Z
Extractor 'tap-slack' setting 'start_date' was set in `meltano.yml`: '2020-10-01T00:00:00Z'

extractorで取得したいデータの設定

extractor自体の設定を終えたら、最後は「抽出したいデータ」を選びます。下記のコマンドで抽出できるデータの一覧が確認できます。ここから抽出したいデータを選んでいきます。

attributesの1つ1つがカラムになるイメージです。ドットの前の接頭辞(?)にあたる名前がテーブル名になるイメージです(channels.channel_id→channelsというテーブルにchannel_idというカラムができる)。

> meltano select tap-slack --list --all
Legend:
        selected
        excluded
        automatic

Enabled patterns:
        *.*

Selected attributes:
        [automatic] channel_members.channel_id
        [automatic] channel_members.user_id
        [selected ] channels.channel_id
        [selected ] channels.created
        [selected ] channels.creator
        [automatic] channels.id
        [selected ] channels.is_archived
        [selected ] channels.is_channel
        [selected ] channels.is_ext_shared
        [selected ] channels.is_general
        [selected ] channels.is_group
        [selected ] channels.is_im
        [selected ] channels.is_member
        [selected ] channels.is_mpim
        [selected ] channels.is_org_shared
        [selected ] channels.is_pending_ext_shared
...

抽出したいデータの選択は、下記のコマンドで行います。

# 1つだけ選ぶ場合
> meltano select tap-slack channels channel_id

# カテゴリ下全てを選ぶ場合
> meltano select tap-slack messages "*"

# 逆に除外する場合
> meltano select tap-slack --exclude channels.channel_id

選んだデータの確認。

> meltano select tap-slack --list

loaderの設定を行う

抽出(E)の設定を終えたら、今度はロード(L)の設定を行います。基本的な流れはextractorと一緒です。

loaderのインストール

まず、どのようなloaderがあるのか確認します。主要なDWHやDBは網羅されている印象です。csvファイルとして出力することもできます。

> meltano discover loaders
Loaders
target-bigquery
target-csv, variants: hotgluexyz (default), singer-io
target-jsonl
target-postgres, variants: datamill-co (default), transferwise, meltano
target-snowflake, variants: datamill-co (default), transferwise, meltano
target-sqlite
target-redshift

今回はBigQueryにロードしたいので、下記コマンドでBQ用のloaderをインストールします。

> meltano add loader target-bigquery

loaderの設定値のセット

extractorと同様、設定値をセットしてきます。ドキュメントは下記。

ドキュメントによれば、必須設定値は(BQの)Project IDCredentials Pathです。それぞれsetコマンドでセットしてきます。Credentials Pathというのは、BQに対してアクセスできるサービスアカウントのclient_secretsファイルをフルパスで指定します。

> meltano config target-bigquery set credentials_path <ファイルのフルパス>

参考までに、必須じゃない設定として、ロケーションも設定しました(未設定時の値はUS)

> meltano config target-bigquery set location asia-northeast1

EL(T)ジョブの実行

抽出(E)とロード(L)の準備ができたので、いよいよ実際にジョブを実行します(今回はTは未設定)。

meltano.ymlの確認

実は、今までコマンドで設定してきた値などは、全てmeltano.ymlに書き込まれています。これ、実は、直接編集しても反映されるため、慣れてきたら、いちいちコマンドを打たなくても、直接ファイルをダーッと書いて設定することも可能です。

今まで設定した私のymlファイルは下記のようになりました。

version: 1
send_anonymous_usage_stats: true
project_id: 3a916aba-7b90-4dbe-b708-13ed3fc81d15
plugins:
  extractors:
  - name: tap-slack
    pip_url: git+https://github.com/Mashey/tap-slack.git
    config:
      channels:
      - <チャネルID>
      start_date: '2020-10-01T00:00:00Z'
      token: <トークン>
  # 今回ロードするSlackのデータ  
    select:
    - messages.*
    - users.*
    - user_groups.*
    - threads.*
    # 下記のデータだけなぜかエラーになるため除外
    - '!messages.last_read'
  loaders:
  # 一応CSV出力もできるように入れました
  - name: target-csv
    variant: hotgluexyz
    pip_url: git+https://github.com/hotgluexyz/target-csv.git@0.3.3
    config:
      destination_path: <csvを出力するパス>
  # こっちがBQ
  - name: target-bigquery
    variant: adswerve
    pip_url: git+https://github.com/adswerve/target-bigquery.git@v0.10.2
    config:
      project_id: tamai-rei
      location: asia-northeast1
      credentials_path: <ファイルのパス>

上記ファイル、現在のところ未解決の課題があります。それは、SlackのAPIトークンがベタ書きされているということです。これはただの検証だからいいのですが、本番運用を想定して、Gitでバージョン管理しようとしたとき、当然このymlファイルは管理の対象になるため、このままあげちゃうのは非常によろしくありません。ドキュメントには、コマンドで設定すれば、「Git管理外のファイルにひっそり設定される」という旨が書かれてあったのですが、その状態だと「トークンがない」というエラーで動作しませんでした。現状、ymlファイルに直接書かないと動作できておらず、ここは後日しっかり確認したいと思います。

ジョブの実行

設定が問題ないことを確認したら、あとはジョブを実行するだけです。

eltというコマンドで実行できます。その際、extractorとloaderを指定します。複数回実行するとき、job_idが同じ場合、インクリメンタルにデータを同期してくれるようです。あとログも、このID別に記録されます。

> meltano elt tap-slack target-bigquery --job_id=slack-to-bq
meltano         | Running extract & load...
meltano         | Found state from 2021-09-22 03:16:32.538159.
tap-slack       | INFO Starting Sync..

(中略)

meltano         | Incremental state has been updated at 2021-09-22 03:18:08.793729.
meltano         | Extract & load complete!
meltano         | Transformation skipped.
meltano         | INFO Starting Discovery..INFO Finished Discovery..⏎

データを確認してみる

ジョブが終わったので、BQを見てみます。

データセット名は指定しなかったので(loader側で設定可能)、extractorの名前がそのまま使われています。その下には、extractorで指定したデータがロードされています。

中身はこんな感じ。

行の感じが怪しい…。

まさかの半構造型のデータがありました(RECORD型のREPEATED)。なので、実はこれをCSVに出力すると、ガタガタなデータで出てきてしまいます。他のDWHだとどうなるのやら。

実際の書き込みテキストももちろんあります。最後は他人のブログを嘲笑しているように見えますが、これは自分のブログを自嘲しています(ヒマな方はURLを直打ちして確認してみてください)。

集計してみる

せっかくなので、自分のチャネルで一番多く使われたリアクション(絵文字)を集計してみます。

WITH unnest_reaction AS(
    SELECT
        channel_id,
        ts,
        reactions.name AS name,
        reactions.COUNT AS COUNT
    FROM
        `tamai-rei.tap_slack.messages`,
        UNNEST(reactions) AS reactions
)
SELECT
    name,
    SUM(COUNT) AS rcount
FROM
    unnest_reaction
GROUP BY
    name
ORDER BY
    rcount DESC

結果は:kusa:が一番多かったです。これは他のチャネルでも見られる傾向なので、私の分報が嘲笑われているわけではありません、たぶん。

おわりに

今度がdbtを連携して「T」(Transform)まで一緒にやってみたいです。