XplentyのAPIを使って複数のジョブ実行を試してみた

2020.07.28

はじめに

データアナリティクス事業本部のkobayashiです。

XplentyはETL、ELTツールとして様々なデータソースを扱え、また処理もXplentyのGUIで直感的に作成できます。

XplentyにもAPIが用意されておりXplentyのマネージメントコンソールでAPI用のトークンを発行してそれを使うと便利にXplentyを扱えます。

Xplenty | Simplified ETL & ELT to BigQuery, Snowflake, Redshift & Azure

今回はこのAPIをPythonスクリプトから実行しクラスターの作成と複数のジョブを実行してみたいと思います。

APIで取得するデータ

前回作成したe-statのデータを取得するパッケージをXplentyのAPIで使って実行します。Xplentyのパッケージを作成するときに取得する統計データの統計表IDをユーザー変数にしました。これをジョブごとに変更し複数の統計データを取得します。

前回の記事

XplentyのAPIを使って複数ジョブを実行する

XplentyのAPIに関するドキュメントは以下にありますのでまずこれを確認します。 大抵のXplentyの操作はここに記載されているAPIから実行可能です。

今回使うAPIは以下ものを使います。

今回は上記のAPIを使って以下の流れで進めます。

  1. アクセストークンを発行する。
  2. 実行したいパッケージのパッケージIDを確認する。併せて変数として渡す内容を確認する。
  3. 「クラスターを作成」、「クラスターが立ち上がるまで待つ」、「ジョブを実行する」Pythonスクリプトを作成し実行する。

アクセストークンの発行

はじめにXplentyのアクセストークンを発行します。Xplentyのマネージメントコンソールより取得します。

手順1). サイドバーのSettingsを押下すると設定画面に進むのでメニューからDeveloper Settings > Access tokensを選択し設定画面に進む

手順2). Enter your password below to reveal your API key:の欄があるのでアクセストークンが未発行の場合はログインパスワードを入力しReveal API keyを押下する

手順3).遷移先の画面にアクセストークンが表示される

アクセストークンが発行されたのでこれを使ってAPIを呼び出せます。

APIの呼び出し方法

認証は先に発行したアクセストークンをユーザー名、パスワードは空欄としてBasic認証にて行われます。

対象のAPIによりhttpメソッドは変わりますが、基本的な呼び出し方は以下になります。

curl -X POST -u {アクセストークン}: "https://api.xplenty.com/{アカウントID}/api/{API名}" \
  -H "Accept: application/vnd.xplenty+json; version=2" \
  -H "Content-Type: application/json" \
  -d '{}'

アカウントIDについてはXplentyのマネージメントコンソールのSettings > Your Settings > Accountsと進んだ先に表示されているAccount IDになります。

パッケージIDの確認

ジョブを実行する際にはクラスターIDと実行するパッケージIDが必要になります。クラスターIDに関してはスクリプト中で取得しますが、パッケージIDについてはを予めAPIのパッケージリストを取得するAPI(list-packages)を使って確認しておきます。

  • パッケージリストを確認するAPI呼び出し
curl -X GET -u {アクセストークン}: https://api.xplenty.com/classmethod-inc/api/packages?status=active
  -H "Accept: application/vnd.xplenty+json; version=2" \
  -H "Content-Type: application/json" 
  • レスポンス
[
  {
    "id": 128360,
    "name": "cm-kobayashi_test-kd",
    "description": null,
    "flow_type": "dataflow",
    "flow_version": "2.0.0",
    "owner_id": 11131,
    "created_at": "2020-06-13T07:40:46Z",
    "updated_at": "2020-07-01T21:30:56Z",
    "status": "active",
    "version_description": null,
    "package_version": "1.115",
    "author": "",
    "variables": {
      "appId": "'{e-statのアクセストークン}'",
      "statsDataId": "'8003000051'"
    },
    "url": "https://api.xplenty.com/{アカウントID}/api/packages/128360",
    "html_url": "https://dashboard.xplenty.com/{アカウントID}/packages/128360/edit",
    "version": 116,
    "item_id": 128360
  },
...
]

レスポンスから対象のパッケージのidを確認します。これを使ってジョブを実行します。

また併せてジョブ実行時に必要となるvariablesの内容も確認しておきます。Xplentyのパッケージを作成する際にユーザー変数を使うとAPIの呼び出し時にこの変数を上書きすることができます。この上書き機能を使うことで複数のジョブを簡単に実行することができます。

複数ジョブを実行するPythonスクリプトの作成

早速なのですが、以下が今回実行するスクリプトになります。

import pprint
import requests
import json
import time

URL = "https://api.xplenty.com/{アカウントID}/api"
TOKEN = "{Xplentyのアクセストークン}"


class XplentyApi():
    def __init__(self):
        self.session = requests.Session()
        self.session.auth = (TOKEN, None)

    # クラスターリストを取得するメソッド
    def get_clusters(self):
        resp = self.session.get(URL + "/clusters?status=available",
                                headers={
                                    "Accept": "application/vnd.xplenty+json",
                                    "Content-Type": "application/json"
                                })
        return resp.json()

    # クラスターを作成するメソッド
    def create_cluster(self):
        param = {
            "nodes": 1,
            "type": "sandbox",
            "terminate_on_idle": True,
            "time_to_idle": 120
        }
        print(param)

        resp = self.session.post(
            URL + "/clusters",
            json.dumps(param),
            headers={
                "Accept": "application/vnd.xplenty+json; version=2",
                "Content-Type": "application/json",
            })

        return resp.json()

    # ジョブを実行するメソッド    
    def create_jobs(self, cluster_id, package_id, variables):
        param = {
            "cluster_id": cluster_id,
            "package_id": package_id,
            "variables": variables
        }

        resp = self.session.post(
            URL + "/jobs",
            json.dumps(param),
            headers={
                "Accept": "application/vnd.xplenty+json; version=2",
                "Content-Type": "application/json",
            })

        return resp.json()

    # 有効なクラスターを取得するメソッド 
    def get_avalable_cluster_id(self):
        cluster_id = None
        clusters = self.get_clusters()
        if len(clusters) > 0:
            cluster_id = clusters[0]["id"]
        else:
            self.create_cluster()
            print("クラスターを起動しました。")
            # availableになるまで待機
            while cluster_id is None:
                clusters = self.get_clusters()
                if len(clusters) > 0:
                    cluster_id = clusters[0]["id"]
                else:
                    print("クラスターを起動中")
                    time.sleep(10)
        return cluster_id

def main():
    session = XplentyApi()
    cluster_id = session.get_avalable_cluster_id()

    # 取得するe-statの統計データの統計表ID
    # 平成27年国勢調査 小地域 年齢(5歳階級、4区分)別、男女別人口 の47都道府県分のデータ
    statsDataIds = [
        8003000047
        , 8003000048
        , 8003000049
        , 8003000050
        , 8003000051
        , 8003000052
        , 8003000053
        , 8003000054
        , 8003000055
        , 8003000056
        , 8003000057
        , 8003000058
        , 8003000059
        , 8003000060
        , 8003000061
        , 8003000062
        , 8003000063
        , 8003000064
        , 8003000065
        , 8003000066
        , 8003000067
        , 8003000068
        , 8003000069
        , 8003000070
        , 8003000071
        , 8003000072
        , 8003000073
        , 8003000074
        , 8003000075
        , 8003000076
        , 8003000077
        , 8003000078
        , 8003000079
        , 8003000080
        , 8003000081
        , 8003000082
        , 8003000083
        , 8003000084
        , 8003000085
        , 8003000086
        , 8003000087
        , 8003000088
        , 8003000089
        , 8003000090
        , 8003000091
        , 8003000092
        , 8003000093
    ]
    for v in statsDataIds:
        variables = {
            "appId": "'{e-statのアクセストークン}'",
            "statsDataId": "'{}'".format(v)
        }
        resp = session.create_jobs(cluster_id, 128360, variables)
        pprint.pprint(resp)

if __name__ == '__main__':
    main()

ポイントとなる箇所の解説を行います。

セッションをインスタンス変数へ格納

class XplentyApi():
    def __init__(self):
        self.session = requests.Session()
        self.session.auth = (TOKEN, None)

いくつかhttpリクエストをいつくか送るのでその都度認証して…となると冗長になってしまうのでXplentyApiクラスのインスタンス化時にセッションを使い回せるようにします。

XplentyのAPIを呼び出すメソッドの作成

# クラスターリストを取得するメソッド
    def get_clusters(self):
        resp = self.session.get(URL + "/clusters?status=available",
                                headers={
                                    "Accept": "application/vnd.xplenty+json",
                                    "Content-Type": "application/json"
                                })
        return resp.json()

    # クラスターを作成するメソッド
    def create_cluster(self):
        param = {
            "nodes": 1,
            "type": "sandbox",
            "terminate_on_idle": True,
            "time_to_idle": 120
        }
        print(param)

        resp = self.session.post(
            URL + "/clusters",
            json.dumps(param),
            headers={
                "Accept": "application/vnd.xplenty+json; version=2",
                "Content-Type": "application/json",
            })

        return resp.json()

    # ジョブを実行するメソッド    
    def create_jobs(self, cluster_id, package_id, variables):
        param = {
            "cluster_id": cluster_id,
            "package_id": package_id,
            "variables": variables
        }

        resp = self.session.post(
            URL + "/jobs",
            json.dumps(param),
            headers={
                "Accept": "application/vnd.xplenty+json; version=2",
                "Content-Type": "application/json",
            })

        return resp.json()

以下のそれぞれのXplentyのAPIを呼び出すメソッドを作成しています。リクエストの内容は詳しくは下記のドキュメントを参考にしてください。

今回は以下の条件になりますが、他にも設定はできますので公式ドキュメントをご確認ください。

  • クラスターリストを取得
    • ステータスが利用可能なクラスター
  • クラスターを作成
    • サンドボックス用のクラスター
    • 無可動時間が2分でクラスターを削除
  • ジョブを実行
    • クラスターID、ジョブID、ユーザー変数を指定

有効なクラスターを取得するメソッド

# 有効なクラスターを取得するメソッド 
    def get_avalable_cluster_id(self):
        cluster_id = None
        clusters = self.get_clusters()
        if len(clusters) > 0:
            cluster_id = clusters[0]["id"]
        else:
            self.create_cluster()
            print("クラスターを起動しました。")
            # availableになるまで待機
            while cluster_id is None:
                clusters = self.get_clusters()
                if len(clusters) > 0:
                    cluster_id = clusters[0]["id"]
                else:
                    print("クラスターを起動中")
                    time.sleep(10)
        return cluster_id

ジョブを実行できるクラスターがあればそのクラスターIDを返し、なければクラスターを作成して利用できるまで待機した後にクラスターIDを返すメソッドになります。

実際にジョブを実行するブロック

def main():
    session = XplentyApi()
    cluster_id = session.get_avalable_cluster_id()

    # 取得するe-statの統計データの統計表ID
    # 平成27年国勢調査 小地域 年齢(5歳階級、4区分)別、男女別人口 の47都道府県分のデータ
    statsDataIds = [
        8003000047
        ...
        , 8003000093
    ]
    for v in statsDataIds:
        variables = {
            "appId": "'{e-statのアクセストークン}'",
            "statsDataId": "'{}'".format(v)
        }
        resp = session.create_jobs(cluster_id, 128360, variables)
        pprint.pprint(resp)

XplentyApiをインスタンス化し有効なクラスターIDを取得します。取得したクラスターIDを用いて統計表IDのリストから順次統計IDを取得し、それをXplentyのAPIを呼び出す際にユーザー変数として与えて目的の統計データを取得するジョブ(パッケージID:128360)を実行しています。

Pythonスクリプトの実行と確認

上記のスクリプトを実行すると以下の様な形でジョブが実行されます。

$ python xplenty-api.py

{'nodes': 1, 'type': 'sandbox', 'terminate_on_idle': True, 'time_to_idle': 120}
クラスターを起動しました。
クラスターを起動中
クラスターを起動中
クラスターを起動中
クラスターを起動中
クラスターを起動中

{'cluster_id': 1272794,
 'completed_at': None,
 'component': {'name': '', 'type': ''},
 'created_at': '2020-07-02T07:16:17Z',
 'errors': None,
 'failed_at': None,
 'id': 35966114,
 'outputs': [],
 'outputs_count': 0,
 'owner_id': 11131,
 'package_id': 128360,
 'progress': 0.0,
 'runtime_in_seconds': 0,
 'started_at': None,
 'status': 'pending',
 'updated_at': '2020-07-02T07:16:17Z',
 'variables': {
      'appId': "'{e-statのアクセストークン}'",
      'statsDataId': "'8003000047'"
 }
}

ジョブの状況をXplentyのマネージメントコンソールから確認すると複数のジョブが実行されていることが確認できます。

まとめ

XplentyのAPIを使ってクラスターの作成とジョブの実行を行ってみました。同じ処理だけど一部の条件が違うジョブを複数実行したい場合は、

  • パッケージ作成時にユーザー変数として登録する
  • XplentyのAPIを呼び出す先にユーザー変数を逐次変更してジョブを実行する

で可能でした。

最後まで読んで頂いてありがとうございました。