CSA JMCでExcelファイルを取り込んでみる

2023.05.24

はじめに

データアナリティクス事業本部のkobayashiです。

クラスメソッドが展開しているデータ統合基盤サービス『CSアナリティクス』(以下CSA)のプロダクト群の1つにCSA JMC(移行JMC)があります。JMCでは、データ連携という機能を使用してS3上のファイルをDWHに取り込むことができます。公式にサポートしているソースファイル形式はCSV/TSV、JSON、Parquetです。これらの形式でソースファイルを直接DWHに取り込むことができますが、企業ではExcel形式でデータを作成・保存していることが多いかと思います。しかし、Pythonを使用することでJMCでもExcelデータを取り込むことができますので、その方法をまとめます。

CSA JMCでExcelデータをDWHに取り込む

ExcelファイルはJMCでデータをDWHに取り込む機能である「データ連携」では直接取り込むことができません。したがって、これを実現するには以下の流れでDWHに取り込む必要があります。

  1. JMCのデータ連携を作成する(CSV取り込み用のジョブを作成する)
  2. JMCのプログラム実行を作成する(Pythonスクリプトを作成する)
    1. S3から元データを取得する
    2. ExcelデータをCSVへ変換する
    3. S3へCSVをアップロードする

では上記の手順を順に説明したいと思います。

今回扱うExcelファイルは以下のようなファイルmajor_results_2020.xlsxを扱います。

このファイルのA10以降で1列目から11列目までを取り込んでみたいと思います。

またJMCの実行予定単位はdailyで、ソースファイルもS3のバケット配下のdaily/cm_kobayashi_test/raw_major_results_2020/2023/05/01/major_results_2020.xlsxにある想定です。

JMCのデータ連携を作成する

はじめにExcelから変換したCSVを取り込むデータ連携の設定を行います。

今回行うのは対象のExcelの1列目から11列目なので予めRedshiftに取り込み先のテーブルを以下のSQLで作成しておきます。

create table cm_kobayashi_test.major_results_2020
(
    pref_name            varchar(64),
    city_name            varchar(64),
    city_name_eng        varchar(64),
    city_code            varchar(4),
    population           decimal(10, 1),
    population_male      decimal(10, 1),
    population_female    decimal(10, 1),
    population_2015      decimal(10, 1),
    population_variation decimal(10, 1),
    population_var_ratio decimal(10, 5),
    area                 decimal(8, 2)
);

次にデータ連携の設定をJMCの管理画面から行います。構成要素 > データ連携から作成を選び、スキーマ名とテーブル名を作成したものに設定します。またS3ファイルパスは変換後のcsvをアップロードするパスを指定しておきます。その他の設定はCSVの形式に関する設定であり、出力するCSVの形式に合わせて調整しますが、今回の方法ではデフォルトの設定のままで問題ありません。

この状態で保存を行えば変換後のCSVをRedshiftに取り込むための構成要素が作成されます。

JMCのプログラム実行用のPythonスクリプトを作成する

次にJMCのプログラム実行で使うPythonスクリプトを作成します。スクリプトではS3上のファイルをダウンロードしてからExcelをCSVに変換し再度S3へアップロードする処理を記述します。

convert_ex2csv.py

import pandas as pd
import boto3
from openpyxl import load_workbook
import tempfile

# csa用のモジュールを読み込み
import csa_env

# JMCで扱うS3バケットを指定
S3_BUCKET = '{データ連携用バケット}'

s3 = boto3.client('s3')


def main():
    try:
        # JMC実行時の環境編集を取得する
        env = csa_env.get()
        schedule_unit = env['schedule_unit'] # スケジュールユニットを取得
        today_slash = env['today_slash'] # 実行日時をスラッシュ区切りで取得

        # 実行時の環境変数を使ってソースパスを指定
        key = '{}/cm_kobayashi_test/raw_major_results_2020/{}/'.format(schedule_unit, today_slash)

        # start converting
        response = s3.list_objects(Bucket=S3_BUCKET, Prefix=key)
        if not response.get('Contents'):
            raise Exception("{}/{}ファイルが存在しません。".format(S3_BUCKET, key))

        # xlsxファイルを全て処理
        for file_path in [
            v['Key'] for v in response.get('Contents') if v['Key'].endswith(".xlsx")
        ]:
            file_name = file_path.split("/")[-1]
            # 一時ディレクトリを作成
            with tempfile.TemporaryDirectory(dir="/tmp/", prefix="csajmc-") as tmpdir:
                print("TempDirName:{}".format(tmpdir))
                local_file_path = tmpdir + "/{}".format(file_name)

                # S3からファイルをダウンロード
                s3.download_file(S3_BUCKET, file_path, local_file_path)

                # Excelファイルを読み込み
                wb = load_workbook(local_file_path, data_only=True)
                df_excel = pd.DataFrame(wb["第1面事項_2020年"].values)

                # Excelからデータを抽出
                # ソースとなるエクセルファイルの先頭列から11列目までを取得する
                ls_out = []
                # 9行目まではヘッダーなので10行目から1行ずつ取得する
                for i in range(9, len(df_excel), 1):
                    ls_out.append(
                        [
                            df_excel.at[i, 0],
                            df_excel.at[i, 1],
                            df_excel.at[i, 2],
                            df_excel.at[i, 3],
                            df_excel.at[i, 4] if df_excel.at[i, 4] != "-" else None, # Excelではデータがない場合 - が入るのでNoneに置換する
                            df_excel.at[i, 5] if df_excel.at[i, 5] != "-" else None,
                            df_excel.at[i, 6] if df_excel.at[i, 6] != "-" else None,
                            df_excel.at[i, 7] if df_excel.at[i, 7] != "-" else None,
                            df_excel.at[i, 8] if df_excel.at[i, 8] != "-" else None,
                            df_excel.at[i, 9] if df_excel.at[i, 9] != "-" else None,
                            df_excel.at[i, 10] if df_excel.at[i, 10] != "-" else None,
                        ]
                    )

                # リストをデータフレームに変換してからcsvとして保存する
                pd.DataFrame(ls_out).to_csv(local_file_path + '.csv', header=True, index=False)

                # S3にファイルをアップロード
                s3.upload_file(
                    local_file_path + '.csv',
                    S3_BUCKET,
                    '{}/cm_kobayashi_test/major_results_2020/{}/{}'.format(schedule_unit, today_slash, file_name) + '.csv',
                )

    except Exception as e:
        raise Exception(e)


if __name__ == '__main__':
    main()

細かくはスクリプト中のコメントを読んでいただければ良いのですが、大まかには、

  1. JMCの実行時の環境変数からS3の取得元パスを作り
  2. S3から一時ディレクトリにExcelファイルをダウンロードする
  3. Excelファイルをopenpyxlで読み込んで対象部分のみを抽出する(必要があれば値を変換する)
  4. ローカルでCSVに保存して
  5. S3のJMCが読み込むパスにアップロードする

といった流れです。

このスクリプトをS3のJMCでプログラム実行するためのパスs3://{バケット名}/program/にアップロードします。

JMCジョブを作成する

下準備は整ったのであとはJMCのジョブを作成します。JMCの管理画面のジョブ > ジョブ一覧でジョブの追加を選びます。

ジョブの設定画面が開くので実行予定時間を日次にして時間を適当に設定します。

次に編集を押下して、先に作成したPythonスクリプトとデータ連携の構成要素の順に設定します。

以上でジョブの作成は完了です。

あとはジョブを実行すればExcelのデータをDWHに取り込めますが、ExcelをCSVに変換するためにはopenpyxlライブラリを使っていますので、JMCのプログラム実行でopenpyxlライブラリを使う設定が必要です。

JMCの管理画面よりサイト管理 > プログラム実行設定にてPythonライブラリでopenpyxlを入力し保存を押します。しばらくするとJMCのプログラム実行でopenpyxlが使えるようになります。

これですべての準備は完了です。あとは実行日時を2023/05/01で実行すればRedshiftへデータが取り込まれます。

Redshiftの対象のテーブルを見てみるとExcelのデータが取り込まれていることがわかります。

> SELECT t.* FROM cm_kobayashi_test.major_results_2020 t;

pref_name   city_name   city_name_eng   city_code   population  population_male population_female   population_2015 population_variation    population_var_ratio    area
00_全国   00000_全国    Japan   a   126146099.0 61349581.0  64796518.0  127094745.0 -948646.0   -0.74641    377976.41
01_北海道  01000_北海道   Hokkaido    a   5224614.0   2465088.0   2759526.0   5381733.0   -157119.0   -2.91949    83424.44
01_北海道  01100_札幌市   Sapporo-shi 1   1973395.0   918682.0    1054713.0   1952356.0   21039.0 1.07762 1121.26
01_北海道  01101_札幌市中央区    Sapporo-shi Chuo-ku 0   248680.0    112853.0    135827.0    237627.0    11053.0 4.65141 46.42

まとめ

CSA JMCをつかってExcel形式のデータをDWHに取り込んでみました。Pythonのスクリプトを少し調整する必要がありますが、JMCでも問題なくExcelファイルをDWHに取り込むことができます。データ分析基盤をお探しの場合はぜひ一度CSA JMCをご検討ください。

最後まで読んで頂いてありがとうございました。