Amazon Managed Service for Apache Flink Studioでユーザー定義関数(UDF)を作成してみた

Amazon Managed Service for Apache Flink Studioでユーザー定義関数(UDF)を作成してみた

Clock Icon2024.10.31

はじめに

コンサルティング部の神野です。

Amazon Managed Service for Apache Flink Studio(以下Flink)内でSQLを実行する際に頻繁に使用するロジックや、標準の関数では提供されていない処理を拡張したいなと思ったことはありませんか?
FlinkはJVM言語(JavaやScala)またはPythonでユーザー定義関数を作成できるので、今回はPythonで作成してみます。

ユーザー定義関数(UDF)について

ユーザー定義関数(以下UDF)は、Flinkの標準機能として提供されていない独自の処理や計算ロジックを作成できる機能です。例えば、特定の業務ロジックや複雑な計算を関数として定義し、SQLやPythonコード内で再利用可能な形で利用できます。

ただし、Flinkの標準機能として用意されている関数と比べると処理速度は劣る可能性があり、
使用する際はパフォーマンスとのトレードオフを意識する必要があります。

準備

前提

下記ブログの環境が作成されているものとします。Flink StudioやInputとOutputのStream、各種テーブルなどが作成されているものとします。

https://dev.classmethod.jp/articles/try-amazon-managed-service-for-apache-flink-studio/

ランタイムバージョン

  • ランタイム Apache Flink 1.15, Apache Zeppelin 0.10

実装

Notebook作成

まずはFlinkの画面を開き、新規Notebookを作成します。
CleanShot 2024-10-31 at 00.37.30@2x

convert_celsius_to_fahrenheitという名前で新しいNotebookを作成します。

CleanShot 2024-10-31 at 00.37.19@2x

テーブル作成

テーブルは前提で作成したものを使用するため作成不要です。

UDF実装

今回はわかりやすく摂氏から華氏に変換するUDFを作成します。

udfデコレーターを使用して、UDF用の関数を作成します。
デコレーターの引数result_typeには戻り値の型を指定します。
関数自体は普通のPythonで作成する関数と同様に引数と戻り値を定義すればOKです。

st_envはStreamTableEnvironment for blink plannerの変数でテーブル環境のリソースとなっています。
関数register_functionでUDFを登録することで、後のSectionでSQLからUDFの呼び出しを可能とします。

実行SQL
%flink.pyflink
from pyflink.table import DataTypes
from pyflink.table.udf import udf

# 摂氏から華氏に変換するUDFの定義
@udf(result_type=DataTypes.FLOAT())
def celsius_to_fahrenheit(celsius):
    # 変換式: °F = (°C × 9/5) + 32
    return (celsius * 9/5) + 32

# UDFをテーブル環境に登録
st_env.register_function("celsius_to_fahrenheit", celsius_to_fahrenheit)

コードが書けたので早速実行します!

CleanShot 2024-10-30 at 23.38.03@2x

上記のように特にエラーなど出ていなければ問題なく実行できています!

補足

st_env:StreamTableEnvironment for blink plannerの詳細については下記ドキュメントをご参照ください。

Output用のStreamへ出力処理の実装

Output用のStreamoutput_temperature_dataへ、作成したUDFcelsius_to_fahrenheitを使って摂氏から華氏に温度を変換してデータを出力します。

実行SQL
%flink.ssql
INSERT INTO output_temperature_data
SELECT 
    `time`,
    -- UDFを使って摂氏から華氏変換
    celsius_to_fahrenheit(`value`) as `value`
FROM temperature_data

こちらも実行してみます!

CleanShot 2024-10-30 at 23.41.22@2x
継続時間が表示されて正常に実行できていますね!

Input用のStreamへデータを送信

前回の記事でも使用した摂氏平均30度の温度を送信するスクリプトをCloudShellから実行します。
スクリプトの中身や実行の詳細については前回の記事をご参照ください。

スクリプト実行

実行コマンド
python3 send_temp.py	

CleanShot 2024-10-31 at 01.07.56@2x

これで一通り完了です!
実際にOutput用のStreamデータを確認してみます。

動作確認

出力先のOut-Flink-Studio-Streamを確認してみます。

CleanShot 2024-10-30 at 23.35.13@2x

問題なく摂氏から華氏へ変換されたデータが出力されていますね!!

おわりに

Pythonを使ってAmazon Managed Service for Apache Flink Studio上でユーザー定義関数を作成する方法はいかがだったでしょうか。
シンプルに実装可能で、SQLで複雑なロジックも使用できるため活用したいシーンでは使用していきたいと思いました。
ただ、パフォーマンスの影響もあるので処理スピードとのトレードオフを考慮する必要はありますが、使用する際は少しでも参考になりましたら幸いです。

最後までご覧いただきありがとうございました!

Share this article

facebook logohatena logotwitter logo

© Classmethod, Inc. All rights reserved.