Pythonを使ってAlteryx Server APIを呼び出す

こんにちは、小澤です。

Alteryx ServerにはREST APIを利用してワークフローを実行する機能があります。

この機能を使えば任意の他のプログラムの中にAlteryxのワークフロー実行を組み込むことができます。 今回は、このAPIをPythonから使う方法を紹介します。

なぜAPIが必要なの?

ワークフローをAPIから実行したい場面はいくつか考えられますが、その中でもよくある利用シーンとしてスケジューラ機能ではできないことの補完があげられます。 Alteryx Serverにはスケジューラ機能がありますが、実行のタイミングを特定の日時またはその繰り返しでしか指定できません。 実際の利用シーンでは、

  • 処理対象となるデータが揃ったが確認できたらワークフロー1を実行
  • ワークフロー1の処理が完了したらワークフロー2を実行
  • ワークフロー1の処理が失敗したらワークフロー3を実行

といったように、Alteryxの外側で行われている処理への依存やワークフロー同士の実行有無やタイミングなどを調整してやる必要がある場合があります。 そういった仕組みを作るにあたってAlteryx Serverの機能のみでは実現できないので、全体の流れを制御するジョブ管理システムのようなものを利用する必要があります。 その際にジョブ管理をしている側からワークフローを実行するために、このAPI機能が利用できます。

Pythonから使う

冒頭で紹介した記事では、Alteryxに含まれるサンプルを利用していました。 じゃあ、普通にバッチ処理のプロセスに含めたりしたい時にどうすんだよ?ということで、今回はPythonを使っていちから処理を実装していきましょう。

今回は、

  • OAuth1.0の認証を行う
  • 自分のStudioのワークフロー一覧を取得し実行したいワークフローを選択する
  • ワークフローの実行
  • 実行ステータスの取得
  • 実行結果の取得

という流れをみてみたいと思います。

必要なライブラリのインストール

requests_oauthlibを使って認証を行うのであらかじめ必要なライブラリをインストールしておきます。 また、必須ではありませんが、動かして見る際に便利なので、Pandasも入れておきます。

$ pip install requests requests_oauthlib
$ pip install pandas

OAuth1.0の認証を行う

まず最初に認証のための情報を渡して、APIを使える状態にします。

import requests
from requests_oauthlib import OAuth1

client_key = '<Your Client Key>'
client_secret = '<Your Client Secret>'
gallery_url = 'http://<Your Alteryx Server URL>/gallery/api'

oauth = OAuth1(client_key, client_secret, signature_type='query')

必要な情報を渡して認証用のOAuth1オブジェクトを取得しています。

この認証情報を使ってまずはワークフローの一覧を取得してみます。 見通しをよくするために、戻り値をJSONで受け取ったのち、PandasのDataFrameに入れてみました。

import pandas as pd
studios = pd.DataFrame(requests.get(gallery_url + '/v1/workflows/subscription/', auth=oauth).json())

workflows変数の中身は以下のようになっています。

ワークフローの実行

ワークフローを実行するには、対象となるワークフローのidを指定します。 先ほど取得したワークフロー一覧の中から確認できるのでidを指定してワークフローを実行してみます。

本来であれば、結果を処理して必要な値を取得するのがいいのですが、今回は手抜きして表示されてるものからidをコピペで貼り付けました。

app_id = '5bd01e72ebedcc096cc51873'

q = {
  "questions": [
    {
      "name": "",
      "value": ""
    }
  ]
}
requests.post(gallery_url + '/v1/workflows/{appId}/jobs/'.format(appId=app_id), auth=oauth, json=q).json()

q変数はワークフロー実行時の入れる項目になります。 Analytic Appを使う場合は、フォームに入れる内容相当のものをここで作成します。 今回は通常のワークフローなので、空のままにしています。

結果は以下のようなJSONが返ってきます。 実行しているジョブのidはこの後の処理でも使うので変数に入れておくなどしておく必要があります。

{'id': '5bd12d88da85720818007d66',
 'appId': None,
 'createDate': '2018-10-25T02:42:16.9875535Z',
 'status': 'Queued',
 'disposition': 'None',
 'outputs': [],
 'messages': [],
 'priority': 0,
 'workerTag': ''}

実行ステータスの取得

ワークフローの実行は、処理が完了するまで待つわけではありません。 "実行してくれと命令した"状態になっています。

先ほどの結果に含まれるstatusは現在の状態になっており、Queuedは実行待ち行列にジョブが投入された状態であることを示しています。 実行結果として出力データを得るにはジョブが完了している必要があるので、適宜ステータスを確認しながら完了を待ちます。 ステータスの確認は以下のコードで行えるので、これをtime.sleepをはさみつつループ処理させるなどして、完了するまで待ちます。

job_id = '5bd12d88da85720818007d66'
requests.get(gallery_url + '/v1/jobs/{jobId}/'.format(jobId=job_id), auth=oauth).json()

完了すると以下のようなstatusがCompleteとなった結果が得られます。

{'id': '5bd12d88da85720818007d66',
 'appId': None,
 'createDate': '2018-10-25T02:42:16Z',
 'status': 'Completed',
 'disposition': 'Success',
 'outputs': [{'id': '5bd12d89da85720818007d69',
   'formats': ['Shp', 'Tab', 'Mif', 'Dbf', 'Csv', 'Raw', 'Tde'],
   'name': '_externals\\2\\iris.yxdb'}],
 'messages': [{'status': 8,
   'text': '_externals\\1\\iris.yxdb|150 records were read from "_externals\\1\\iris.yxdb"',
   'toolId': 1},
  {'status': 9,
   'text': '_externals\\2\\iris.yxdb|150 records were written to "_externals\\2\\iris.yxdb"',
   'toolId': 2}],
 'priority': 0,
 'workerTag': None}

なお、失敗した場合は別な処理をする、のようなことをしたい場合もstatusの値をみて判断することが可能です。

実行結果の取得

ジョブのidと先ほどの結果のoutputsにあるidを使って出力結果を取得することが可能です。 ワークフローの内容によっては複数の出力ある場合もあるので、outputs以下は配列になっています。

Alteryxでは様々なフォーマットでの出力に対応していますが、プログラムからバイナリ形式のものを扱うのは少々面倒なのでCSV形式のファイルを取得してみたいと思います。 今回実行したワークフローはyxdb形式で出力しているので取得時にformatでCsvを指定することで、CSV形式で結果を取得できます。

output_id = '5bd12d89da85720818007d69'

result = requests.get(
    gallery_url + '/v1/jobs/{jobId}/output/{outputId}/'.format(jobId=job_id, outputId=output_id), 
    auth=oauth, 
    params={'format': 'Csv'}).text

結果をPandasのDataFrameにれて確認してみましょう。

data = [line.split(',') for line in result.split('\r\n')]
header = data[0]
data = data[1:]

pd.DataFrame(data, columns=header)

正しく取得できていることが確認できます。

おわりに

今回は、Alteryx ServerのAPIをPythonから使ってみました。

スケジュール機能だけでは実現できない仕組みを作る時に「この方法でできるよねー」とはなるものの、プログラムを書く必要があり、Alteryxユーザにとってはなかなかハードルが高いものでした。 今回はPythonを使っていますが、OAuth1.0とRESTの各種メソッドに対応したライブラリがあれば他の言語でもほぼ同じような記述で実現できます。

Alteryxの導入なら、クラスメソッドにおまかせください

日本初のAlteryxビジネスパートナーであるクラスメソッドが、Alteryxの導入から活用方法までサポートします。14日間の無料トライアルも実施中ですので、お気軽にご相談ください。

alteryx_960x400