JEPXが公開する電力市場価格のCSVをLambdaで整形してみた

JEPXが公開している電力売買市場情報のCSV、AWSサービスでの利用が容易なJSON形式への変換をLambdaで試みました。
2021.06.14

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

AWSチームのすずきです。

日本卸電力取引所(JEPX) がCSV形式で公開している、電力売買市場のスポット取引価格情報。 その確認を容易にするために、AWS Lambda を利用してJSON形式への変換を行う機会がありましたので、紹介させていただきます。

  • スポット市場取引結果(CSV内容表示)

課題

  • CSVの1行目にデコード不能なデータが存在する。Glue、S3 Select等のツールではヘッダ情報を利用できない。

  • 日時情報が日本時間の年月日と、1日を48分割した時刻コードで構成されている。既存の日付型データとの結合処理には加工が必須。

  • 毎年4/1開始、年度末には1年分のレコードが蓄積(約3MB)する仕様。直近データのみ必要とする場合に非効率。

Lambda

以下の処理を行うLambda関数を用意しました。

  • CSVファイルのダウンロード
  • 時刻情報の加工
  • 改行区切りのJSONL形式への整形
  • データのGZIP圧縮
  • S3書き込み時のACL、メタ情報設定
  • 小容量版データの出力
import datetime
import json
import os
import urllib.request
import boto3
import re
import gzip

def handler(event, context):
  # CSVのURLは環境変数で受取(http://www.jepx.org/market/excel/spot_2021.csv)
  a = urllib.request.urlopen(os.environ['JpexSpotCsvUrl'])

  #改行区切でロード
  b = a.read().splitlines()

  #ヘッダ行を撤去、直近日付レコードを処理対象とするため逆順に
  b.pop(0)
  b.reverse()

  y = []

  for c in b:

    # 「,」区切りのCSVを配列に展開
    d = c.decode().split(',')
    z = {}

    # 1カラム目「YYYY/MM/DD」の日付データのみを処理。
    # データ中のエスケープ対応は不要な前提
    if len(d) > 0:
      if re.compile("\d.../\d./\d.").search(d[0]):

        #CSVの日付をUnix時刻(UTC)に変換
        csv_date_unixtime = int(datetime.datetime.strptime(d[0], '%Y/%m/%d').timestamp()) - 32400

        #CSVの時刻コード(1日を48分割、1単位30分)をUnix時刻に変換した値に合算、ISO8601形式に変換
        z['time'] = datetime.datetime.fromtimestamp(1800 * int(d[1]) + csv_date_unixtime).strftime('%Y-%m-%dT%H:%M:%SZ')

        #ISO8601形式のJST時刻情報を用意
        z['time_jst'] = datetime.datetime.fromtimestamp(1800 * int(d[1]) + csv_date_unixtime + 32400).strftime('%Y-%m-%dT%H:%M:%S+0900')

        #地域別エリアプライスを抽出
        z['hepco'] = float(d[6])
        z['tohoku'] = float(d[7])
        z['tepco'] = float(d[8])
        z['chuden'] = float(d[9])
        z['rikuden'] = float(d[10])
        z['kepco'] = float(d[11])
        z['energia'] = float(d[12])
        z['yonden'] = float(d[13])
        z['kyuden'] = float(d[14])

        #システムプライス、入札量情報
        z['system'] = float(d[5])
        z['sell_volume'] = int(d[2])
        z['buy_volume'] = int(d[3])
        z['total_amount'] = int(d[4])

        #1レコードのJSON生成し配列に追加。(S3書き込み時、改行区切のJSONLとして利用)
        y.append(json.dumps(z))
    
  #S3出力
  # 出力先のS3バケットは環境変数を利用
  s3_bucket = boto3.resource('s3').Bucket(os.environ['S3BucketOutput'])

  # 全レコード出力
  # 改行区切りのJSONL、GZIP圧縮して出力
  # ContentType、ContentEncoding を明示
  s3_object = s3_bucket.put_object(
    Key = 'jpex-spot-price-2021.gz',
    Body = gzip.compress('\n'.join(y).encode()),
    ContentType = "application/x-gzip",
    ContentEncoding = "gzip"
  )

  # 直近(3日分)、144レコードのみ処理
  # ACLで「public-read」を付与、認証認可無しの参照を許可
  s3_object = s3_bucket.put_object(
    Key = 'jpex-spot-price-3days.gz',
    Body = gzip.compress('\n'.join(y[:144]).encode()),
    ContentType = "application/x-gzip",
    ContentEncoding = "gzip",
    ACL= 'public-read'
  )

結果

S3 Select

S3 Select、JSONの項目名を利用したクエリ処理が可能になりました。

SELECT s."time_jst", s."tepco" FROM s3object s where s."time_jst" like '2021-06-12%' LIMIT 5

curl

curlコマンドで「--compressed」オプションを利用する事で、gzip圧縮されたデータの自動展開が可能になりました。

$ curl --silent --compressed https://test-###.s3.ap-northeast-1.amazonaws.com/jpex-spot-price-3days.gz \
| head -n1
{"time": "2021-06-14T15:00:00Z", "time_jst": "2021-06-15T00:00:00+0900", "hepco": 6.73, "tohoku": 6.73, "tepco": 6.73, "chuden": 7.27, "rikuden": 7.27, "kepco": 7.27, "energia": 7.27, "yonden": 7.27, "kyuden": 7.27, "system": 7.22, "sell_volume": 18515450, "buy_volume": 20034350, "total_amount": 15979150}

curl + jq

「jq」を利用したクエリ処理も可能になりました。

$ curl --silent --compressed https://test-####.s3.ap-northeast-1.amazonaws.com/jpex-spot-price-3days.gz \
| jq -r '. | select(.time_jst| startswith("2021-06-12"))| [.time,.tepco]|  @csv' |  head -n 6
"2021-06-12T14:30:00Z",6.26
"2021-06-12T14:00:00Z",6.5
"2021-06-12T13:30:00Z",6.5
"2021-06-12T13:00:00Z",8.32
"2021-06-12T12:30:00Z",8.32
"2021-06-12T12:00:00Z",8.08

まとめ

簡単なLambda関数ですが、システムで取り扱いやすいデータを用意する事が出来ました。

参照元のCSVファイルの変更追従、EventBridgeによる定期実行の仕組みなどを追加した上で、 BIサービスの「Amazon QuickSight」、時系列データベースサービス「Amazon Timestream」、時系列予測サービスの「Amazon Forecast」などとの連携も試してみたいと思います。