この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。
AWSチームのすずきです。
日本卸電力取引所(JEPX) がCSV形式で公開している、電力売買市場のスポット取引価格情報。 その確認を容易にするために、AWS Lambda を利用してJSON形式への変換を行う機会がありましたので、紹介させていただきます。
- スポット市場取引結果(CSV内容表示)
課題
- CSVの1行目にデコード不能なデータが存在する。Glue、S3 Select等のツールではヘッダ情報を利用できない。
- 日時情報が日本時間の年月日と、1日を48分割した時刻コードで構成されている。既存の日付型データとの結合処理には加工が必須。
- 毎年4/1開始、年度末には1年分のレコードが蓄積(約3MB)する仕様。直近データのみ必要とする場合に非効率。
Lambda
以下の処理を行うLambda関数を用意しました。
- CSVファイルのダウンロード
- 時刻情報の加工
- 改行区切りのJSONL形式への整形
- データのGZIP圧縮
- S3書き込み時のACL、メタ情報設定
- 小容量版データの出力
import datetime
import json
import os
import urllib.request
import boto3
import re
import gzip
def handler(event, context):
# CSVのURLは環境変数で受取(http://www.jepx.org/market/excel/spot_2021.csv)
a = urllib.request.urlopen(os.environ['JpexSpotCsvUrl'])
#改行区切でロード
b = a.read().splitlines()
#ヘッダ行を撤去、直近日付レコードを処理対象とするため逆順に
b.pop(0)
b.reverse()
y = []
for c in b:
# 「,」区切りのCSVを配列に展開
d = c.decode().split(',')
z = {}
# 1カラム目「YYYY/MM/DD」の日付データのみを処理。
# データ中のエスケープ対応は不要な前提
if len(d) > 0:
if re.compile("\d.../\d./\d.").search(d[0]):
#CSVの日付をUnix時刻(UTC)に変換
csv_date_unixtime = int(datetime.datetime.strptime(d[0], '%Y/%m/%d').timestamp()) - 32400
#CSVの時刻コード(1日を48分割、1単位30分)をUnix時刻に変換した値に合算、ISO8601形式に変換
z['time'] = datetime.datetime.fromtimestamp(1800 * int(d[1]) + csv_date_unixtime).strftime('%Y-%m-%dT%H:%M:%SZ')
#ISO8601形式のJST時刻情報を用意
z['time_jst'] = datetime.datetime.fromtimestamp(1800 * int(d[1]) + csv_date_unixtime + 32400).strftime('%Y-%m-%dT%H:%M:%S+0900')
#地域別エリアプライスを抽出
z['hepco'] = float(d[6])
z['tohoku'] = float(d[7])
z['tepco'] = float(d[8])
z['chuden'] = float(d[9])
z['rikuden'] = float(d[10])
z['kepco'] = float(d[11])
z['energia'] = float(d[12])
z['yonden'] = float(d[13])
z['kyuden'] = float(d[14])
#システムプライス、入札量情報
z['system'] = float(d[5])
z['sell_volume'] = int(d[2])
z['buy_volume'] = int(d[3])
z['total_amount'] = int(d[4])
#1レコードのJSON生成し配列に追加。(S3書き込み時、改行区切のJSONLとして利用)
y.append(json.dumps(z))
#S3出力
# 出力先のS3バケットは環境変数を利用
s3_bucket = boto3.resource('s3').Bucket(os.environ['S3BucketOutput'])
# 全レコード出力
# 改行区切りのJSONL、GZIP圧縮して出力
# ContentType、ContentEncoding を明示
s3_object = s3_bucket.put_object(
Key = 'jpex-spot-price-2021.gz',
Body = gzip.compress('\n'.join(y).encode()),
ContentType = "application/x-gzip",
ContentEncoding = "gzip"
)
# 直近(3日分)、144レコードのみ処理
# ACLで「public-read」を付与、認証認可無しの参照を許可
s3_object = s3_bucket.put_object(
Key = 'jpex-spot-price-3days.gz',
Body = gzip.compress('\n'.join(y[:144]).encode()),
ContentType = "application/x-gzip",
ContentEncoding = "gzip",
ACL= 'public-read'
)
結果
S3 Select
S3 Select、JSONの項目名を利用したクエリ処理が可能になりました。
SELECT s."time_jst", s."tepco" FROM s3object s where s."time_jst" like '2021-06-12%' LIMIT 5
curl
curlコマンドで「--compressed」オプションを利用する事で、gzip圧縮されたデータの自動展開が可能になりました。
$ curl --silent --compressed https://test-###.s3.ap-northeast-1.amazonaws.com/jpex-spot-price-3days.gz \
| head -n1
{"time": "2021-06-14T15:00:00Z", "time_jst": "2021-06-15T00:00:00+0900", "hepco": 6.73, "tohoku": 6.73, "tepco": 6.73, "chuden": 7.27, "rikuden": 7.27, "kepco": 7.27, "energia": 7.27, "yonden": 7.27, "kyuden": 7.27, "system": 7.22, "sell_volume": 18515450, "buy_volume": 20034350, "total_amount": 15979150}
curl + jq
「jq」を利用したクエリ処理も可能になりました。
$ curl --silent --compressed https://test-####.s3.ap-northeast-1.amazonaws.com/jpex-spot-price-3days.gz \
| jq -r '. | select(.time_jst| startswith("2021-06-12"))| [.time,.tepco]| @csv' | head -n 6
"2021-06-12T14:30:00Z",6.26
"2021-06-12T14:00:00Z",6.5
"2021-06-12T13:30:00Z",6.5
"2021-06-12T13:00:00Z",8.32
"2021-06-12T12:30:00Z",8.32
"2021-06-12T12:00:00Z",8.08
まとめ
簡単なLambda関数ですが、システムで取り扱いやすいデータを用意する事が出来ました。
参照元のCSVファイルの変更追従、EventBridgeによる定期実行の仕組みなどを追加した上で、 BIサービスの「Amazon QuickSight」、時系列データベースサービス「Amazon Timestream」、時系列予測サービスの「Amazon Forecast」などとの連携も試してみたいと思います。