Amazon AthenaからRDS for PostgreSQLとAmazon S3に対する横串クエリ(Federated Query)を実行してみた

2024.06.10

初めに

先日Amazon Athenaで(パススルーを使っていますが)Federated Queryを実行してみました。

上記記事のタイミングでは周辺の準備が大変でCloudWatch Logsに対するクエリを行っていましたが、なんとこちら僕の大好きなPostgreSQLに対するクエリも対応しております。

Federated Queryはでデータソースが単一である必要はなく複数種別のデータソースに対するクエリも可能なため、例えばPostgreSQLのDBのデータ+何らかの事情でS3バケットに部分的に退避したデータに対するクエリといったことが可能です。

今回は以前できなかったPostgreSQLへのFederated Queryを試しつつ折角横串クエリの本丸である複数サービスへの同時クエリを試してみます。

環境準備

データコネクタはServerless Application Repositoryから作成します。細かなパラメータは異なりますが大枠の部分は以前と同じなので上記記載の私の記事もご参照ください。

なおPostgreSQLのデータコネクタはVPC Lambdaが前提となっておますが、Serverless Application Repositoryで作成されるリソースはLambda関数+そのロールのみとなるため、所属するサブネットやセキュリティグループ等のそのほか必要となるリソースは別途自前で作成する必要があります。

周辺リソースの作成

こんな感じになります。

サブネット等周辺環境については普段の検証用のものがあるのでそこに追加する形で以下のリソースを作成します。RDS Proxyも作ろうかと思いましたが簡単な検証でなくても動きはしそうなので省略してます。

  • RDS本体
  • セキュリティグループ(Lambda取り付け用)
  • S3バケット(スピルバケット)
  • シークレット(Secrets Manager)
    • データコネクタDB認証情報用
  • VPCエンドポイント(S3、Secrets Manager)
    • Lambda関数がNAT Gateway等でインターネットに接続可能であれば不要

これを生成するCloudFormationテンプレートは以下のとおりです。

AWSTemplateFormatVersion: 2010-09-09
Parameters:
  Env:
    Type: String
  Prefix:
    Type: String
  VpcId:
    Type: String
  SubnetGroupId:
    Type: String
  EndpointSubnetId:
    Type: List<String>
Resources:
  DBInstance: 
    Type: AWS::RDS::DBInstance
    Properties:
      Engine: postgres
      MasterUsername: postgres
      AllocatedStorage: 20
      StorageType: gp3
      DBInstanceIdentifier: !Sub ${Env}-${Prefix}-db
      MasterUserPassword: !Sub "{{resolve:secretsmanager:${RDSMasterUserCredential}:SecretString:password::}}"
      AvailabilityZone: !Sub ${AWS::Region}a
      DBInstanceClass: db.t3.small
      AutoMinorVersionUpgrade: False
      EnablePerformanceInsights: True
      CACertificateIdentifier: "rds-ca-rsa4096-g1"
      DBSubnetGroupName: !Ref SubnetGroupId
      VPCSecurityGroups: 
        - !Ref RdsSg
  RDSMasterUserCredential:
    Type: AWS::SecretsManager::Secret
    Properties:
      Name: !Sub ${Env}-${Prefix}-master-credential
      GenerateSecretString:
        SecretStringTemplate: '{"username": "postgres"}'
        GenerateStringKey: "password"
        PasswordLength: 16
        ExcludeCharacters: '"@/\'
  RdsSg:
    Type: AWS::EC2::SecurityGroup
    Properties:
      GroupName: data-connector-rds-sg
      GroupDescription: String
      SecurityGroupIngress: 
        - IpProtocol: tcp
          FromPort: 5432
          ToPort: 5432
          SourceSecurityGroupId: !Ref LambdaSg
      VpcId: !Ref VpcId
  LambdaSg:
    Type: AWS::EC2::SecurityGroup
    Properties:
      GroupName: data-connector-lambda-sg
      GroupDescription: String
      VpcId: !Ref VpcId
  SecretsManagerEndpoint:
    Type: AWS::EC2::VPCEndpoint
    Properties:
      ServiceName: 'com.amazonaws.ap-northeast-1.secretsmanager'
      SubnetIds: !Ref EndpointSubnetId
      VpcEndpointType: Interface
      VpcId: !Ref VpcId
      PrivateDnsEnabled: True
      SecurityGroupIds:
        - !Ref EndpointSg
  S3Endpoint:
    Type: AWS::EC2::VPCEndpoint
    Properties:
      ServiceName: 'com.amazonaws.ap-northeast-1.s3'
      SubnetIds: !Ref EndpointSubnetId
      VpcEndpointType: Interface
      VpcId: !Ref VpcId
      # PrivateDnsOnlyForInboundResolverEndpointがデフォルトTrueっぽい為Falseにしないとここけるが
      # CFn上指定がサポートされていない?(要確認)っぽいので別途手動で有効化する
      #PrivateDnsEnabled: True
      SecurityGroupIds:
        - !Ref EndpointSg
  EndpointSg:
    Type: AWS::EC2::SecurityGroup
    Properties:
      GroupName: data-connector-ep-sg
      GroupDescription: String
      SecurityGroupIngress: 
        - IpProtocol: tcp
          FromPort: 443
          ToPort: 443
          CidrIp: 0.0.0.0/0
      VpcId: !Ref VpcId

VPCエンドポイントはS3(スピルバケットへのアクセス用)、Secrets Manager(認証情報取得用)の2種が必要です。

データコネクタの作成

Serverless Application RepositoryのAthenaPostgreSQLConnectorを作成します。

作成時のパラメータは以下のとおりです(作成後一部直したので作成後のCloudFormationの入力画面)。

キー 入力値 備考
CompositeHandler PostGreSqlCompositeHandler
DefaultConnectionString postgres://jdbc:postgresql://dev-athena-poc-xxxxxx-1.rrds.amazonaws.com:5432/postgres?secret=${dev-athena-poc-master-credential}
DefaultScale 0 デフォルト値
DisableSpillEncryption false デフォルト値
LambdaFunctionName postgresq-data-connector
LambdaMemory 3008 デフォルト値
LambdaRoleARN (空) デフォルト値
LambdaTimeout 900 デフォルト値
PermissionsBoundaryARN (空) デフォルト値
SecretNamePrefix dev-athena-poc
SecurityGroupIds sg-08c4xxxxx
SpillBucket athena-postgresql-connector-spill-xxxxx
SpillPrefix athena-spill
Subnets subnet-080cxxxxx

接続文字列は以下のような形となります。
一般的なJDBC接続文字列ですが最近だとライブラリ等にラップされているせいで見ることも少ないので意外と忘れがちな部分ではあります。

postgres://jdbc:postgresql://{{RDSのエンドポイント}}:5432/{{接続先DB名}}?secret=${Secretsの名前}

認証情報はSecrets Manager側に以下の形式で保存されていれば利用する際に良い感じに展開してくれます。

{ "username": "{{接続ユーザ名}}", "password": "{{接続ユーザーパスワード}}"}

パラメータの情報に関する情報は作成上の説明文の記載に加え情報は以下にも記載がありますのでこちらをご参照ください。

ここまでで正常に設定できていればデータコネクタの部分で以下ように接続先のスキーマ情報が確認できます。

ここまでで各種設定ができていない場合は表示されないので各種設定を確認しましょう。

ただしS3へアクセスできない場合でもこの時点では問題なく表示されるため注意してください。

その場合は実際にAthenaでクエリをかける場合に失敗します。少し放置してからログを確認しないとS3のタイムアウトのエラーが出てこなくてめちゃくちゃハマりました。

テストデータ投入

RDSには以下のようにデータを投入します。

postgres=> CREATE TABLE history(id serial, text text, create_date timestamp DEFAULT now());
postgres=> INSERT INTO history(text) SELECT md5(generate_series::text) FROM generate_series(1,20);
INSERT 0 20
postgres=> DELETE FROM history WHERE id < 10;
DELETE 9
postgres=> SELECT * FROM history;
 id |               text               |        create_date
----+----------------------------------+----------------------------
 10 | d3d9446802a44259755d38e6d163e820 | 2024-06-06 09:16:42.664379
 11 | 6512bd43d9caa6e02c990b0a82652dca | 2024-06-06 09:16:42.664379
 12 | c20ad4d76fe97759aa27a0c99bff6710 | 2024-06-06 09:16:42.664379
 13 | c51ce410c124a10e0db5e4b97fc2af39 | 2024-06-06 09:16:42.664379
 14 | aab3238922bcc25a6f606eb525ffdc56 | 2024-06-06 09:16:42.664379
 15 | 9bf31c7ff062936a96d3c8bd1f8f2ff3 | 2024-06-06 09:16:42.664379
 16 | c74d97b01eae257e44aa9d5bade97baf | 2024-06-06 09:16:42.664379
 17 | 70efdf2ec9b086079795c442636b55fb | 2024-06-06 09:16:42.664379
 18 | 6f4922f45568161a8cdf4ad2299f6d23 | 2024-06-06 09:16:42.664379
 19 | 1f0e3dad99908345f7439f8ffabdffc4 | 2024-06-06 09:16:42.664379
 20 | 98f13708210194c475687be6106a3b84  | 2024-06-06 09:16:42.664379

S3には以下のようなTSVファイルを一つ置いておきます。日付がやけに古いのは↑と同じ方法で作成したテスト用のTSVがあってそれを流用している関係です。
(ついでに何かの間違いで消したつもりになって消してないPostgreSQL側ののデータが混入していたという事態を避ける意味もこめて)

1 c4ca4238a0b923820dcc509a6f75849b  2022-11-25 08:58:23.151356
2 c81e728d9d4c2f636f067f89cc14862c  2022-11-25 08:58:23.151356
3 eccbc87e4b5ce2fe28308fd9f2a7baf3  2022-11-25 08:58:23.151356
4 a87ff679a2f3e71d9181a67b7542122c  2022-11-25 08:58:23.151356
5 e4da3b7fbbce2345d7772b0674a318d5  2022-11-25 08:58:23.151356
6 1679091c5a880faf6fb5e6087eb1b2dc  2022-11-25 08:58:23.151356
7 8f14e45fceea167a5a36dedd4bea2543  2022-11-25 08:58:23.151356
8 c9f0f895fb98ab9159f51fd0297e236d  2022-11-25 08:58:23.151356
9 45c48cce2e2d7fbdea1afc51c7c6ad26  2022-11-25 08:58:23.151356

またこれを格納しているS3バケットに対してクエリをかけられるようにAthena側でテーブルを作成しております。

CREATE EXTERNAL TABLE IF NOT EXISTS PostgresUnionS3Table (
  id int,
  text String,
  create_date timestamp
)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'
WITH SERDEPROPERTIES (
  'field.delim'='\t'
)
LOCATION 's3://athena-postgresql-union-data-xxxxxxx/';

PostgreSQL側のAthena上でのテーブルはよしなに作成されているので別途作成は不要です。

実行

SELECT文を流して両方のクエリ結果を結合します。

今回S3とPostgreSQLを異なるデータカタログ上に作成しているため片側をデータカタログ名からフルパスで指定します。

SELECT *
FROM (
    SELECT id, text, create_date FROM AwsDataCatalog.default.postgresunions3table
    union
    SELECT id, text, create_date FROM history
)
ORDER BY id asc;

idが一桁番台のものはS3側のデータ、それ以降のものはPostgreSQLのものが取れ、それがAthena側で一元化され表示されていることが確認できます。

もちろんパススルークエリでも可能です。CloudWatch Logsのパススルーと比べるとクエリ内にデータソースの指定も含まれるのでシンプルで収まりが良いですね。

SELECT *
FROM (
    SELECT id, text, create_date FROM AwsDataCatalog.default.postgresunions3table
    union
    SELECT id, text, create_date FROM TABLE(system.query(query => 'SELECT * FROM history'))
)
ORDER BY id asc;

終わりに

横串クエリ(Federated Query)の名前の通りの複数ソースに対して一元的にクエリを流し情報を取得してみました。

Athenaを通じてなんらかで外部にアーカイブ化されたデータを含めた分析、さまざまにデータソースに散らばるデータを中間媒体を用意することなく一元的に分析といった非常に便利な機能ではあります。

Athena自体がサーバ維持費がかかるサービスでもなく、仲介するLambdaも動いた分だけの請求のため使わない場合に削除しなくとも維持にコストがかからないのも一つの魅力です(VPCエンドポイント等の周辺リソース自体は場合によってになる点は注意)。

ただ以前にも書いた通りLambdaが都度立ち上がり仲介するためその分の実行速度は落ちるも形にもなりますし、頻度によっては料金が気になることがあるためそこは注意しましょう。