はじめに
データアナリティクス事業本部のkobayashiです。
クラスメソッドが展開しているデータ統合基盤サービス『CSアナリティクス』(以下CSA)のプロダクト群の1つにCSA JMC(移行JMC)があります。JMCでは、データ連携という機能を使用してS3上のファイルをDWHに取り込むことができます。公式にサポートしているソースファイル形式はCSV/TSV、JSON、Parquetです。これらの形式でソースファイルを直接DWHに取り込むことができますが、企業ではExcel形式でデータを作成・保存していることが多いかと思います。しかし、Pythonを使用することでJMCでもExcelデータを取り込むことができますので、その方法をまとめます。
CSA JMCでExcelデータをDWHに取り込む
ExcelファイルはJMCでデータをDWHに取り込む機能である「データ連携」では直接取り込むことができません。したがって、これを実現するには以下の流れでDWHに取り込む必要があります。
- JMCのデータ連携を作成する(CSV取り込み用のジョブを作成する)
- JMCのプログラム実行を作成する(Pythonスクリプトを作成する)
- S3から元データを取得する
- ExcelデータをCSVへ変換する
- S3へCSVをアップロードする
では上記の手順を順に説明したいと思います。
今回扱うExcelファイルは以下のようなファイルmajor_results_2020.xlsx
を扱います。
このファイルのA10以降で1列目から11列目までを取り込んでみたいと思います。
またJMCの実行予定単位はdaily
で、ソースファイルもS3のバケット配下のdaily/cm_kobayashi_test/raw_major_results_2020/2023/05/01/major_results_2020.xlsx
にある想定です。
JMCのデータ連携を作成する
はじめにExcelから変換したCSVを取り込むデータ連携の設定を行います。
今回行うのは対象のExcelの1列目から11列目なので予めRedshiftに取り込み先のテーブルを以下のSQLで作成しておきます。
create table cm_kobayashi_test.major_results_2020
(
pref_name varchar(64),
city_name varchar(64),
city_name_eng varchar(64),
city_code varchar(4),
population decimal(10, 1),
population_male decimal(10, 1),
population_female decimal(10, 1),
population_2015 decimal(10, 1),
population_variation decimal(10, 1),
population_var_ratio decimal(10, 5),
area decimal(8, 2)
);
次にデータ連携の設定をJMCの管理画面から行います。構成要素 > データ連携
から作成を選び、スキーマ名とテーブル名を作成したものに設定します。またS3ファイルパスは変換後のcsvをアップロードするパスを指定しておきます。その他の設定はCSVの形式に関する設定であり、出力するCSVの形式に合わせて調整しますが、今回の方法ではデフォルトの設定のままで問題ありません。
この状態で保存を行えば変換後のCSVをRedshiftに取り込むための構成要素が作成されます。
JMCのプログラム実行用のPythonスクリプトを作成する
次にJMCのプログラム実行で使うPythonスクリプトを作成します。スクリプトではS3上のファイルをダウンロードしてからExcelをCSVに変換し再度S3へアップロードする処理を記述します。
convert_ex2csv.py
import pandas as pd
import boto3
from openpyxl import load_workbook
import tempfile
# csa用のモジュールを読み込み
import csa_env
# JMCで扱うS3バケットを指定
S3_BUCKET = '{データ連携用バケット}'
s3 = boto3.client('s3')
def main():
try:
# JMC実行時の環境編集を取得する
env = csa_env.get()
schedule_unit = env['schedule_unit'] # スケジュールユニットを取得
today_slash = env['today_slash'] # 実行日時をスラッシュ区切りで取得
# 実行時の環境変数を使ってソースパスを指定
key = '{}/cm_kobayashi_test/raw_major_results_2020/{}/'.format(schedule_unit, today_slash)
# start converting
response = s3.list_objects(Bucket=S3_BUCKET, Prefix=key)
if not response.get('Contents'):
raise Exception("{}/{}ファイルが存在しません。".format(S3_BUCKET, key))
# xlsxファイルを全て処理
for file_path in [
v['Key'] for v in response.get('Contents') if v['Key'].endswith(".xlsx")
]:
file_name = file_path.split("/")[-1]
# 一時ディレクトリを作成
with tempfile.TemporaryDirectory(dir="/tmp/", prefix="csajmc-") as tmpdir:
print("TempDirName:{}".format(tmpdir))
local_file_path = tmpdir + "/{}".format(file_name)
# S3からファイルをダウンロード
s3.download_file(S3_BUCKET, file_path, local_file_path)
# Excelファイルを読み込み
wb = load_workbook(local_file_path, data_only=True)
df_excel = pd.DataFrame(wb["第1面事項_2020年"].values)
# Excelからデータを抽出
# ソースとなるエクセルファイルの先頭列から11列目までを取得する
ls_out = []
# 9行目まではヘッダーなので10行目から1行ずつ取得する
for i in range(9, len(df_excel), 1):
ls_out.append(
[
df_excel.at[i, 0],
df_excel.at[i, 1],
df_excel.at[i, 2],
df_excel.at[i, 3],
df_excel.at[i, 4] if df_excel.at[i, 4] != "-" else None, # Excelではデータがない場合 - が入るのでNoneに置換する
df_excel.at[i, 5] if df_excel.at[i, 5] != "-" else None,
df_excel.at[i, 6] if df_excel.at[i, 6] != "-" else None,
df_excel.at[i, 7] if df_excel.at[i, 7] != "-" else None,
df_excel.at[i, 8] if df_excel.at[i, 8] != "-" else None,
df_excel.at[i, 9] if df_excel.at[i, 9] != "-" else None,
df_excel.at[i, 10] if df_excel.at[i, 10] != "-" else None,
]
)
# リストをデータフレームに変換してからcsvとして保存する
pd.DataFrame(ls_out).to_csv(local_file_path + '.csv', header=True, index=False)
# S3にファイルをアップロード
s3.upload_file(
local_file_path + '.csv',
S3_BUCKET,
'{}/cm_kobayashi_test/major_results_2020/{}/{}'.format(schedule_unit, today_slash, file_name) + '.csv',
)
except Exception as e:
raise Exception(e)
if __name__ == '__main__':
main()
細かくはスクリプト中のコメントを読んでいただければ良いのですが、大まかには、
- JMCの実行時の環境変数からS3の取得元パスを作り
- S3から一時ディレクトリにExcelファイルをダウンロードする
- Excelファイルをopenpyxlで読み込んで対象部分のみを抽出する(必要があれば値を変換する)
- ローカルでCSVに保存して
- S3のJMCが読み込むパスにアップロードする
といった流れです。
このスクリプトをS3のJMCでプログラム実行するためのパスs3://{バケット名}/program/
にアップロードします。
JMCジョブを作成する
下準備は整ったのであとはJMCのジョブを作成します。JMCの管理画面のジョブ > ジョブ一覧
でジョブの追加を選びます。
ジョブの設定画面が開くので実行予定時間を日次
にして時間を適当に設定します。
次に編集
を押下して、先に作成したPythonスクリプトとデータ連携の構成要素の順に設定します。
以上でジョブの作成は完了です。
あとはジョブを実行すればExcelのデータをDWHに取り込めますが、ExcelをCSVに変換するためにはopenpyxlライブラリを使っていますので、JMCのプログラム実行でopenpyxlライブラリを使う設定が必要です。
JMCの管理画面よりサイト管理 > プログラム実行設定
にてPythonライブラリでopenpyxl
を入力し保存を押します。しばらくするとJMCのプログラム実行でopenpyxl
が使えるようになります。
これですべての準備は完了です。あとは実行日時を2023/05/01
で実行すればRedshiftへデータが取り込まれます。
Redshiftの対象のテーブルを見てみるとExcelのデータが取り込まれていることがわかります。
> SELECT t.* FROM cm_kobayashi_test.major_results_2020 t;
pref_name city_name city_name_eng city_code population population_male population_female population_2015 population_variation population_var_ratio area
00_全国 00000_全国 Japan a 126146099.0 61349581.0 64796518.0 127094745.0 -948646.0 -0.74641 377976.41
01_北海道 01000_北海道 Hokkaido a 5224614.0 2465088.0 2759526.0 5381733.0 -157119.0 -2.91949 83424.44
01_北海道 01100_札幌市 Sapporo-shi 1 1973395.0 918682.0 1054713.0 1952356.0 21039.0 1.07762 1121.26
01_北海道 01101_札幌市中央区 Sapporo-shi Chuo-ku 0 248680.0 112853.0 135827.0 237627.0 11053.0 4.65141 46.42
まとめ
CSA JMCをつかってExcel形式のデータをDWHに取り込んでみました。Pythonのスクリプトを少し調整する必要がありますが、JMCでも問題なくExcelファイルをDWHに取り込むことができます。データ分析基盤をお探しの場合はぜひ一度CSA JMCをご検討ください。
最後まで読んで頂いてありがとうございました。