Pythonで利用できるSQLのリネージュ分析ツール「SQLLineage」を試してみた

2021.09.15

こんにちは!DA(データアナリティクス)事業本部 サービスソリューション部の大高です。

データウェアハウスにおいて、データを「あるテーブル」から「あるテーブル」へSQLで投入するときに「データの流れを知りたい」というケースがあります。これは一般的に「データカタログ」と呼ばれる製品の中の機能として存在する「データリネージュ」と呼ばれるものになります。

この「データリネージュ」に関して、Pythonで利用できるSQLリネージュ分析ツールとして「SQLLineage」というツールを見つけたので、試してみたいと思います。

前提

今回利用する環境は以下のような環境です。

事前準備

まずは適当なフォルダを作成して、Pythonの仮想環境を作成します。 私はpoetryを利用しているので、以下のように作成していますが、適宜必要な設定で仮想環境が作成できればOKです。

$ mkdir hello-sqllineage
$ cd hello-sqllineage
$ poetry init
$ pyenv local 3.9.7
$ poetry config --local virtualenvs.in-project true
$ poetry install

SQLLineageのセットアップ

SQLLineage自体は、以下のようにpipでsqllineageをインストールするだけです。

$ pip install sqllineage

私はpoetryを利用しているので、以下のようにインストールしました。

$ poetry add sqllineage

SQLを解析してみる

インストールできたので、実際に解析を試してみましょう。

単純なINSERT-SELECT

まずは簡単な以下のクエリをファイルとして作成します。

query-1.sql

INSERT INTO
  ootaka_sandbox_db.public.region
SELECT
  r_regionkey,
  r_name,
  r_comment
FROM
  snowflake_sample_data.tpch_sf1.region
;

ファイルを保存したら、以下のコマンドで解析してみます。

$ sqllineage -f query-1.sql
Statements(#): 1
Source Tables:
    snowflake_sample_data.tpch_sf1.region
Target Tables:
    ootaka_sandbox_db.public.region

「ソーステーブル」はsnowflake_sample_data.tpch_sf1.region、「ターゲットテーブル」はootaka_sandbox_db.public.regionであると解析されましたね!

複数テーブルをJOINしてINSERT

今度はもう少し複雑にして、複数テーブルをJOINしてINSERTしてみましょう。

query-2.sql

INSERT INTO
  ootaka_sandbox_db.public.region_and_nation
SELECT
  r_regionkey,
  r_name,
  r_comment,
  n_nationkey,
  n_name,
  n_comment
FROM
  snowflake_sample_data.tpch_sf1.region
INNER JOIN snowflake_sample_data.tpch_sf1.nation ON
  region.r_regionkey = nation.n_regionkey
;
$ sqllineage -f query-2.sql
Statements(#): 1
Source Tables:
    snowflake_sample_data.tpch_sf1.nation
    snowflake_sample_data.tpch_sf1.region
Target Tables:
    ootaka_sandbox_db.public.region_and_nation

こちらも全く問題ないですね。

With句を利用した複雑なSELECT文

次に、Snowflakeの下記のドキュメントからクエリを拝借して、試してみます。

query-3.sql

use schema snowflake_sample_data.tpcds_sf10tcl;

-- QID=TPC-DS_query57

with v1 as(
  select i_category, i_brand, cc_name, d_year, d_moy,
        sum(cs_sales_price) sum_sales,
        avg(sum(cs_sales_price)) over
          (partition by i_category, i_brand,
                     cc_name, d_year)
          avg_monthly_sales,
        rank() over
          (partition by i_category, i_brand,
                     cc_name
           order by d_year, d_moy) rn
  from item, catalog_sales, date_dim, call_center
  where cs_item_sk = i_item_sk and
       cs_sold_date_sk = d_date_sk and
       cc_call_center_sk= cs_call_center_sk and
       (
         d_year = 1999 or
         ( d_year = 1999-1 and d_moy =12) or
         ( d_year = 1999+1 and d_moy =1)
       )
  group by i_category, i_brand,
          cc_name , d_year, d_moy),
v2 as(
  select v1.i_category ,v1.d_year, v1.d_moy ,v1.avg_monthly_sales
        ,v1.sum_sales, v1_lag.sum_sales psum, v1_lead.sum_sales nsum
  from v1, v1 v1_lag, v1 v1_lead
  where v1.i_category = v1_lag.i_category and
       v1.i_category = v1_lead.i_category and
       v1.i_brand = v1_lag.i_brand and
       v1.i_brand = v1_lead.i_brand and
       v1.cc_name = v1_lag.cc_name and
       v1.cc_name = v1_lead.cc_name and
       v1.rn = v1_lag.rn + 1 and
       v1.rn = v1_lead.rn - 1)
select  *
from v2
where  d_year = 1999 and
        avg_monthly_sales > 0 and
        case when avg_monthly_sales > 0 then abs(sum_sales - avg_monthly_sales) / avg_monthly_sales else null end > 0.1
order by sum_sales - avg_monthly_sales, 3
limit 100;
$ sqllineage -f query-3.sql
Statements(#): 2
Source Tables:
    <default>.call_center
    <default>.catalog_sales
    <default>.date_dim
    <default>.item
Target Tables:

このクエリファイルは2つのステートメントを持っており、そのうちの2つ目のSELECTステートメントが解析されたことが分かります。

with句を利用して、v1v2を作成し、最終的にはv2からSELECTしていますが、ソーステーブルは、結果として表示されているテーブルを利用しているので想定どおりです。

また、今回はINSERTをしていないので、ターゲットテーブルには何も出力されていませんね。

一時的にデータを別テーブルに入れるクエリ

最後に、一時的にデータを別テーブルに入れるクエリを試してみましょう。これも複数ステートメントのクエリファイルにします。

query-2.sqlとほぼ同じですが、違いとしてsnowflake_sample_data.tpch_sf1.regionテーブルのデータを、一時的にootaka_sandbox_db.public.regionにINSERTし、その後にootaka_sandbox_db.public.regionsnowflake_sample_data.tpch_sf1.nationをJOINしてootaka_sandbox_db.public.region_and_nationテーブルにINSERTしています。

query-4.sql

INSERT INTO
  ootaka_sandbox_db.public.region
SELECT
  r_regionkey,
  r_name,
  r_comment
FROM
  snowflake_sample_data.tpch_sf1.region
;

INSERT INTO
  ootaka_sandbox_db.public.region_and_nation
SELECT
  r_regionkey,
  r_name,
  r_comment,
  n_nationkey,
  n_name,
  n_comment
FROM
  ootaka_sandbox_db.public.region
INNER JOIN snowflake_sample_data.tpch_sf1.nation ON
  region.r_regionkey = nation.n_regionkey
;
$ sqllineage -f query-4.sql
Statements(#): 2
Source Tables:
    snowflake_sample_data.tpch_sf1.nation
    snowflake_sample_data.tpch_sf1.region
Target Tables:
    ootaka_sandbox_db.public.region_and_nation
Intermediate Tables:
    ootaka_sandbox_db.public.region

解析結果に、これまでとは違いIntermediate Tablesが表示されました。一度ootaka_sandbox_db.public.regionテーブルを経由していることが分かりますね。

更に、-vverboseオプションを付けるとこんな感じになります。

$ sqllineage -v -f query-4.sql
Statement #1: INSERT INTO  ootaka_sandbox_db.public.regionSELECT...
    table read: [Table: snowflake_sample_data.tpch_sf1.region]
    table write: [Table: ootaka_sandbox_db.public.region]
    table rename: []
    table drop: []
    table intermediate: []
Statement #2: INSERT INTO  ootaka_sandbox_db.public.region_and_n...
    table read: [Table: ootaka_sandbox_db.public.region, Table: snowflake_sample_data.tpch_sf1.nation]
    table write: [Table: ootaka_sandbox_db.public.region_and_nation]
    table rename: []
    table drop: []
    table intermediate: []
==========
Summary:
Statements(#): 2
Source Tables:
    snowflake_sample_data.tpch_sf1.nation
    snowflake_sample_data.tpch_sf1.region
Target Tables:
    ootaka_sandbox_db.public.region_and_nation
Intermediate Tables:
    ootaka_sandbox_db.public.region

-vを付けないと全体的な解析結果のみでしたが、-vを付けることにより「クエリごとのreadテーブルとwriteテーブルが、どのテーブルに対する処理なのか」まで分かるようになります。

可視化してみる

更にSQLLineageの便利な機能として、可視化をすることもできます。

使い方は簡単で-gオプションを付けるだけです。

$ sqllineage -g -f query-4.sql
 * SQLLineage Running on http://localhost:5000/?f=query-4.sql

上記のように表示されるので、ブラウザでアクセスします。すると以下のようにリネージュが表示されました。

このように可視化されると、データフローがとても分かりやすいですね。

この他にも、GithubのプロジェクトにおけるREADME.mdに記載のとおり以下のデモサイトが用意されているので、こちらのデモサイトでも、どのような動作になるのかが確認できます。

余談ですがREADMEの、この一文が凄く素敵だなと思いました!

Talk is cheap, show me a demo.

パッケージとしての利用

今回試したのはコマンドラインだけでしたが、Pythonのパッケージとしても利用が可能です。具体的にはドキュメントの以下に記載されているように利用できるようです。

まとめ

以上、Pythonで利用できるSQLのリネージュ分析ツール「SQLLineage」を試してみました。

データカタログにおいて「リネージュ」は大事な要素だと思いますが、SQLのリネージュに関しては、このようなツールを使うと分析しやすくなりそうですね。

どなたかのお役に立てば幸いです。それでは!