Glue for RayでRedshiftにData APIで接続してみた

Glue for RayでRedshift Data APIを使って、パブリックに公開されていないプライベートなRedshiftクラスタに接続してみます。
2023.08.15

データアナリティクス事業本部の笠原です。

Glue for RayはGA時点でもVPC内への接続はできません。 そのため、VPC内のリソースである、RDSやRedshiftへはパブリック経路で接続する必要があります。

ただ、RDSやRedshiftへの接続には、Data APIという別の方法があります。 これが使えれば、Glue for RayでもVPC内のプライベートなRDSやRedshiftにアクセスできるはずです。

今回は、Glue for RayでRedshift Data APIを使って、パブリックに公開されていないプライベートなRedshiftクラスタに接続してみます。

概要図

RedshiftはVPC内で起動します。 VPCにはインターネットゲートウェイはアタッチしませんし、Redshiftはパブリックアクセスをしません。 また、GlueジョブにはGlue接続はアタッチしません。

Data APIを利用する際、今回はSecrets Managerを利用します。

準備

最初に、Cloudformationテンプレートで一通り作成します。 まずは、Redshiftと、Redshiftを配置するためのVPCを作成します。

Redshiftは、シングルノードの dc2.large を指定しています。 ノードタイプは、Redshift Data APIに対応しているノードタイプを選びましょう。

AWSTemplateFormatVersion: "2010-09-09"
Parameters:
  MasterUserName:
    Type: String
    Default: defaultuser
    AllowedPattern: "([a-z])([a-z]|[0-9])*"
  MasterUserPassword:
    Type: String
    NoEcho: true

Resources:
  ## Redshift
  RedshiftCluster:
    Type: AWS::Redshift::Cluster
    Properties:
      ClusterType: single-node
      NodeType: dc2.large
      DBName: glueraysampledb
      MasterUsername: !Ref MasterUserName
      MasterUserPassword: !Ref MasterUserPassword
      ClusterParameterGroupName: !Ref RedshiftClusterParameterGroup
      VpcSecurityGroupIds:
        - !Ref RedshiftClusterSecurityGroup
      ClusterSubnetGroupName: !Ref RedshiftClusterSubnetGroup
      PubliclyAccessible: false
      Port: 5439
  RedshiftClusterParameterGroup:
    Type: AWS::Redshift::ClusterParameterGroup
    Properties:
      Description: Cluster Parameter Group
      ParameterGroupFamily: redshift-1.0
      Parameters:
        - ParameterName: enable_user_activity_logging
          ParameterValue: true
  RedshiftClusterSubnetGroup:
    Type: AWS::Redshift::ClusterSubnetGroup
    Properties:
      Description: Cluster Subnet Group
      SubnetIds:
        - !Ref ClusterSubnet

  ## VPC
  VPC:
    Type: AWS::EC2::VPC
    Properties:
      CidrBlock: 10.0.0.0/16
  ClusterSubnet:
    Type: AWS::EC2::Subnet
    Properties:
      CidrBlock: 10.0.1.0/24
      VpcId: !Ref VPC
      AvailabilityZone: ap-northeast-1a
  RedshiftClusterSecurityGroup:
    Type: AWS::EC2::SecurityGroup
    Properties:
      GroupName: RedshiftClusterSecurityGroup
      GroupDescription: Security Group
      VpcId: !Ref VPC
      SecurityGroupIngress:
        - CidrIp: 10.0.0.0/16
          FromPort: 5439
          ToPort: 5439
          IpProtocol: tcp

次に、Glueジョブで必要なIAMロールも作成します。

AWSTemplateFormatVersion: "2010-09-09"

Resources:
  ## IAM
  GlueJobRoleWithRedshiftDataAPI:
    Type: AWS::IAM::Role
    Properties:
      RoleName: glue-ray-redshift-job-role
      AssumeRolePolicyDocument:
        Version: 2012-10-17
        Statement:
          - Effect: Allow
            Principal:
              Service:
                - glue.amazonaws.com
            Action: sts:AssumeRole
      ManagedPolicyArns:
        - arn:aws:iam::aws:policy/AmazonS3FullAccess
        - arn:aws:iam::aws:policy/service-role/AWSGlueServiceRole
        - arn:aws:iam::aws:policy/AmazonRedshiftDataFullAccess
        - arn:aws:iam::aws:policy/AmazonRedshiftFullAccess

データ読み込み時に出力先となるS3バケットも用意しておきましょう。

AWSTemplateFormatVersion: "2010-09-09"

Resources:
  ## S3
  GlueRaySampleBucket:
    Type: AWS::S3::Bucket
    Properties:
      BucketName: glue-ray-sample-kasahara
      PublicAccessBlockConfiguration:
        BlockPublicAcls: True
        BlockPublicPolicy: True
        IgnorePublicAcls: True
        RestrictPublicBuckets: True

続いて、Redshiftに今回のテスト用のデータを作成します。 クエリエディタv2に入って、以下のSQLを実行しました。

-- スキーマ & テーブル作成
CREATE SCHEMA rayschema;
CREATE TABLE rayschema.posts (
    id INTEGER NOT NULL,
    title VARCHAR(255) NOT NULL,
    description VARCHAR(255)
);
-- データを新規作成
INSERT INTO rayschema.posts (id, title, description) VALUES (1, 'タイトル1', '本文です。');
INSERT INTO rayschema.posts (id, title, description) VALUES (2, 'タイトル2', '本文です。');
INSERT INTO rayschema.posts (id, title, description) VALUES (3, 'タイトル3', '本文です。');
INSERT INTO rayschema.posts (id, title, description) VALUES (4, 'タイトル4', '本文です。');
INSERT INTO rayschema.posts (id, title, description) VALUES (5, 'タイトル5', '本文です。');
INSERT INTO rayschema.posts (id, title, description) VALUES (6, 'タイトル6', '本文です。');
INSERT INTO rayschema.posts (id, title, description) VALUES (7, 'タイトル7', '本文です。');
INSERT INTO rayschema.posts (id, title, description) VALUES (8, 'タイトル8', '本文です。');
INSERT INTO rayschema.posts (id, title, description) VALUES (9, 'タイトル9', '本文です。');
INSERT INTO rayschema.posts (id, title, description) VALUES (10, 'タイトル10', '本文です。');
INSERT INTO rayschema.posts (id, title, description) VALUES (11, 'タイトル11', '本文です。');
INSERT INTO rayschema.posts (id, title, description) VALUES (12, 'タイトル12', '本文です。');
INSERT INTO rayschema.posts (id, title, description) VALUES (13, 'タイトル13', '本文です。');
INSERT INTO rayschema.posts (id, title, description) VALUES (14, 'タイトル14', '本文です。');
INSERT INTO rayschema.posts (id, title, description) VALUES (15, 'タイトル15', '本文です。');
INSERT INTO rayschema.posts (id, title, description) VALUES (16, 'タイトル16', '本文です。');
INSERT INTO rayschema.posts (id, title, description) VALUES (17, 'タイトル17', '本文です。');
INSERT INTO rayschema.posts (id, title, description) VALUES (18, 'タイトル18', '本文です。');
INSERT INTO rayschema.posts (id, title, description) VALUES (19, 'タイトル19', '本文です。');
INSERT INTO rayschema.posts (id, title, description) VALUES (20, 'タイトル20', '本文です。');

最後に、Secrets Managerでシークレット情報を作成します。 今回はテストなので、Redshift作成時に生成したスーパーユーザのユーザ名とパスワードのシークレット情報を作成します。

また、タグに RedshiftDataFullAccess を設定します。値は空でOKです。 これは、 マネージドポリシーである AmazonRedshiftDataFullAccess が Secrets Managerからシークレット情報を取得する際の条件として、リソースタグ名が RedshiftDataFullAccess のシークレットのみ取得可能と指定されているためです。

Glue for Rayジョブ作成

マネジメントコンソールから、Glue Studio上でRayジョブを作成しましょう。 GlueジョブのIAMロールは、先ほど作成したロールを設定してください。

データの読み込み

今回は簡単のため、AWS SDK for pandas (AWS Wrangler)を使います。 AWS SDK for pandasには、Redshift Data APIを使ってデータを読み込むことが簡単にできる機能が用意されています。

引数にて --pip-install でインストールするパッケージを指定します。 今回は、以下のパッケージをインストールします。 awswranglerのバージョンは、Rayがプレビューだった時に利用できたバージョンにしてます。

modin[ray], tqdm, awswrangler==3.1.1

コードは以下の通りにしました。

import ray
import awswrangler as wr

## AWS SDK for Pandasの実行エンジン(Ray)とメモリフォーマット(Modin)の確認
print(f"Execution Engine: {wr.engine.get()}")
print(f"Memory Format: {wr.memory_format.get()}")

## Redshift Data API 接続情報定義
con_redshift = wr.data_api.redshift.connect(
    cluster_id="<RedshiftクラスタID>",
    database="glueraysampledb",  ## Redshiftデータベース名
    secret_arn="<事前に作成したシークレットのARN>"
)

## SQLでデータ読み込み
df = wr.data_api.redshift.read_sql_query(
    sql="SELECT * FROM rayschema.posts",
    con=con_redshift,
)

## 内容確認
print(df.head(5))
print(df.info())

## S3へ書き込み
wr.s3.to_parquet(
    df=df,
    path="s3://glue-ray-sample-kasahara/ray/redshift-data-api/outputs/",
    index=False
)

print文の出力内容は Cloudwatch Logsのロググループ /aws-glue/ray/jobs/script-log に出力されています。 また、S3バケットにもParquetファイルが出力されています。

データの書き込み

AWS SDK for pandasで用意されているメソッドには、データ書き込みに関するものがありません。 そこで、データ書き込みについては、boto3を用いてInsertしていきたいと思います。

以下の記事を参考にしました。

以下のようなコードで500件追加してみました。

import ray
import boto3
import time
import json

ray.init()

## Redshift接続情報定義
CLUSTER_ID='<RedshiftクラスタID>'
DATABASE_NAME='glueraysampledb'  ## Redshiftデータベース名
SECRET_ARN='<事前に作成したシークレットのARN>'

## 実行するSQL
SQL = '''
    INSERT INTO rayschema.posts (id, title, description) VALUES (:id, :title, :description);
'''

## データ書き込み件数
max_count=500
offset=20

## 書き込みタスクの定義
@ray.remote
def insert_data(id):
    data_client = boto3.client('redshift-data')
    ## SQLクエリの実行
    result = data_client.execute_statement(
        ClusterIdentifier=CLUSTER_ID,
        Database=DATABASE_NAME,
        SecretArn=SECRET_ARN,
        Sql=SQL,
        Parameters=[
            {'name': 'id', 'value': f'{id}'},
            {'name': 'title', 'value': f'タイトル{id}'},
            {'name': 'description', 'value': '本文です。'},
        ]
    )
    ## SQL実行IDを確認
    statement_id = result['Id']
    ## SQLクエリ実行が終わるまで待つ
    statement=None
    status=''
    while status != 'FINISHED' and status != 'FAILED' and status != 'ABORTED':
        statement = data_client.describe_statement(Id=statement_id)
        status = statement['Status']
        time.sleep(1)
    ## 結果の応答
    if status == 'FINISHED':
        if int(statement['ResultSize']) > 0:
            statement = data_client.get_statement_result(Id=statement_id)
            print(json.dumps(statement['Records']))
            return 0
        else:
            print('QUERY FINISHED.')
            return 0
    elif status == 'FAILED':
        print(f'QUERY FAILED.\n{statement}')
        return 1
    elif status == 'ABORTED':
        print('QUERY ABORTED: The query run was stopped by the user.')
        return 1

## タスクの実行
futures = [insert_data.remote(id) for id in range(offset+1, max_count+offset+1)]
results = ray.get(futures)
if 1 in results:
    print('QUERY FAILED.')
else:
    print('QUERY COMPLETED.')

タスクの途中で、クエリ結果取得待ちで time.sleep(1) を入れているので、実行結果はそこまで早くなりません。 また、 boto3.client('redshift-data') の定義はRayタスクの中で行なっていますが、これは元々Rayタスクの外で定義してました。

## 書き込みタスクの定義
@ray.remote
def insert_data(id, data_client):
    ## SQLクエリの実行

    ## <省略>

## タスクの実行
client = ray.put(boto3.client('redshift-data'))
futures = [insert_data.remote(id, client) for id in range(offset+1, max_count+offset+1)]

ただし、実行時に以下のエラーが出たため、やむなくRayタスクの中で定義しています。

TypeError: Could not serialize the put value <botocore.client.RedshiftDataAPIService object at 0x7f6cc96ac0>

Redshiftクエリエディタv2にて、件数が増えていることが確認できます。 元々20件あって、追加で500件増えたため、トータル520件になってます。

まとめ

いかがでしたでしょうか。 Glue for RayはまだVPCへプライベート接続できないですが、Data APIを使うことでプライベートなRedshiftクラスタに接続できることが確認できました。 VPCへのプライベート接続ができるまでは、Data APIを使っていきましょう。