Amundsenにロードしたデータを削除するスクリプトを検証してみた

2021.09.22

どーもsutoです。

今回はAmundsenにロードしたデータを削除するスクリプトを検証して、実際にデータ削除を行う手順をご紹介します。

Amundsenとは

概要やアーキテクチャについてはこちらの記事が参考になります。

【参考】データロードの記事

セットアップからデータテーブルのロードまでの記事はこちら

公式のREADMEを見てみる

Amundsenに取り込んだデータを削除するための方法はGitの以下のREADME.md内「Removing stale data in Neo4j」に記述があります。

実行スクリプトとなるのはtaskフォルダにあるneo4j_staleness_removal_task.pyのようです。

削除方法は2種類あります。

  • 「published_tag」を使用

    • 例では、一言で表すと2020-03-31以前に取り込んだデータを削除する処理となります。
task = Neo4jStalenessRemovalTask()
job_config_dict = {
    'job.identifier': 'remove_stale_data_job',
    'task.remove_stale_data.neo4j_endpoint': neo4j_endpoint,
    'task.remove_stale_data.neo4j_user': neo4j_user,
    'task.remove_stale_data.neo4j_password': neo4j_password,
    'task.remove_stale_data.staleness_max_pct': 10,
    'task.remove_stale_data.target_nodes': ['Table', 'Column'],
    'task.remove_stale_data.job_publish_tag': '2020-03-31'
}
job_config = ConfigFactory.from_dict(job_config_dict)
job = DefaultJob(conf=job_config, task=task)
job.launch()
  • 「publisher_last_updated_epoch_ms」を使用

    • 例では、特定のノードまたはリレーションが過去3日間公開されていないデータを削除するものです。
task = Neo4jStalenessRemovalTask()
job_config_dict = {
    'job.identifier': 'remove_stale_data_job',
    'task.remove_stale_data.neo4j_endpoint': neo4j_endpoint,
    'task.remove_stale_data.neo4j_user': neo4j_user,
    'task.remove_stale_data.neo4j_password': neo4j_password,
    'task.remove_stale_data.staleness_max_pct': 10,
    'task.remove_stale_data.target_relations': ['READ', 'READ_BY'],
    'task.remove_stale_data.milliseconds_to_expire': 86400000 * 3
}
job_config = ConfigFactory.from_dict(job_config_dict)
job = DefaultJob(conf=job_config, task=task)
job.launch()

またjob_config_dict内で使用しているパラメータのデフォルト値がneo4j_staleness_removal_task.pyのなかに記述されています。

DEFAULT_CONFIG = ConfigFactory.from_dict({BATCH_SIZE: 100,
​                                          NEO4J_MAX_CONN_LIFE_TIME_SEC: 50,
​                                          NEO4J_ENCRYPTED: True,
​                                          NEO4J_VALIDATE_SSL: False,
​                                          STALENESS_MAX_PCT: 5,
​                                          TARGET_NODES: [],
​                                          TARGET_RELATIONS: [],
​                                          STALENESS_PCT_MAX_DICT: {},
​                                          MIN_MS_TO_EXPIRE: 86400000,
                                          DRY_RUN: False})

このなかでも注目するパラメータがSTALENESS_MAX_PCT: 5で、これは「削除対象となるデータがAmundsenに取り込んでいる全データの5%を超えていた場合、削除が実行されずタスクが中止される」ことを意味します。

つまり開発中や運用中の誤動作や設定ミスなどによるデータ損失リスクを回避するためのセーフティ機能の役割を果たしています。削除する場合はこの値を調整しながら実行していく必要があります。

また、公式にも書いていますがデータ削除の実行はリスクのある行為なので、とくに本番運用中の場合はDRY_RUN: Trueにしておくことを推奨しているようですね。

実際にやってみた

今回は、以前の記事で検証としてRedshiftやGlueからデータを取り込んでいたので、そのクリーンアップを兼ねて全データの一括削除を行ってみました。

Amundsenのコンテナ起動済みで、RedshiftやGlueのデータがロードされている状態からスタートします。

まずはロード処理の際モジュールをインストール等を行った仮想環境に入り、neo4j_staleness_removal_task.pyを開きます。

cd amundsen/databuilder
source venv/bin/activate

vi databuilder/task/neo4j_staleness_removal_task.py

以下のようにjob_config_dictに使用する変数(neo4j_endpointなど)の定義を追記、メイン関数にデータ削除ジョブを実行するコードを追記します。

import logging
import textwrap
# ~~省略~~
MARKER_VAR_NAME = 'marker'

# 変数定義でneo4jプロキシの接続情報を格納
neo_host = None
NEO4J_ENDPOINT = f'bolt://{neo_host or "localhost"}:7687'
neo4j_endpoint = NEO4J_ENDPOINT
neo4j_user = 'neo4j'
neo4j_password = 'test'

class Neo4jStalenessRemovalTask(Task):
# ~~省略~~
        finally:
            LOGGER.debug('Cypher query execution elapsed for %i seconds', time.time() - start)

# データ削除ジョブの呼び出しを追記
if __name__ == "__main__":
    task = Neo4jStalenessRemovalTask()
    job_config_dict = {
        'job.identifier': 'remove_stale_data_job',
        'task.remove_stale_data.neo4j_endpoint': neo4j_endpoint,
        'task.remove_stale_data.neo4j_user': neo4j_user,
        'task.remove_stale_data.neo4j_password': neo4j_password,
        'task.remove_stale_data.staleness_max_pct': 101,
        'task.remove_stale_data.target_nodes': ['Table', 'Column'],
        'task.remove_stale_data.job_publish_tag': '2021-09-13'
    }
    job_config = ConfigFactory.from_dict(job_config_dict)
    job = DefaultJob(conf=job_config, task=task)
    job.launch()

今回のneo4jプロキシのパラメータはデフォルト値です。また、job_publish_tagにはデータをロードしたのは2021-09-12以前の日付でしたので、コードには2021-09-13にしています。

ここで全データを削除するためstaleness_max_pctを101%にしています。(理由は後述)

編集内容を保存したらスクリプトを実行してみます。

python3 databuilder/task/neo4j_staleness_removal_task.py

以下画面のようにロードしてしたデータが削除されました。

検索結果がコンテナサーバのキャッシュ上に残っている可能性がありますので、表示がおかしい場合はキャッシュクリアやDockerの再起動などを試してみてください。

ちょっとハマったところ

staleness_max_pctですが、ここは100%で良いのでは?と思って実行してみたのですが、以下のように上限によるエラーとなってしまいました。

(venv) [ec2-user@ip-10-0-0-198 databuilder]$ python3 databuilder/task/neo4j_staleness_removal_task.py
Traceback (most recent call last):
  File "databuilder/task/neo4j_staleness_removal_task.py", line 296, in <module>
    job.launch()
  File "/home/ec2-user/amundsen/databuilder/venv/lib64/python3.7/site-packages/amundsen_databuilder-6.0.3-py3.7.egg/databuilder/job/job.py", line 76,in launch
  File "/home/ec2-user/amundsen/databuilder/venv/lib64/python3.7/site-packages/amundsen_databuilder-6.0.3-py3.7.egg/databuilder/job/job.py", line 66,in launch
  File "databuilder/task/neo4j_staleness_removal_task.py", line 121, in run
    self.validate()
  File "databuilder/task/neo4j_staleness_removal_task.py", line 131, in validate
    self._validate_node_staleness_pct()
  File "databuilder/task/neo4j_staleness_removal_task.py", line 241, in _validate_node_staleness_pct
    types=self.target_nodes)
  File "databuilder/task/neo4j_staleness_removal_task.py", line 217, in _validate_staleness_pct
    raise Exception(f'Staleness percentage of {type_str} is {stale_pct} %. '
Exception: Staleness percentage of Table is 100.0 %. Stopping due to over threshold 100 %

100%(全データ削除)のため安全装置としてタスクが自動停止したんですね。

開発側の仕様としてそのようになっているので、強制的に全データを削除したい場合は100%より高い数値を設定してしまえばタスク停止を回避して全データを削除することができました。(percentの定義としてなんだか裏技にようになってしまいましたが。。。)