はじめに
データアナリティクス事業本部ビッグデータチームのkasamaです。
今回はIICS CDIでリソースをGithubで管理し、チェックインした際にGithub ActionsでIICS JOBを自動テストすることが可能か検証していきたいと思います。
前提条件
以下の動画や記事などで紹介されていた内容を元に検証と実装を一部修正させていただきました。
事前に任意のマッピングタスク、マッピング、タスクフローを作成してあることとします。
また、IICS CDIとGithubでのソース管理が設定されていることとします。まだの場合は、以下のブログを参考にしてください。
実装
Automated Deployment of IICS Assets- CI/CD using Informatica API'sの添付ファイルの内容を元に実装しました。元のファイルでは、以下のような実装でした。
- IICSログイン
- 修正したマッピングタスクのテスト
- 別環境へのpullとpullしたマッピングタスクのテスト
タスクフローのテスト追加と別環境は無かった為、以下の形に修正しました。
- IICSログイン
- 修正したマッピングタスク or タスクフローのテスト
以下、実際のフォルダ構成になります。Explore上にIICSリソースが格納され、scriptsにpythonスクリプトを格納し、.github/workflowsディレクトリ配下のymlファイルで実際のworkflowの設定を定義します。
(base) kasama.yoshiki@iics % tree
.
└── .github
└── workflows
├── iics_deployment.yml
├── Explore
│ └── cm-kasama
│ ├── blog9
│ │ ├── blog9.Folder.json
│ │ ├── blog9_mapping.xxx.zip
│ │ └── blog9_taskflow.TASKFLOW.xml
│ └── cm-kasama.Project.json
└── scripts
├── infa_login.py
├── infa_main.py
└── lib
├── infa_get_mt_updates.py
├── infa_get_tf_updates.py
└── my_logger.py
├── README.md
├── requirements.txt
(base) kasama.yoshiki@iics %
それぞれのソースコードを確認します。
.github/workflows/iics_deployment.yml
name: DEPLOY_MAPPING_TASK
# Controls when the workflow will run
on:
push:
branches:
- main
paths:
- 'Explore/cm-kasama/**'
env:
IICS_LOGIN_URL: https://dm-us.informaticacloud.com
IICS_POD_URL: https://na1.dm-us.informaticacloud.com
IICS_USERNAME: ${{ secrets.IICS_USERNAME }}
IICS_PASSWORD: ${{ secrets.IICS_PASSWORD }}
MAX_CONCURRENT_THREADS: ${{ secrets.MAX_CONCURRENT_THREADS }}
# This is overriding until we connect the repository with a workflow_dispatch
COMMIT_HASH: ${{ github.sha }}
# A workflow run is made up of one or more jobs that can run sequentially or in parallel
jobs:
dev_build:
name: Review Development Code
environment:
name: development
# The type of runner that the job will run on
runs-on: ubuntu-latest
# Steps represent a sequence of tasks that will be executed as part of the job
steps:
# Checks-out your repository under $GITHUB_WORKSPACE, so your job can access it
- uses: actions/checkout@v3
# Install python and dependent modules
- uses: actions/setup-python@v3
with:
python-version: '3.x'
- name: Install python modules
run: |
python -m pip install --upgrade pip
pip install -r requirements.txt
# Runs a single command using the runners shell
- name: Login to development
run: |
python ./scripts/infa_login.py
- name: Test Committed Tasks
run: python ./scripts/infa_main.py
このGitHub Actionsのワークフローは、特定の条件で指定したタスクを実行するものです。以下に、簡単な解説を行います。
on
: mainブランチの該当IICSプロジェクト(cm-kasama)にpushされたことをtriggerとする設定env
: 環境変数の定義、secretsは後ほどGithub上で設定する値となります。jobs
:ubuntu-latest
という最新のUbuntuランナー上で実行されます。step
: 以下の順序でタスクが実行されます:- リポジトリをチェックアウトします。
- Pythonをセットアップし、
3.x
バージョンを使用します。 - 必要なPythonモジュールをインストールします。
./scripts/infa_login.py
スクリプトを実行して、開発環境へログインします。./scripts/infa_main.py
スクリプトを実行して、マッピングタスクをテストします。
scripts/infa_login.py
import os
import requests
URL = os.environ["IICS_LOGIN_URL"]
USERNAME = os.environ["IICS_USERNAME"]
PASSWORD = os.environ["IICS_PASSWORD"]
URL = "https://dm-us.informaticacloud.com/saas/public/core/v3/login"
BODY = {"username": USERNAME, "password": PASSWORD}
r = requests.post(url=URL, json=BODY)
if r.status_code != 200:
print("Caught exception: " + r.text)
# extracting data in json format
data = r.json()
# Set session tokens to the environment
env_file = os.getenv("GITHUB_ENV")
with open(env_file, "a") as myfile:
myfile.write("sessionId=" + data["userInfo"]["sessionId"] + "\n")
IICS上でマッピングタスクやタスクフローを実行するために、IICSにログインしsessionIdを環境変数に設定します。
scripts/infa_main.py
import os
import sys
import requests
import threading
from threading import Semaphore, Event
from lib.infa_get_tf_updates import run_tf_job
from lib.infa_get_mt_updates import run_mt_job
from lib.my_logger import MyLogger
def threaded_error_handler(semaphore, error_event, func, *args):
try:
func(*args)
except Exception as e:
error_event.set() # エラーが発生したらeventをset
finally:
semaphore.release()
def main():
my_logger = MyLogger(__name__)
logger = my_logger.logger
MAX_CONCURRENT_THREADS = int(os.environ.get("MAX_CONCURRENT_THREADS", ""))
SAAS_URL = os.environ.get("IICS_POD_URL", "") + "/saas"
SESSION_ID = os.environ.get("sessionId", "")
COMMIT_HASH = os.environ.get("COMMIT_HASH", "")
API_URL = os.environ.get("IICS_POD_URL", "")
HEADERS = {"Content-Type": "application/json; charset=utf-8", "INFA-SESSION-ID": SESSION_ID}
HEADERS_V2 = {"Accept": "application/json", "IDS-SESSION-ID": SESSION_ID}
# Semaphore to limit number of concurrent threads
semaphore = Semaphore(MAX_CONCURRENT_THREADS)
error_event = Event()
try:
logger.info(f"Getting all objects for the commit: {COMMIT_HASH}")
response = requests.get(SAAS_URL + "/public/core/v3/commit/" + COMMIT_HASH, headers=HEADERS)
logger.info(f"get /public/core/v3/commit/ status_code : {response.status_code}")
if response.status_code != 200:
raise Exception(f"Exception caught: {response.text}")
request_json = response.json()
threads = []
# Create a dictionary to map types to jobs
jobs = {"MTT": run_mt_job, "TASKFLOW": run_tf_job}
for iics_change in request_json["changes"]:
job = jobs.get(iics_change["type"])
if job:
semaphore.acquire()
t = threading.Thread(
target=threaded_error_handler,
args=(
semaphore,
error_event,
job,
iics_change,
SAAS_URL if iics_change["type"] == "MTT" else API_URL,
HEADERS_V2,
),
)
threads.append(t)
t.start()
else:
logger.info("Couldn't find the taskflow or mapping task")
for t in threads:
t.join()
requests.post(SAAS_URL + "/public/core/v3/logout", headers=HEADERS)
if error_event.is_set(): # エラーが発生していたら
sys.exit(99)
except Exception as e:
logger.error(f"Unexpected error: {str(e)}")
sys.exit(99)
if __name__ == "__main__":
main()
main処理では、異なる種類のジョブを並行して実行する処理を実装しています。
threaded_error_handler
- スレッドで実行する関数のエラーハンドラーです。
- 関数
func
がエラーなく完了した場合、セマフォをリリースします。 - エラーが発生した場合、エラーイベントをセットします。メインスレッドは、すべてのスレッドが終了した後でこのイベントを確認し、エラーが発生しているかどうかを判断します。
main関数
:Semaphore
: セマフォを使用して同時に実行できるスレッドの最大数をMAX_CONCURRENT_THREADS
の値に制御しています。これにより、リソースの過度な使用を避けることができます。- 取得したCOMMIT_HASHを使用して、関連する全てのオブジェクトを取得しようとします。
- 成功した場合、レスポンスJSONから変更を取得し、各変更のタイプ(マッピングタスク or タスクフロー)に基づいて特定のジョブをスレッドとして実行します。
- すべてのスレッドが完了するのを待ち、エラーイベントがセットされていれば、エラーコード
99
で終了します。
scripts/lib/infa_get_mt_updates.py
import time
import requests
from lib.my_logger import MyLogger
my_logger = MyLogger(__name__)
logger = my_logger.logger
def run_mt_job(iics_change, saas_url, headers_v2):
try:
BODY = {"@type": "job", "taskId": iics_change["appContextId"], "taskType": "MTT"}
logger.info(f"{saas_url} /api/v2/job/ {iics_change['name']}")
response = requests.post(saas_url + "/api/v2/job/", headers=headers_v2, json=BODY)
logger.info(f"post /api/v2/job/ status code : {response.status_code}")
if response.status_code != 200:
logger.error(f"Exception caught: {response.text}")
raise Exception(f"Exception caught: {response.text}")
test_json = response.json()
logger.info(test_json)
PARAMS = "?taskId=" + test_json["taskId"]
logger.info(f"PARAMS: {PARAMS}")
STATE = 0
polling_count = 0
while polling_count <= 6:
time.sleep(30)
activity_response = requests.get(saas_url + "/api/v2/activity/activityLog" + PARAMS, headers=headers_v2)
activity_log = activity_response.json()
STATE = activity_log[0]["state"]
logger.info(f"STATE: {STATE}")
if STATE == 1:
logger.info(f"Mapping task: {activity_log[0]['objectName']} completed successfully.")
break
polling_count += 1
if STATE != 1:
raise Exception(f"Mapping task: {activity_log[0]['objectName']} failed.")
except Exception as e:
logger.info(activity_log)
logger.error(f"Unexpected error: {str(e)}")
raise e
run_mt_job処理では、マッピングタスクを実行し、その実行結果を監視します。
/api/v2/job/
: BODYのtaskTypeでMTTを指定することでマッピングタスクを実行することができます。/api/v2/activity/activityLog
: 実行した経過を監視し、タスクが成功した場合は、STATEが1となります。
scripts/lib/infa_get_tf_updates.py
import time
import requests
from lib.my_logger import MyLogger
my_logger = MyLogger(__name__)
logger = my_logger.logger
def run_tf_job(iics_change, api_url, headers_v2):
try:
# This loop runs tests for each one of the mapping tasks
logger.info(f"api_url: {api_url} /active-bpel/rt/ {iics_change['name']}")
response = requests.post(api_url + "/active-bpel/rt/" + iics_change["name"], headers=headers_v2)
logger.info(f"post /ctive-bpel/rt/ status code : {response.status_code}")
if response.status_code != 200:
raise Exception(f"Exception caught: {response.text}")
test_json = response.json()
logger.info(f"test_json: {test_json}")
run_id = str(test_json["RunId"])
logger.info(f"run_id: {run_id}")
STATUS = "RUNNING"
while STATUS == "RUNNING":
time.sleep(30)
status_response = requests.get(api_url + "/active-bpel/services/tf/status/" + run_id, headers=headers_v2)
status_data = status_response.json()
logger.info(f"assetName: {status_data['assetName']}")
STATUS = status_data["status"]
logger.info(f"STATUS: {STATUS}")
if STATUS == "SUCCESS":
logger.info(f"TASKFLOW: {status_data['assetName']} completed successfully. ")
else:
logger.error(f"STATUS: {STATUS}")
raise Exception(f"TASKFLOW: {status_data['assetName']} failed. ")
except Exception as e:
logger.error(f"Unexpected error: {str(e)}")
raise e
run_tf_job処理では、タスクフローを実行し、その実行結果を監視します。
/active-bpel/rt/
: 指定したpathよりタスクフローを実行することができます。/active-bpel/services/tf/status/
: 実行した経過を監視し、タスクが成功した場合は、STATUSがSUCCESSとなります。
scripts/lib/my_logger.py
import sys
import logging, logging.handlers
class DebugFilter(logging.Filter):
def filter(self, record):
return record.levelno == logging.DEBUG
class InfoFilter(logging.Filter):
def filter(self, record):
return record.levelno == logging.INFO
class MyLogger:
def __init__(self, name):
self.logger = logging.getLogger(name)
self.logger.setLevel(logging.DEBUG)
formatter = logging.Formatter(
"%(asctime)s - %(levelname)s - %(name)s - %(funcName)s - line:%(lineno)d - %(message)s"
)
console_handler = logging.StreamHandler(sys.stdout)
console_handler.setLevel(logging.INFO)
console_handler.setFormatter(formatter)
info_filter = InfoFilter()
console_handler.addFilter(info_filter)
self.logger.addHandler(console_handler)
error_handler = logging.StreamHandler(sys.stderr)
error_handler.setLevel(logging.WARNING)
error_handler.setFormatter(formatter)
self.logger.addHandler(error_handler)
loggerでのlog出力のためのclassを定義したpythonファイルです。
requirements.txt
requests
処理の中で使用するrequestsモジュールを記載しています。
IICS設定
以下の記事記載の通り、タスクフローをAPI経由で起動させるためには、タスクフローのStart
プロパティで許可されたユーザー
または許可されたグループ
を追加し、publishする必要があるため、テスト用に作成したタスクフローに対して、自身のユーザーを設定し、publishします。
- How to start an IICS Taskflow via Rest/SOAP API?
- Advanced Taskflows not shown in monitor when triggered
Github secret設定
Github の secretを用いて、暗号化された環境変数を設定します。project → Setting → secrets and variables → Actions → New repository secret
を選択し、 IICS_PASSWORD
、IICS_USERNAME
、MAX_CONCURRENT_THREADS
のそれぞれを作成します。
実行
同時実行数を4としているため、同時実行数を超えた場合は、一つ実行が完了後にジョブが起動されることを確認したいため、1つのタスクフローと4つのマッピングタスクをチェックインします。
実行結果
チェックイン後は、ymlファイルに記載された定義に基づいてworkflowが起動し、マッピングタスクとタスクフローが実行されています。同時実行数は4であるため、4つのジョブが起動しています。
1つのジョブが正常に終了したら、残りのジョブが起動したことも確認できました。
全てのジョブが完了後に、workflowも完了となりました。
最後に
Github ActionsでIICS JOBを自動テストすること自体は可能でしたが、以下の点は引き続き検討が必要です。
- 実行したくないリソースの制御
- タスクフローとタスクフローに関連するマッピングタスクの同時実行制御
- タスクフローのみに絞る?
- テスト対象リソースのテストデータ
- 事前準備が必要
1点目については、特定のプロジェクト、フォルダの場合のみテストジョブを実行するように実装修正はできそうですが、その他については引き続き検討してきたいと思います。