初めに
先日Amazon Athenaで(パススルーを使っていますが)Federated Queryを実行してみました。
上記記事のタイミングでは周辺の準備が大変でCloudWatch Logsに対するクエリを行っていましたが、なんとこちら僕の大好きなPostgreSQLに対するクエリも対応しております。
Federated Queryはでデータソースが単一である必要はなく複数種別のデータソースに対するクエリも可能なため、例えばPostgreSQLのDBのデータ+何らかの事情でS3バケットに部分的に退避したデータに対するクエリといったことが可能です。
今回は以前できなかったPostgreSQLへのFederated Queryを試しつつ折角横串クエリの本丸である複数サービスへの同時クエリを試してみます。
環境準備
データコネクタはServerless Application Repositoryから作成します。細かなパラメータは異なりますが大枠の部分は以前と同じなので上記記載の私の記事もご参照ください。
なおPostgreSQLのデータコネクタはVPC Lambdaが前提となっておますが、Serverless Application Repositoryで作成されるリソースはLambda関数+そのロールのみとなるため、所属するサブネットやセキュリティグループ等のそのほか必要となるリソースは別途自前で作成する必要があります。
周辺リソースの作成
こんな感じになります。
サブネット等周辺環境については普段の検証用のものがあるのでそこに追加する形で以下のリソースを作成します。RDS Proxyも作ろうかと思いましたが簡単な検証でなくても動きはしそうなので省略してます。
- RDS本体
- セキュリティグループ(Lambda取り付け用)
- S3バケット(スピルバケット)
- シークレット(Secrets Manager)
- データコネクタDB認証情報用
- VPCエンドポイント(S3、Secrets Manager)
- Lambda関数がNAT Gateway等でインターネットに接続可能であれば不要
これを生成するCloudFormationテンプレートは以下のとおりです。
AWSTemplateFormatVersion: 2010-09-09
Parameters:
Env:
Type: String
Prefix:
Type: String
VpcId:
Type: String
SubnetGroupId:
Type: String
EndpointSubnetId:
Type: List<String>
Resources:
DBInstance:
Type: AWS::RDS::DBInstance
Properties:
Engine: postgres
MasterUsername: postgres
AllocatedStorage: 20
StorageType: gp3
DBInstanceIdentifier: !Sub ${Env}-${Prefix}-db
MasterUserPassword: !Sub "{{resolve:secretsmanager:${RDSMasterUserCredential}:SecretString:password::}}"
AvailabilityZone: !Sub ${AWS::Region}a
DBInstanceClass: db.t3.small
AutoMinorVersionUpgrade: False
EnablePerformanceInsights: True
CACertificateIdentifier: "rds-ca-rsa4096-g1"
DBSubnetGroupName: !Ref SubnetGroupId
VPCSecurityGroups:
- !Ref RdsSg
RDSMasterUserCredential:
Type: AWS::SecretsManager::Secret
Properties:
Name: !Sub ${Env}-${Prefix}-master-credential
GenerateSecretString:
SecretStringTemplate: '{"username": "postgres"}'
GenerateStringKey: "password"
PasswordLength: 16
ExcludeCharacters: '"@/\'
RdsSg:
Type: AWS::EC2::SecurityGroup
Properties:
GroupName: data-connector-rds-sg
GroupDescription: String
SecurityGroupIngress:
- IpProtocol: tcp
FromPort: 5432
ToPort: 5432
SourceSecurityGroupId: !Ref LambdaSg
VpcId: !Ref VpcId
LambdaSg:
Type: AWS::EC2::SecurityGroup
Properties:
GroupName: data-connector-lambda-sg
GroupDescription: String
VpcId: !Ref VpcId
SecretsManagerEndpoint:
Type: AWS::EC2::VPCEndpoint
Properties:
ServiceName: 'com.amazonaws.ap-northeast-1.secretsmanager'
SubnetIds: !Ref EndpointSubnetId
VpcEndpointType: Interface
VpcId: !Ref VpcId
PrivateDnsEnabled: True
SecurityGroupIds:
- !Ref EndpointSg
S3Endpoint:
Type: AWS::EC2::VPCEndpoint
Properties:
ServiceName: 'com.amazonaws.ap-northeast-1.s3'
SubnetIds: !Ref EndpointSubnetId
VpcEndpointType: Interface
VpcId: !Ref VpcId
# PrivateDnsOnlyForInboundResolverEndpointがデフォルトTrueっぽい為Falseにしないとここけるが
# CFn上指定がサポートされていない?(要確認)っぽいので別途手動で有効化する
#PrivateDnsEnabled: True
SecurityGroupIds:
- !Ref EndpointSg
EndpointSg:
Type: AWS::EC2::SecurityGroup
Properties:
GroupName: data-connector-ep-sg
GroupDescription: String
SecurityGroupIngress:
- IpProtocol: tcp
FromPort: 443
ToPort: 443
CidrIp: 0.0.0.0/0
VpcId: !Ref VpcId
VPCエンドポイントはS3(スピルバケットへのアクセス用)、Secrets Manager(認証情報取得用)の2種が必要です。
データコネクタの作成
Serverless Application RepositoryのAthenaPostgreSQLConnector
を作成します。
作成時のパラメータは以下のとおりです(作成後一部直したので作成後のCloudFormationの入力画面)。
キー | 入力値 | 備考 |
---|---|---|
CompositeHandler | PostGreSqlCompositeHandler | |
DefaultConnectionString | postgres://jdbc:postgresql://dev-athena-poc-xxxxxx-1.rrds.amazonaws.com:5432/postgres?secret=${dev-athena-poc-master-credential} | |
DefaultScale | 0 | デフォルト値 |
DisableSpillEncryption | false | デフォルト値 |
LambdaFunctionName | postgresq-data-connector | |
LambdaMemory | 3008 | デフォルト値 |
LambdaRoleARN | (空) | デフォルト値 |
LambdaTimeout | 900 | デフォルト値 |
PermissionsBoundaryARN | (空) | デフォルト値 |
SecretNamePrefix | dev-athena-poc | |
SecurityGroupIds | sg-08c4xxxxx | |
SpillBucket | athena-postgresql-connector-spill-xxxxx | |
SpillPrefix | athena-spill | |
Subnets | subnet-080cxxxxx |
接続文字列は以下のような形となります。
一般的なJDBC接続文字列ですが最近だとライブラリ等にラップされているせいで見ることも少ないので意外と忘れがちな部分ではあります。
postgres://jdbc:postgresql://{{RDSのエンドポイント}}:5432/{{接続先DB名}}?secret=${Secretsの名前}
認証情報はSecrets Manager側に以下の形式で保存されていれば利用する際に良い感じに展開してくれます。
{ "username": "{{接続ユーザ名}}", "password": "{{接続ユーザーパスワード}}"}
パラメータの情報に関する情報は作成上の説明文の記載に加え情報は以下にも記載がありますのでこちらをご参照ください。
ここまでで正常に設定できていればデータコネクタの部分で以下ように接続先のスキーマ情報が確認できます。
ここまでで各種設定ができていない場合は表示されないので各種設定を確認しましょう。
ただしS3へアクセスできない場合でもこの時点では問題なく表示されるため注意してください。
その場合は実際にAthenaでクエリをかける場合に失敗します。少し放置してからログを確認しないとS3のタイムアウトのエラーが出てこなくてめちゃくちゃハマりました。
テストデータ投入
RDSには以下のようにデータを投入します。
postgres=> CREATE TABLE history(id serial, text text, create_date timestamp DEFAULT now());
postgres=> INSERT INTO history(text) SELECT md5(generate_series::text) FROM generate_series(1,20);
INSERT 0 20
postgres=> DELETE FROM history WHERE id < 10;
DELETE 9
postgres=> SELECT * FROM history;
id | text | create_date
----+----------------------------------+----------------------------
10 | d3d9446802a44259755d38e6d163e820 | 2024-06-06 09:16:42.664379
11 | 6512bd43d9caa6e02c990b0a82652dca | 2024-06-06 09:16:42.664379
12 | c20ad4d76fe97759aa27a0c99bff6710 | 2024-06-06 09:16:42.664379
13 | c51ce410c124a10e0db5e4b97fc2af39 | 2024-06-06 09:16:42.664379
14 | aab3238922bcc25a6f606eb525ffdc56 | 2024-06-06 09:16:42.664379
15 | 9bf31c7ff062936a96d3c8bd1f8f2ff3 | 2024-06-06 09:16:42.664379
16 | c74d97b01eae257e44aa9d5bade97baf | 2024-06-06 09:16:42.664379
17 | 70efdf2ec9b086079795c442636b55fb | 2024-06-06 09:16:42.664379
18 | 6f4922f45568161a8cdf4ad2299f6d23 | 2024-06-06 09:16:42.664379
19 | 1f0e3dad99908345f7439f8ffabdffc4 | 2024-06-06 09:16:42.664379
20 | 98f13708210194c475687be6106a3b84 | 2024-06-06 09:16:42.664379
S3には以下のようなTSVファイルを一つ置いておきます。日付がやけに古いのは↑と同じ方法で作成したテスト用のTSVがあってそれを流用している関係です。
(ついでに何かの間違いで消したつもりになって消してないPostgreSQL側ののデータが混入していたという事態を避ける意味もこめて)
1 c4ca4238a0b923820dcc509a6f75849b 2022-11-25 08:58:23.151356
2 c81e728d9d4c2f636f067f89cc14862c 2022-11-25 08:58:23.151356
3 eccbc87e4b5ce2fe28308fd9f2a7baf3 2022-11-25 08:58:23.151356
4 a87ff679a2f3e71d9181a67b7542122c 2022-11-25 08:58:23.151356
5 e4da3b7fbbce2345d7772b0674a318d5 2022-11-25 08:58:23.151356
6 1679091c5a880faf6fb5e6087eb1b2dc 2022-11-25 08:58:23.151356
7 8f14e45fceea167a5a36dedd4bea2543 2022-11-25 08:58:23.151356
8 c9f0f895fb98ab9159f51fd0297e236d 2022-11-25 08:58:23.151356
9 45c48cce2e2d7fbdea1afc51c7c6ad26 2022-11-25 08:58:23.151356
またこれを格納しているS3バケットに対してクエリをかけられるようにAthena側でテーブルを作成しております。
CREATE EXTERNAL TABLE IF NOT EXISTS PostgresUnionS3Table (
id int,
text String,
create_date timestamp
)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'
WITH SERDEPROPERTIES (
'field.delim'='\t'
)
LOCATION 's3://athena-postgresql-union-data-xxxxxxx/';
PostgreSQL側のAthena上でのテーブルはよしなに作成されているので別途作成は不要です。
実行
SELECT文を流して両方のクエリ結果を結合します。
今回S3とPostgreSQLを異なるデータカタログ上に作成しているため片側をデータカタログ名からフルパスで指定します。
SELECT *
FROM (
SELECT id, text, create_date FROM AwsDataCatalog.default.postgresunions3table
union
SELECT id, text, create_date FROM history
)
ORDER BY id asc;
idが一桁番台のものはS3側のデータ、それ以降のものはPostgreSQLのものが取れ、それがAthena側で一元化され表示されていることが確認できます。
もちろんパススルークエリでも可能です。CloudWatch Logsのパススルーと比べるとクエリ内にデータソースの指定も含まれるのでシンプルで収まりが良いですね。
SELECT *
FROM (
SELECT id, text, create_date FROM AwsDataCatalog.default.postgresunions3table
union
SELECT id, text, create_date FROM TABLE(system.query(query => 'SELECT * FROM history'))
)
ORDER BY id asc;
終わりに
横串クエリ(Federated Query)の名前の通りの複数ソースに対して一元的にクエリを流し情報を取得してみました。
Athenaを通じてなんらかで外部にアーカイブ化されたデータを含めた分析、さまざまにデータソースに散らばるデータを中間媒体を用意することなく一元的に分析といった非常に便利な機能ではあります。
Athena自体がサーバ維持費がかかるサービスでもなく、仲介するLambdaも動いた分だけの請求のため使わない場合に削除しなくとも維持にコストがかからないのも一つの魅力です(VPCエンドポイント等の周辺リソース自体は場合によってになる点は注意)。
ただ以前にも書いた通りLambdaが都度立ち上がり仲介するためその分の実行速度は落ちるも形にもなりますし、頻度によっては料金が気になることがあるためそこは注意しましょう。