Data API for Redshiftでデータのロード/アンロードを試してみた

2020.10.04

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

先日Data API for Redshiftが利用可能になりました。
遅ればせながら実際に自分自身で触ってみたかったので、データのロード/アンロード操作を試してみました。
Data API for Redshiftの詳細や使い方等については、下記のアップデートブログがあるので、そちらをご参照ください。

環境

ローカルからAWS CLIを実行しました。バージョンは下記の通りです。
※今回は事前にAWS CLIの(2020/10/04時点での)最新版が実行環境にインストール済みである前提で進めます。

  • Mac OS : 10.15.7
  • AWS CLI : 1.18.152
  • Redshiftクラスター : 1.0.19097

テーブル作成

任意のRedshiftクラスタを作成し、テーブル作成をします。
せっかくなのでテーブルもData APIを実行して作成しました。

テーブル情報、サンプルデータはこちらの一部を拝借しました。

$ aws --profile nagamasa redshift-data execute-statement \
--secret-arn arn:aws:secretsmanager:us-east-1:XXXXXXXXXXXX:secret:XXXXXXXXXXXXXXXXXX \
--cluster-identifier nagamasa-cls-01 \
--database dev \
--sql "create table part (
       p_partkey     INTEGER NOT NULL,
       p_name        VARCHAR(22) NOT NULL,
       p_mfgr        VARCHAR(6) NOT NULL,
       p_category    VARCHAR(7) NOT NULL,
       p_brand1      VARCHAR(9) NOT NULL,
       p_color       VARCHAR(11) NOT NULL,
       p_type        VARCHAR(25) NOT NULL,
       p_size        INTEGER NOT NULL,
       p_container   VARCHAR(10) NOT NULL);"
{
    "ClusterIdentifier": "nagamasa-cls-01",
    "CreatedAt": 1601765440.339,
    "Database": "dev",
    "Id": "11860fc2-bc83-4fe6-a48e-d8dd28cabaf0",
    "SecretArn": "arn:aws:secretsmanager:us-east-1:XXXXXXXXXXXX:secret:XXXXXXXXXXXXXXXXXX"
}
$

ロード

早速データをロードしていきます。 execute-statementAPIで--sqlパラメータにCOPYコマンドを指定して実行します。

$ aws --profile nagamasa redshift-data execute-statement \
--secret-arn arn:aws:secretsmanager:us-east-1:XXXXXXXXXXXX:secret:XXXXXXXXXXXXXXXXXX \
--cluster-identifier nagamasa-cls-01 \
--database dev \
--sql "copy part from 's3://awssampledbuswest2/ssbgz/part'
    credentials 'aws_iam_role=arn:aws:iam::XXXXXXXXXXXX:role/myRedshiftRole'
    gzip region 'us-west-2';"
{
    "ClusterIdentifier": "nagamasa-cls-01",
    "CreatedAt": 1601765979.623,
    "Database": "dev",
    "Id": "7e84ad4b-f20a-47f1-812b-efc4798e5b1d",
    "SecretArn": "arn:aws:secretsmanager:us-east-1:XXXXXXXXXXXX:secret:XXXXXXXXXXXXXXXXXX"
}
$

返却されたレスポンスのIdの値(SQLステートメントのID)を describe-statementAPIの--idパラメータに指定して実行すると、SQLステートメントの詳細が取得できました。StatusがFINISHEDとなっていたので非同期でのCOPYコマンドの完了が確認できました。
実際に実装する際は、StatusがFINISHEDになっていることが確認できたら後続処理に進むなどの実装になりそうですね!

$ aws --profile nagamasa redshift-data describe-statement --id 7e84ad4b-f20a-47f1-812b-efc4798e5b1d
{
    "ClusterIdentifier": "nagamasa-cls-01",
    "CreatedAt": 1601765979.623,
    "Duration": 10725667637,
    "Id": "7e84ad4b-f20a-47f1-812b-efc4798e5b1d",
    "QueryString": "copy part from 's3://awssampledbuswest2/ssbgz/part'\ncredentials ''\ngzip region 'us-west-2';",
    "RedshiftPid": 25465,
    "RedshiftQueryId": -1,
    "ResultRows": 0,
    "ResultSize": 0,
    "SecretArn": "arn:aws:secretsmanager:us-east-1:XXXXXXXXXXXX:secret:XXXXXXXXXXXXXXXXXX",
    "Status": "FINISHED",
    "UpdatedAt": 1601765991.011
}
$

ちゃんとデータもロードされていました〜。

dev=# select count(*) from part;
  count
---------
 1400000
(1 row)

dev=#

アンロード

次にデータをS3にアンロードしていきます。COPYコマンド同様に、execute-statementAPIで--sqlパラメータにUNLOADコマンドを指定して実行します。

$ aws --profile nagamasa redshift-data execute-statement \
--secret-arn arn:aws:secretsmanager:us-east-1:XXXXXXXXXXXX:secret:XXXXXXXXXXXXXXXXXX \
--cluster-identifier nagamasa-cls-01 \
--database dev \
--sql "unload ('select p_partkey, p_name, p_type, p_container from part order by p_partkey')
    to 's3://nagamasa-test/output/part'
    credentials 'aws_iam_role=arn:aws:iam::XXXXXXXXXXXX:role/myRedshiftRole'
    header parallel off csv;"
{
    "ClusterIdentifier": "nagamasa-cls-01",
    "CreatedAt": 1601766883.444,
    "Database": "dev",
    "Id": "51524ab2-3ea8-47e4-9b28-0fbda3be8466",
    "SecretArn": "arn:aws:secretsmanager:us-east-1:XXXXXXXXXXXX:secret:XXXXXXXXXXXXXXXXXX"
}
$

アンロードも問題なくでき、データも確認できました〜

$ aws --profile nagamasa s3 cp s3://nagamasa-test/output/part000 - 2>/dev/null | head
p_partkey,p_name,p_type,p_container
1,lace spring,PROMO BURNISHED COPPER,JUMBO PKG
2,rosy metallic,LARGE BRUSHED BRASS,LG CASE
3,green antique,STANDARD POLISHED BRASS,WRAP CASE
4,metallic smoke,SMALL PLATED BRASS,MED DRUM
5,blush chiffon,STANDARD POLISHED TIN,SM PKG
6,ivory azure,PROMO PLATED STEEL,MED BAG
7,blanched tan,SMALL PLATED COPPER,SM BAG
8,khaki cream,PROMO BURNISHED TIN,LG DRUM
9,rose moccasin,SMALL BURNISHED STEEL,WRAP CASE
$

ちなみに①

Data API for Redshiftでは非同期でSQLクエリが実行されるということで、Redshiftクラスタの状態が[変更中]でも実行できたりするのかな?と思い試してみましたが、下記のようにData APIでエラーレスポンスが返却されました。(まあ、そうですよね) 

An error occurred (ValidationException) when calling the ExecuteStatement operation: Redshift cluster status 'MODIFY_CLUSTER_IAM_ROLES' is not allowed

ちなみに②

Lambdaでの実行も試してみましたが、Lambda内部で呼ばれているSDKが更新されてないのか現時点(2020/10/04)でもまだ正常実行が確認できませんでした。
※下記の条件で試しましたが、いずれも同じ実行結果(エラー)でした。

  • Python3.7, Python3.6, Python2.7
  • バージニア北部(us-east-1)リージョン
  • IAM ロールに AmazonRedshiftDataFullAccess ポリシーをアタッチ

lambda_function.py

import json
import boto3 

def lambda_handler(event, context):
    
    rs_client = boto3.client('redshift-data')
    
    res = rs_client.execute_statement(
          ClusterIdentifier = 'nagamasa-cls-01',
          secretArn = 'arn:aws:secretsmanager:us-east-1:XXXXXXXXXXXX:secret:XXXXXXXXXXXXXXXXXX',
          Database = 'dev',
          Sql = 'select count(*) from part;'
        )
    
    print (res)

実行結果

{
  "errorMessage": "Unknown service: 'redshift-data'. Valid service names are: accessanalyzer, acm, acm-pca, alexaforbusiness, amplify, apigateway, apigatewaymanagementapi, apigatewayv2, appconfig, application-autoscaling, application-insights, appmesh, appstream, appsync, athena, autoscaling, autoscaling-plans, backup, batch, braket, budgets, ce, chime, cloud9, clouddirectory, cloudformation, cloudfront, cloudhsm, cloudhsmv2, cloudsearch, cloudsearchdomain, cloudtrail, cloudwatch, codeartifact, codebuild, codecommit, codedeploy, codeguru-reviewer, codeguruprofiler, codepipeline, codestar, codestar-connections, codestar-notifications, cognito-identity, cognito-idp, cognito-sync, comprehend, comprehendmedical, compute-optimizer, config, connect, connectparticipant, cur, dataexchange, datapipeline, datasync, dax, detective, devicefarm, directconnect, discovery, dlm, dms, docdb, ds, dynamodb, dynamodbstreams, ebs, ec2, ec2-instance-connect, ecr, ecs, efs, eks, elastic-inference, elasticache, elasticbeanstalk, elastictranscoder, elb, elbv2, emr, es, events, firehose, fms, forecast, forecastquery, frauddetector, fsx, gamelift, glacier, globalaccelerator, glue, greengrass, groundstation, guardduty, health, honeycode, iam, identitystore, imagebuilder, importexport, inspector, iot, iot-data, iot-jobs-data, iot1click-devices, iot1click-projects, iotanalytics, iotevents, iotevents-data, iotsecuretunneling, iotsitewise, iotthingsgraph, ivs, kafka, kendra, kinesis, kinesis-video-archived-media, kinesis-video-media, kinesis-video-signaling, kinesisanalytics, kinesisanalyticsv2, kinesisvideo, kms, lakeformation, lambda, lex-models, lex-runtime, license-manager, lightsail, logs, machinelearning, macie, macie2, managedblockchain, marketplace-catalog, marketplace-entitlement, marketplacecommerceanalytics, mediaconnect, mediaconvert, medialive, mediapackage, mediapackage-vod, mediastore, mediastore-data, mediatailor, meteringmarketplace, mgh, migrationhub-config, mobile, mq, mturk, neptune, networkmanager, opsworks, opsworkscm, organizations, outposts, personalize, personalize-events, personalize-runtime, pi, pinpoint, pinpoint-email, pinpoint-sms-voice, polly, pricing, qldb, qldb-session, quicksight, ram, rds, rds-data, redshift, rekognition, resource-groups, resourcegroupstaggingapi, robomaker, route53, route53domains, route53resolver, s3, s3control, sagemaker, sagemaker-a2i-runtime, sagemaker-runtime, savingsplans, schemas, sdb, secretsmanager, securityhub, serverlessrepo, service-quotas, servicecatalog, servicediscovery, ses, sesv2, shield, signer, sms, sms-voice, snowball, sns, sqs, ssm, sso, sso-oidc, stepfunctions, storagegateway, sts, support, swf, synthetics, textract, transcribe, transfer, translate, waf, waf-regional, wafv2, workdocs, worklink, workmail, workmailmessageflow, workspaces, xray",
  "errorType": "UnknownServiceError",
(後略)
}

おわりに

アップデートから少し時間も経っていたので、本当はLambdaでの実行確認をメインにした記事内容にしたくて試してみたのですが、まだLambdaでの正常実行は確認できませんでした。
Data API for Redshiftに触れることが目的の簡単な記事でしたが、自分自身で触ってみて使いやすさを実感できたことは良かったです。
Lambdaでも実行できるようになったら、どんどん活用できそうですね!
以上、データアナリティクス事業本部のナガマサでした〜

参考記事