データ統合基盤CSアナリティクスでpythonコードを実行してみる

2020.08.06

弊社では、短期間かつお手軽にAWS上にデータ分析基盤を導入可能なCSA(Customer Story Analytics)というデータ統合基盤パッケージを提供しています。

今回は、CSAのジョブ管理ツールであるJMC(Job Management Console)を使ってPythonプログラムを実行したいと思います。JMCでは、ジョブの構成要素として「データ連携」、「プログラム」、「SQL」の3つを定義することが可能で、その構成要素の一つ、あるいは複数を組み合わせる事によってジョブとして構成し、実行します。今回は、その中でも「プログラム」を紹介します。(参考:JMCの概要説明

なお、JMC含め、CSAの全体像の説明については、こちらのブログでも触れていますので、是非ご一読ください。

目次

構成要素「プログラム」とは?

    • 任意のpythonコードを実行することが可能です。
    • ユースケースとしては、外部APIからデータを取得してS3上にCSVとして格納したり、或いは、ETL処理を行う場合等です。
    • 通常、その他の構成要素「データ連携」、「SQL」と組み合わせて利用します。(たとえば、「プログラム」でAPIを呼んで、JSONでデータ取得し、CSVに変換、S3に格納。そのCSVファイルを「データ連携」でRedshiftに反映させ、そのRedshift上のデータを用いて「SQL」でデータマートを作成するいった流れです。)

サンプルコード

実際に動かしてみるサンプルコードは以下のとおりです。

まず、アップロードするプログラムについての「お作法」として、引数無しの「main」という名前の関数を必ず用意する必要があります。

「csa_env」というのは、CSA環境独自のライブラリです。コードの中では、このライブラリを使って、設定されているS3バケット名( 「 all_vars["s3"]["bucket"] 」部分 )と、のちほど説明する、実行時の引数( 「 all_vars["vars"].get("sample_various") 」部分 )を出力します。

また、今回は「csa_env」以外のライブラリは使用しませんが、CSAでは、以下サードパーティーライブラリを利用することが可能です。

  • boto3 ( AWS SDK for Python )
  • pandas ( データ分析 )
  • psycopg2 ( PostgreSQL系DBアクセス )
  • requests ( HTTP )
  • jinja2 ( テンプレートエンジン )
  • scipy ( 科学技術計算 )
import csa_env

def main():

  all_vars = csa_env.get()

  print('↓設定されているS3バケットのバケット名')
  print(all_vars["s3"]["bucket"])

  print('↓ジョブ設定時に定義可能な外部変数')
  print(all_vars["vars"].get("sample_various"))

なお、上記の変数も含め、csa_env.get()で取得可能な変数は以下のとおりです。

変数名 説明 サンプル値 備考
today ジョブ実行日付 2019-03-04
today_nodash ジョブ実行日付(「-」無し) 20190304
today_slash ジョブ実行日付(「/」区切り) 2019/03/04
yesterday ジョブ実行日付の前日 2019-03-03
yesterday_nodash ジョブ実行日付の前日(「-」無し) 20190303
yesterday_slash ジョブ実行日付の前日(「/」区切り) 2019/03/03
tomorrow ジョブ実行日付の翌日 2019-03-05
tomorrow_nodash ジョブ実行日付の翌日(「-」無し) 20190305
tomorrow_slash ジョブ実行日付の翌日(「/」区切り) 2019/03/05
now ジョブ実行日時 2019-03-04 15:06:07
now_slash ジョブ実行日時(「/」区切り) 2019/03/04 15:06:07
current_year ジョブ実行年 2019
current_month ジョブ実行月 03
current_date ジョブ実行日 04
current_hour ジョブ実行時間 15
current_minute ジョブ実行時間 06
current_second ジョブ実行秒 07
job_name 実行ジョブ名 サンプルジョブ
schedule_unit 実行サイクル名 daily
iam.redshift_role RedshiftのIAMロール名 arn:aws:iam::1234567890:role/xxxxxxx
s3.bucket S3のバケット名 csa-sample-s3
run_type ジョブ実行種別 scheduled 手動実行の場合「manual」,スケジュール実行の場合「scheduled」

JMCにサンプルコードをアップロードする

JMCにログインして、ヘッダーメニューの「構成要素」から「プログラム」を選択します。

「フォルダを選択」を押して、対象のプログラムが入っているローカル上のフォルダを選択します。

対象フォルダを選択すると、下記のように、同フォルダ内に格納されているファイルが表示されます。「アップロード」を押すとファイルがアップロードされます。

プログラムの一覧画面にアップロードしたプログラムが加わっているのがわかります。

アップロード先はS3の対象バケット中の「program」フォルダ配下になります。

ジョブ設定

ヘッダーメニューの「ジョブ」から「ジョブ一覧」を選択します。

「ジョブ一覧」画面の「ジョブの追加」ボタンをクリックします。

下記のモーダル画面が表示されるので、任意のジョブ名を入力し、「ジョブ実行種別」から「構成要素を実行」を選択し、「追加」をクリックします。

ジョブの設定画面で、「編集」をクリックします。

構成要素の編集画面では、対象のプログラムを選んで、ドラッグ&ドロップ、あるいは、「+」ボタンを押して右側に移動させます。右側に移動させると「引数」のボタンが現れるので、クリックします。

下記画面で引数を定義することができます。前述のとおり、コード側では、「sample_various」という名前で受ける事にしています( 「 all_vars["vars"].get("sample_various") 」の部分 )ので、「キー」に「sample_various」、「値」に任意の文字列を入力し、「完了」を押します。

引数を一つ登録したことで「引数(1)」となったことが確認できます。「保存」を押して、ジョブを更新します。

ジョブ実行・結果確認

対象のジョブがジョブの一覧画面に加わっています。左端のOn/Offボタンを「On」にした上で、右から三番目の手動実行ボタンをクリックします。

下記のとおり、「実行種別」、「実行結果の通知」の確認が表示されるので、選択した上で、「確認」を押し、実行します。

なお、ここでは、「通知しない」を選択していますが、「通知する」を選択した場合は、ジョブの成功/失敗時に通知メールが指定のメールアドレスに発信されることになります。通知メールの設定については、下記の投稿をご参照下さい。

参考:データ統合基盤 CSアナリティクスのメール通知機能

実行後、成功した場合は、下記のように4つのインジケーターのうち、左端のインジケータが緑色に表示されます。このインジケータをクリックし、ログを確認します。

なお、ジョブの実行に失敗した場合でも、「ジョブ実行履歴」から、失敗したジョブをリトライすることが可能です。(下記の枠線で囲ったボタンをクリックすることで、ジョブのリトライが可能です。)

ログを確認すると、コードの中で、printで書いた4行が表示されていることがわかります。

2行目には、「 print(all_vars["s3"]["bucket"]) 」に相対した形で、S3のバケット名が表示されています。

5行目には、「 print(all_vars["vars"]["sample_various"]) 」に相対した形で、ジョブ設定時に手動で設定した引数( ”サンプル実行時引数” )が表示されています。

↓設定されているS3バケットのバケット名
v4demo-csa-data-bucket

↓ジョブ設定時に定義可能な外部変数
サンプル実行時引数

まとめ

構成要素「プログラム」を使うことで、APIからのデータの取得、データの加工、CSVへの変換、S3へのデータ格納等、様々なデータの処理を実現することが可能となります。

構成要素「プログラム」はその他の構成要素である「データ連携」、「SQL」と組み合わせて利用されるケースが多いです。「データ連携」については、別のブログで説明していますので、併せてご確認頂けますと幸いです。

参考:データ統合基盤CSアナリティクスを使ってS3上のデータをRedshiftに定期連携してみる

今後もCSAの機能について定期的に発信していきます。