WeWork製のMarquezで構築するメタデータ・エコシステムの概要とデモ

どうも!DA部の春田です。

OSSデータカタログといえば、Lyft社のAmundsenがメジャーになりつつありますが、個人的により好みのOSSを見つけました。WeWork社製のMarquezです。

本ブログでは、実際のデモをお見せしながらMarquezの特徴を紹介していきます。

Marquezの概要

Marquezは一言でいえば、メタデータの収集・連携・可視化のエコシステムを構築するためのツールです。データがどのように生成・消費されたかの繋がり(Lineage)を管理し、ジョブのランタイムやデータセットへのアクセス頻度、データのライフサイクル管理などの全体像を可視化できるようになります。

Marquezを導入する目的は、組織内のチームが別チームのデータセットをシームレスに共有でき、安全に信頼して使用できるようなデータのエコシステムを促進することです。細かい特徴については、Marquez公式ページを和訳して掲載しておきます。

  • メタデータの一元管理する機能群
    • データ・リネージ(Data lineage)
    • データ・ガバナンス(Data governance)
    • データのヘルス状態(Data health)
    • データの探索性(Data discovery + exploration)
  • 詳細で高次元のデータモデル
    • ジョブ
    • データセット
  • メタデータAPIを通して、メタデータを簡単に収集
    • 2021年2月現在では、PythonとJavaのクライアントが付属
  • 第一級オブジェクトとしてのデータセット
  • ジョブやデータセットの所有情報の強制化
  • シンプルな運用と、最小限の依存関係になるようなデザイン
  • RESTful APIで他のシステムとスムーズに統合可能

パッとわかる魅力としては、データセットやジョブのLineageが簡単・キレイに表示されることでしょうか。データのリネージを可視化したい!という要件にはピッタリのツールです。

個人的に印象的だったのが、メタデータをMarquezへ流す入口として用意されているAPIと、シンプルなデザインです。REST APIで連携することができるので、ほとんどのデータ分析基盤に対して簡単に導入することができそうです。どのDBMSやETLツールに対応しているのかを特段考えなくて良いのが魅力的です。

Marquezのデータモデル

Marquezを導入するためには、メタデータモデルの理解が必須です。重要な定義は以下の5つです。

  • Namespace
    • 関連するDatasetやJobのメタデータを管理する
    • Namespace内で、DatasetとJobはユニーク
  • Source
    • Datasetの物理的な場所(DB内のテーブルなのか、クラウドストレージのファイルなのか?)
    • それぞれのDatasetは必ず一つのSourceと紐づく
  • Dataset
    • 既存のSourceと紐づく
    • 論理名(sourceName)と物理名(physicalName)の両方を持つ
    • 変更履歴がバージョンとして管理される
    • Dataset全体とField(カラム)に対してタグ付けが可能
  • Job
    • InputとOutputにNamespaceとDatasetを定義する
    • 変更履歴がバージョンとして管理される
  • Run
    • Jobの実行履歴
    • Create → Start → Complete or Failed or Aborted の3ステップでAPIを叩いてジョブ実行を記録する

Marquezには簡単なQuick Startが用意されているので、データを登録する流れだけ確認しておきましょう。

  1. Namespaceを作成する
  2. Sourceを作成する
  3. NamespaceにDatasetを加える
  4. NamespaceにJobを加える
  5. Runを作成する
  6. Runを実行する
  7. Runを完了させる

Quick Startでは要素が少なく、あまりデータのリネージが表現できていないので、今回は別のデモを用意しました。

環境構築

デモを始める前に、Marquezの環境構築を行います。MarquezProject/marquezからリポジトリをクローンし、./docker/up.shを叩くだけです。

git clone https://github.com/MarquezProject/marquez.git
cd ./marquez/
./docker/up.sh

http://localhost:3000に行き、以下のようなUIが表示されていればOKです。初期状態では何もデータが入っていません。

デモ

ここまででお気づきかとは思いますが、MarquezはAPI越しにメタデータ受け付けて管理するだけのツールなので、実際に稼働中のETL環境がなくてもデモは作れてしまいます。今回は、以下のようなETL環境があると“想定”します。(構成図で色々簡略しているのはご了承ください。)

オンプレ環境のPOSの売上データと、外部APIサーバーの気象データを使用して、Redshift上で結合して関係性を分析したい、みたいな分析です。この場合、Marquezに登録するデータは以下のようになります。

  • Namespace
    • sales-insight-project
  • Source
    • オンプレのMySQL
    • 外部APIサーバー
    • S3
    • Redshift
  • Dataset
    • sales(MySQL)
    • sales(S3)
    • sales(Redshift)
    • weather(API)
    • weather(Redshift)
    • sales_insight(Redshift)
  • Job
    • MySQLからS3へのエクスポート
    • Pythonプログラム
    • S3からRedshiftへのCOPY
    • salesとweatherを結合するSQL

さっそくAPIを叩いてデータを登録していきましょう。各APIの仕様については、Marquez API Referenceを確認してください。

まずはNamespaceから作成します。Namespaceで登録できる情報は、名前、所有者、概要のみです。curlコマンドで専用APIにPUTしましょう。

curl -X PUT http://localhost:5000/api/v1/namespaces/sales-insight-project -H 'Content-Type: application/json' -d '{"ownerName": "Takumi Haruta", "description": "Data analytics platform for sales insights"}'

続いて、Sourceを作成します。Sourceには、名前、タイプ(RDBの種類など)、ソースへのURL、概要が登録できます。

curl -X PUT http://localhost:5000/api/v1/sources/on-premise -H 'Content-Type: application/json' -d '{"type": "MySQL", "connectionUrl": "jdbc:mysql://onpremise-host:3306/dev", "description": "On-premise MySQL DB"}'
curl -X PUT http://localhost:5000/api/v1/sources/public-api -H 'Content-Type: application/json' -d '{"type": "API", "connectionUrl": "https://example-api/api/v1", "description": "Public API Server"}'
curl -X PUT http://localhost:5000/api/v1/sources/amazon-s3 -H 'Content-Type: application/json' -d '{"type": "Amazon S3", "connectionUrl": "s3://my-bucket"}'
curl -X PUT http://localhost:5000/api/v1/sources/amazon-redshift -H 'Content-Type: application/json' -d '{"type": "Amazon Redshift", "connectionUrl": "jdbc:redshift://examplecluster.abc123xyz789.us-west-2.redshift.amazonaws.com:5439/dev"}'

NamespaceSourceだけでは、Marquezの画面には何も変化がないかと思います。次に続くDatasetを追加していくことで、リッチになっていきます。Datasetでは、作成したNamespaceの配下に、Sourceを紐づけながら、タイプ(テーブルなど)、物理名、論理名、カラム情報、タグ、概要などを追加できます。(2021年3月現在、typeDB_TABLESTREAMしか選べないようです。)

curl -X PUT http://localhost:5000/api/v1/namespaces/sales-insight-project/datasets/sales-on-premise -H 'Content-Type: application/json' -d '{"type": "DB_TABLE", "physicalName": "public.sales", "sourceName": "on-premise", "fields": [{"name": "id", "type": "INTEGER"}, {"name": "saletime", "type": "TIMESTAMP"}, {"name": "location", "type": "STRING"}, {"name": "price", "type": "INTEGER"}], "description": "On-premise POS data"}'
curl -X PUT http://localhost:5000/api/v1/namespaces/sales-insight-project/datasets/sales-amazon-s3 -H 'Content-Type: application/json' -d '{"type": "DB_TABLE", "physicalName": "sales.csv", "sourceName": "amazon-s3", "fields": [{"name": "id", "type": "INTEGER"}, {"name": "saletime", "type": "TIMESTAMP"}, {"name": "location", "type": "STRING"}, {"name": "price", "type": "INTEGER"}], "description": "POS data exported to S3"}'
curl -X PUT http://localhost:5000/api/v1/namespaces/sales-insight-project/datasets/sales-amazon-redshift -H 'Content-Type: application/json' -d '{"type": "DB_TABLE", "physicalName": "public.sales", "sourceName": "amazon-redshift", "fields": [{"name": "id", "type": "INTEGER"}, {"name": "saletime", "type": "TIMESTAMP"}, {"name": "location", "type": "STRING"}, {"name": "price", "type": "INTEGER"}], "description": "POS data in Redshift"}'
curl -X PUT http://localhost:5000/api/v1/namespaces/sales-insight-project/datasets/weather-public-api -H 'Content-Type: application/json' -d '{"type": "STREAM", "physicalName": "weather", "sourceName": "public-api", "schemaLocation": "https://example-api/api/v1/weather", "fields": [{"name": "datetime", "type": "TIMESTAMP"}, {"name": "location", "type": "STRING"}, {"name": "weather", "type": "STRING"}, {"name": "temperature", "type": "INTEGER"}], "description": "Public weather API"}'
curl -X PUT http://localhost:5000/api/v1/namespaces/sales-insight-project/datasets/weather-amazon-redshift -H 'Content-Type: application/json' -d '{"type": "DB_TABLE", "physicalName": "weather", "sourceName": "amazon-redshift", "fields": [{"name": "datetime", "type": "TIMESTAMP"}, {"name": "location", "type": "STRING"}, {"name": "weather", "type": "STRING"}, {"name": "temperature", "type": "INTEGER"}], "description": "Public API weather data in Redshift"}'
curl -X PUT http://localhost:5000/api/v1/namespaces/sales-insight-project/datasets/sales-insight -H 'Content-Type: application/json' -d '{"type": "DB_TABLE", "physicalName": "public.sales_insights", "sourceName": "amazon-redshift", "fields": [{"name": "id", "type": "INTEGER"}, {"name": "saletime", "type": "TIMESTAMP"}, {"name": "location", "type": "STRING"}, {"name": "price", "type": "INTEGER"}, {"name": "weather", "type": "STRING"}, {"name": "temperature", "type": "INTEGER"}], "description": "Sales Insights Mart in Redshift"}'

続いて、Jobを作成していきます。これによって、データセット間のLineageが表現できるようになります。Jobでは、タイプ(バッチやストリームなど)、インプット、アウトプット、概要などを入力できます。

curl -X PUT http://localhost:5000/api/v1/namespaces/sales-insight-project/jobs/export-onpremise-to-s3 -H 'Content-Type: application/json' -d '{"type": "BATCH", "inputs": [{"namespace": "sales-insight-project", "name": "sales-on-premise"}], "outputs": [{"namespace": "sales-insight-project", "name": "sales-amazon-s3"}], "description": "Export On-premise POS data to Amazon S3"}'
curl -X PUT http://localhost:5000/api/v1/namespaces/sales-insight-project/jobs/get-weather-api-and-load -H 'Content-Type: application/json' -d '{"type": "BATCH", "inputs": [{"namespace": "sales-insight-project", "name": "weather-public-api"}], "outputs": [{"namespace": "sales-insight-project", "name": "weather-amazon-redshift"}], "description": "Get weather data from public API and load to Redshift"}'
curl -X PUT http://localhost:5000/api/v1/namespaces/sales-insight-project/jobs/load-s3-to-redshift -H 'Content-Type: application/json' -d '{"type": "BATCH", "inputs": [{"namespace": "sales-insight-project", "name": "sales-amazon-s3"}], "outputs": [{"namespace": "sales-insight-project", "name": "sales-amazon-redshift"}], "description": "Load CSV data on S3 to Redshift"}'
curl -X PUT http://localhost:5000/api/v1/namespaces/sales-insight-project/jobs/merge-sales-and-weather -H 'Content-Type: application/json' -d '{"type": "BATCH", "inputs": [{"namespace": "sales-insight-project", "name": "sales-amazon-redshift"}, {"namespace": "sales-insight-project", "name": "weather-amazon-redshift"}], "outputs": [{"namespace": "sales-insight-project", "name": "sales-insight"}], "description": "Join sales and weather table"}'

この状態でWeb UIにアクセスすると、最初の画面にはsales-amazon-redshiftのデータセットのみ表示されているかと思います。(多分アルファベット順で一番上に来るデータセットを初期状態で表示する仕様?)この画面でsales-amazon-redshift以外のデータセットをクリックすると、先ほど作成した全てのデータセットとジョブが紐づいて表示されます。

Jobはあくまで定義ですので、実行履歴を残すにはRunを作成し、実行・終了させる必要があります。作成するとジョブに青いマークがつきます。

curl -X POST http://localhost:5000/api/v1/namespaces/sales-insight-project/jobs/export-onpremise-to-s3/runs -H 'Content-Type: application/json' -d '{}'
# 50bc8938-f7db-4ab7-a86a-27c44b7d3699
curl -X POST http://localhost:5000/api/v1/namespaces/sales-insight-project/jobs/get-weather-api-and-load/runs -H 'Content-Type: application/json' -d '{}'
# 2ce5cfd4-4de0-45ad-969a-eee0fffbeb19
curl -X POST http://localhost:5000/api/v1/namespaces/sales-insight-project/jobs/load-s3-to-redshift/runs -H 'Content-Type: application/json' -d '{}'
# 159f2620-9924-48dc-9b45-fffd07c0d8c6
curl -X POST http://localhost:5000/api/v1/namespaces/sales-insight-project/jobs/merge-sales-and-weather/runs -H 'Content-Type: application/json' -d '{}'
# 195692c5-3bca-4876-81cb-d1444bab4607

レスポンスのIDを指定して実行すると、黄色いマークに変わります。

curl -X POST http://localhost:5000/api/v1/jobs/runs/50bc8938-f7db-4ab7-a86a-27c44b7d3699/start
curl -X POST http://localhost:5000/api/v1/jobs/runs/2ce5cfd4-4de0-45ad-969a-eee0fffbeb19/start
curl -X POST http://localhost:5000/api/v1/jobs/runs/159f2620-9924-48dc-9b45-fffd07c0d8c6/start
curl -X POST http://localhost:5000/api/v1/jobs/runs/195692c5-3bca-4876-81cb-d1444bab4607/start

最後に完了させると、緑のマークに変わります。

curl -X POST http://localhost:5000/api/v1/jobs/runs/50bc8938-f7db-4ab7-a86a-27c44b7d3699/complete
curl -X POST http://localhost:5000/api/v1/jobs/runs/2ce5cfd4-4de0-45ad-969a-eee0fffbeb19/complete
curl -X POST http://localhost:5000/api/v1/jobs/runs/159f2620-9924-48dc-9b45-fffd07c0d8c6/complete
curl -X POST http://localhost:5000/api/v1/jobs/runs/195692c5-3bca-4876-81cb-d1444bab4607/complete

このようにデータセットとジョブの関係性を可視化しながら、ジョブの実行状況・履歴が見れるのがMarquezの特徴です。

まとめ

  • 良さ
    • シンプルさと統合のしやすさに焦点が当てられている思想がとても良い
    • リネージの画面がオシャレ
  • 課題
    • 所有者がNamespaceにしか設定できない。本来はDataset単位で設定できるようにするべき。
    • URLもとい、ユニークの設計が中途半端。
    • UIの初期状態がいまいち。大規模システムになると対応しきれなさそう。

実際にデモを動かしてきましたが、課題は散見されるものの、思想がとても好みのOSSデータカタログでした。今後のアップデートが楽しみです。