EMRのステップ実行でPySparkを使ってRedshiftに出力する

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

こんにちは、小澤です。

Sparkにはspark-redshiftというライブラリがあります。

今回は、このライブラリを使って、EMRのステップ実行からPySparkを呼び出して、Redshiftに結果を出力する方法を解説します。

SparkのDataFrameとデータベース

SparkのDataFrameは入出力としてHDFSなどに置かれているファイルやHiveのメタストアを利用するの他にデータベースの利用も可能です。 これは適切にドライバの設定を行えば、入出力先としてRedshiftも利用可能であるということになります。

それを補助してくれるライブラリがspark-redshiftになります。

PySparkからの利用方法

必要なライブラリとEMRのバージョン

Github上にあるREADMEを確認するとScalaやPython, R, SQLなどの各種言語からの利用方法や動作原理についての記載があります。 ただし、ここではmavenやsbtを使ったライブラリの利用方法は記載してありますが、PySparkからの利用に関してはソースコードのみとなっています。

そのため、まずは必要なライブラリと利用方法について解説します。 EMRで利用する際に必要となるライブラリは以下の3つになります。

また、利用しているEMRの構成は以下のようになっています。

  • emr-5.4.0
  • Hadoop 2.7.3
  • Spark 2.1.0

こちらは2017/10/02現在の情報となりますので、EMR及びライブラリのバージョンが異なる場合、他の必要となるライブラリのバージョンも異なる可能性がありますのでご注意ください。

Pythonのバージョンに関しては、EMRには2.7と3.4がインストールされていますが、今回は3.4を利用しています。 Python 3.4を利用するためにクラスタ作成時に以下の設定をしています。

[
  {
    "Classification" : "spark-env",
    "Configurations" : [
      {
        "Classification":"export",
        "Properties" : {
          "PYSPARK_PYTHON" : "/usr/bin/python3"
        }
      }
    ]
  }
]

ライブラリの指定方法

続いて、これらのライブラリをPySpark実行時に指定する方法を紹介します。

と言ってもこれは通常通りPySparkで依存するJarファイルを指定するのと同じ方法になります。

EMRにsshでログインしてPySparkの対話シェルから利用する場合は、pysparkコマンドの--jarsオプションでjarファイルをカンマ区切りで指定します。

$ pyspark --jars /path/to/spark-redshift.jar,/path/to/spark-avro.jar,/path/to/redshift-odbc.jar

他のライブラリも利用したい場合は同様にカンマ区切りで指定します。 スクリプトを作成して実行する場合はshark-submitで同様のオプションを指定します。

ステップ実行時のオプション指定

EMRでステップ実行をする際のオプション指定の方法も同様になります。 「spark-submitのオプション」に--jarオプションを指定し、「アプリケーションの場所」で実行するpythonのファイルを選択するのみとなります。 その他のオプションを指定する際は同様に並べていけば問題ありません。

awscliやboto3などからステップの追加を行う際も同様です。

処理の記述方法

最初に取り上げたREADMEにも記載がありますが、実際にSparkのDataFrameをRedshiftに出力する方法を見ていきましょう。 使い方としては、DataFrameのwriteでDataFrameWriterを取得してオプションなどを指定していきます。

df.write \
  .format("com.databricks.spark.redshift") \
  .option("url", "jdbc:redshift://redshifthost_endpoint:5439/database_name?user=username&password=password") \
  .option("dbtable", "table_name") \
  .option("tempdir", "s3a://path/for/temp/data") \
  .option("aws_iam_role", "arn:aws:iam::123456789000:role/redshift_iam_role") \
  .mode("overwrite") \
  .save()

formatでは、spark-redshiftで定義されているものを指定しています。

spark-redshiftでは、データを出力する際にRedshiftのcopyコマンドを利用しますので、

  • 対象となるRedshiftのエンドポイント
  • 対象となるテーブル
  • copyコマンド実行の際に利用するS3のパス
  • copyコマンドのクレデンシャルで利用するIAMのロール

を指定しています。 その他のオプションで指定しできるものはREADMEのParametersに記載してありますので参考にしてください。

modeは通常のDataFrameWriterのmodeと同じものが選択可能です。 いずれのmodeでも、テーブルが存在していない場合は新規で作成されるので、あらかじめテーブルを作成していないことによるエラーは発生しません(既存のテーブルに対してerrorやignoreを選択した場合はデータの挿入が実行されません)。 その際の各列の型はDataFrameのスキーマに応じて作成されます。

また、デフォルトではdiststyleはeven, sort keyは無しとなりますので必要に応じてそれらを適切な値に設定しておくといいでしょう。 例外としては既存のテーブルに対してmodeをappendにした場合のみテーブルの設定が反映されたものとなります。

最後にsaveを呼び出すことでRedshiftへのcopyが実行されます。

おわりに

今回はSparkのDataFrameを使った処理の結果を直接Redshiftに出力できるspark-redshiftを紹介しました。

EMRのステップ実行を活用すれば、Hadoopで前処理やETLを行った後にRedshiftにデータをロードするといったシーンでもそれらの処理フローを制御するためのプログラムや実行環境も必要なくなるため役立つ場面も多いのではないかと思います。