JEPXが公開する電力市場価格のCSVをLambdaで整形してみた
JEPXが公開している電力売買市場情報のCSV、AWSサービスでの利用が容易なJSON形式への変換をLambdaで試みました。
AWSチームのすずきです。
日本卸電力取引所(JEPX) がCSV形式で公開している、電力売買市場のスポット取引価格情報。 その確認を容易にするために、AWS Lambda を利用してJSON形式への変換を行う機会がありましたので、紹介させていただきます。
- スポット市場取引結果(CSV内容表示)
課題
- CSVの1行目にデコード不能なデータが存在する。Glue、S3 Select等のツールではヘッダ情報を利用できない。
- 日時情報が日本時間の年月日と、1日を48分割した時刻コードで構成されている。既存の日付型データとの結合処理には加工が必須。
- 毎年4/1開始、年度末には1年分のレコードが蓄積(約3MB)する仕様。直近データのみ必要とする場合に非効率。
Lambda
以下の処理を行うLambda関数を用意しました。
- CSVファイルのダウンロード
- 時刻情報の加工
- 改行区切りのJSONL形式への整形
- データのGZIP圧縮
- S3書き込み時のACL、メタ情報設定
- 小容量版データの出力
import datetime import json import os import urllib.request import boto3 import re import gzip def handler(event, context): # CSVのURLは環境変数で受取(http://www.jepx.org/market/excel/spot_2021.csv) a = urllib.request.urlopen(os.environ['JpexSpotCsvUrl']) #改行区切でロード b = a.read().splitlines() #ヘッダ行を撤去、直近日付レコードを処理対象とするため逆順に b.pop(0) b.reverse() y = [] for c in b: # 「,」区切りのCSVを配列に展開 d = c.decode().split(',') z = {} # 1カラム目「YYYY/MM/DD」の日付データのみを処理。 # データ中のエスケープ対応は不要な前提 if len(d) > 0: if re.compile("\d.../\d./\d.").search(d[0]): #CSVの日付をUnix時刻(UTC)に変換 csv_date_unixtime = int(datetime.datetime.strptime(d[0], '%Y/%m/%d').timestamp()) - 32400 #CSVの時刻コード(1日を48分割、1単位30分)をUnix時刻に変換した値に合算、ISO8601形式に変換 z['time'] = datetime.datetime.fromtimestamp(1800 * int(d[1]) + csv_date_unixtime).strftime('%Y-%m-%dT%H:%M:%SZ') #ISO8601形式のJST時刻情報を用意 z['time_jst'] = datetime.datetime.fromtimestamp(1800 * int(d[1]) + csv_date_unixtime + 32400).strftime('%Y-%m-%dT%H:%M:%S+0900') #地域別エリアプライスを抽出 z['hepco'] = float(d[6]) z['tohoku'] = float(d[7]) z['tepco'] = float(d[8]) z['chuden'] = float(d[9]) z['rikuden'] = float(d[10]) z['kepco'] = float(d[11]) z['energia'] = float(d[12]) z['yonden'] = float(d[13]) z['kyuden'] = float(d[14]) #システムプライス、入札量情報 z['system'] = float(d[5]) z['sell_volume'] = int(d[2]) z['buy_volume'] = int(d[3]) z['total_amount'] = int(d[4]) #1レコードのJSON生成し配列に追加。(S3書き込み時、改行区切のJSONLとして利用) y.append(json.dumps(z)) #S3出力 # 出力先のS3バケットは環境変数を利用 s3_bucket = boto3.resource('s3').Bucket(os.environ['S3BucketOutput']) # 全レコード出力 # 改行区切りのJSONL、GZIP圧縮して出力 # ContentType、ContentEncoding を明示 s3_object = s3_bucket.put_object( Key = 'jpex-spot-price-2021.gz', Body = gzip.compress('\n'.join(y).encode()), ContentType = "application/x-gzip", ContentEncoding = "gzip" ) # 直近(3日分)、144レコードのみ処理 # ACLで「public-read」を付与、認証認可無しの参照を許可 s3_object = s3_bucket.put_object( Key = 'jpex-spot-price-3days.gz', Body = gzip.compress('\n'.join(y[:144]).encode()), ContentType = "application/x-gzip", ContentEncoding = "gzip", ACL= 'public-read' )
結果
S3 Select
S3 Select、JSONの項目名を利用したクエリ処理が可能になりました。
SELECT s."time_jst", s."tepco" FROM s3object s where s."time_jst" like '2021-06-12%' LIMIT 5
curl
curlコマンドで「--compressed」オプションを利用する事で、gzip圧縮されたデータの自動展開が可能になりました。
$ curl --silent --compressed https://test-###.s3.ap-northeast-1.amazonaws.com/jpex-spot-price-3days.gz \ | head -n1 {"time": "2021-06-14T15:00:00Z", "time_jst": "2021-06-15T00:00:00+0900", "hepco": 6.73, "tohoku": 6.73, "tepco": 6.73, "chuden": 7.27, "rikuden": 7.27, "kepco": 7.27, "energia": 7.27, "yonden": 7.27, "kyuden": 7.27, "system": 7.22, "sell_volume": 18515450, "buy_volume": 20034350, "total_amount": 15979150}
curl + jq
「jq」を利用したクエリ処理も可能になりました。
$ curl --silent --compressed https://test-####.s3.ap-northeast-1.amazonaws.com/jpex-spot-price-3days.gz \ | jq -r '. | select(.time_jst| startswith("2021-06-12"))| [.time,.tepco]| @csv' | head -n 6 "2021-06-12T14:30:00Z",6.26 "2021-06-12T14:00:00Z",6.5 "2021-06-12T13:30:00Z",6.5 "2021-06-12T13:00:00Z",8.32 "2021-06-12T12:30:00Z",8.32 "2021-06-12T12:00:00Z",8.08
まとめ
簡単なLambda関数ですが、システムで取り扱いやすいデータを用意する事が出来ました。
参照元のCSVファイルの変更追従、EventBridgeによる定期実行の仕組みなどを追加した上で、 BIサービスの「Amazon QuickSight」、時系列データベースサービス「Amazon Timestream」、時系列予測サービスの「Amazon Forecast」などとの連携も試してみたいと思います。