色々なDBに使えるOSSデータカタログAmundsenからAmazon Athenaのメタデータを取得してみた

どうも!DA部の春田です。

Lyft社製のOSSデータカタログAmundsenの魅力の一つは、そのコネクタの豊富さです。すでにかなりの数のテーブル・コネクタ、ダッシュボード・コネクタが備わっているので、既存のETL基盤にサッと導入することができます。

amundsen-io/amundsen: Supported Integrations

今回はAmundsenをEC2インスタンス上でセットアップし、Amazon Athena内のテーブルメタデータを取得してみました。

Amundsenのセットアップ

下記事でローカルのMacにAmundsenをセットアップしていますが、今回も改めてセットアップ方法を記載しておきます。

今回使用するEC2インスタンスは、Ubuntu 20.04 LTSのt3.mediumでEBSを15GB使用します。パブリックIPを有効化し、セキュリティグループはSSH用の22番とWebフロント用の5000番を開けておきます。

インスタンスが立ち上がったらSSHでログインし、DockerおよびDocker Composeをインストールします。Dockerの公式サイトに沿って行いました。

sudo apt update
sudo apt upgrade

# Update the apt package index and install packages to allow apt to use a repository over HTTPS
sudo apt-get install \
    apt-transport-https \
    ca-certificates \
    curl \
    gnupg-agent \
    software-properties-common

# Add Docker’s official GPG key
curl -fsSL https://download.docker.com/linux/ubuntu/gpg | sudo apt-key add -
sudo apt-key fingerprint 0EBFCD88

# Set up the stable repository
sudo add-apt-repository \
   "deb [arch=amd64] https://download.docker.com/linux/ubuntu \
   $(lsb_release -cs) \
   stable"

# Install the latest version of Docker Engine and containerd
sudo apt-get update
sudo apt-get install docker-ce docker-ce-cli containerd.io

# Verify that Docker Engine is installed correctly
sudo docker run hello-world

# Download the current stable release of Docker Compose
sudo curl -L "https://github.com/docker/compose/releases/download/1.28.0/docker-compose-$(uname -s)-$(uname -m)" -o /usr/local/bin/docker-compose

# Apply executable permissions to the binary
sudo chmod +x /usr/local/bin/docker-compose

# Test the installation
docker-compose --version

コンテナを立ち上げる前に、Elasticsearchのメモリ上限を上げておきます。

# Write vm.max_map_count=262144
sudo vi /etc/sysctl.conf
# Reload settings
sudo sysctl -p

あとはAmundsenのGitリポジトリをクローンし、コンテナを立ち上げるだけです。

git clone --recursive https://github.com/amundsen-io/amundsen.git
cd amundsen/
sudo docker-compose -f docker-amundsen.yml up -d
sudo docker-compose -f docker-amundsen.yml logs -f

立ち上がったら、ブラウザからhttp://<amundsen public ip>:5000/でWebフロントにアクセスできます。データがないので、以下のような表示になっているかと思います。

以上でセットアップは完了です。

Amundsenのサンプルコードを読み解く

続いて、AmundsenからAmazon Athenaのテーブル・メタデータを収集していきます。まずはざっとコンセプトを解説していきます。

Amundsenのデータ作成やロードは全て、amundsen-io/amundsendatabuilderライブラリが担っています。普段のデータ分析基盤と同じように、メタデータに対してもETL処理が考えられており、それぞれに対応したクラスが用意されています。

  • Extractor
    • データソースから、データを抽出する
  • Transformer
    • Extractorが抽出したデータや、他のTransformerが変換したデータを変換する
  • Loader
    • 抽出・変換したデータをAmundsen内のデータベースなどに流す
  • Publisher
    • ジョブの不可分性(Atomicity)や、バルクロードを制御する

Athenaのデータロードにはサンプルスクリプトが用意されているので、こちらをベースに進めていきます。

amundsendatabuilder/athena_sample_dag.py at master · amundsen-io/amundsendatabuilder

上のサンプルはワークフロー・オーケストレーションツールのApache Airflowをベースに制御されています。Airflow自体はAmundsenには必要ないので、後で書き換えます。まず、大枠として実行されるのは以下のコードです。

with DAG('amundsen_databuilder', default_args=default_args, **dag_args) as dag:
    athena_table_extract_job = PythonOperator(
        task_id='athena_table_extract_job',
        python_callable=create_table_extract_job
    )

    athena_es_publisher_job = PythonOperator(
        task_id='athena_es_publisher_job',
        python_callable=create_es_publisher_sample_job
    )

    # elastic search update run after table metadata has been updated
    athena_table_extract_job >> athena_es_publisher_job

PythonOperatorクラスは、Airflowのオペレーターそのものです。こちらに、ジョブのcreate_table_extract_jobとパブリッシャーのcreate_es_publisher_sample_jobをコールバックに指定して実行させています。

def create_table_extract_job():
    where_clause_suffix = f"where table_schema in {SUPPORTED_SCHEMA_SQL_IN_CLAUSE}"

    tmp_folder = '/var/tmp/amundsen/table_metadata'
    node_files_folder = f'{tmp_folder}/nodes/'
    relationship_files_folder = f'{tmp_folder}/relationships/'

    job_config = ConfigFactory.from_dict({
        f'extractor.athena_metadata.{AthenaMetadataExtractor.WHERE_CLAUSE_SUFFIX_KEY}': where_clause_suffix,
        f'extractor.athena_metadata.extractor.sqlalchemy.{SQLAlchemyExtractor.CONN_STRING}': connection_string(),
        f'extractor.athena_metadata.{AthenaMetadataExtractor.CATALOG_KEY}': "'AwsDataCatalog'",
        f'loader.filesystem_csv_neo4j.{FsNeo4jCSVLoader.NODE_DIR_PATH}': node_files_folder,
        f'loader.filesystem_csv_neo4j.{FsNeo4jCSVLoader.RELATION_DIR_PATH}': relationship_files_folder,
        f'publisher.neo4j.{neo4j_csv_publisher.NODE_FILES_DIR}': node_files_folder,
        f'publisher.neo4j.{neo4j_csv_publisher.RELATION_FILES_DIR}': relationship_files_folder,
        f'publisher.neo4j.{neo4j_csv_publisher.NEO4J_END_POINT_KEY}': neo4j_endpoint,
        f'publisher.neo4j.{neo4j_csv_publisher.NEO4J_USER}': neo4j_user,
        f'publisher.neo4j.{neo4j_csv_publisher.NEO4J_PASSWORD}': neo4j_password,
        f'publisher.neo4j.{neo4j_csv_publisher.JOB_PUBLISH_TAG}': 'unique_tag',  # should use unique tag here like {ds}
    })
    job = DefaultJob(conf=job_config,
                     task=DefaultTask(extractor=AthenaMetadataExtractor(), loader=FsNeo4jCSVLoader(),
                                      transformer=NoopTransformer()),
                     publisher=Neo4jCsvPublisher())
    job.launch()

create_table_extract_jobでは、各種パラメータを指定した上でDefaultJobを作成し、実行させています。このDefaultJob内で、extractortransformerloaderを指定しており、意味的には以下のような感じでしょう。

  1. Athenaから取得したデータがCSVでローカルに保存
  2. データの変換は行わない
  3. ローカルのCSVをNeo4jにロードする
def create_es_publisher_sample_job():
    # loader saves data to this location and publisher reads it from here
    extracted_search_data_path = '/var/tmp/amundsen/search_data.json'

    task = DefaultTask(loader=FSElasticsearchJSONLoader(),
                       extractor=Neo4jSearchDataExtractor(),
                       transformer=NoopTransformer())

    # elastic search client instance
    elasticsearch_client = es
    # unique name of new index in Elasticsearch
    elasticsearch_new_index_key = 'tables' + str(uuid.uuid4())
    # related to mapping type from /databuilder/publisher/elasticsearch_publisher.py#L38
    elasticsearch_new_index_key_type = 'table'
    # alias for Elasticsearch used in amundsensearchlibrary/search_service/config.py as an index
    elasticsearch_index_alias = 'table_search_index'

    job_config = ConfigFactory.from_dict({
        f'extractor.search_data.extractor.neo4j.{Neo4jExtractor.GRAPH_URL_CONFIG_KEY}': neo4j_endpoint,
        f'extractor.search_data.extractor.neo4j.{Neo4jExtractor.MODEL_CLASS_CONFIG_KEY}':
            'databuilder.models.table_elasticsearch_document.TableESDocument',
        f'extractor.search_data.extractor.neo4j.{Neo4jExtractor.NEO4J_AUTH_USER}': neo4j_user,
        f'extractor.search_data.extractor.neo4j.{Neo4jExtractor.NEO4J_AUTH_PW}': neo4j_password,
        f'loader.filesystem.elasticsearch.{FSElasticsearchJSONLoader.FILE_PATH_CONFIG_KEY}': extracted_search_data_path,
        f'loader.filesystem.elasticsearch.{FSElasticsearchJSONLoader.FILE_MODE_CONFIG_KEY}': 'w',
        f'publisher.elasticsearch.{ElasticsearchPublisher.FILE_PATH_CONFIG_KEY}': extracted_search_data_path,
        f'publisher.elasticsearch.{ElasticsearchPublisher.FILE_MODE_CONFIG_KEY}': 'r',
        f'publisher.elasticsearch.{ElasticsearchPublisher.ELASTICSEARCH_CLIENT_CONFIG_KEY}':
            elasticsearch_client,
        f'publisher.elasticsearch.{ElasticsearchPublisher.ELASTICSEARCH_NEW_INDEX_CONFIG_KEY}':
            elasticsearch_new_index_key,
        f'publisher.elasticsearch.{ElasticsearchPublisher.ELASTICSEARCH_DOC_TYPE_CONFIG_KEY}':
            elasticsearch_new_index_key_type,
        f'publisher.elasticsearch.{ElasticsearchPublisher.ELASTICSEARCH_ALIAS_CONFIG_KEY}':
            elasticsearch_index_alias
    })

    job = DefaultJob(conf=job_config,
                     task=task,
                     publisher=ElasticsearchPublisher())
    job.launch()

create_es_publisher_sample_jobは、データのロードが終わった後にElasticsearchを更新するためのジョブが定義されています。ここでも同様に、DefaultJob内で以下のようなETL処理が記述されています。

  1. Neo4jから検索用のデータをJSON形式でローカルに出力
  2. データの変換は行わない
  3. ローカルのJSONをElasticsearchにロードする

ジョブの定義は、かなりシンプルでわかりやすいですね。あとは、必要なパラメータは何かを考えていきます。

# TODO: user provides a list of schema for indexing
SUPPORTED_SCHEMAS = ['sampledb']
# String format - ('schema1', schema2', .... 'schemaN')
SUPPORTED_SCHEMA_SQL_IN_CLAUSE = "('{schemas}')".format(schemas="', '".join(SUPPORTED_SCHEMAS))

OPTIONAL_TABLE_NAMES = ''
AWS_ACCESS = 'YOUR_ACCESS_KEY'
AWS_SECRET = 'YOUR_SECRET_KEY'


def connection_string():
    access_key = AWS_ACCESS
    secret = AWS_SECRET
    host = 'athena.us-east-1.amazonaws.com'
    extras = 's3_staging_dir=s3://aws-athena-query-results-032106861074-us-east-1/'
    return "awsathena+rest://%s:%s@%s:443/?%s" % (access_key, secret, host, extras)

一番重要なのは、Athenaへ接続するための変数定義でしょう。参照するDBスキーマ、クエリを出力するS3バケット、認証情報などがベタ書きになっているので、環境に合わせて修正する必要があります。

これらの環境情報は最終的に、SQLAlchemyで使われるURLの形式で出力されます。ですので、EC2からAthenaへSQLAlchemyを使用して接続する方法を探せばいいわけですねー。

SQLAlchemyがAthenaへの接続で使用しているライブラリはpyathenaで、ドキュメントには認証に関して以下の記載があります。

The connection string has the following format:

awsathena+rest://{aws_access_key_id}:{aws_secret_access_key}@athena.{region_name}.amazonaws.com:443/{schema_name}?s3_staging_dir={s3_staging_dir}&...

If you do not specify aws_access_key_id and aws_secret_access_key using instance profile or boto3 configuration file:

awsathena+rest://:@athena.{region_name}.amazonaws.com:443/{schema_name}?s3_staging_dir={s3_staging_dir}&...

アクセスキーを指定しなければ、インスタンスプロファイルを見に行ってくれるようですね。IAM Roleで権限付与してしまえば大丈夫そうです。AmazonAthenaFullAccessAmazonS3FullAccessを付与したIAM RoleをEC2インスタンスにアタッチしましょう。

AmandsenからAthenaのメタデータを取得する

最終的に以下のようなスクリプトになりました!パラメータは引数で指定できるようにしています。

# Copyright Contributors to the Amundsen project.
# SPDX-License-Identifier: Apache-2.0

import uuid
import argparse

from elasticsearch import Elasticsearch
from pyhocon import ConfigFactory

from databuilder.extractor.athena_metadata_extractor import AthenaMetadataExtractor
from databuilder.extractor.neo4j_extractor import Neo4jExtractor
from databuilder.extractor.neo4j_search_data_extractor import Neo4jSearchDataExtractor
from databuilder.extractor.sql_alchemy_extractor import SQLAlchemyExtractor
from databuilder.job.job import DefaultJob
from databuilder.loader.file_system_elasticsearch_json_loader import FSElasticsearchJSONLoader
from databuilder.loader.file_system_neo4j_csv_loader import FsNeo4jCSVLoader
from databuilder.publisher import neo4j_csv_publisher
from databuilder.publisher.elasticsearch_publisher import ElasticsearchPublisher
from databuilder.publisher.neo4j_csv_publisher import Neo4jCsvPublisher
from databuilder.task.task import DefaultTask
from databuilder.transformer.base_transformer import NoopTransformer

# NEO4J cluster endpoints
neo4j_endpoint = 'bolt://127.0.0.1:7687'
neo4j_user = 'neo4j'
neo4j_password = 'test'
es = Elasticsearch([
    {'host': '127.0.0.1'},
])


def create_table_extract_job(target_schema_sql, connection_string):
    where_clause_suffix = f"where table_schema in {target_schema_sql}"

    tmp_folder = '/var/tmp/amundsen/table_metadata'
    node_files_folder = f'{tmp_folder}/nodes/'
    relationship_files_folder = f'{tmp_folder}/relationships/'

    job_config = ConfigFactory.from_dict({
        f'extractor.athena_metadata.{AthenaMetadataExtractor.WHERE_CLAUSE_SUFFIX_KEY}': where_clause_suffix,
        f'extractor.athena_metadata.extractor.sqlalchemy.{SQLAlchemyExtractor.CONN_STRING}': connection_string,
        f'extractor.athena_metadata.{AthenaMetadataExtractor.CATALOG_KEY}': "'AwsDataCatalog'",
        f'loader.filesystem_csv_neo4j.{FsNeo4jCSVLoader.NODE_DIR_PATH}': node_files_folder,
        f'loader.filesystem_csv_neo4j.{FsNeo4jCSVLoader.RELATION_DIR_PATH}': relationship_files_folder,
        f'publisher.neo4j.{neo4j_csv_publisher.NODE_FILES_DIR}': node_files_folder,
        f'publisher.neo4j.{neo4j_csv_publisher.RELATION_FILES_DIR}': relationship_files_folder,
        f'publisher.neo4j.{neo4j_csv_publisher.NEO4J_END_POINT_KEY}': neo4j_endpoint,
        f'publisher.neo4j.{neo4j_csv_publisher.NEO4J_USER}': neo4j_user,
        f'publisher.neo4j.{neo4j_csv_publisher.NEO4J_PASSWORD}': neo4j_password,
        f'publisher.neo4j.{neo4j_csv_publisher.JOB_PUBLISH_TAG}': 'unique_tag',  # should use unique tag here like {ds}
    })
    job = DefaultJob(conf=job_config,
                     task=DefaultTask(extractor=AthenaMetadataExtractor(), loader=FsNeo4jCSVLoader(),
                                      transformer=NoopTransformer()),
                     publisher=Neo4jCsvPublisher())
    job.launch()


def create_es_publisher_sample_job():
    # loader saves data to this location and publisher reads it from here
    extracted_search_data_path = '/var/tmp/amundsen/search_data.json'

    task = DefaultTask(loader=FSElasticsearchJSONLoader(),
                       extractor=Neo4jSearchDataExtractor(),
                       transformer=NoopTransformer())

    # elastic search client instance
    elasticsearch_client = es
    # unique name of new index in Elasticsearch
    elasticsearch_new_index_key = 'tables' + str(uuid.uuid4())
    # related to mapping type from /databuilder/publisher/elasticsearch_publisher.py#L38
    elasticsearch_new_index_key_type = 'table'
    # alias for Elasticsearch used in amundsensearchlibrary/search_service/config.py as an index
    elasticsearch_index_alias = 'table_search_index'

    job_config = ConfigFactory.from_dict({
        f'extractor.search_data.extractor.neo4j.{Neo4jExtractor.GRAPH_URL_CONFIG_KEY}': neo4j_endpoint,
        f'extractor.search_data.extractor.neo4j.{Neo4jExtractor.MODEL_CLASS_CONFIG_KEY}':
            'databuilder.models.table_elasticsearch_document.TableESDocument',
        f'extractor.search_data.extractor.neo4j.{Neo4jExtractor.NEO4J_AUTH_USER}': neo4j_user,
        f'extractor.search_data.extractor.neo4j.{Neo4jExtractor.NEO4J_AUTH_PW}': neo4j_password,
        f'loader.filesystem.elasticsearch.{FSElasticsearchJSONLoader.FILE_PATH_CONFIG_KEY}': extracted_search_data_path,
        f'loader.filesystem.elasticsearch.{FSElasticsearchJSONLoader.FILE_MODE_CONFIG_KEY}': 'w',
        f'publisher.elasticsearch.{ElasticsearchPublisher.FILE_PATH_CONFIG_KEY}': extracted_search_data_path,
        f'publisher.elasticsearch.{ElasticsearchPublisher.FILE_MODE_CONFIG_KEY}': 'r',
        f'publisher.elasticsearch.{ElasticsearchPublisher.ELASTICSEARCH_CLIENT_CONFIG_KEY}':
            elasticsearch_client,
        f'publisher.elasticsearch.{ElasticsearchPublisher.ELASTICSEARCH_NEW_INDEX_CONFIG_KEY}':
            elasticsearch_new_index_key,
        f'publisher.elasticsearch.{ElasticsearchPublisher.ELASTICSEARCH_DOC_TYPE_CONFIG_KEY}':
            elasticsearch_new_index_key_type,
        f'publisher.elasticsearch.{ElasticsearchPublisher.ELASTICSEARCH_ALIAS_CONFIG_KEY}':
            elasticsearch_index_alias
    })

    job = DefaultJob(conf=job_config,
                     task=task,
                     publisher=ElasticsearchPublisher())
    job.launch()


if __name__ == "__main__":
    parser = argparse.ArgumentParser(description='Fetch Athena metadata')
    parser.add_argument('--region', required=True)
    parser.add_argument('--s3output', required=True)
    parser.add_argument('--target_schema', nargs='+', required=True)
    args = parser.parse_args()

    connection_string = "awsathena+rest://@athena.%s.amazonaws.com:443/?s3_staging_dir=%s" % (args.region, args.s3output)
    target_schema_sql = "('{schemas}')".format(schemas="', '".join(args.target_schema))

    create_table_extract_job(target_schema_sql, connection_string)
    create_es_publisher_sample_job()

実際に実行させてみます。まずは下記のように、venvを切って必要なライブラリをインストールし、Pythonの実行環境を整えます。pyathenaのみrequirements.txtに入っていなかったため加えてインストールしておきます。

cd ~/amundsen/amundsendatabuilder
python3 -m venv venv
source venv/bin/activate  
pip3 install -r requirements.txt
pip3 install pyathena
python3 setup.py install

実行コマンドは以下です!

python3 example/dags/athena_sample_dag.py --region 'ap-northeast-1' --s3output 's3://my-s3-bucket/' --target_schema 'cm-haruta'

AmandsenのWebUIからathenaで検索すれば、ちゃんと反映されていることが確認できました!cm-haruta配下のテーブルは一つだけですが、複数あっても自動で取得してくれるので便利です。

所感

Amundsenは新興OSSデータカタログながらも、プロダクションでも簡単に導入でき、データの民主化を加速化できる強力なツールであることがわかりました。デフォルトだとユーザーがちゃんと設定されていないため、次回はユーザー周りを調節してみたいと思います。

How to setup user profiles - Amundsen