はじめに
こんにちは。データアナリティクス事業本部ビッグデータチームのyosh-kです。
今回は、ExcelのTable定義書からRedshiftのDDLを作成するPython実装を行ってみたいと思います。
前提条件
- 以下リンクのExcelファイルを対象とします。
- Redshiftの対応データ型は以下AWS公式記載の19個を対象とします。
- spectrumやviewのddlには対応していないので、別途回収していただく必要があります。
No.1-19では、それぞれのデータ型のNot NULL制約、デフォルト値の確認用のテストデータ。No.20-38では、PRIMARY KEY制約の確認用のテストデータ。No.39-57では、上記制約を除いた確認用のテストデータとなります。
実装
実装したソースコードはGithubに格納してあります。
フォルダ構成は以下となります。
.
├── README.md
├── input
│ └── 正常系table定義.xlsx
├── lib
│ ├── environments.py
│ ├── excel_processing.py
│ ├── sql_processing.py
│ └── table_info.py
├── main.py
├── output
│ └── dwh.test.sql
└── requirements.txt
3 directories, 9 files
まず、必要なライブラリをrequrements.txtからinstallします。
pip install -r requirements.txt
main.py
main.pyでは、他のライブラリを呼び出すmain処理を実装しています。 処理の流れとしては、大きく以下になります。
- 環境変数読み込み
- Excelファイル読み込み
- フォルダ配下のファイル数分繰り返し、pandasを用いてデータを取得。
- table情報からDDLを整形
- sqlファイルとして出力
import glob
import warnings
import traceback
from lib.table_info import TableInfo
from lib.environments import get_env
from lib.excel_processing import extract_schema_table_name_from_excel, extract_table_data_from_excel
from lib.sql_processing import make_column_definition
# Excelファイルにデータ検証(Data Validation)の拡張が含まれているけど、
# その拡張は openpyxl ライブラリでサポートされていないという意味です。
# そのため、読み込むExcelファイルがこの拡張を必要としない場合には問題ありません。
warnings.filterwarnings("ignore", message="Data Validation extension is not supported and will be removed")
def write_to_file(s_name: str, t_name: str, ddl: str) -> None:
""" "SQL文をファイルに書き込み"""
with open(f"./output/{s_name}.{t_name}.sql", "w", encoding="utf-8") as file:
file.write(ddl)
def main():
try:
env = get_env()
files = [file for file in glob.glob("./input/*.xlsx") if "~$" not in file]
if not files:
raise ValueError("file did not exist.")
print("files:", files)
for file in files:
schema_name, table_name = extract_schema_table_name_from_excel(file, env)
extract_table_data = extract_table_data_from_excel(file, env)
column_lengths = extract_table_data[env["COLUMN_NAME"]].apply(lambda x: len(str(x)))
max_column_length = max(column_lengths)
ddl = f"CREATE TABLE IF NOT EXISTS {schema_name}.{table_name}(\n"
table_info = TableInfo(ddl, max_column_length)
for _, row in extract_table_data.iterrows():
table_info.column_name = row[env["COLUMN_NAME"]]
table_info.data_type = row[env["DATA_TYPE"]]
table_info.digits = row[env["DIGITS"]]
table_info.decimal_part = row[env["DECIMAL_PART"]]
table_info.primary_key = row[env["PRIMARY_KEY"]]
table_info.is_not_null = row[env["NOT_NULL"]]
table_info.default_value = row[env["DEFAULT_VALUE"]]
# 全て含む場合
if {
table_info.column_name,
table_info.decimal_part,
table_info.data_type,
table_info.digits,
table_info.primary_key,
table_info.default_value,
} == {"-"}:
continue
elif table_info.data_type in env["REDSHIFT_DATA_TYPES"].split(","):
if table_info.primary_key != "-":
table_info.primary_key_list = table_info.column_name
make_column_definition(table_info)
else:
print("error record:", vars(table_info))
raise ValueError("There is no data type. Tracking your self or ask the administorator")
if table_info.primary_key_list != []:
table_info.ddl += " PRIMARY KEY (" + ", ".join(table_info.primary_key_list) + ")\n"
table_info.ddl = table_info.ddl.rstrip(",\n") + "\n)\nDISTSTYLE AUTO\nSORTKEY AUTO;"
write_to_file(schema_name, table_name, table_info.ddl)
print("successfull")
except Exception:
print("error file:", file)
traceback.print_exc()
if __name__ == "__main__":
main()
要点だけ確認していきます。 openpyxl ライブラリは、Excelファイルの読み込み時に、「Data Validation extension is not supported and will be removed」という警告を出します。 これは、Excelファイルにデータ検証(Data Validation)の拡張が含まれていますが、その拡張はopenpyxl ライブラリでサポートされていないという意味になります。読み込むExcelファイルがこの拡張を必要としない場合には問題ないため、警告を無視するように実装しています。
warnings.filterwarnings("ignore", message="Data Validation extension is not supported and will be removed")
globを用いて、input配下の拡張子xlsx
のファイルを抽出します。抽出時に~$
というExcelファイルをOpen時に作成されるtmpファイルは除くようにしています。
files = [file for file in glob.glob("./input/*.xlsx") if "~$" not in file]
PandasのDataFrameの特定の列(env["COLUMN_NAME"]
という環境変数で指定されている列)を対象に、その各行の要素(値)の文字列長を計算し、その結果を新たなcolumn_lengthsとして返しています。
具体的には、apply
関数を使って lambda x: len(str(x)) という無名関数(lambda関数)を各行に適用しています。このlambda関数はその引数 x を文字列に変換 str(x) し、その長さ len() を返すものです。
column_lengths = table[env["COLUMN_NAME"]].apply(lambda x: len(str(x)))
全てのtable_infoが-
の場合は、空レコードなのでcontinueし、環境変数で定義した、Redshift データ型にMatchする場合にddlを作成します。Matchしない行がある場合は、例外処理をします。
# 全て含む場合
if {
table_info.column_name,
table_info.decimal_part,
table_info.data_type,
table_info.digits,
table_info.primary_key,
table_info.default_value,
} == {"-"}:
continue
elif table_info.data_type in env["REDSHIFT_DATA_TYPES"].split(","):
if table_info.primary_key != "-":
table_info.primary_key_list = table_info.column_name
make_column_definition(table_info)
else:
print("error record:", vars(table_info))
raise ValueError("There is no data type. Tracking your self or ask the administorator")
主キーが存在する場合は、主キーの記載を行います。DDLの最後にDISTSTYLE AUTO
とSORTKEY AUTO
を付与していますが、必要に応じて適切な設定を選択することが重要となります。データ分散スタイル(DISTSTYLE)はテーブルのデータがRedshiftクラスタ内のノードにどのように分散されるかを決定します。AUTOを指定すると、Redshiftはテーブルのサイズ(行数やデータ量)を見て、ALL、KEY、EVENの最適なものに自動的に切り替えます。ソートキー(SORTKEY)AUTOを指定すると、Redshiftは実行されるクエリのパターンを見て、一連のソートキーを自動的に調整してテーブルをソートします。そのため、データが増加してもクエリパフォーマンスを保てます。
if table_info.primary_key_list != []:
table_info.ddl += " PRIMARY KEY (" + ", ".join(table_info.primary_key_list) + ")\n"
table_info.ddl = table_info.ddl.rstrip(",\n") + "\n)\nDISTSTYLE AUTO\nSORTKEY AUTO;"
environments.py
下記ファイルでは、table定義書の環境変数をそれぞれ記載していきます。また、REDSHIFT_DATA_TYPES
にて対応のRedshiftデータ型を定義しています。
import os
def get_env():
# Excelシート名
os.environ["SHEET_NAME"] = os.environ.get("SHEET_NAME", "table定義")
# カラム名の列名
os.environ["COLUMN_NAME"] = os.environ.get("COLUMN_NAME", "カラム名(物理)")
# データ型の列名
os.environ["DATA_TYPE"] = os.environ.get("DATA_TYPE", "データ型")
# 桁数の列名
os.environ["DIGITS"] = os.environ.get("DIGITS", "桁数")
# 小数点以下桁数の列名
os.environ["DECIMAL_PART"] = os.environ.get("DECIMAL_PART", "小数点以下桁数")
# 主キーの列名
os.environ["PRIMARY_KEY"] = os.environ.get("PRIMARY_KEY", "PRIMARY KEY")
# Not NULL制約の列名
os.environ["NOT_NULL"] = os.environ.get("NOT_NULL", "NOT NULL")
# デフォルト値の列名
os.environ["DEFAULT_VALUE"] = os.environ.get("DEFAULT_VALUE", "デフォルト値")
# table nameが格納されている列番号
os.environ["TABLE_NAME_LOCATION"] = os.environ.get("TABLE_NAME_LOCATION", "1")
# schema nameが格納されている列番号
os.environ["SCHEMA_NAME_LOCATION"] = os.environ.get("SCHEMA_NAME_LOCATION", "5")
os.environ["REDSHIFT_DATA_TYPES"] = ",".join(
[
"SMALLINT",
"INTEGER",
"BIGINT",
"DECIMAL",
"REAL",
"DOUBLE PRECISION",
"BOOLEAN",
"CHAR",
"VARCHAR",
"DATE",
"TIMESTAMP",
"TIMESTAMPTZ",
"GEOMETRY",
"GEOGRAPHY",
"HLLSKETCH",
"SUPER",
"TIME",
"TIMETZ",
"VARBYTE",
]
)
return os.environ
excel_processing.py
下記ファイルでは、Excelからデータを抽出、加工する処理を実装しています。
import pandas as pd
def extract_schema_table_name_from_excel(file: str, env):
"""Excelファイルからtable_name,schema_nameを抽出"""
header_df = pd.read_excel(file, sheet_name=env["SHEET_NAME"], nrows=0)
schema_name = header_df.columns[int(env["SCHEMA_NAME_LOCATION"])]
table_name = header_df.columns[int(env["TABLE_NAME_LOCATION"])]
return schema_name, table_name
def extract_table_data_from_excel(file: str, env) -> pd.DataFrame:
"""Excelファイルからデータフレームを抽出"""
extract_df = pd.read_excel(file, sheet_name=env["SHEET_NAME"], header=2, dtype=str)
col_map = {
"column_name": env["COLUMN_NAME"],
"data_type": env["DATA_TYPE"],
"digits": env["DIGITS"],
"decimal_part": env["DECIMAL_PART"],
"primary_key": env["PRIMARY_KEY"],
"not_null": env["NOT_NULL"],
"default_value": env["DEFAULT_VALUE"],
}
extracted_data = extract_df[[col_map[key] for key in col_map if key in col_map]]
return clean_table(extracted_data)
def clean_table(
df: pd.DataFrame,
) -> pd.DataFrame:
"""データフレームのクリーニング"""
# 文字列型のデータの欠損値を"-"に変換します
return df.fillna("-")
nrows=0
と指定することで先頭行のみを抽出しています。
header_df = pd.read_excel(file, sheet_name=env["SHEET_NAME"], nrows=0)
header=2
と指定し、2行目をheaderとし、dtype=str
とすることで全てstringで抽出するようにしています。
extract_df = pd.read_excel(file, sheet_name=env["SHEET_NAME"], header=2, dtype=str)
mapに含まれているカラムのみをdfで使用するようにします。
col_map = {
"column_name": env["COLUMN_NAME"],
"data_type": env["DATA_TYPE"],
"digits": env["DIGITS"],
"decimal_part": env["DECIMAL_PART"],
"primary_key": env["PRIMARY_KEY"],
"not_null": env["NOT_NULL"],
"default_value": env["DEFAULT_VALUE"],
}
extracted_data = extract_df[[col_map[key] for key in col_map if key in col_map]]
sql_processing.py
下記ファイルで、ddl定義を行っています。{file_info.column_name:<{file_info.max_column_length}}
と最初に定義することで、それぞれのカラムごとにフォーマットのばらつきが出ないように調整しています。file_info.digits(文字桁数)
、file_info.decimal_part(小数点以下桁数)
、file_info.is_not_null(Not Null)
, file_info.default_value(デフォルト値)
にそれぞれ値がある場合は、追記するような条件分岐となっています。
def make_column_definition(file_info):
sql_line = f" {file_info.column_name:<{file_info.max_column_length}} {file_info.data_type}"
# Add digits and decimal part if needed
if file_info.digits != "-":
if file_info.decimal_part != "-":
sql_line += f"({int(file_info.digits)},{file_info.decimal_part})"
else:
sql_line += f"({int(file_info.digits)})"
# Add NOT NULL if needed
if file_info.is_not_null != "-":
sql_line += " NOT NULL"
# Add default value if needed
if file_info.default_value != "-":
sql_line += f" DEFAULT '{file_info.default_value}'"
file_info.ddl += sql_line + ",\n"
table_info.py
下記ファイルでは、データのset,get用のTableInfo
Classを定義しています。
class TableInfo:
def __init__(self, ddl, max_column_length):
self.__ddl = ddl
self.__data_type = ""
self.__digits = ""
self.__is_not_null = ""
self.__column_name = ""
self.__max_column_length = max_column_length
self.__decimal_part = 0
self.__primary_key = ""
self.__default_value = ""
self.__primary_key_list = []
# gettter method
@property
def ddl(self):
return self.__ddl
@property
def data_type(self):
return self.__data_type
@property
def digits(self):
return self.__digits
@property
def is_not_null(self):
return self.__is_not_null
@property
def column_name(self):
return self.__column_name
@property
def max_column_length(self):
return self.__max_column_length
@property
def decimal_part(self):
return self.__decimal_part
@property
def primary_key(self):
return self.__primary_key
@property
def primary_key_list(self):
return self.__primary_key_list
@property
def default_value(self):
return self.__default_value
# setter method
@ddl.setter
def ddl(self, ddl):
self.__ddl = ddl
@data_type.setter
def data_type(self, data_type):
self.__data_type = data_type
@digits.setter
def digits(self, digits):
self.__digits = digits
@is_not_null.setter
def is_not_null(self, is_not_null):
self.__is_not_null = is_not_null
@column_name.setter
def column_name(self, column_name):
self.__column_name = column_name
@max_column_length.setter
def max_column_length(self, max_column_length):
self.__max_column_length = max_column_length
@decimal_part.setter
def decimal_part(self, decimal_part):
self.__decimal_part = decimal_part
@primary_key.setter
def primary_key(self, primary_key):
self.__primary_key = primary_key
@primary_key_list.setter
def primary_key_list(self, primary_key_list):
self.__primary_key_list.append(primary_key_list)
@default_value.setter
def default_value(self, default_value):
self.__default_value = default_value
実行結果
それではmain.pyを実行してみます。
(ddl_env) kasama.yoshiki@ 30_create_redshift_ddls % python main.py
files: ['./input/正常系table定義.xlsx']
successfull
(ddl_env) kasama.yoshiki@30_create_redshift_ddls %
outputフォルダに対象のddlが作成されました。以下の観点でも問題ないことを確認できました。
col_column_name_1
:Not NULL
,デフォルト値
、桁数``小数点以下桁数
col_column_name_2
:primary key
col_column_name_3
: 制約なし
CREATE TABLE IF NOT EXISTS dwh.test(
col_smallint_1 SMALLINT NOT NULL DEFAULT '0',
col_integer_1 INTEGER NOT NULL DEFAULT '0',
col_bigint_1 BIGINT NOT NULL DEFAULT '0',
col_decimal_1 DECIMAL(5,2) NOT NULL DEFAULT '0.0',
col_real_1 REAL NOT NULL DEFAULT '0.0',
col_double_1 DOUBLE PRECISION NOT NULL DEFAULT '0.0',
col_boolean_1 BOOLEAN NOT NULL DEFAULT 'True',
col_char_1 CHAR(10) NOT NULL,
col_varchar_1 VARCHAR(255) NOT NULL,
col_date_1 DATE NOT NULL DEFAULT '2020-01-01',
col_timestamp_1 TIMESTAMP NOT NULL DEFAULT '2000-01-01 00:00:00',
col_timestamptz_1 TIMESTAMPTZ NOT NULL DEFAULT '2000-01-01 00:00:00+09',
col_geometry_1 GEOMETRY NOT NULL,
col_geography_1 GEOGRAPHY NOT NULL,
col_hllsketch_1 HLLSKETCH NOT NULL,
col_super_1 SUPER NOT NULL DEFAULT '{"key": "value"}',
col_time_1 TIME NOT NULL DEFAULT '00:00:00',
col_timetz_1 TIMETZ NOT NULL DEFAULT '00:00:00+09',
col_varbyte_1 VARBYTE NOT NULL,
col_smallint_2 SMALLINT,
col_integer_2 INTEGER,
col_bigint_2 BIGINT,
col_decimal_2 DECIMAL(5,2),
col_real_2 REAL,
col_double_2 DOUBLE PRECISION,
col_boolean_2 BOOLEAN,
col_char_2 CHAR(10),
col_varchar_2 VARCHAR(255),
col_date_2 DATE,
col_timestamp_2 TIMESTAMP,
col_timestamptz_2 TIMESTAMPTZ,
col_geometry_2 GEOMETRY,
col_geography_2 GEOGRAPHY,
col_hllsketch_2 HLLSKETCH,
col_super_2 SUPER,
col_time_2 TIME,
col_timetz_2 TIMETZ,
col_varbyte_2 VARBYTE,
col_smallint_3 SMALLINT,
col_integer_3 INTEGER,
col_bigint_3 BIGINT,
col_decimal_3 DECIMAL(5,2),
col_real_3 REAL,
col_double_3 DOUBLE PRECISION,
col_boolean_3 BOOLEAN,
col_char_3 CHAR(10),
col_varchar_3 VARCHAR(255),
col_date_3 DATE,
col_timestamp_3 TIMESTAMP,
col_timestamptz_3 TIMESTAMPTZ,
col_geometry_3 GEOMETRY,
col_geography_3 GEOGRAPHY,
col_hllsketch_3 HLLSKETCH,
col_super_3 SUPER,
col_time_3 TIME,
col_timetz_3 TIMETZ,
col_varbyte_3 VARBYTE,
PRIMARY KEY (col_smallint_2, col_integer_2, col_bigint_2, col_decimal_2, col_real_2, col_double_2, col_boolean_2, col_char_2, col_varchar_2, col_date_2, col_timestamp_2, col_timestamptz_2, col_time_2, col_timetz_2, col_varbyte_2)
)
DISTSTYLE AUTO
SORTKEY AUTO;
このddlをRedshift Serverlessで実行してみたいと思います。
table定義書で指定したdwh
スキーマは今回は作成していないため、schema指定を外して実行しましたが、問題なく成功しました。
table definitionからも正常に作成されていることを確認できました。
最後に
table定義書のExcelファイルはあくまでサンプルですので、それぞれのプロジェクトに合わせた内容にpandas実装、environments.py
などを修正し、役立てていただければと思います。