Amazon AppFlow カスタムコネクタ入門として amazon-appflow-custom-jdbc-connector をデプロイして MySQL から S3 にデータ転送してみた

自前の Lambda 関数を指定することで AppFlow のカスタムコネクタとして使用できます。

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

コンバンハ、千葉(幸)です。

Amazon AppFlow はコーディング不要で SaaS アプリケーションや AWS サービス間のデータ転送を実現するマネージドなインテグレーションサービスです。

データの送信元/送信先として数多くの SaaS アプリケーションや AWS サービスが用意されていますが、それとは別にカスタマー独自のカスタムコネクタを準備し用いることもできます。

JDBC ドライバが含まれたカスタムコネクタを使用して、MySQL サーバーを AppFlow フローの送信元/送信先に設定する方法が AWS ブログに載っていたのでそれを試してみます。

今回作成する構成

全体のイメージは以下です。

  • 最終的に実現するのは AppFlow フローによる Aurora MySQL から S3 へのデータ転送
  • MySQL への接続は標準のコネクタでは提供されていないため、JDBC ドライバを含むカスタムコネクタにより実現する
    • SDK、JDBC ドライバを含む Lambda 関数の PoC 用 SAM テンプレートが公式で提供されている
  • カスタムコネクタは Lambda 関数と紐付けて作成するものである
    • 今回の構成では以下へアクセス可能とする必要がある
      • Aurora MySQL(今回はパブリックアクセス可能とすることで実現する)
      • AWS Secrets Manager シークレット
  • 各種セットアップを今回は AWS Cloud9 環境から行う

1. Cloud9 環境のセットアップ

Cloud9 環境が既に存在するところからスタートします。以下で作成しています。

  • Amazon Linux2
  • パブリックアクセス可能
  • t2.micro

今回デプロイしたい Lambda 関数は以下のリポジトリにあります。

README を確認すると、以下のツールが必要とされています。

  • AWS CLI
  • SAM CLI
  • Java11
  • Maven

上 3 つは元々インストールされていたので、Maven を追加してあげればよさそうです。

# AWS CLI
~/environment $ aws --version
aws-cli/1.19.112 Python/2.7.18 Linux/4.14.294-220.533.amzn2.x86_64 botocore/1.20.112

# SAM CLI
~/environment $ sam --version
SAM CLI, version 1.57.0

# Java11
~/environment $ java --version
openjdk 11.0.17 2022-10-18 LTS
OpenJDK Runtime Environment Corretto-11.0.17.8.1 (build 11.0.17+8-LTS)
OpenJDK 64-Bit Server VM Corretto-11.0.17.8.1 (build 11.0.17+8-LTS, mixed mode)

まずはパッケージをダウンロードします。

~/environment $ curl -OL https://dlcdn.apache.org/maven/maven-3/3.8.6/binaries/apache-maven-3.8.6-bin.tar.gz
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100 8472k  100 8472k    0     0  9404k      0 --:--:-- --:--:-- --:--:-- 9403k

/opt配下に解凍します。

~/environment $ sudo tar xzvf apache-maven-3.8.6-bin.tar.gz -C /opt/                                                                          
apache-maven-3.8.6/README.txt
apache-maven-3.8.6/LICENSE
apache-maven-3.8.6/NOTICE
apache-maven-3.8.6/lib/
apache-maven-3.8.6/lib/commons-cli.license
apache-maven-3.8.6/lib/commons-io.license
...略...

パスを追加し、プロファイルを読み込み直します。

~/environment $ echo "PATH=$PATH:/opt/apache-maven-3.8.6/bin/" >> ~/.bash_profile
~/environment $ source ~/.bash_profile

mvn -vでバージョンが確認できたので問題なく準備できました。

~/environment $ mvn -v
Apache Maven 3.8.6 (84538c9988a25aec085021c365c560670ad80f63)
Maven home: /opt/apache-maven-3.8.6
Java version: 11.0.17, vendor: Amazon.com Inc., runtime: /usr/lib/jvm/java-11-amazon-corretto.x86_64
Default locale: en_US, platform encoding: UTF-8
OS name: "linux", version: "4.14.294-220.533.amzn2.x86_64", arch: "amd64", family: "unix"

冒頭のリポジトリをクローンします。

~/environment $ git clone https://github.com/aws-samples/amazon-appflow-custom-jdbc-connector.git
Cloning into 'amazon-appflow-custom-jdbc-connector'...
remote: Enumerating objects: 100, done.
remote: Counting objects: 100% (100/100), done.
remote: Compressing objects: 100% (59/59), done.
remote: Total 100 (delta 30), reused 85 (delta 19), pack-reused 0
Receiving objects: 100% (100/100), 201.52 KiB | 5.17 MiB/s, done.
Resolving deltas: 100% (30/30), done.

ディレクトリを移動し、mvn packageによるビルドを行います。

~/environment $ cd amazon-appflow-custom-jdbc-connector/
~/environment/amazon-appflow-custom-jdbc-connector (main) $ mvn package
...略...
 [INFO] Copying byte-buddy-1.10.20.jar to /home/ec2-user/environment/amazon-appflow-custom-jdbc-connector/target/dependency/byte-buddy-1.10.20.jar
[INFO] Copying byte-buddy-agent-1.10.20.jar to /home/ec2-user/environment/amazon-appflow-custom-jdbc-connector/target/dependency/byte-buddy-agent-1.10.20.jar
[INFO] Copying objenesis-3.2.jar to /home/ec2-user/environment/amazon-appflow-custom-jdbc-connector/target/dependency/objenesis-3.2.jar
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time:  02:32 min
[INFO] Finished at: 2022-11-13T10:52:49Z
[INFO] ------------------------------------------------------------------------

ここまでで SAM デプロイを行う準備が整いました。

2. SAM によるカスタムコネクタ用 Lambda 関数のデプロイ

SAM デプロイを行います。SAM により生成される CloudFormation スタックの名称や、いくつかの確認事項に対話式で答えることでデプロイが開始されます。

~/environment/amazon-appflow-custom-jdbc-connector (main) $ sam deploy --guided

Configuring SAM deploy
======================

        Looking for config file [samconfig.toml] :  Not found

        Setting default arguments for 'sam deploy'
        =========================================
        Stack Name [sam-app]: sam-app
        AWS Region [ap-northeast-1]: 
        #Shows you resources changes to be deployed and require a 'Y' to initiate deploy
        Confirm changes before deploy [y/N]: y
        #SAM needs permission to be able to create roles to connect to the resources in your template
        Allow SAM CLI IAM role creation [Y/n]: y
        #Preserves the state of previously provisioned resources when an operation fails
        Disable rollback [y/N]: y
        Save arguments to configuration file [Y/n]: y
        SAM configuration file [samconfig.toml]: 
        SAM configuration environment [default]: 

        Looking for resources needed for deployment:
        Creating the required resources...
        Successfully created!
         Managed S3 bucket: aws-sam-cli-managed-default-samclisourcebucket-58h089x7vwj
         A different default S3 bucket can be set in samconfig.toml

        Saved arguments to config file
        Running 'sam deploy' for future deployments will use the parameters saved above.
        The above parameters can be changed by modifying samconfig.toml
        Learn more about samconfig.toml syntax at 
        https://docs.aws.amazon.com/serverless-application-model/latest/developerguide/serverless-sam-cli-config.html

Uploading to sam-app/ebe56aca3da88af67ab2396bb72205a8  24616216 / 24616216  (100.00%)

        Deploying with following values
        ===============================
        Stack name                   : sam-app
        Region                       : ap-northeast-1
        Confirm changeset            : True
        Disable rollback             : True
        Deployment s3 bucket         : aws-sam-cli-managed-default-samclisourcebucket-58h089x7vwj
        Capabilities                 : ["CAPABILITY_IAM"]
        Parameter overrides          : {}
        Signing Profiles             : {}

Initiating deployment
=====================
Uploading to sam-app/b7d02a0293bff5ad86550c66681e85b0.template  1297 / 1297  (100.00%)

Waiting for changeset to be created..
CloudFormation stack changeset
-------------------------------------------------------------------------------------------------------------------------------------------------
Operation                            LogicalResourceId                    ResourceType                         Replacement                        
-------------------------------------------------------------------------------------------------------------------------------------------------
+ Add                                ConnectorFunctionRole                AWS::IAM::Role                       N/A                                
+ Add                                ConnectorFunction                    AWS::Lambda::Function                N/A                                
+ Add                                PolicyPermission                     AWS::Lambda::Permission              N/A                                
-------------------------------------------------------------------------------------------------------------------------------------------------

Changeset created successfully. arn:aws:cloudformation:ap-northeast-1:012345678910:changeSet/samcli-deploy1668337006/570c940f-1dcf-4e70-a8b0-0ad263081461


Previewing CloudFormation changeset before deployment
======================================================
Deploy this changeset? [y/N]: y

2022-11-13 10:57:08 - Waiting for stack create/update to complete

CloudFormation events from stack operations (refresh every 0.5 seconds)
-------------------------------------------------------------------------------------------------------------------------------------------------
ResourceStatus                       ResourceType                         LogicalResourceId                    ResourceStatusReason               
-------------------------------------------------------------------------------------------------------------------------------------------------
CREATE_IN_PROGRESS                   AWS::IAM::Role                       ConnectorFunctionRole                -                                  
CREATE_IN_PROGRESS                   AWS::IAM::Role                       ConnectorFunctionRole                Resource creation Initiated        
CREATE_COMPLETE                      AWS::IAM::Role                       ConnectorFunctionRole                -                                  
CREATE_IN_PROGRESS                   AWS::Lambda::Function                ConnectorFunction                    -                                  
CREATE_IN_PROGRESS                   AWS::Lambda::Function                ConnectorFunction                    Resource creation Initiated        
CREATE_COMPLETE                      AWS::Lambda::Function                ConnectorFunction                    -                                  
CREATE_IN_PROGRESS                   AWS::Lambda::Permission              PolicyPermission                     -                                  
CREATE_IN_PROGRESS                   AWS::Lambda::Permission              PolicyPermission                     Resource creation Initiated        
CREATE_COMPLETE                      AWS::Lambda::Permission              PolicyPermission                     -                                  
CREATE_COMPLETE                      AWS::CloudFormation::Stack           sam-app                              -                                  
-------------------------------------------------------------------------------------------------------------------------------------------------

Successfully created/updated stack - sam-app in ap-northeast-1


SAM CLI update available (1.62.0); (1.57.0 installed)
To download: https://docs.aws.amazon.com/serverless-application-model/latest/developerguide/serverless-sam-cli-install.html

↑正常にデプロイが完了しました。

SAM によりデプロイされた内容を確認してみる

今回使用した SAM テンプレートは以下です。

AWSTemplateFormatVersion: '2010-09-09'
Transform: 'AWS::Serverless-2016-10-31'
Description: Template to deploy the lambda connector in your account.
Resources:
  ConnectorFunction:
    Type: 'AWS::Serverless::Function'
    Properties:
      Handler: "org.custom.connector.jdbc.handler.JDBCConnectorLambdaHandler::handleRequest"
      CodeUri: "./target/appflow-custom-jdbc-connector-1.0.jar"
      Description: "AppFlow custom JDBC connector example"
      Runtime: java11
      Timeout: 30
      MemorySize: 1024
      Policies:
        Version: '2012-10-17'
        Statement:
          Effect: Allow
          Action: 'secretsmanager:GetSecretValue'
          Resource: !Sub 'arn:aws:secretsmanager:${AWS::Region}:${AWS::AccountId}:secret:appflow!${AWS::AccountId}-*'

  PolicyPermission:
    Type: 'AWS::Lambda::Permission'
    Properties:
      FunctionName: !GetAtt ConnectorFunction.Arn
      Action: lambda:InvokeFunction
      Principal: 'appflow.amazonaws.com'
      SourceAccount: !Ref 'AWS::AccountId'
      SourceArn: !Sub 'arn:aws:appflow:${AWS::Region}:${AWS::AccountId}:*'

以下のリソースが作成されています。

  • Lambda 関数
  • Lambda 関数用 IAM ロールおよびポリシー
  • (Lambda 関数リソースベースポリシー)

リソースベースポリシーは以下の通り。AppFlow からの Invoke が許可されています。

{
  "Version": "2012-10-17",
  "Id": "default",
  "Statement": [
    {
      "Sid": "sam-app-PolicyPermission-1LD210PAD1FD8",
      "Effect": "Allow",
      "Principal": {
        "Service": "appflow.amazonaws.com"
      },
      "Action": "lambda:InvokeFunction",
      "Resource": "arn:aws:lambda:ap-northeast-1:012345678910:function:sam-app-ConnectorFunction-Xnp6q3lSAbwO",
      "Condition": {
        "StringEquals": {
          "AWS:SourceAccount": "012345678910"
        },
        "ArnLike": {
          "AWS:SourceArn": "arn:aws:appflow:ap-northeast-1:012345678910:*"
        }
      }
    }
  ]
}

Lambda 関数にアタッチされた IAM ロールは以下の通り。AWSLambdaBasicExecutionRoleに加え、インラインポリシーが設定されています。

AppFlow_IAM_Management_Console

インラインポリシーの内訳は以下の通りで、AppFlow により作成された Secrets を取得する権限が与えられています。

{
    "Version": "2012-10-17",
    "Statement": {
        "Action": "secretsmanager:GetSecretValue",
        "Resource": "arn:aws:secretsmanager:ap-northeast-1:012345678910:secret:appflow!012345678910-*",
        "Effect": "Allow"
    }
}

3. Aurora MySQL のセットアップ

以下の設定で Aurora DB クラスターを作成しました。

  • エンジンバージョン:8.0.mysql_aurora.3.02.1
  • パブリックアクセス:有効
  • SecuriryGroup:0.0.0.0/0から 3306 ポートへのインバウンドを許可
    • 今回は検証なので広めに開けましたが真似しないでください!

AppFlow による転送を行うためのデータを追加します。具体的にはデータベース、テーブル、レコードを追加します。何らかサンプルとなるものが必要だったので、以下のページの内容を踏襲しました。

Cloud9 環境から DB に接続して作業していきます。

$ mysql --version
mysql  Ver 15.1 Distrib 10.2.38-MariaDB, for Linux (x86_64) using  EditLine wrapper
$ mysql -h xxxxxx.cluster-xxxxxx0und.ap-northeast-1.rds.amazonaws.com -u Admin -p
Enter password:(パスワードを入力し接続)

データベースtestdbの作成。

MySQL [(none)]> create database testdb;
Query OK, 1 row affected (0.00 sec)

MySQL [(none)]> show databases;
+--------------------+
| Database           |
+--------------------+
| information_schema |
| mysql              |
| performance_schema |
| sys                |
| testdb             |
+--------------------+
5 rows in set (0.01 sec)

テーブルusersの作成。

MySQL [(none)]> use testdb;
Database changed
MySQL [testdb]> show tables;
Empty set (0.01 sec)

MySQL [testdb]> CREATE TABLE users (id INT AUTO_INCREMENT, name TEXT, PRIMARY KEY (id));
Query OK, 0 rows affected (0.02 sec)

MySQL [testdb]> show tables;
+------------------+
| Tables_in_testdb |
+------------------+
| users            |
+------------------+
1 row in set (0.00 sec)

MySQL [testdb]> describe users;
+-------+------+------+-----+---------+----------------+
| Field | Type | Null | Key | Default | Extra          |
+-------+------+------+-----+---------+----------------+
| id    | int  | NO   | PRI | NULL    | auto_increment |
| name  | text | YES  |     | NULL    |                |
+-------+------+------+-----+---------+----------------+
2 rows in set (0.00 sec)

レコードの追加。

MySQL [testdb]> SELECT * FROM users;
Empty set (0.00 sec)

MySQL [testdb]> INSERT INTO users(name) VALUES ('testuser');
Query OK, 1 row affected (0.00 sec)

MySQL [testdb]> SELECT * FROM users;
+----+----------+
| id | name     |
+----+----------+
|  1 | testuser |
+----+----------+
1 row in set (0.00 sec)

準備ができました。

4. AppFlow フローの作成・実行

ここから先は AppFlow 関連のリソースを設定していきます。

カスタムコネクタの登録

まずはカスタムコネクタを登録していきます。「コネクタ」の画面から、「新しいコネクタを登録」を押下します。

app_flow_Amazon_AppFlow

先ほど作成した Lambda 関数を選択し、ラベル(名称)を指定して「登録」を押下します。

AppFlow_Custom_Connect_Amazon_AppFlow

今回はjdbcという名称で登録しました。

コネクタプロパティの作成

登録したコネクタに「接続(コネクタプロパティ)」を作成していきます。

Appflow_Connector_Amazon_AppFlow

各種情報を入力します。

項目 説明
Driver 今回使用するコネクタでは mysql のみ
Hostname DB クラスターの接続エンドポイント
Port ポート番号
Username DB ユーザー名
Password DB ユーザーのパスワード
Database Name データベース名

Appflow_connector_Amazon_AppFlow-8345953

↑わたしはこの段階で「接続する」を押下して「何も反応ないな……?」としばらくハマっていたのですが、スクロールするとまだ入力項目があります。

接続(コネクタプロパティ)名を指定し、「接続する」を押下します。

Ammflow_connector_name_Amazon_AppFlow

コネクタに紐づく接続が正常に作成されました。

AppFlow_custom_Connector_Amazon_AppFlow-8340162


ちなみに

接続の作成時に登録した内容はシークレットに保存されています。

AppFlow_secrets_Secrets_Manager

シークレットの値はこのような感じ。接続作成時に入力した内容に加え以下内容が登録されていました。

  • authenticationType:CUSTOM
  • customAuthenticationType:JDBC

appflow_Secrets_Manager


フローの作成

フローを作成していきます。手順が 5 ステップに分かれているので、順番に見ていきます。

手順 1 ではフロー名を入力して次へ。

appflow_jdbc_flow

手順 2 では送信元、送信先を指定します。

送信元では先ほど作成したコネクタ・接続を選択します。

  • API バージョンを選択:今回選択できるのは v1 のみ
  • jdbc オブジェクトを選択:接続先のテーブル名を候補に表示してくれるので選択
    • 今回は先ほど作成したusersテーブルを選択

appflow_custom-flow

↑送信先には S3 バケットを指定しました。

手順 3 ではデータのマッピングを実施します。今回の例で言えば、フィールドはテーブルのカラムが該当します。

「送信元フィールド名」から「すべてのフィールドを直接マッピングする」を選択すると、自動でマッピングを設定してくれます。

appflow_custom_connect_jdbc

手順 4 ではフィルターを指定できますが、今回は特に指定せず次に進みます。

AppFlow_Custom_flow_creeate_Amazon_AppFlow

手順 5 で最終確認を行い、フローの作成を行います。

appflow_custom_flow_create

これでフローの作成が完了です。

フローの実行

作成されたフローを選択し、「フローを実行」を押下します。

appflow_create_run_Amazon_AppFlow

しばらく待つと画面上部に結果が表示されます。

appflow_custom_flow_success_Amazon_AppFlow

今回は以下の名称でオブジェクトが作成されていました。

s3://chibayuki-hoge-hoge/jdbc-flow/ef028c20-a5de-4728-ac1e-b5dee0e7b397/-2423294-2022-11-13T13:37:04

オブジェクトをダウンロードし中身を確認するとこのようになっていました。

{"id":"1","name":"testuser"}

おさらいをするとtestdbデータベースのusersテーブルの中身はこのようになっています。

MySQL [testdb]> SELECT * FROM users;
+----+----------+
| id | name     |
+----+----------+
|  1 | testuser |
+----+----------+
1 row in set (0.00 sec)

AppFlow カスタムコネクタを利用した MySQL から S3 へのデータ転送が確認できました。

終わりに

Amazon AppFlow のカスタムコネクタのサンプルである amazon-appflow-custom-jdbc-connector を使ってみました。

このサンプルはあくまで PoC 用でありそのまま本番環境には使用しないように、という注意書きがありますのでご留意ください。

カスタムコネクタを使ってみた情報が少なくてイメージが湧いていませんでしたが、実際に手を動かして「カスタムコネクタは Lambda 関数を紐付けて作成すること」「Lambda 関数から接続先やシークレットにアクセス可能であること」を学びました。

AppFlow カスタムコネクタの使用を検討している方の参考になれば幸いです。

以上、 チバユキ (@batchicchi) がお送りしました。

参考