Amazon Managed Service for Apache Flink Studioでユーザー定義関数(UDF)を作成してみた
はじめに
コンサルティング部の神野です。
Amazon Managed Service for Apache Flink Studio(以下Flink)内でSQLを実行する際に頻繁に使用するロジックや、標準の関数では提供されていない処理を拡張したいなと思ったことはありませんか?
FlinkはJVM言語(JavaやScala)またはPythonでユーザー定義関数を作成できるので、今回はPythonで作成してみます。
ユーザー定義関数(UDF)について
ユーザー定義関数(以下UDF)は、Flinkの標準機能として提供されていない独自の処理や計算ロジックを作成できる機能です。例えば、特定の業務ロジックや複雑な計算を関数として定義し、SQLやPythonコード内で再利用可能な形で利用できます。
ただし、Flinkの標準機能として用意されている関数と比べると処理速度は劣る可能性があり、
使用する際はパフォーマンスとのトレードオフを意識する必要があります。
準備
前提
下記ブログの環境が作成されているものとします。Flink StudioやInputとOutputのStream、各種テーブルなどが作成されているものとします。
ランタイムバージョン
- ランタイム
Apache Flink 1.15, Apache Zeppelin 0.10
実装
Notebook作成
まずはFlinkの画面を開き、新規Notebookを作成します。
convert_celsius_to_fahrenheit
という名前で新しいNotebookを作成します。
テーブル作成
テーブルは前提で作成したものを使用するため作成不要です。
UDF実装
今回はわかりやすく摂氏から華氏に変換するUDFを作成します。
udf
デコレーターを使用して、UDF用の関数を作成します。
デコレーターの引数result_type
には戻り値の型を指定します。
関数自体は普通のPythonで作成する関数と同様に引数と戻り値を定義すればOKです。
st_env
はStreamTableEnvironment for blink plannerの変数でテーブル環境のリソースとなっています。
関数register_function
でUDFを登録することで、後のSectionでSQLからUDFの呼び出しを可能とします。
%flink.pyflink
from pyflink.table import DataTypes
from pyflink.table.udf import udf
# 摂氏から華氏に変換するUDFの定義
@udf(result_type=DataTypes.FLOAT())
def celsius_to_fahrenheit(celsius):
# 変換式: °F = (°C × 9/5) + 32
return (celsius * 9/5) + 32
# UDFをテーブル環境に登録
st_env.register_function("celsius_to_fahrenheit", celsius_to_fahrenheit)
コードが書けたので早速実行します!
上記のように特にエラーなど出ていなければ問題なく実行できています!
補足
st_env
:StreamTableEnvironment for blink plannerの詳細については下記ドキュメントをご参照ください。
Output用のStreamへ出力処理の実装
Output用のStreamoutput_temperature_data
へ、作成したUDFcelsius_to_fahrenheit
を使って摂氏から華氏に温度を変換してデータを出力します。
%flink.ssql
INSERT INTO output_temperature_data
SELECT
`time`,
-- UDFを使って摂氏から華氏変換
celsius_to_fahrenheit(`value`) as `value`
FROM temperature_data
こちらも実行してみます!
継続時間が表示されて正常に実行できていますね!
Input用のStreamへデータを送信
前回の記事でも使用した摂氏平均30度の温度を送信するスクリプトをCloudShellから実行します。
スクリプトの中身や実行の詳細については前回の記事をご参照ください。
スクリプト実行
python3 send_temp.py
これで一通り完了です!
実際にOutput用のStreamデータを確認してみます。
動作確認
出力先のOut-Flink-Studio-Stream
を確認してみます。
問題なく摂氏から華氏へ変換されたデータが出力されていますね!!
おわりに
Pythonを使ってAmazon Managed Service for Apache Flink Studio上でユーザー定義関数を作成する方法はいかがだったでしょうか。
シンプルに実装可能で、SQLで複雑なロジックも使用できるため活用したいシーンでは使用していきたいと思いました。
ただ、パフォーマンスの影響もあるので処理スピードとのトレードオフを考慮する必要はありますが、使用する際は少しでも参考になりましたら幸いです。
最後までご覧いただきありがとうございました!