この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。
大阪オフィスの玉井です。
以前、Materializeというツールを紹介しました(私が書いた記事の中では比較的多く読まれているようです)。
今回、そのMaterializeをクラウド越し利用できるMaterialize Cloudがオープンβ版として登場しました。無料でトライアル出来るとのことで、早速使ってみました。
やってみた
下記のチュートリアルを参考にします。
検証環境
- macOS Big Sur11.5.2
- psql 13.4
Materialize Cloudの準備
上記ウェブサイトからRegisterします。無料です。
アカウントを設定して、メールで送られてきたURL先でログインすると、あっという間にMaterialize Cloudに入ることができます。
ちなみに本当は、最初に、Materialize社の方が、オンボーディングのためのMTGをセッティングしてくれるのですが(というか、そのMTGを経ないと利用できない)、「英語が話せないから、先にセルフで環境触らせてくれへん?」って英語でサポートにメールしたら、快諾してくれました。ありがとうございますMaterializeのみなさん。
Materialize Cloudは、Materializeのインスタンスを起動→そのインスタンスに接続して使用、という流れで使います。ということで、まずはインスタンス作成画面に移動します。
無料のオープンβ版なので、インスタンスサイズはXSのみ、インスタンスが稼働するパブリッククラウドはAWSのみとなっております。ちなみに、インスタンスの名前は自動で付与してくれます(もちろん好きな名前にすることができますが、面倒だったので、このままでいきます)。
インスタンス作成ボタンをポチッと押すと、あっという間にインスタンスが立ち上がります。インスタンスの詳細画面を確認すると、インスタンスに対する接続情報を確認することができます。
Materialize Cloudに接続する
今回はpsqlで接続します(ネイティブで対応しています)。Materialize Cloudの画面に、コマンドまでご丁寧に用意してくれているので、それをそのまま使用して、Materialize Cloudインスタンスに接続します。ちなみに、SSL接続になるので、先に証明書ファイルをダウンロードしておく必要があります。忘れないようにしましょう。
> psql "postgresql://materialize@xxxxxxxx.materialize.cloud:6875/materialize?sslmode=require&sslcert=materialize.crt&sslkey=materialize.key&sslrootcert=ca.crt"
psql (13.4, server 9.5.0)
SSL connection (protocol: TLSv1.3, cipher: TLS_AES_256_GCM_SHA384, bits: 256, compression: off)
Type "help" for help.
materialize=>
ストリーミングデータに対してSQLを実行してみる
では、ここからは、Materializeの基本にして真骨頂である、ストリーミングデータをDBみたいにSQLで問い合わせる、という部分を、実際に試してみたいと思います。
データソースの用意
ストリーミングデータって実際に用意するのがクソ面倒難しいのですが、公式ドキュメントでは、PubNubというサービスのMarket Orders Data Streamを利用する形になっていました。アカウントを作る必要もなく、ウェブサイトにある接続情報を利用するだけで、一瞬でストリーミングデータを利用できるので非常に便利です。
PubNubのマーケットストリーミングデータの情報を確認したら、それをもとにCREATE SOURCE
文を実行します。このSOURCEという概念が、Materializeにおけるデータソースになりますので、Materializeは基本的にはこのクエリから始まると思います。
CREATE SOURCE market_orders_raw
FROM
PUBNUB SUBSCRIBE KEY 'sub-c-5e072e06-1a7c-11ec-9ca7-5693d1c31269'
CHANNEL 'pubnub-market-orders'
SHOWクエリでSOURCEの情報を確認することができます。
materialize=> SHOW COLUMNS FROM market_orders_raw;
name | nullable | type
------+----------+------
text | f | text
(1 row)
当然ながら、Apache KafkaやAmazon Kinesisといった、主要なストリーミングデータ基盤にも対応しています(CREATE SOURCEできる)。詳しくはドキュメントをどうぞ。
ストリーミングデータを対象にしたマテビューを作ってみる
SOURCEを作り、PubNubから流れ続けるデータをMaterializeで補足する準備が整いました。これを基にして、マテビューを作っていきます。
まずは、普通のVIEWを作ります。これはまだマテビューではないので、実態はただのクエリです(ストリーミングデータのjsonb型を行列形式に変換しているだけ)。
CREATE VIEW market_orders AS
SELECT
((text::jsonb)->>'bid_price')::float AS bid_price,
(text::jsonb)->>'order_quantity' AS order_quantity,
(text::jsonb)->>'symbol' AS symbol,
(text::jsonb)->>'trade_type' AS trade_type,
to_timestamp(((text::jsonb)->'timestamp')::bigint) AS ts
FROM
market_orders_raw
上記のVIEWを基に、マテビューを作成します。
CREATE MATERIALIZED VIEW avg_bid AS
SELECT
symbol,
AVG(bid_price) AS AVG
FROM
market_orders
GROUP BY
symbol
マテビューを参照してみます。
materialize=> SELECT * FROM avg_bid;
symbol | avg
-------------+--------------------
Apple | 191.27232718467712
Google | 308.4405372142792
Elerium | 178.60857986211778
Bespin Gas | 172.4710697134336
Linen Cloth | 225.99615881840387
(5 rows)
みなさん御存知の通り、マテビューというのは、クエリだけでなく、実際の結果も保存しておく部分が、普通のVIEWとの違いです。ただ、このMaterializeのマテビューのすごいところは、流れ続けるストリーミングデータを常に追い続けるようになってるところです。結果を常に更新し続けているマテビュー、というイメージです。
ですので、下記のように実行するたびにマテビューの結果も変わります。常に最新の結果が取得できます。まさにリアルタイム。
ちなみに、TAIL
という関数を使用すると、そのマテビューの(ストリーミングによる)変更状態を監視することができます。
COPY(TAIL avg_bid) TO stdout
結合を試してみる
Materializeは一般的なSQLが使えますが、もちろんJOINも使用できます。
まずは結合を試すためのテーブルを作ります。見て分かる通り、これはただの固定値が入ってるだけです。
CREATE TABLE symbols(
symbol text,
ticker text
)
INSERT INTO symbols
SELECT
*
FROM
(VALUES('Apple', 'AAPL'),('Google', 'GOOG'),('Elerium', 'ELER'),('Bespin Gas', 'BGAS'),('Linen Cloth', 'LCLO'))
そして、マテビューを作ります。ここで先程作ったテーブルを、symbol
というキーで結合しています。
CREATE MATERIALIZED VIEW cnt_ticker AS
SELECT s.ticker AS ticker,
COUNT(*) AS cnt
FROM market_orders m
JOIN symbols s ON m.symbol = s.symbol
GROUP BY s.ticker;
結合していても、Materializeのマテビューは、常に最新の結果を返してくれます。
マテビューにフィルタリングを仕込む
一般的なSQLが使用できると言っている以上、WHERE句も、もちろん使用できます。
下記のマテビューでは、データ全体の件数ではなく、(現在から数えて)過去1分間の件数を出すようにしています。mz_logical_timestamp()
というのは、Materializeでいう現在時間を出す関数です(now()
と同じ)。
CREATE MATERIALIZED VIEW cnt_sliding AS
SELECT
symbol,
COUNT(*) AS cnt
FROM
market_orders m
WHERE EXTRACT(EPOCH FROM (ts + INTERVAL '1 minute'))::bigint * 1000 > mz_logical_timestamp()
GROUP BY
symbol
もうGIF動画はありませんが、こちらも以前と同様、実行するたび、最新の過去1分間の結果を返してくれます。
materialize=> SELECT * FROM cnt_sliding;
symbol | cnt
-------------+-----
Apple | 46
Google | 46
Elerium | 40
Bespin Gas | 55
Linen Cloth | 47
(5 rows)
おわりに
意味不明なスピード(褒め言葉)でストリーミングデータの最新結果を取得できるMaterializeですが、クラウド上で簡単に準備することができるようになったことで、より気軽に利用できるようになりました。インフラの運用も考えなくていいので、リアルタイムなデータ分析を求めている方は、まずはテスト的に使用してみてはどうでしょうか。