Pythonで利用できるSQLのリネージュ分析ツール「SQLLineage」を試してみた
こんにちは!DA(データアナリティクス)事業本部 サービスソリューション部の大高です。
データウェアハウスにおいて、データを「あるテーブル」から「あるテーブル」へSQLで投入するときに「データの流れを知りたい」というケースがあります。これは一般的に「データカタログ」と呼ばれる製品の中の機能として存在する「データリネージュ」と呼ばれるものになります。
この「データリネージュ」に関して、Pythonで利用できるSQLリネージュ分析ツールとして「SQLLineage」というツールを見つけたので、試してみたいと思います。
前提
今回利用する環境は以下のような環境です。
事前準備
まずは適当なフォルダを作成して、Pythonの仮想環境を作成します。 私はpoetryを利用しているので、以下のように作成していますが、適宜必要な設定で仮想環境が作成できればOKです。
$ mkdir hello-sqllineage $ cd hello-sqllineage $ poetry init $ pyenv local 3.9.7 $ poetry config --local virtualenvs.in-project true $ poetry install
SQLLineageのセットアップ
SQLLineage自体は、以下のようにpipでsqllineage
をインストールするだけです。
$ pip install sqllineage
私はpoetryを利用しているので、以下のようにインストールしました。
$ poetry add sqllineage
SQLを解析してみる
インストールできたので、実際に解析を試してみましょう。
単純なINSERT-SELECT
まずは簡単な以下のクエリをファイルとして作成します。
INSERT INTO ootaka_sandbox_db.public.region SELECT r_regionkey, r_name, r_comment FROM snowflake_sample_data.tpch_sf1.region ;
ファイルを保存したら、以下のコマンドで解析してみます。
$ sqllineage -f query-1.sql Statements(#): 1 Source Tables: snowflake_sample_data.tpch_sf1.region Target Tables: ootaka_sandbox_db.public.region
「ソーステーブル」はsnowflake_sample_data.tpch_sf1.region
、「ターゲットテーブル」はootaka_sandbox_db.public.region
であると解析されましたね!
複数テーブルをJOINしてINSERT
今度はもう少し複雑にして、複数テーブルをJOINしてINSERTしてみましょう。
INSERT INTO ootaka_sandbox_db.public.region_and_nation SELECT r_regionkey, r_name, r_comment, n_nationkey, n_name, n_comment FROM snowflake_sample_data.tpch_sf1.region INNER JOIN snowflake_sample_data.tpch_sf1.nation ON region.r_regionkey = nation.n_regionkey ;
$ sqllineage -f query-2.sql Statements(#): 1 Source Tables: snowflake_sample_data.tpch_sf1.nation snowflake_sample_data.tpch_sf1.region Target Tables: ootaka_sandbox_db.public.region_and_nation
こちらも全く問題ないですね。
With句を利用した複雑なSELECT文
次に、Snowflakeの下記のドキュメントからクエリを拝借して、試してみます。
use schema snowflake_sample_data.tpcds_sf10tcl; -- QID=TPC-DS_query57 with v1 as( select i_category, i_brand, cc_name, d_year, d_moy, sum(cs_sales_price) sum_sales, avg(sum(cs_sales_price)) over (partition by i_category, i_brand, cc_name, d_year) avg_monthly_sales, rank() over (partition by i_category, i_brand, cc_name order by d_year, d_moy) rn from item, catalog_sales, date_dim, call_center where cs_item_sk = i_item_sk and cs_sold_date_sk = d_date_sk and cc_call_center_sk= cs_call_center_sk and ( d_year = 1999 or ( d_year = 1999-1 and d_moy =12) or ( d_year = 1999+1 and d_moy =1) ) group by i_category, i_brand, cc_name , d_year, d_moy), v2 as( select v1.i_category ,v1.d_year, v1.d_moy ,v1.avg_monthly_sales ,v1.sum_sales, v1_lag.sum_sales psum, v1_lead.sum_sales nsum from v1, v1 v1_lag, v1 v1_lead where v1.i_category = v1_lag.i_category and v1.i_category = v1_lead.i_category and v1.i_brand = v1_lag.i_brand and v1.i_brand = v1_lead.i_brand and v1.cc_name = v1_lag.cc_name and v1.cc_name = v1_lead.cc_name and v1.rn = v1_lag.rn + 1 and v1.rn = v1_lead.rn - 1) select * from v2 where d_year = 1999 and avg_monthly_sales > 0 and case when avg_monthly_sales > 0 then abs(sum_sales - avg_monthly_sales) / avg_monthly_sales else null end > 0.1 order by sum_sales - avg_monthly_sales, 3 limit 100;
$ sqllineage -f query-3.sql Statements(#): 2 Source Tables: <default>.call_center <default>.catalog_sales <default>.date_dim <default>.item Target Tables:
このクエリファイルは2つのステートメントを持っており、そのうちの2つ目のSELECTステートメントが解析されたことが分かります。
with句を利用して、v1
とv2
を作成し、最終的にはv2
からSELECTしていますが、ソーステーブルは、結果として表示されているテーブルを利用しているので想定どおりです。
また、今回はINSERTをしていないので、ターゲットテーブルには何も出力されていませんね。
一時的にデータを別テーブルに入れるクエリ
最後に、一時的にデータを別テーブルに入れるクエリを試してみましょう。これも複数ステートメントのクエリファイルにします。
query-2.sql
とほぼ同じですが、違いとしてsnowflake_sample_data.tpch_sf1.region
テーブルのデータを、一時的にootaka_sandbox_db.public.region
にINSERTし、その後にootaka_sandbox_db.public.region
とsnowflake_sample_data.tpch_sf1.nation
をJOINしてootaka_sandbox_db.public.region_and_nation
テーブルにINSERTしています。
INSERT INTO ootaka_sandbox_db.public.region SELECT r_regionkey, r_name, r_comment FROM snowflake_sample_data.tpch_sf1.region ; INSERT INTO ootaka_sandbox_db.public.region_and_nation SELECT r_regionkey, r_name, r_comment, n_nationkey, n_name, n_comment FROM ootaka_sandbox_db.public.region INNER JOIN snowflake_sample_data.tpch_sf1.nation ON region.r_regionkey = nation.n_regionkey ;
$ sqllineage -f query-4.sql Statements(#): 2 Source Tables: snowflake_sample_data.tpch_sf1.nation snowflake_sample_data.tpch_sf1.region Target Tables: ootaka_sandbox_db.public.region_and_nation Intermediate Tables: ootaka_sandbox_db.public.region
解析結果に、これまでとは違いIntermediate Tables
が表示されました。一度ootaka_sandbox_db.public.region
テーブルを経由していることが分かりますね。
更に、-v
のverbose
オプションを付けるとこんな感じになります。
$ sqllineage -v -f query-4.sql Statement #1: INSERT INTO ootaka_sandbox_db.public.regionSELECT... table read: [Table: snowflake_sample_data.tpch_sf1.region] table write: [Table: ootaka_sandbox_db.public.region] table rename: [] table drop: [] table intermediate: [] Statement #2: INSERT INTO ootaka_sandbox_db.public.region_and_n... table read: [Table: ootaka_sandbox_db.public.region, Table: snowflake_sample_data.tpch_sf1.nation] table write: [Table: ootaka_sandbox_db.public.region_and_nation] table rename: [] table drop: [] table intermediate: [] ========== Summary: Statements(#): 2 Source Tables: snowflake_sample_data.tpch_sf1.nation snowflake_sample_data.tpch_sf1.region Target Tables: ootaka_sandbox_db.public.region_and_nation Intermediate Tables: ootaka_sandbox_db.public.region
-v
を付けないと全体的な解析結果のみでしたが、-v
を付けることにより「クエリごとのread
テーブルとwrite
テーブルが、どのテーブルに対する処理なのか」まで分かるようになります。
可視化してみる
更にSQLLineageの便利な機能として、可視化をすることもできます。
使い方は簡単で-g
オプションを付けるだけです。
$ sqllineage -g -f query-4.sql * SQLLineage Running on http://localhost:5000/?f=query-4.sql
上記のように表示されるので、ブラウザでアクセスします。すると以下のようにリネージュが表示されました。
このように可視化されると、データフローがとても分かりやすいですね。
この他にも、GithubのプロジェクトにおけるREADME.mdに記載のとおり以下のデモサイトが用意されているので、こちらのデモサイトでも、どのような動作になるのかが確認できます。
余談ですがREADMEの、この一文が凄く素敵だなと思いました!
Talk is cheap, show me a demo.
パッケージとしての利用
今回試したのはコマンドラインだけでしたが、Pythonのパッケージとしても利用が可能です。具体的にはドキュメントの以下に記載されているように利用できるようです。
まとめ
以上、Pythonで利用できるSQLのリネージュ分析ツール「SQLLineage」を試してみました。
データカタログにおいて「リネージュ」は大事な要素だと思いますが、SQLのリネージュに関しては、このようなツールを使うと分析しやすくなりそうですね。
どなたかのお役に立てば幸いです。それでは!