データアナリティクス事業本部の笠原です。
Glue for RayはGA時点でもVPC内への接続はできません。 そのため、VPC内のリソースである、RDSやRedshiftへはパブリック経路で接続する必要があります。
ただ、RDSやRedshiftへの接続には、Data APIという別の方法があります。 これが使えれば、Glue for RayでもVPC内のプライベートなRDSやRedshiftにアクセスできるはずです。
今回は、Glue for RayでRedshift Data APIを使って、パブリックに公開されていないプライベートなRedshiftクラスタに接続してみます。
概要図
RedshiftはVPC内で起動します。 VPCにはインターネットゲートウェイはアタッチしませんし、Redshiftはパブリックアクセスをしません。 また、GlueジョブにはGlue接続はアタッチしません。
Data APIを利用する際、今回はSecrets Managerを利用します。
準備
最初に、Cloudformationテンプレートで一通り作成します。 まずは、Redshiftと、Redshiftを配置するためのVPCを作成します。
Redshiftは、シングルノードの dc2.large
を指定しています。
ノードタイプは、Redshift Data APIに対応しているノードタイプを選びましょう。
AWSTemplateFormatVersion: "2010-09-09"
Parameters:
MasterUserName:
Type: String
Default: defaultuser
AllowedPattern: "([a-z])([a-z]|[0-9])*"
MasterUserPassword:
Type: String
NoEcho: true
Resources:
## Redshift
RedshiftCluster:
Type: AWS::Redshift::Cluster
Properties:
ClusterType: single-node
NodeType: dc2.large
DBName: glueraysampledb
MasterUsername: !Ref MasterUserName
MasterUserPassword: !Ref MasterUserPassword
ClusterParameterGroupName: !Ref RedshiftClusterParameterGroup
VpcSecurityGroupIds:
- !Ref RedshiftClusterSecurityGroup
ClusterSubnetGroupName: !Ref RedshiftClusterSubnetGroup
PubliclyAccessible: false
Port: 5439
RedshiftClusterParameterGroup:
Type: AWS::Redshift::ClusterParameterGroup
Properties:
Description: Cluster Parameter Group
ParameterGroupFamily: redshift-1.0
Parameters:
- ParameterName: enable_user_activity_logging
ParameterValue: true
RedshiftClusterSubnetGroup:
Type: AWS::Redshift::ClusterSubnetGroup
Properties:
Description: Cluster Subnet Group
SubnetIds:
- !Ref ClusterSubnet
## VPC
VPC:
Type: AWS::EC2::VPC
Properties:
CidrBlock: 10.0.0.0/16
ClusterSubnet:
Type: AWS::EC2::Subnet
Properties:
CidrBlock: 10.0.1.0/24
VpcId: !Ref VPC
AvailabilityZone: ap-northeast-1a
RedshiftClusterSecurityGroup:
Type: AWS::EC2::SecurityGroup
Properties:
GroupName: RedshiftClusterSecurityGroup
GroupDescription: Security Group
VpcId: !Ref VPC
SecurityGroupIngress:
- CidrIp: 10.0.0.0/16
FromPort: 5439
ToPort: 5439
IpProtocol: tcp
次に、Glueジョブで必要なIAMロールも作成します。
AWSTemplateFormatVersion: "2010-09-09"
Resources:
## IAM
GlueJobRoleWithRedshiftDataAPI:
Type: AWS::IAM::Role
Properties:
RoleName: glue-ray-redshift-job-role
AssumeRolePolicyDocument:
Version: 2012-10-17
Statement:
- Effect: Allow
Principal:
Service:
- glue.amazonaws.com
Action: sts:AssumeRole
ManagedPolicyArns:
- arn:aws:iam::aws:policy/AmazonS3FullAccess
- arn:aws:iam::aws:policy/service-role/AWSGlueServiceRole
- arn:aws:iam::aws:policy/AmazonRedshiftDataFullAccess
- arn:aws:iam::aws:policy/AmazonRedshiftFullAccess
データ読み込み時に出力先となるS3バケットも用意しておきましょう。
AWSTemplateFormatVersion: "2010-09-09"
Resources:
## S3
GlueRaySampleBucket:
Type: AWS::S3::Bucket
Properties:
BucketName: glue-ray-sample-kasahara
PublicAccessBlockConfiguration:
BlockPublicAcls: True
BlockPublicPolicy: True
IgnorePublicAcls: True
RestrictPublicBuckets: True
続いて、Redshiftに今回のテスト用のデータを作成します。 クエリエディタv2に入って、以下のSQLを実行しました。
-- スキーマ & テーブル作成
CREATE SCHEMA rayschema;
CREATE TABLE rayschema.posts (
id INTEGER NOT NULL,
title VARCHAR(255) NOT NULL,
description VARCHAR(255)
);
-- データを新規作成
INSERT INTO rayschema.posts (id, title, description) VALUES (1, 'タイトル1', '本文です。');
INSERT INTO rayschema.posts (id, title, description) VALUES (2, 'タイトル2', '本文です。');
INSERT INTO rayschema.posts (id, title, description) VALUES (3, 'タイトル3', '本文です。');
INSERT INTO rayschema.posts (id, title, description) VALUES (4, 'タイトル4', '本文です。');
INSERT INTO rayschema.posts (id, title, description) VALUES (5, 'タイトル5', '本文です。');
INSERT INTO rayschema.posts (id, title, description) VALUES (6, 'タイトル6', '本文です。');
INSERT INTO rayschema.posts (id, title, description) VALUES (7, 'タイトル7', '本文です。');
INSERT INTO rayschema.posts (id, title, description) VALUES (8, 'タイトル8', '本文です。');
INSERT INTO rayschema.posts (id, title, description) VALUES (9, 'タイトル9', '本文です。');
INSERT INTO rayschema.posts (id, title, description) VALUES (10, 'タイトル10', '本文です。');
INSERT INTO rayschema.posts (id, title, description) VALUES (11, 'タイトル11', '本文です。');
INSERT INTO rayschema.posts (id, title, description) VALUES (12, 'タイトル12', '本文です。');
INSERT INTO rayschema.posts (id, title, description) VALUES (13, 'タイトル13', '本文です。');
INSERT INTO rayschema.posts (id, title, description) VALUES (14, 'タイトル14', '本文です。');
INSERT INTO rayschema.posts (id, title, description) VALUES (15, 'タイトル15', '本文です。');
INSERT INTO rayschema.posts (id, title, description) VALUES (16, 'タイトル16', '本文です。');
INSERT INTO rayschema.posts (id, title, description) VALUES (17, 'タイトル17', '本文です。');
INSERT INTO rayschema.posts (id, title, description) VALUES (18, 'タイトル18', '本文です。');
INSERT INTO rayschema.posts (id, title, description) VALUES (19, 'タイトル19', '本文です。');
INSERT INTO rayschema.posts (id, title, description) VALUES (20, 'タイトル20', '本文です。');
最後に、Secrets Managerでシークレット情報を作成します。 今回はテストなので、Redshift作成時に生成したスーパーユーザのユーザ名とパスワードのシークレット情報を作成します。
また、タグに RedshiftDataFullAccess
を設定します。値は空でOKです。
これは、 マネージドポリシーである AmazonRedshiftDataFullAccess
が Secrets Managerからシークレット情報を取得する際の条件として、リソースタグ名が RedshiftDataFullAccess
のシークレットのみ取得可能と指定されているためです。
Glue for Rayジョブ作成
マネジメントコンソールから、Glue Studio上でRayジョブを作成しましょう。 GlueジョブのIAMロールは、先ほど作成したロールを設定してください。
データの読み込み
今回は簡単のため、AWS SDK for pandas (AWS Wrangler)を使います。 AWS SDK for pandasには、Redshift Data APIを使ってデータを読み込むことが簡単にできる機能が用意されています。
引数にて --pip-install
でインストールするパッケージを指定します。
今回は、以下のパッケージをインストールします。
awswrangler
のバージョンは、Rayがプレビューだった時に利用できたバージョンにしてます。
modin[ray], tqdm, awswrangler==3.1.1
コードは以下の通りにしました。
import ray
import awswrangler as wr
## AWS SDK for Pandasの実行エンジン(Ray)とメモリフォーマット(Modin)の確認
print(f"Execution Engine: {wr.engine.get()}")
print(f"Memory Format: {wr.memory_format.get()}")
## Redshift Data API 接続情報定義
con_redshift = wr.data_api.redshift.connect(
cluster_id="<RedshiftクラスタID>",
database="glueraysampledb", ## Redshiftデータベース名
secret_arn="<事前に作成したシークレットのARN>"
)
## SQLでデータ読み込み
df = wr.data_api.redshift.read_sql_query(
sql="SELECT * FROM rayschema.posts",
con=con_redshift,
)
## 内容確認
print(df.head(5))
print(df.info())
## S3へ書き込み
wr.s3.to_parquet(
df=df,
path="s3://glue-ray-sample-kasahara/ray/redshift-data-api/outputs/",
index=False
)
print文の出力内容は Cloudwatch Logsのロググループ /aws-glue/ray/jobs/script-log
に出力されています。
また、S3バケットにもParquetファイルが出力されています。
データの書き込み
AWS SDK for pandasで用意されているメソッドには、データ書き込みに関するものがありません。 そこで、データ書き込みについては、boto3を用いてInsertしていきたいと思います。
以下の記事を参考にしました。
以下のようなコードで500件追加してみました。
import ray
import boto3
import time
import json
ray.init()
## Redshift接続情報定義
CLUSTER_ID='<RedshiftクラスタID>'
DATABASE_NAME='glueraysampledb' ## Redshiftデータベース名
SECRET_ARN='<事前に作成したシークレットのARN>'
## 実行するSQL
SQL = '''
INSERT INTO rayschema.posts (id, title, description) VALUES (:id, :title, :description);
'''
## データ書き込み件数
max_count=500
offset=20
## 書き込みタスクの定義
@ray.remote
def insert_data(id):
data_client = boto3.client('redshift-data')
## SQLクエリの実行
result = data_client.execute_statement(
ClusterIdentifier=CLUSTER_ID,
Database=DATABASE_NAME,
SecretArn=SECRET_ARN,
Sql=SQL,
Parameters=[
{'name': 'id', 'value': f'{id}'},
{'name': 'title', 'value': f'タイトル{id}'},
{'name': 'description', 'value': '本文です。'},
]
)
## SQL実行IDを確認
statement_id = result['Id']
## SQLクエリ実行が終わるまで待つ
statement=None
status=''
while status != 'FINISHED' and status != 'FAILED' and status != 'ABORTED':
statement = data_client.describe_statement(Id=statement_id)
status = statement['Status']
time.sleep(1)
## 結果の応答
if status == 'FINISHED':
if int(statement['ResultSize']) > 0:
statement = data_client.get_statement_result(Id=statement_id)
print(json.dumps(statement['Records']))
return 0
else:
print('QUERY FINISHED.')
return 0
elif status == 'FAILED':
print(f'QUERY FAILED.\n{statement}')
return 1
elif status == 'ABORTED':
print('QUERY ABORTED: The query run was stopped by the user.')
return 1
## タスクの実行
futures = [insert_data.remote(id) for id in range(offset+1, max_count+offset+1)]
results = ray.get(futures)
if 1 in results:
print('QUERY FAILED.')
else:
print('QUERY COMPLETED.')
タスクの途中で、クエリ結果取得待ちで time.sleep(1)
を入れているので、実行結果はそこまで早くなりません。
また、 boto3.client('redshift-data')
の定義はRayタスクの中で行なっていますが、これは元々Rayタスクの外で定義してました。
## 書き込みタスクの定義
@ray.remote
def insert_data(id, data_client):
## SQLクエリの実行
## <省略>
## タスクの実行
client = ray.put(boto3.client('redshift-data'))
futures = [insert_data.remote(id, client) for id in range(offset+1, max_count+offset+1)]
ただし、実行時に以下のエラーが出たため、やむなくRayタスクの中で定義しています。
TypeError: Could not serialize the put value <botocore.client.RedshiftDataAPIService object at 0x7f6cc96ac0>
Redshiftクエリエディタv2にて、件数が増えていることが確認できます。 元々20件あって、追加で500件増えたため、トータル520件になってます。
まとめ
いかがでしたでしょうか。 Glue for RayはまだVPCへプライベート接続できないですが、Data APIを使うことでプライベートなRedshiftクラスタに接続できることが確認できました。 VPCへのプライベート接続ができるまでは、Data APIを使っていきましょう。