Amundsenに登録したテーブルに対してPythonプログラムからタグ付けする

どうも!DA部の春田です。

Lyft社製OSSデータカタログAmundsenに関して、今回はPythonプログラムからテーブルにタグを付与する方法をご紹介します。

環境構築

ローカル(macOS)の環境構築については下記事をご参照ください。

EC2インスタンス上での環境構築については下記事の通りです。

今回はEC2インスタンスに構築したAmundsenを使用します。データソースにAmazon Athenaを使用していますが、特に指定はないです。お好きなDBやサンプルデータでAmundsenにメタデータを作成しておいてください。

Pythonでタグを付与する時の流れ

そもそもAmundsenではUIからタグを付与することも可能です。

しかし、2021年1月28日現在のamundsenfrontendlibraryの最新バージョンv3.3.0では、UIから更新したタグはNeo4jには登録されますが、Elasticsearchの更新のトリガは行わないため検索機能に追加されない仕様になっています。つい先日この対応がmasterにマージされたので、近日中に修正されるとは思います。

fix: index tag info into elasticsearch immediately after ui change by tianruzhou-db · Pull Request #883 · amundsen-io/amundsenfrontendlibrary

ただフロントではなく、バックエンドのPythonなどから自動的にタグを付与したいケースはあると思うのでご紹介していきます。

タグ更新の起点は、table_tag_transformer.pyにあるTableTagTransformerクラスです。必要なパラメータ(ここではTAGS)をConfigFactoryで定義する設定一覧のdictに記述し、ジョブの方でtransformerも定義するという流れです。

amundsendatabuilder/table_tag_transformer.py at cb8142eecbb37e8d73d7aa7269b6df65e43d9536 · amundsen-io/amundsendatabuilder

このTAGSはカンマ区切りの形式で複数個記述できます。ソースがちょっとわかりにくいのですが、table_metadata.py_format_as_list関数で形式が共通化されています。

amundsendatabuilder/table_metadata.py at cb8142eecbb37e8d73d7aa7269b6df65e43d9536 · amundsen-io/amundsendatabuilder

ざっと要点を把握したところで、実際にPythonコードからタグを付与してみましょう!

Pythonからタグを付与してみた!

今回作成したPythonコードは以下の通りです。流れは以前こちらの記事でAthenaのデータを登録した時とほとんど変わりません。

異なる点は、transformer.table_tag.{TableTagTransformer.TAGS}job_configに入れている点と、DefaultJobtransformerとしてTableTagTransformer()を渡している点です。

  • create_table_extract_job()
    • extractor: Athenaから取得したデータがCSVでローカルに保存
    • transformer: タグを付与する
    • loader: ローカルのCSVをNeo4jにロードする
  • create_es_publisher_sample_job()
    • extractor: Neo4jから検索用のデータをJSON形式でローカルに出力
    • transformer: データの変換は行わない
    • loader: ローカルのJSONをElasticsearchにロードする
import uuid
import argparse

from elasticsearch import Elasticsearch
from pyhocon import ConfigFactory

from databuilder.extractor.athena_metadata_extractor import AthenaMetadataExtractor
from databuilder.extractor.neo4j_extractor import Neo4jExtractor
from databuilder.extractor.neo4j_search_data_extractor import Neo4jSearchDataExtractor
from databuilder.extractor.sql_alchemy_extractor import SQLAlchemyExtractor
from databuilder.job.job import DefaultJob
from databuilder.loader.file_system_elasticsearch_json_loader import FSElasticsearchJSONLoader
from databuilder.loader.file_system_neo4j_csv_loader import FsNeo4jCSVLoader
from databuilder.publisher import neo4j_csv_publisher
from databuilder.publisher.elasticsearch_publisher import ElasticsearchPublisher
from databuilder.publisher.neo4j_csv_publisher import Neo4jCsvPublisher
from databuilder.task.task import DefaultTask
from databuilder.transformer.table_tag_transformer import TableTagTransformer
from databuilder.transformer.base_transformer import NoopTransformer

# NEO4J cluster endpoints
neo4j_endpoint = 'bolt://127.0.0.1:7687'
neo4j_user = 'neo4j'
neo4j_password = 'test'
es = Elasticsearch([
    {'host': '127.0.0.1'},
])


def create_table_extract_job(target_schema_sql, connection_string, tags):
    where_clause_suffix = f"where table_schema in {target_schema_sql}"

    tmp_folder = '/var/tmp/amundsen/table_metadata'
    node_files_folder = f'{tmp_folder}/nodes/'
    relationship_files_folder = f'{tmp_folder}/relationships/'

    job_config = ConfigFactory.from_dict({
        f'extractor.athena_metadata.{AthenaMetadataExtractor.WHERE_CLAUSE_SUFFIX_KEY}': where_clause_suffix,
        f'extractor.athena_metadata.extractor.sqlalchemy.{SQLAlchemyExtractor.CONN_STRING}': connection_string,
        f'extractor.athena_metadata.{AthenaMetadataExtractor.CATALOG_KEY}': "'AwsDataCatalog'",
        f'loader.filesystem_csv_neo4j.{FsNeo4jCSVLoader.NODE_DIR_PATH}': node_files_folder,
        f'loader.filesystem_csv_neo4j.{FsNeo4jCSVLoader.RELATION_DIR_PATH}': relationship_files_folder,
        f'publisher.neo4j.{neo4j_csv_publisher.NODE_FILES_DIR}': node_files_folder,
        f'publisher.neo4j.{neo4j_csv_publisher.RELATION_FILES_DIR}': relationship_files_folder,
        f'publisher.neo4j.{neo4j_csv_publisher.NEO4J_END_POINT_KEY}': neo4j_endpoint,
        f'publisher.neo4j.{neo4j_csv_publisher.NEO4J_USER}': neo4j_user,
        f'publisher.neo4j.{neo4j_csv_publisher.NEO4J_PASSWORD}': neo4j_password,
        f'publisher.neo4j.{neo4j_csv_publisher.JOB_PUBLISH_TAG}': 'unique_tag',  # should use unique tag here like {ds}
        f'transformer.table_tag.{TableTagTransformer.TAGS}': tags,
    })
    job = DefaultJob(
        conf=job_config,
        task=DefaultTask(
            extractor=AthenaMetadataExtractor(),
            loader=FsNeo4jCSVLoader(),
            transformer=TableTagTransformer()
        ),
        publisher=Neo4jCsvPublisher()
    )
    job.launch()


def create_es_publisher_sample_job():
    # loader saves data to this location and publisher reads it from here
    extracted_search_data_path = '/var/tmp/amundsen/search_data.json'

    # elastic search client instance
    elasticsearch_client = es
    # unique name of new index in Elasticsearch
    elasticsearch_new_index_key = 'tables' + str(uuid.uuid4())
    # related to mapping type from /databuilder/publisher/elasticsearch_publisher.py#L38
    elasticsearch_new_index_key_type = 'table'
    # alias for Elasticsearch used in amundsensearchlibrary/search_service/config.py as an index
    elasticsearch_index_alias = 'table_search_index'

    job_config = ConfigFactory.from_dict({
        f'extractor.search_data.extractor.neo4j.{Neo4jExtractor.GRAPH_URL_CONFIG_KEY}': neo4j_endpoint,
        f'extractor.search_data.extractor.neo4j.{Neo4jExtractor.MODEL_CLASS_CONFIG_KEY}':
            'databuilder.models.table_elasticsearch_document.TableESDocument',
        f'extractor.search_data.extractor.neo4j.{Neo4jExtractor.NEO4J_AUTH_USER}': neo4j_user,
        f'extractor.search_data.extractor.neo4j.{Neo4jExtractor.NEO4J_AUTH_PW}': neo4j_password,
        f'loader.filesystem.elasticsearch.{FSElasticsearchJSONLoader.FILE_PATH_CONFIG_KEY}': extracted_search_data_path,
        f'loader.filesystem.elasticsearch.{FSElasticsearchJSONLoader.FILE_MODE_CONFIG_KEY}': 'w',
        f'publisher.elasticsearch.{ElasticsearchPublisher.FILE_PATH_CONFIG_KEY}': extracted_search_data_path,
        f'publisher.elasticsearch.{ElasticsearchPublisher.FILE_MODE_CONFIG_KEY}': 'r',
        f'publisher.elasticsearch.{ElasticsearchPublisher.ELASTICSEARCH_CLIENT_CONFIG_KEY}':
            elasticsearch_client,
        f'publisher.elasticsearch.{ElasticsearchPublisher.ELASTICSEARCH_NEW_INDEX_CONFIG_KEY}':
            elasticsearch_new_index_key,
        f'publisher.elasticsearch.{ElasticsearchPublisher.ELASTICSEARCH_DOC_TYPE_CONFIG_KEY}':
            elasticsearch_new_index_key_type,
        f'publisher.elasticsearch.{ElasticsearchPublisher.ELASTICSEARCH_ALIAS_CONFIG_KEY}':
            elasticsearch_index_alias
    })

    job = DefaultJob(
        conf=job_config,
        task=DefaultTask(
            loader=FSElasticsearchJSONLoader(),
            extractor=Neo4jSearchDataExtractor(),
            transformer=NoopTransformer()
        ),
        publisher=ElasticsearchPublisher()
    )
    job.launch()


if __name__ == "__main__":
    parser = argparse.ArgumentParser(description='Fetch Athena metadata')
    parser.add_argument('--region', required=True)
    parser.add_argument('--s3output', required=True)
    parser.add_argument('--target_schema', nargs='+', required=True)
    parser.add_argument('--tags', help='split by ","')
    args = parser.parse_args()

    connection_string = "awsathena+rest://@athena.%s.amazonaws.com:443/?s3_staging_dir=%s" % (args.region, args.s3output)
    target_schema_sql = "('{schemas}')".format(schemas="', '".join(args.target_schema))

    create_table_extract_job(target_schema_sql, connection_string, args.tags)
    create_es_publisher_sample_job()

なお、Pythonのvenvを切っていない方やamundsendatabuilderをインストールしていない場合は、事前に以下のコマンドを実行しておいてください。今回はAthenaのテーブルを対象とするので、別途pyathenaをインストールしています。

cd ~/amundsen/amundsendatabuilder
python3 -m venv venv
source venv/bin/activate  
pip3 install -r requirements.txt
pip3 install pyathena
python3 setup.py install

実行にはカンマ区切りのタグを引数に渡せばOKです。

python3 example/dags/athena_sample_dag.py --region 'ap-northeast-1' --s3output 's3://cm-haruta/athena/' --target_schema 'cm-haruta' --tags haruta,cm

実行後、タグが付与されたことがUIから確認できました!検索も可能です。

所感

AmundsenはメタデータETL用のPythonライブラリが、すでに結構揃っているので開発しやすいですねー。導入も簡単なのでぜひ触ってみてください!