Node.js で Parquet データを操作してみた(parquetjs)

2023.06.20

こんにちは、CX事業本部 Delivery部の若槻です。

今回は、npm で配布されている parquetjs を使用して、 Node.js で Parquet データを操作してみました。

parquetjs とは

parquetjs を使用すると、列指向フォーマットである Parquet データの書き込みや読み取りを行うことができます。

注意点として、README の冒頭にもある通り本パッケージはアクティブにメンテナンスが行われていないため、プロダクトコードでは利用せず、テスト用データの作成などの用途に留めることをおすすめします。

試してみた

インストール

npm install parquetjs

TypeScript 環境で利用する場合は、型定義 @types/parquetjs も合わせてインストールします。

npm install -D @types/parquetjs

モジュールのインポート

// CommonJS
var parquet = require('parquetjs');

// ES Module
import * as parquet  from 'parquetjs';

ファイル書き込み

スキーマ定義

Parquet ファイルを作成するためには ParquetSchema を使ってスキーマを宣言する必要があります。

var schema = new parquet.ParquetSchema({
  name: { type: 'UTF8' },
  quantity: { type: 'INT64' },
  price: { type: 'DOUBLE' },
  date: { type: 'TIMESTAMP_MILLIS' },
  in_stock: { type: 'BOOLEAN' }
});

利用可能なデータ型の一覧は次のテーブルから確認できます。

書き込み

ファイルへの書き込みは ParquetWriter を使って行います。

// ファイル 'fruits.parquet' に書き込みを行う ParquetWriter の作成
var writer = await parquet.ParquetWriter.openFile(schema, 'fruits.parquet');

// ファイルへの行の追加
await writer.appendRow({name: 'apples', quantity: 10, price: 2.5, date: new Date(), in_stock: true});
await writer.appendRow({name: 'oranges', quantity: 10, price: 2.5, date: new Date(), in_stock: true});

// ファイルのクローズ
await writer.close();

作成されたファイルを VS コードプラグイン parquet-viewer で確認すると、データが書き込まれていることが確認できます。

{"name":"apples","quantity":10,"price":2.5,"date":"2023-06-20T15:07:33.957Z","in_stock":true}
{"name":"oranges","quantity":10,"price":2.5,"date":"2023-06-20T15:07:33.969Z","in_stock":true}

ファイル読み取り

Parquet ファイルの読み取りは ParquetReader を使って行います。

// 'fruits.parquet' の内容を読み取る ParquetReader の作成
let reader = await parquet.ParquetReader.openFile('fruits.parquet');

// カーソルの作成
let cursor = reader.getCursor();

// イテレーションによりレコードを取得
let record = null;
while (record = await cursor.next()) {
  console.log(record);
}

出力結果です。

output

{
  name: 'apples',
  quantity: 10,
  price: 2.5,
  date: 2023-06-20T15:07:33.957Z,
  in_stock: true
}
{
  name: 'oranges',
  quantity: 10,
  price: 2.5,
  date: 2023-06-20T15:07:33.969Z,
  in_stock: true
}

カーソル作成時にオプションで読み取る列のサブセットを指定できます。列指向なので効率的なクエリが期待できます。

let cursor = reader.getCursor(['name', 'price']);

output

{ name: 'apples', price: 2.5 }
{ name: 'oranges', price: 2.5 }

使用後はリーダーをクローズします。

await reader.close();

エンコーディング

Parquet フォーマットは各フィールドの値を連続した配列として保存し、さまざまなスキームを使用して圧縮/エンコードできます。

PLAIN エンコーディング

PLAIN では値が圧縮されず、そのままの形式で保存されます。デフォルト(BOOLEAN 以外)で使用されるエンコーディングです。

var schema = new parquet.ParquetSchema({
  name: { type: 'UTF8', encoding: 'PLAIN' },
});

RLE エンコーディング(エラーにより使用できなかった)

RLE (ランレングス)エンコーディングにより、連続する数値を非常に効率的に圧縮できます。使用できる型は BOOLEANINT32 および INT64 のみです。また使用時には bitWidth でフィールドの最大ビット数を指定する必要があります。

var schema = new parquet.ParquetSchema({
  age: { type: 'UINT_32', encoding: 'RLE', bitWidth: 7 },
});

しかし実際に RLE エンコーディングを試してみたところ、エラーとなりました。

script.ts

import { ParquetSchema, ParquetWriter } from 'parquetjs';

const schema = new ParquetSchema({
  age: { type: 'UINT_32', encoding: 'RLE', bitWidth: 7 },
});

const run = async () => {
  try {
    const writer = await ParquetWriter.openFile(schema, 'fruits2.parquet');

    // Loop 100 times
    for (let i = 0; i < 100; i++) {
      await writer.appendRow({
        age: 10,
      });
    }

    await writer.close();
  } catch (err) {
    console.error(err);
  }
};

run();

ERR_INVALID_ARG_VALUE という内部的なエラーが発生しているようです。

$ npx ts-node script.ts
RangeError [ERR_INVALID_ARG_VALUE]: The argument 'size' is invalid. Received NaN
    at Function.alloc (node:buffer:361:3)
    at encodeRunRepeated (/Users/wakatsuki.ryuta/projects/cm-rwakatsuki/cdk_sample_app/node_modules/parquetjs/lib/codec/rle.js:22:20)
    at Object.exports.encodeValues (/Users/wakatsuki.ryuta/projects/cm-rwakatsuki/cdk_sample_app/node_modules/parquetjs/lib/codec/rle.js:79:31)
    at encodeValues (/Users/wakatsuki.ryuta/projects/cm-rwakatsuki/cdk_sample_app/node_modules/parquetjs/lib/writer.js:287:34)
    at encodeDataPageV2 (/Users/wakatsuki.ryuta/projects/cm-rwakatsuki/cdk_sample_app/node_modules/parquetjs/lib/writer.js:346:19)
    at encodeColumnChunk (/Users/wakatsuki.ryuta/projects/cm-rwakatsuki/cdk_sample_app/node_modules/parquetjs/lib/writer.js:420:18)
    at encodeRowGroup (/Users/wakatsuki.ryuta/projects/cm-rwakatsuki/cdk_sample_app/node_modules/parquetjs/lib/writer.js:483:22)
    at ParquetEnvelopeWriter.writeRowGroup (/Users/wakatsuki.ryuta/projects/cm-rwakatsuki/cdk_sample_app/node_modules/parquetjs/lib/writer.js:201:18)
    at ParquetWriter.close (/Users/wakatsuki.ryuta/projects/cm-rwakatsuki/cdk_sample_app/node_modules/parquetjs/lib/writer.js:116:33)
    at run (/Users/wakatsuki.ryuta/projects/cm-rwakatsuki/cdk_sample_app/script.ts:18:18) {
  code: 'ERR_INVALID_ARG_VALUE'
}

Issue を探してみましたが該当のエラーは見つかりませんでした。アクティブにメンテナンスされていないパッケージなので、まあこういうこともあるでしょう。

フィールドのオプション指定

デフォルトでは、すべてのフィールドが各行で必須となりますが、オプションとすることもできます。

var schema = new parquet.ParquetSchema({
  name: { type: 'UTF8' },
  quantity: { type: 'INT64', optional: true },
});

var writer = await parquet.ParquetWriter.openFile(schema, 'fruits.parquet');
await writer.appendRow({name: 'apples', quantity: 10 });
await writer.appendRow({name: 'banana' }); // not in stock

fruits.parquet

{"name":"apples","quantity":10}
{"name":"banana"}

ネストされたデータ

次のようにネストされたデータを書き込むこともできます。

var schema = new parquet.ParquetSchema({
  name: { type: 'UTF8' },
  colours: { type: 'UTF8', repeated: true },
  stock: {
    repeated: true,
    fields: {
      price: { type: 'DOUBLE' },
      quantity: { type: 'INT64' },
    }
  }
});

var writer = await parquet.ParquetWriter.openFile(schema, 'fruits.parquet');

await writer.appendRow({
  name: 'banana',
  colours: ['yellow'],
  stock: [
    { price: 2.45, quantity: 16 },
    { price: 2.60, quantity: 420 }
  ]
});

await writer.appendRow({
  name: 'apple',
  colours: ['red', 'green'],
  stock: [
    { price: 1.20, quantity: 42 },
    { price: 1.30, quantity: 230 }
  ]
});

ネストされたデータのうち一部のフィールドのみを読み取ることができます。

let reader = await parquet.ParquetReader.openFile('fruits.parquet');

let cursor = reader.getCursor([['name'], ['stock', 'price']]);
let record = null;
while (record = await cursor.next()) {
  console.log(record);
}

await reader.close();

output

{ name: 'banana', stock: [ { price: 2.45 }, { price: 2.6 } ] }
{ name: 'apple', stock: [ { price: 1.2 }, { price: 1.3 } ] }

おわりに

parquetjs を使って Node.js で Parquet データを操作してみました。

E2Eテストなどで Parquet 形式のテストデータを作成したい場合には、Amazon Athena などで INSET INTO クエリを使って作成していましたが、parquetjs を使えばもっと簡単に作成することができそうですね。

以上