ストリーミングデータをSQLでリアルタイムに分析できる「Materialize」を使ってみた

Rustで開発されているそうです
2021.08.18

大阪オフィスの玉井です。

データ分析にリアルタイムを求めている方は必見のツールを紹介します。

Materializeとは?

公式曰く「streaming SQL materialized view engine」です。

…わからん。

ざっくり説明

ストリーミングデータに対してリアルタイムにクエリできるDB(みたいなもの)です。

普通のDBやDWHの場合、テーブルなど(実体はストレージとかだと思いますが)にデータが格納されており、そこに対してクエリ(照会)します。Materializeも表面上はSQLなので、ビュー等の概念がありますが、実際のデータ自体は、静的なデータではなく、Kafka等のストリーミングデータ(常に増え続けているデータ)を対象にします。

つまり、増え続けるストリーミングデータに対して、あたかも普通のDBのようにクエリすることができます。しかも、結果は常にリアルタイムです。そして速い。

通常、ストリーミングデータを分析する場合、何らかのバッチ処理を動かして、DWH等に移している仕組みをとっているところが多いと思います。しかし、バッチ処理は、それを動かすインフラのコスト(運用面含む)があります。また、分析できるデータに、どうしてもタイムラグが発生してしまいます。

Materializeはリアルタイムデータをそのまま分析できるため、ストリーミングデータをバッチ処理でどこかに移し替えるという仕組み自体が不要になります。

もう少しだけ踏み込む

Materializeは、名前の通り、マテリアライズドビューがメインなのですが、他のDBにあるマテビューとは一味違います。

そもそもマテビューとは「ビューなんだけども、クエリの実行結果も持っとくわ」というものです。普通のビューであれば、照会するたびに、定義されたクエリが発行されますが、マテビューの場合は、既に結果も持っているため、照会したタイミングで結果を返すだけで済みます。ビューより効率がいいです。

当然、マテビューには課題もあります。それは「持っとく実行結果の更新(頻度)」です。実行結果を保持しておくのはいいのですが、ずーっと同じ結果を持ってる=新しいデータが反映されてない、ということになります。マテビューの元になっているデータが更新されれば、マテビュー自体の内容も新しいものになってないと困るので、どこかでマテビューが持っている結果を取り直す必要があります。その取り直し(リフレッシュ)の頻度とタイミングを検討する必要があります(例えば、10個くらいのテーブルを結合するようなごっついマテビューの場合、リフレッシュする際の負荷やコストがバカにできないため、いつどのくらいやるかは非常に大事です)。

しかし、Materializeでは上記の課題は存在しません。常にリアルタイムで結果を更新し続ける仕組みになっているため、もはやリフレッシュという概念がありません。クエリした時、常に最新のデータが取得できるようになっています。

「一体どんな仕組みやねん?」と思われた方は、下記にアーキテクチャの説明があるので、一読ください。

とりあえずやってみた

参考

下記を参考にした…つもりだったのですが、(実際に検証したときと)内容がガラッと変わっていました(なのでこのブログの内容は貴重かもしれない)。

環境

  • macOS Big Sur 11.5.1
  • Homebrew 3.2.6
  • psql 13.3
  • fish 3.3.1

Materializeのインストールと動作確認

インストール

インストールはめちゃくちゃ簡単です。homebrewでインストールするだけです(言い忘れてましたが、Materializeはオープンソースのツールなので、無料です)。

> brew install MaterializeInc/materialize/materialized

Materializeを起動する

簡単なコマンド一発で起動します。-w 1というのは、Materializeが使うワーカースレッドを1個に指定しています(指定しないと、マシンの物理コアの半分を持っていってしまう、みたいなことがドキュメントにあり)。

> materialized -w 1
=======================================================================
Thank you for trying Materialize!

We are interested in any and all feedback you have, which may be able
to improve both our software and your queries! Please reach out at:

    Web: https://materialize.com
    GitHub issues: https://github.com/MaterializeInc/materialize/issues
    Email: support@materialize.io
    Twitter: @MaterializeInc
=======================================================================

materialized v0.8.2 (c40f601d6) listening on 0.0.0.0:6875...

Materializeに接続する

起動したMaterializeは、普通のDBと同じ感覚で接続できます。下記のように、psqlを使って接続することができます(別途、新しいターミナルを開いて、そちらから実行しました)。

> psql -U materialize -h localhost -p 6875 materialize

Materialize上でのクエリをちょっと試す

Materializeと互換性のあるクライアント(psqlなど)で接続した後は、普通のSQLを実行することができます。

VIEWを作成します。

CREATE MATERIALIZED VIEW pseudo_source(
    key,
    value
) AS
    VALUES ('a', 1), ('a', 2), ('a', 3), ('a', 4),('b', 5), ('c', 6), ('c', 7);

VIEWをSELECT文で参照します。

materialize=> SELECT * FROM pseudo_source;
 key | value
-----+-------
 a   |     1
 a   |     2
 a   |     3
 a   |     4
 b   |     5
 c   |     6
 c   |     7
(7 rows)

集計もできます。

materialize=> SELECT key, sum(value) FROM pseudo_source GROUP BY key;
 key | sum
-----+-----
 a   |  10
 b   |   5
 c   |  13
(3 rows)

マテビューも作れます。

materialize=> CREATE MATERIALIZED VIEW key_sums AS
    SELECT key, sum(value) FROM pseudo_source GROUP BY key;

マテビューからさらに集計もできます。

materialize=> SELECT sum(sum) FROM key_sums;
 sum
-----
  28
(1 row)

ストリーミングデータに対してリアルタイムにクエリを実行する

ここからMaterializeの真骨頂です。毎秒増えていくデータに対して、SQLでリアルタイムの結果を取得します。

ストリーミングデータの準備

まず、Materialize側で、CREATE SOURCE文を実行し、ストリーミングデータ元を定義します。流れてくるデータの場所をMaterializeに教えてあげる感じですね。

CREATE SOURCE wikirecent
FROM FILE '/Users/tamai.rei/Downloads/wikirecent' WITH (tail = true)
FORMAT REGEX '^data: (?P<data>.*)';

上記のクエリを見ればわかると思いますが、今回のストリーミングデータが流れてくる場所は、自分のマシンのDownloadフォルダを指定しています。ですので、Downloadフォルダに移動して、下記を実行します(当方、シェルがFishなので、下記のような書き方になっています)。

while true;
  curl --max-time 9999999 -N https://stream.wikimedia.org/v2/stream/recentchange >> wikirecent;
end

上記のスクリプトは、ウィキメディアというウィキペディアの姉妹プロジェクトがあるのですが、そこが提供しているストリーミングデータを、(手動で止めるまで)永久に取得し続けるものです。これをリアルタイムデータとして、Materializeからクエリしていきます。

Materializeからリアルタイムデータをクエリする

先程CREATE SOURCEしたオブジェクトの内容を確認します。dataというカラムに、実データがリアルタイムで入り続けます。CREATE SOURCE文の最後でFORMAT REGEXという句がありますが、ここで実際に取り出すデータを正規表現で指定しています。今回はdata:で始まる行のデータを入れることになります。

materialize=> SHOW COLUMNS FROM wikirecent;
    name    | nullable |  type
------------+----------+--------
 data       | t        | text
 mz_line_no | f        | bigint
(2 rows)

では、このSOURCEからマテビューを作成します。SELECT句が少々特殊ですが、これは半構造化データを構造化に整形するための、Materialize用の書き方です。Snowflakeの半構造化データ用のクエリと近い感じがします。

CREATE MATERIALIZED VIEW recentchanges AS
SELECT
        val->>'$schema' AS r_schema,
        (val->'bot')::bool AS bot,
        val->>'comment' AS comment,
        (val->'id')::float::int AS id,
        (val->'length'->'new')::float::int AS length_new,
        (val->'length'->'old')::float::int AS length_old,
        val->'meta'->>'uri' AS meta_uri,
        val->'meta'->>'id' as meta_id,
        (val->'minor')::bool AS minor,
        (val->'namespace')::float AS namespace,
        val->>'parsedcomment' AS parsedcomment,
        (val->'revision'->'new')::float::int AS revision_new,
        (val->'revision'->'old')::float::int AS revision_old,
        val->>'server_name' AS server_name,
        (val->'server_script_path')::text AS server_script_path,
        val->>'server_url' AS server_url,
        (val->'timestamp')::float AS r_ts,
        val->>'title' AS title,
        val->>'type' AS type,
        val->>'user' AS user,
        val->>'wiki' AS wiki
FROM
    (
        SELECT
            data::jsonb AS val
        FROM
            wikirecent
    )

ここからさらに、もう一つマテビューを作成します。これはデータの総件数を集計するものです。今回は、このマテビューを使って、リアルタイム性を確認していきます。

CREATE MATERIALIZED VIEW counter AS
SELECT
    COUNT(*)
FROM
    recentchanges

クエリにかかった時間を表示するために、\timingを実行しておきます。これはMaterializeではなくpsqlのメタコマンドです。

materialize-> \timing
Timing is on.

マテビューをSELECTします。まず1回目。

materialize=> SELECT COUNT(*) FROM recentchanges;
 count
-------
   856
(1 row)

Time: 4.386 ms

数秒待って、2回目の実行です。体感ではほとんど1回目と同様、1瞬です。しかし、件数はちゃんと増えています。リアルタイムの結果が返ってきています。何もしていないのに、マテビューの結果が更新されているわけですね。

materialize=> SELECT COUNT(*) FROM recentchanges;
 count
-------
   967
(1 row)

Time: 5.466 ms

1分弱くらい待っての3回目。件数は増えているのに、クエリの速度がさっきより上がるという、いい意味で意味不明(?)な事象が発生しました。すごいぞMaterialize。

materialize=> SELECT * FROM counter;
 count
-------
  1345
(1 row)

Time: 0.811 ms

当然、他にも色々なクエリを、リアルタイムデータに対して、実行することができます。

CREATE MATERIALIZED VIEW useredits AS
SELECT
    user,
    count(*)
FROM
    recentchanges
GROUP BY
    user

ウィキメディアを編集した回数のユーザーランキングです。

materialize=> SELECT * FROM useredits ORDER BY count DESC;
                  user                  | count
----------------------------------------+-------
 NewsBots                               |   168
 Quesotiotyo                            |    20
 Umarbot                                |    13
 SuccuBot                               |    11
 Jeuwre                                 |    10
 Lsjbot                                 |     9
 Tibidibi                               |     8
 Maharajbhakt2014                       |     8
 KrBot                                  |     7
 WP 1.0 bot                             |     5
 NadandoBot                             |     4
 Twofivesixbot                          |     4
 Pbritti                                |     3
 NewNoYork                              |     3
 WingerBot                              |     3
 LogainmBot                             |     3
 SchlurcherBot                          |     3
 Kails                                  |     2
 리듬                                   |     2
 Ivanics                                |     2
 Zff19930930                            |     2
:

LIMIT句を使用してのTOP10ランキングを作ります。

CREATE MATERIALIZED VIEW top10 AS
SELECT
    *
FROM
    useredits
ORDER BY
    count DESC
LIMIT 10

(データがリアルタイムに増えているので、既に上記の結果と変動があります。2位の急浮上っぷりは何だ…)

materialize=> SELECT * FROM top10 ORDER BY count DESC;
     user     | count
--------------+-------
 NewsBots     |   739
 YoaR         |   172
 Quesotiotyo  |    82
 Umarbot      |    66
 SuccuBot     |    50
 Jeuwre       |    40
 MilHistBot   |    36
 KrBot        |    23
 Lsjbot       |    18
 Dreamer plox |    17
(10 rows)

Time: 2.384 ms

検討事項

Materializeのユースケース

実際のデータ分析の現場でどのように活用することができるでしょうか。ドキュメント等を読む限り、下記のケースがあるようです。

真のリアルタイムダッシュボード(BIツール)

バックエンドがMaterializeなので、更新するたびに最新の結果が表示されるダッシュボードが作れそうです。公式では、Metabaseを利用したデモがあります。

TableauやLookerに対する言及は見つかりませんでしたが、psqlで接続できるので、もしかしたらPostgreSQL扱いで接続できるかもしれません。

リアルタイムデータアプリケーション

データの可視化にとどまらず、その時点のデータ分析の結果によって、システムに何らかのアクションを起こさせる仕組み(アプリケーション)にも使えるかと思います。

IoTセンサーのデータなんかはリアルタイムで常に流れてくるでしょうし、そのセンサーのデータによって、色々なアクションを起こす、というユースケースは非常にありそうです。

Materializeじゃなくても、そういう仕組みを開発することは可能ですが、開発する際に一定のコストがかかりますし、運用し続ける面でも、インフラの管理や、ビジネスロジックが変わった際の改修、対象データが増えた時の改修など、結構色々なコストがかかってくると思います。Materializeだと、ビジネスロジックや対象データの増加は、SQLの中の話でしかないので、普通に開発するより時間とコストを節約できるのではないでしょうか。

DWHとの棲み分け

データ分析に非常に役立ちそうなMaterializeですが、かといって、DWHの代わりとして全データをMaterializeに移すというのは違うでしょう。

あくまでデータ分析基盤の中心はDWHですが、別途リアルタイム性が求められるデータに対してのみ、Materializeを使用するのがスタンダードになりそうです。その際、「Materializeで使用するデータは、別途DWHでも使用するのか」「だとしたらデータパイプラインはどうする?」みたいな検討が必要かもしれません。

おわりに

まさかのSaas版も準備されているとのことで、今後も注目していきたいプロダクトですね。