Amazon AthenaのFederated Queryにデータソース側のクエリ構文が直接指定できるパススルー機能が追加されていました

2024.05.29

初めに

先月のアップデートとなりますがAthenaで直接利用できないデータソースに対するクエリをサポートするFederated Queryの強化アップデートがありました。

Athenaでは以前より直接サポートしてないデータソースに対する検索に対してLambda関数を中間のコネクタと利用してクエリを行うフェデレーティッドクエリ(Federated Query)が存在していました。

Federated Queryは多数のデータソースに対して統一された構文)で横串検索ができるのが非常的に強力な一方、あくまでAthenaの構文で記載したものを変換するという関係上データソースのネイティブの機能を全て使えるとは限りません。そのため場合によっては必要以上に複雑なクエリを書く必要やクエリ自体のコストが高くなってしまう可能性があります。

今回追加されたパススルークエリはAthenaのクエリ上ににデータソース側のサービスのクエリを直接を組み込みそれをそのままデータソース側で実行することができます。

これにより個別のサービスのクエリの仕様の把握が必要なもののより柔軟にクエリを構築し分析を行うことが可能となります。

サポートしているデータソースやサービスごとの構文等については以下ページおよびその配下のサービスごとのページをご参照ください。

今回やること

今回はCloudWatch Logsをデータソースとしてパスクエリを実行してみます。
(厳密にはCloudWatch Logs Insightによるクエリ)

データソースをPostgreSQLとするパススルークエリもサポートされているためいつもなら嬉々としてこちらを利用するところですがVPC上のプライベートなリソースに繋ぎ込もうとすると付随リソースの準備量が多くなってしまうため予感がしたため別の機会に試してみます。

データソースコネクタの作成

以下のブログ記事を参考に作成しました。以下はDynamoDBの記事ですがCloudWatch Logsもほぼ同様の手順です(パススルークエリ特有の設定もなし)。

CloudWatch LogsとDynamoDBで入力パラメータは一部異なりますが必須パラメーター自体は同様で作成されるスピルバケットの名称とデータカタログの名前を入力してデプロイします。

順序が前後しますがデータカタログはアプリケーション作成時のテンプレートに含まれますがバケットは存在せず自分で作る必要があります。
作成されたLambda関数に指定したS3に対するアクセス権限は割り当てられているので同一アカウントであれば特別バケット側で許可は不要でバケット名以外デフォルト設定のままでも問題ありません。

なおAWS公式から提供される利用可能なコネクタの一覧およびそのServerless Aplication Repositoryへのリンクは以下のページにまとまっているようですが最新のものが一部乗っていなさそうなのでAthena側のデータソースの作成のリンクから飛んだ方が良さそうです(廃止予定のAthenaJdbcConnectorが乗っているが移行先のものが乗っていない)。

マネジメントコンソールを開いてみると先ほどアプリケーションをデプロイする際に指定したデータカタログとその中にロググループ=データベース、ログストリーム=テーブルが作成されています。

クエリをかけてみる

今回クエリの対象とするのは私がデイリーで流しているPythonによって作成されたLambda関数のログとなります。

この関数の処理の一部としてAWSの最新の情報のRSSからのデータ取得を行っておりその際一部の情報をロガーを使わずprint()で簡易的に出力しておりこのうちlinkの値を取り出します。

なお本当に適当に出しているだけなので、所定のパラメータ以外にもNew Relicのシステムログ等様々なログが含まれています。

パススルーを利用しない場合

注) 自分がAthena詳しくないだけで実はもっといい構文があって知識不足によって不必要に複雑となってしまっている可能性があります。Athena詳しい方でもっとシンプルにできそうな案があればお待ちしております。

このログの中からlinkの中の値を取り出そうとするとAthenaだと少し構文が面倒です。

というもののJSON_EXTRACT()等を利用して値の抽出(目的のキーなくともnullで正常実行可能)は可能なのですが、そもそもJSONフォーマットでないと転けてしまう問題があります。

テーブルを作成する際にWITH SERDEPROPERTIES ('ignore.malformed.json' = 'true')でクエリの際に非JSONデータ弾いたりはできますが別にテーブルを作成する必要がありやや面倒です。
(私がAthenaのクエリに疎く綺麗にできる方法があるかもしれませんが...)

何かないかと思って探してましたが今回の場合は雑な正規表現で取り出せそうなのでこちらで絞り込みます。
※ あくまで{で始まって}で終わるデータのみなので簡易的なチェックのみとなる点には注意

WITH filtered_logs AS (
  SELECT message
  FROM "2024/05/29/[$LATEST]507e9080e09a4967a1b0ea0b328a4bf2"
  WHERE REGEXP_LIKE(message, '^\{.*\}$')
)

さて、これで中からlinkの値を取り出せ...ませんね。

WITH filtered_logs AS (
  SELECT message
  FROM "2024/05/29/[$LATEST]507e9080e09a4967a1b0ea0b328a4bf2"
  WHERE REGEXP_LIKE(message, '^\{.*\}$')
)
SELECT json_extract(message, '$.link') AS link
FROM filtered_logs;

先ほどのログをもう一度よくみてください。一見JSONのように思える文字列ですが、囲み文字がシングルクォートとなっております。

JSONは仕様として囲み文字がダブルクォート(")である必要があり、シングルクォート(')はNGです。ツールによって許容してくれる場合もありますがこの場合はNGのようです。

一例として今回の場合はシングルクォートをダブルクォートに無理やり置き換えてクエリを実行することで抽出します。

WITH filtered_logs AS (
  SELECT replace(message, '''', '"') AS message
  FROM "2024/05/29/[$LATEST]507e9080e09a4967a1b0ea0b328a4bf2"
  WHERE REGEXP_LIKE(message, '^\{.*\}$')
)
SELECT json_extract(message, '$.link') AS link
FROM filtered_logs;

パススルークエリを利用する場合

CloudWatch Logs Insightに関しては比較的構造の判定等が緩く先ほどのシングルクォートのJSON(pythonの辞書型の形式)であってもそのまま値を出すことができます。
またその関係かクエリ先のデータに構造化されていないデータ仮に含まれていてもパース失敗のようにならず良くも悪くも値なしとなります。

先ほど同様linkの値を取り出したい場合は以下のようにシンプルにキーの指定および存在確認のみで実現可能です。

fields link
| filter ispresent(link)

こちらをパスクエリを通じてAthena経由で実行してみます。

構文は以下のページのパススルークエリの項目をベースに作成します。

検索先の指定がロググループのため同等となるようにInsight側のクエリに追加に追加で絞り込みのフィルタを追加しておきます。

SELECT * FROM TABLE(
        system.query(
            STARTTIME => '1716876000000',
            ENDTIME => '1716962400000',
            QUERYSTRING => 'fields link | filter ispresent(link) | filter @logStream = "2024/05/29/[$LATEST]507e9080e09a4967a1b0ea0b328a4bf2"',
            LOGGROUPNAMES => '/aws/lambda/aws-update-release-checke-xxxxxxx',
            LIMIT => '100'
            )) ;

検索対象の開始時刻(STARTTIME)、終了時刻(ENDTIME)、取得上限(LIMIT)は指定を削除するとクエリがエラーとなるためこれらも全て必須パラメータとなりそうです。
特に時刻はunixtime(ミリ秒)なのでさっと雑に検索をかけたい時にも指定する必要があり若干不便そうではあります。

実行してみると無事にlinkの値を抽出していることが確認できました。

終わりに

今回新たにFederated Queryの拡張として追加されたパススルーによるクエリを試してみました。

データの特性とデータソース側の機能によっては個々のクエリをシンプルにできる余地は魅力的な一方、多数のデータソースを横串的に検索する場合Athena側のクエリ内に異なるサービスのクエリが混在する形となってしまうため逆に複雑となってしまう可能性があります。
(全体を見通そうとすると各サービスのネイティブなクエリを理解する必要があるためAthena単体のクエリ知識だけで対応ができない)

今回の例でもパススルークエリを利用することでクエリの実態としては特殊なデータの加工なくシンプルなものとできましたが、構文上シンプルなSQL構文ではなく引数的に渡す関係ことによる視認性の低下、AthenaのみではなくCloudWatch Logs Insightのクエリ構文も混ざるため複雑性に対するリターンが十分か?と言われるとむしろマイナスなきもします。

ネイティブ機能を利用して書き込まないと複雑になり過ぎてしまう、ヒント句を書き込んだクエリでないとパフォーマンスが...といった複雑性に対するリターンが十分あるか?といった面も踏まえて利用を検討してみましょう。 (ある種インラインアセンブラに通ずるような部分がある気もします)