AWS LambdaとPyArrow3.0.0を使ってサクッとParquetファイルに変換する

2021.03.24

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

Apache Arrowは目覚ましい進化を遂げて、今年の1月にVersion3.0がリリースされ、今月から最新のpyarrow 3.0.0もpipコマンドで普通にインストールできるようになりました。以前ブログで取り上げた2年前のpyarrow 0.13.0は、decimal型やdate型のサポートされていませんでした。

本日は、2年前と同じテーマですが、最新のApache Arrow(PyArrow3.0.0)とLambda Functionを使って、S3にファイル出力したイベントで即座にParquetに変換してデータレイクへ配置する方法をご紹介します。

Apache Arrow(PyArrow)とは

Apache Arrowは、インメモリの列指向データフォーマットです。Hadoop/Spark(Hadoopのエコシステム)やPandas(PyDataのエコシステム)をはじめとするツール間でゼロコピーデータアクセスを目指し、Apache Arrowがその共通データフォーマットになるように設計されています。つまり、Apache Arrowに読み込むと他のツールとデータが共有できたり、様々なデータファイルフォーマットに出力できるようになります。

img

PyArrow3.0.0によるParquetファイルに変換

PyArrowから直接CSVファイルを読み込んでParquet出力する方法を用いて変換し、サポートしているデータ型をそれぞれ検証します。

サポートしているデータ型

Amazon AthenaとAmazon Redshift Spectrumの両方でサポートしているデータ型の主要なデータ型が、Apache Arrow(PyArrow3.0.0)でサポートされています。

※ Apache Arrowがサポートしている全てのデータ型は、Data Types and Schemasをご覧ください。

Python3 PyArrow Amazon Athena Amazon Redshift Spectrum 対応状況
int int16 smallint smallint
int int32 int integer
int int64 bigint bigint
float float32 float real
float float64 double double precision
decimal decimal decimal(x, y) decimal
bool bool_ boolean boolean
str string string varchar(65535)
str string char(n) char(n)
str string varchar(n) varchar(n)
date date date date
datetime timestamp timestamp timestamp

サンプルデータ

今回は、この組み合わせを試すために以下のカラムのデータ(sales.tsv)を用意しました。

No カラム名 PyArroのデータ型の定義 実際のデータ
1 id int16() 1
2 price int32() 20000
3 total int64() 300000000
4 price_profit float32() 4.56
5 total_profit float64() 67.89
6 discount decimal(10, 2) 789012.34
7 visible bool_() True
8 name string() QuietComfort 35
9 created date32() 2019-06-14
10 updated timestamp('s') 2019-06-14 23:59:59
id  price   total   price_profit    total_profit    discount    visible name    created updated
1   20000   300000000   4.56    67.89   789012.34   True    QuietComfort 35 2019-06-14  2019-06-14 23:59:59

PyArrow3.0.0用のLambda Layerを作成する

Lambda動作環境の選定

今回は、TSVファイルに軽量・高速に変換するためAWS Lambdaを用います。Lambdaは、パッケージフォーマットとして、従来どおりLambda関数を用いる方法に加えて、コンテナイメージがサポートされました。複数のLambdaアプリケーションや関数から再利用されることを考慮して、デプロイパッケージは、Layerを用います。

Lambdaの制約事項

デプロイパッケージを問わず、以下の制約がありますので、このリソース条件で変換するように実装します。

  • ローカルのストレージ(/tmp)は512MB
  • メモリは最大10GB(メモリ10GBの場合は、vCPU x 6)

現在のPyArrow3.0.0は、S3から直接読み込みはサポートされていませんので、boto3で/tmpにコピーしたファイルをarrow形式でメモリに読み込んだ後、Parquet形式で/tmpに書き出してS3のデータレイクストレージにboto3でコピーします。そのため、入出力ファイルは512MBが上限になります。この上限を超えたいのなら、EFSでストレージをマウントするなどご検討ください。

Lambda Layerを作成

作成するために最新のAmazon Linux2のEC2インスタンス上で作成します。今日現在のEC2インスタンスのPython3のデフォルトバージョンであるPython3.7(3.7.9)をインストールします。

  • Amazon Linux 2(t3.nano)
  • Python 3 (3.7.9)

コンパイラや開発用ライブラリを事前にインストールして準備します。

[ec2-user@ip-10-0-0-247 ~]$ sudo yum install gcc gcc-c++ kernel-devel python-devel libxslt-devel libffi-devel openssl-devel
読み込んだプラグイン:extras_suggestions, langpacks, priorities, update-motd
amzn2-core                                                                                                                           | 3.7 kB  00:00:00
パッケージ python-devel-2.7.18-1.amzn2.0.3.x86_64 はインストール済みか最新バージョンです
依存性の解決をしています
--> トランザクションの確認を実行しています。
---> パッケージ gcc.x86_64 0:7.3.1-12.amzn2 を インストール
:
:
完了しました!

本題のLambda Layerを作成します。Layerのランタイムのサイズは130MBです。(Layerのランタイムの展開後の最大サイズは150MB未満)

[ec2-user@ip-10-0-0-247 ~]$ mkdir python
[ec2-user@ip-10-0-0-247 ~]$ pip3 install -t ./python pyarrow
[ec2-user@ip-10-0-0-247 ~]$ ll python/
合計 8
drwxrwxr-x  2 ec2-user ec2-user   66  3月 23 07:41 bin
drwxrwxr-x 18 ec2-user ec2-user 4096  3月 23 07:41 numpy
drwxrwxr-x  2 ec2-user ec2-user  158  3月 23 07:41 numpy-1.20.1.dist-info
drwxrwxr-x  2 ec2-user ec2-user  154  3月 23 07:41 numpy.libs
drwxrwxr-x  7 ec2-user ec2-user 4096  3月 23 07:41 pyarrow
drwxrwxr-x  2 ec2-user ec2-user  165  3月 23 07:41 pyarrow-3.0.0.dist-info
[ec2-user@ip-10-0-0-247 ~]$ du -s python
130412	python

[ec2-user@ip-10-0-0-247 ~]$ zip -r pyarrow.zip python
  adding: python/ (stored 0%)
  adding: python/numpy.libs/ (stored 0%)
  adding: python/numpy.libs/libz-eb09ad1d.so.1.2.3 (deflated 50%)
:
:
  adding: python/bin/f2py3.7 (deflated 25%)
  adding: python/bin/plasma_store (deflated 31%)

[ec2-user@ip-10-0-0-247 ~]$ ll pyarrow.zip
-rw-rw-r-- 1 ec2-user ec2-user 39123022  3月 23 07:42 pyarrow.zip

このランタイム(pyarrow.zip)をpyarrowというレイヤ名で登録します。

Lambda関数の作成

データの型指定

今回用いるデータ型を先頭でインポートしています。column_typesにカラム名とデータ型をKEY/VALUEで指定します。上記の一覧表に記載したデータ型をすべて指定しています。

from pyarrow import int16, int32, int64, float64, float32, bool_, date32, date64, decimal128, timestamp, string, Table, parquet as pq
:
:
    column_types = {
        'id': int16(),
        'price': int32(),
        'total': int64(),
        'price_profit': decimal128(5,2),
        'total_profit': float32(),
        'discount': float64(),
        'visible': bool_(),
        'name': string(),
        'created': date32(),
        'updated': timestamp('s')
    }

入力データの読み込み

  • ReadOptions()
    • スレッド数、ブロックサイズ、メモリプール、などの入力ファイルの読み込みの挙動を指定します。
    • (?)このTSVファイルは、先頭はヘッダ行なので、skip_rows=1が必要だが指定しなくてもスキップされた。
  • ParseOptions()
    • 入力ファイルのフォーマットを指定します。
  • ConvertOptions()
    • 入力データのデータ型指定、スキーマ検証、NULLの取り扱いなどを指定します。
  • read_csv()
    • 入力ファイルパスや上記のオプションを指定して、arrow形式のメモリに読みます。
    • tsvの場合はread_csv()ですが、jsonの場合はread_json()を用います。
def get_pyarrow_table(input_files: str, column_types: dict) -> Table:
    readoptions = ReadOptions(
        use_threads=True,  # 複数の読み取りスレッドの利用
        block_size=1000  # 読み取りブロック数
    )
    parseoptions = ParseOptions(
        delimiter='\t',  # タブ区切り指定
        double_quote=False,  # ダブルクオートで括らない
        escape_char='\'',  # エスケープ文字の指定
    )
    convertoptions = ConvertOptions(
        check_utf8=True,  # 文字列カラムのUTF-8妥当性をチェック
        column_types=column_types,  # 列のデータ型を辞書型で渡す
        null_values=['']  # データ内のNULLを表す文字列
    )
    pyarrow_table = read_csv(
        input_file=input_files,
        read_options=readoptions,
        parse_options=parseoptions,
        convert_options=convertoptions
    )
    return pyarrow_table

出力データの書き込み

arrow形式のメモリを指定したファイル形式やオプションに従い、Parquetファイル出力します。

    pq.write_table(
        arrow_table,
        output_file,
        compression='snappy',  # snappyで圧縮
        flavor=['spark'],  # spark互換の設定
        version='1.0',
    )

デプロイした関数

実際にデプロイしたLambda関数(S3EventCSVtoParquet)は以下のとおりです。

import os
import urllib.parse
from pyarrow.csv import read_csv, ReadOptions, ParseOptions, ConvertOptions
from pyarrow import int16, int32, int64, float64, float32, bool_, date32, date64, decimal128, timestamp, string, Table, parquet as pq
from botocore.exceptions import ClientError
import boto3

#
# Functions
#
def get_pyarrow_table(input_files: str, column_types: dict) -> Table:
    readoptions = ReadOptions(
        use_threads=True,  # 複数の読み取りスレッドの利用
        block_size=1000  # 読み取りブロック数
    )
    parseoptions = ParseOptions(
        delimiter='\t',  # タブ区切り指定
        double_quote=False,  # ダブルクオートで括らない
        escape_char='\'',  # エスケープ文字の指定
    )
    convertoptions = ConvertOptions(
        check_utf8=True,  # 文字列カラムのUTF-8妥当性をチェック
        column_types=column_types,  # 列のデータ型を辞書型で渡す
        null_values=['']  # データ内のNULLを表す文字列
    )
    pyarrow_table = read_csv(
        input_file=input_files,
        read_options=readoptions,
        parse_options=parseoptions,
        convert_options=convertoptions
    )
    return pyarrow_table


def tsv2parquet(input_file: str, output_file: str, column_types: dict):
    arrow_table = get_pyarrow_table(
        input_file,
        column_types
    )
    if os.path.exists(input_file):
        os.remove(input_file)
    pq.write_table(
        arrow_table,
        output_file,
        compression='snappy',  # snappyで圧縮
        flavor=['spark'],  # spark互換の設定
        version='1.0',
    )

    
def get_target_key(source_key: str):
    source_tok = source_key.split('/')
    source_file = source_tok[-1]
    file_prefix = source_file.split('.')[0]
    target_key = source_tok[0] + '/target/' + file_prefix  + '.parquet'
    return target_key


def lambda_handler(event, context):
    source_bucket = event['Records'][0]['s3']['bucket']['name']
    source_key = urllib.parse.unquote_plus(event['Records'][0]['s3']['object']['key'], encoding='utf-8')
    s3resource = boto3.resource('s3')

    target_bucket = source_bucket
    target_key = get_target_key(source_key)

    source_files = '/tmp/' + source_key.split('/')[-1]
    target_file = '/tmp/' + target_key.split('/')[-1]

    try:
        s3resource.Bucket(source_bucket).download_file(Filename=source_files, Key=source_key)
        column_types = {
            'id': int16(),
            'price': int32(),
            'total': int64(),
            'price_profit': decimal128(5,2),
            'total_profit': float32(),
            'discount': float64(),
            'visible': bool_(),
            'name': string(),
            'created': date32(),
            'updated': timestamp('s')
        }
        tsv2parquet(source_files, target_file, column_types)
        s3resource.Bucket(target_bucket).upload_file(Filename=target_file, Key=target_key)
        if os.path.exists(target_file):
            os.remove(target_file)
    except ClientError as e:
        print(e)

S3イベントの設定

上記のLambda LayerとLambda関数と登録した後、入力データファイルのS3バケットとPrefix、Suffixを指定して、オブジェクト作成イベント(All object create events)で、このLambda関数を起動するイベントを登録します。あとは、入力ファイルをS3にコピーすると自動的にLambda関数が呼び出され、Parquetファイルが出力されます。

動作検証

変換結果の確認

Parquetにフォーマット変換されたsales.parquetをGlue Crawlerで自動認識させてクエリした結果は以下のとおり、正しく変換されました。

もちろん、AthenaのDDLからもその結果が確認できます。

CREATE EXTERNAL TABLE `target`(
  `id` smallint, 
  `price` int, 
  `total` bigint, 
  `price_profit` float, 
  `total_profit` double, 
  `discount` decimal(10,2), 
  `visible` boolean, 
  `name` string, 
  `created` date, 
  `updated` timestamp)
ROW FORMAT SERDE 
  'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' 
STORED AS INPUTFORMAT 
  'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' 
OUTPUTFORMAT 
  'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
LOCATION
  's3://cm-bucket/pyarrowutil/target/'
TBLPROPERTIES (
  'CrawlerSchemaDeserializerVersion'='1.0', 
  'CrawlerSchemaSerializerVersion'='1.0', 
  'UPDATED_BY_CRAWLER'='sales', 
  'averageRecordSize'='778', 
  'classification'='parquet', 
  'compressionType'='none', 
  'objectCount'='1', 
  'recordCount'='1', 
  'sizeKey'='3528', 
  'typeOfData'='file')

パフォーマンス検証

実用性を検証するため、上記のソースを書き換えて約100MBデータ(customer0002_part_00.gz)をParquetファイルに変換してみます。

メモリとタイムアウトの変更

データサイズが大きいので、メモリを128MBから5120MB、タイムアウトを3秒から3分に変更しました。(この値は、あくまでも私の動物の「感」で、何の根拠もありません。)

カラムの定義の変更

        column_types = {
            'c_custkey': int32(),
            'c_name': string(),
            'c_address': string(),
            'c_city': string(),
            'c_nation': string(),
            'c_region': string(),
            'c_phone': string(),
            'c_mktsegment': string(),
        }

入力フォーマットの定義

PSVファイルは、先頭からデータ行なので、skip_rows=0などつけなくても済むはずだが、先頭行はカラム名として設定されてしまう現象が発生。そのため、columnnames_optionsをReadOptionsにcolumnnames_options指定して、カラム名を設定しています。

また、今回はPSV(パイプ文字区切りファイル)のため、delimiter='|'と指定しています。

def get_pyarrow_table(input_files: str, column_types: dict) -> Table:
    columnnames_options = [
        'c_custkey',
        'c_name',
        'c_address',
        'c_city',
        'c_nation',
        'c_region',
        'c_phone',
        'c_mktsegment',
    ]
    readoptions = ReadOptions(
        use_threads=True,  # 複数の読み取りスレッドの利用
        block_size=1000,  # 読み取りブロック数
        column_names=columnnames_options, #column_names
    )
    parseoptions = ParseOptions(
        delimiter='|',  # パイプ文字区切り指定
        double_quote=False,  # ダブルクオートで括らない
        escape_char='\'',  # エスケープ文字の指定
    )
:
:  
    return pyarrow_table

変換結果(100MBの.gzファイルの変換)

ファイルサイズ

変換のみなら、この圧縮ファイル(customer0002_part_00.gz)100MBを展開すると286.5MBあり、変換後のParquetファイルは118.4MBでした。

処理時間
  • 30.01秒:入出力ファイルの変換のみ
  • 33.47秒:入出力ファイルの変換、S3のダウンロード・アップロード
AWS利用費の試算

5,120MB(0.0000000833USD/ms)なので、AWS料金を試算すると0.002788051(USD)です。Glueと比較すると圧倒的な速さと安さです。

0.002788051(USD) = 0.0000000833 ✕ 1000 ✕ 33.47

まとめ

2年前のpyarrow 0.13.0は、decimal型やdate型のサポートされていませんでしたが、pyarrow 3.0.0では、主要なデータ型をサポートし、AthenaやRedshift Spectrumが正常に認識できるParquetファイルが生成できることを確認できました。

これまではAWS GlueのETLジョブを用いてParquetを生成していましたが、単純なフォーマット変換だけであれば、AWS LambdaとPyArrowの組み合わせで代用できるようになり、データレイクなどのサーバレスアナリティクスが加速します。Glue(Spark)は、データを変換して、DataFrameをソートしたり、ParquetファイルをPartitioningやBucketingして出力も可能ですので、利用目的に応じて使い分けると良いでしょう。

データ分析界のスタープレーヤーが集まって生み出されたApache Arrowですが、まだまだ進化が終わる気配がありません。機能やファイルフォーマットサポートなど、今後も引き続きウオッチしたいと思います。

合わせて読みたい