【IICS】GitHub Actionsを使用したIICS JOB自動テスト

2023.08.27

はじめに

データアナリティクス事業本部ビッグデータチームのyosh-kです。
今回はIICS CDIでリソースをGithubで管理し、チェックインした際にGithub ActionsでIICS JOBを自動テストすることが可能か検証していきたいと思います。

前提条件

以下の動画や記事などで紹介されていた内容を元に検証と実装を一部修正させていただきました。

事前に任意のマッピングタスク、マッピング、タスクフローを作成してあることとします。

また、IICS CDIとGithubでのソース管理が設定されていることとします。まだの場合は、以下のブログを参考にしてください。

実装

Automated Deployment of IICS Assets- CI/CD using Informatica API'sの添付ファイルの内容を元に実装しました。元のファイルでは、以下のような実装でした。

  • IICSログイン
  • 修正したマッピングタスクのテスト
  • 別環境へのpullとpullしたマッピングタスクのテスト

タスクフローのテスト追加と別環境は無かった為、以下の形に修正しました。

  • IICSログイン
  • 修正したマッピングタスク or タスクフローのテスト

以下、実際のフォルダ構成になります。Explore上にIICSリソースが格納され、scriptsにpythonスクリプトを格納し、.github/workflowsディレクトリ配下のymlファイルで実際のworkflowの設定を定義します。

(base) kasama.yoshiki@iics % tree 
.
└── .github
    └── workflows
        ├── iics_deployment.yml
├── Explore
│   └── cm-kasama
│       ├── blog9
│       │   ├── blog9.Folder.json
│       │   ├── blog9_mapping.xxx.zip
│       │   └── blog9_taskflow.TASKFLOW.xml
│       └── cm-kasama.Project.json
└── scripts
    ├── infa_login.py
    ├── infa_main.py
    └── lib
        ├── infa_get_mt_updates.py
        ├── infa_get_tf_updates.py
        └── my_logger.py
├── README.md
├── requirements.txt

(base) kasama.yoshiki@iics %

それぞれのソースコードを確認します。

.github/workflows/iics_deployment.yml

name: DEPLOY_MAPPING_TASK

# Controls when the workflow will run
on:
  push:
    branches:
      - main
    paths:
      - 'Explore/cm-kasama/**'
env:
  IICS_LOGIN_URL: https://dm-us.informaticacloud.com
  IICS_POD_URL: https://na1.dm-us.informaticacloud.com
 
  IICS_USERNAME: ${{ secrets.IICS_USERNAME }}
  IICS_PASSWORD: ${{ secrets.IICS_PASSWORD }}
  MAX_CONCURRENT_THREADS: ${{ secrets.MAX_CONCURRENT_THREADS }}


  # This is overriding until we connect the repository with a workflow_dispatch

  COMMIT_HASH: ${{ github.sha }}

# A workflow run is made up of one or more jobs that can run sequentially or in parallel
jobs:
  dev_build:
    name: Review Development Code
    environment:
      name: development
    # The type of runner that the job will run on
    runs-on: ubuntu-latest

    # Steps represent a sequence of tasks that will be executed as part of the job
    steps:
      # Checks-out your repository under $GITHUB_WORKSPACE, so your job can access it
      - uses: actions/checkout@v3

      # Install python and dependent modules
      - uses: actions/setup-python@v3
        with:
          python-version: '3.x'
      
      - name: Install python modules
        run: |
            python -m pip install --upgrade pip
            pip install -r requirements.txt

      # Runs a single command using the runners shell
      - name: Login to development
        run: |
            python ./scripts/infa_login.py

      - name: Test Committed Tasks
        run: python ./scripts/infa_main.py

このGitHub Actionsのワークフローは、特定の条件で指定したタスクを実行するものです。以下に、簡単な解説を行います。

  • on: mainブランチの該当IICSプロジェクト(cm-kasama)にpushされたことをtriggerとする設定
  • env: 環境変数の定義、secretsは後ほどGithub上で設定する値となります。
  • jobs: ubuntu-latestという最新のUbuntuランナー上で実行されます。
  • step: 以下の順序でタスクが実行されます:
    • リポジトリをチェックアウトします。
    • Pythonをセットアップし、3.xバージョンを使用します。
    • 必要なPythonモジュールをインストールします。
    • ./scripts/infa_login.pyスクリプトを実行して、開発環境へログインします。
    • ./scripts/infa_main.pyスクリプトを実行して、マッピングタスクをテストします。

scripts/infa_login.py

import os
import requests

URL = os.environ["IICS_LOGIN_URL"]
USERNAME = os.environ["IICS_USERNAME"]
PASSWORD = os.environ["IICS_PASSWORD"]


URL = "https://dm-us.informaticacloud.com/saas/public/core/v3/login"
BODY = {"username": USERNAME, "password": PASSWORD}

r = requests.post(url=URL, json=BODY)

if r.status_code != 200:
    print("Caught exception: " + r.text)


# extracting data in json format
data = r.json()

# Set session tokens to the environment
env_file = os.getenv("GITHUB_ENV")

with open(env_file, "a") as myfile:
    myfile.write("sessionId=" + data["userInfo"]["sessionId"] + "\n")

IICS上でマッピングタスクやタスクフローを実行するために、IICSにログインしsessionIdを環境変数に設定します。

scripts/infa_main.py

import os
import sys
import requests
import threading
from threading import Semaphore, Event
from lib.infa_get_tf_updates import run_tf_job
from lib.infa_get_mt_updates import run_mt_job
from lib.my_logger import MyLogger


def threaded_error_handler(semaphore, error_event, func, *args):
    try:
        func(*args)
    except Exception as e:
        error_event.set()  # エラーが発生したらeventをset
    finally:
        semaphore.release()


def main():
    my_logger = MyLogger(__name__)
    logger = my_logger.logger
    MAX_CONCURRENT_THREADS = int(os.environ.get("MAX_CONCURRENT_THREADS", ""))
    SAAS_URL = os.environ.get("IICS_POD_URL", "") + "/saas"
    SESSION_ID = os.environ.get("sessionId", "")
    COMMIT_HASH = os.environ.get("COMMIT_HASH", "")
    API_URL = os.environ.get("IICS_POD_URL", "")

    HEADERS = {"Content-Type": "application/json; charset=utf-8", "INFA-SESSION-ID": SESSION_ID}
    HEADERS_V2 = {"Accept": "application/json", "IDS-SESSION-ID": SESSION_ID}

    # Semaphore to limit number of concurrent threads
    semaphore = Semaphore(MAX_CONCURRENT_THREADS)
    error_event = Event()
    try:
        logger.info(f"Getting all objects for the commit: {COMMIT_HASH}")
        response = requests.get(SAAS_URL + "/public/core/v3/commit/" + COMMIT_HASH, headers=HEADERS)
        logger.info(f"get /public/core/v3/commit/ status_code : {response.status_code}")

        if response.status_code != 200:
            raise Exception(f"Exception caught: {response.text}")

        request_json = response.json()
        threads = []

        # Create a dictionary to map types to jobs
        jobs = {"MTT": run_mt_job, "TASKFLOW": run_tf_job}

        for iics_change in request_json["changes"]:
            job = jobs.get(iics_change["type"])
            if job:
                semaphore.acquire()
                t = threading.Thread(
                    target=threaded_error_handler,
                    args=(
                        semaphore,
                        error_event,
                        job,
                        iics_change,
                        SAAS_URL if iics_change["type"] == "MTT" else API_URL,
                        HEADERS_V2,
                    ),
                )
                threads.append(t)
                t.start()
            else:
                logger.info("Couldn't find the taskflow or mapping task")

        for t in threads:
            t.join()

        requests.post(SAAS_URL + "/public/core/v3/logout", headers=HEADERS)

        if error_event.is_set():  # エラーが発生していたら
            sys.exit(99)
    except Exception as e:
        logger.error(f"Unexpected error: {str(e)}")
        sys.exit(99)


if __name__ == "__main__":
    main()

main処理では、異なる種類のジョブを並行して実行する処理を実装しています。

  • threaded_error_handler
    • スレッドで実行する関数のエラーハンドラーです。
    • 関数funcがエラーなく完了した場合、セマフォをリリースします。
    • エラーが発生した場合、エラーイベントをセットします。メインスレッドは、すべてのスレッドが終了した後でこのイベントを確認し、エラーが発生しているかどうかを判断します。
  • main関数:
    • Semaphore: セマフォを使用して同時に実行できるスレッドの最大数をMAX_CONCURRENT_THREADSの値に制御しています。これにより、リソースの過度な使用を避けることができます。
    • 取得したCOMMIT_HASHを使用して、関連する全てのオブジェクトを取得しようとします。
    • 成功した場合、レスポンスJSONから変更を取得し、各変更のタイプ(マッピングタスク or タスクフロー)に基づいて特定のジョブをスレッドとして実行します。
    • すべてのスレッドが完了するのを待ち、エラーイベントがセットされていれば、エラーコード99で終了します。

scripts/lib/infa_get_mt_updates.py

import time
import requests
from lib.my_logger import MyLogger

my_logger = MyLogger(__name__)
logger = my_logger.logger


def run_mt_job(iics_change, saas_url, headers_v2):
    try:
        BODY = {"@type": "job", "taskId": iics_change["appContextId"], "taskType": "MTT"}
        logger.info(f"{saas_url} /api/v2/job/ {iics_change['name']}")
        response = requests.post(saas_url + "/api/v2/job/", headers=headers_v2, json=BODY)
        logger.info(f"post /api/v2/job/  status code : {response.status_code}")
        if response.status_code != 200:
            logger.error(f"Exception caught: {response.text}")
            raise Exception(f"Exception caught: {response.text}")

        test_json = response.json()
        logger.info(test_json)
        PARAMS = "?taskId=" + test_json["taskId"]
        logger.info(f"PARAMS: {PARAMS}")
        STATE = 0
        polling_count = 0
        while polling_count <= 6:
            time.sleep(30)
            activity_response = requests.get(saas_url + "/api/v2/activity/activityLog" + PARAMS, headers=headers_v2)

            activity_log = activity_response.json()
            STATE = activity_log[0]["state"]
            logger.info(f"STATE: {STATE}")
            if STATE == 1:
                logger.info(f"Mapping task: {activity_log[0]['objectName']} completed successfully.")
                break
            polling_count += 1
        if STATE != 1:
            raise Exception(f"Mapping task: {activity_log[0]['objectName']} failed.")
    except Exception as e:
        logger.info(activity_log)
        logger.error(f"Unexpected error: {str(e)}")
        raise e

run_mt_job処理では、マッピングタスクを実行し、その実行結果を監視します。

  • /api/v2/job/: BODYのtaskTypeでMTTを指定することでマッピングタスクを実行することができます。
  • /api/v2/activity/activityLog: 実行した経過を監視し、タスクが成功した場合は、STATEが1となります。

scripts/lib/infa_get_tf_updates.py

import time
import requests
from lib.my_logger import MyLogger

my_logger = MyLogger(__name__)
logger = my_logger.logger


def run_tf_job(iics_change, api_url, headers_v2):
    try:
        # This loop runs tests for each one of the mapping tasks
        logger.info(f"api_url: {api_url} /active-bpel/rt/ {iics_change['name']}")
        response = requests.post(api_url + "/active-bpel/rt/" + iics_change["name"], headers=headers_v2)
        logger.info(f"post /ctive-bpel/rt/  status code : {response.status_code}")
        if response.status_code != 200:
            raise Exception(f"Exception caught: {response.text}")

        test_json = response.json()
        logger.info(f"test_json: {test_json}")
        run_id = str(test_json["RunId"])
        logger.info(f"run_id: {run_id}")

        STATUS = "RUNNING"

        while STATUS == "RUNNING":
            time.sleep(30)
            status_response = requests.get(api_url + "/active-bpel/services/tf/status/" + run_id, headers=headers_v2)

            status_data = status_response.json()
            logger.info(f"assetName: {status_data['assetName']}")
            STATUS = status_data["status"]
            logger.info(f"STATUS: {STATUS}")
        if STATUS == "SUCCESS":
            logger.info(f"TASKFLOW: {status_data['assetName']} completed successfully. ")
        else:
            logger.error(f"STATUS: {STATUS}")
            raise Exception(f"TASKFLOW: {status_data['assetName']} failed. ")
    except Exception as e:
        logger.error(f"Unexpected error: {str(e)}")
        raise e

run_tf_job処理では、タスクフローを実行し、その実行結果を監視します。

scripts/lib/my_logger.py

import sys
import logging, logging.handlers


class DebugFilter(logging.Filter):
    def filter(self, record):
        return record.levelno == logging.DEBUG


class InfoFilter(logging.Filter):
    def filter(self, record):
        return record.levelno == logging.INFO


class MyLogger:
    def __init__(self, name):
        self.logger = logging.getLogger(name)
        self.logger.setLevel(logging.DEBUG)

        formatter = logging.Formatter(
            "%(asctime)s - %(levelname)s - %(name)s - %(funcName)s  - line:%(lineno)d - %(message)s"
        )

        console_handler = logging.StreamHandler(sys.stdout)
        console_handler.setLevel(logging.INFO)
        console_handler.setFormatter(formatter)
        info_filter = InfoFilter()
        console_handler.addFilter(info_filter)
        self.logger.addHandler(console_handler)

        error_handler = logging.StreamHandler(sys.stderr)
        error_handler.setLevel(logging.WARNING)
        error_handler.setFormatter(formatter)
        self.logger.addHandler(error_handler)

loggerでのlog出力のためのclassを定義したpythonファイルです。

requirements.txt

requests

処理の中で使用するrequestsモジュールを記載しています。

IICS設定

以下の記事記載の通り、タスクフローをAPI経由で起動させるためには、タスクフローのStartプロパティで許可されたユーザーまたは許可されたグループを追加し、publishする必要があるため、テスト用に作成したタスクフローに対して、自身のユーザーを設定し、publishします。

Github secret設定

Github の secretを用いて、暗号化された環境変数を設定します。project → Setting → secrets and variables → Actions → New repository secretを選択し、 IICS_PASSWORDIICS_USERNAMEMAX_CONCURRENT_THREADSのそれぞれを作成します。

暗号化されたシークレット

実行

同時実行数を4としているため、同時実行数を超えた場合は、一つ実行が完了後にジョブが起動されることを確認したいため、1つのタスクフローと4つのマッピングタスクをチェックインします。

実行結果

チェックイン後は、ymlファイルに記載された定義に基づいてworkflowが起動し、マッピングタスクとタスクフローが実行されています。同時実行数は4であるため、4つのジョブが起動しています。

1つのジョブが正常に終了したら、残りのジョブが起動したことも確認できました。

全てのジョブが完了後に、workflowも完了となりました。

最後に

Github ActionsでIICS JOBを自動テストすること自体は可能でしたが、以下の点は引き続き検討が必要です。

  • 実行したくないリソースの制御
  • タスクフローとタスクフローに関連するマッピングタスクの同時実行制御
    • タスクフローのみに絞る?
  • テスト対象リソースのテストデータ
    • 事前準備が必要

1点目については、特定のプロジェクト、フォルダの場合のみテストジョブを実行するように実装修正はできそうですが、その他については引き続き検討してきたいと思います。