Spark Cassandra Connectorを使って、Amazon Linux 2 から Amazon Keyspacesに接続してみた

2022.04.22

いわさです。

Amazon KeyspacesはCassandra互換のデータベースです。
先日、Spark Cassandra Connectorを使いApache Spark上からデータの読み込みや書き込みが出来るようになりました。

本日は Amazon Linux 2 のEC2に必要なモジュールをセットアップし、Amazon Keyspacesへ接続しデータ取得を行いました。

AWS側の準備

AWS側は通常どおりデータベースを作成しておきます。

クライアントからAmazon KeySpacesへの接続ですが、IAMを使います。
Cassandra接続用のユーザーID/パスワードかSigV4認証のどちらかを利用出来ます。後者の場合はアクセスキーやシークレットを設定した上でSigV4プラグインをSparkで利用するのですが、今回うまくいきませんでした。
この記事では前者の、専用のユーザーID/パスワードを発行して認証情報として使用しています。

通常どおりIAMユーザーを作成するのですが、認証情報タブに「Amazon Keyspaces (for Apache Cassandra) の認証情報」というエリアがあります。
CodeCommitのGit認証ユーザーと同じでIAMユーザーに紐づく形で専用のID/パスワードを発行する仕組みになっています。

操作権限についてはIAMユーザーへアタッチされるポリシーで制御が出来ます。
マネージドポリシーも用意されているのですが、フルアクセスと読み取り専用のふたつだけです。

フルアクセスのほうはcassandraの全アクションに加えて、オートスケールやCloudWatchアラームの作成・削除権限なども許可されています。
読み取り専用についてはcassandraのSelectのみです。

Cassandoraの全アクションは以下となっています。
必要に応じてカスタムポリシーの作成が可能です。

AWS側としてはKeySpacesリソースを作成し、アクセス権限のあるIAMユーザーを作成するところまでです。

Spark環境の準備

今回はAmazon Linux 2 EC2に準備をしますがインターネット経由でアクセスするただのクライアントなのでEC2じゃなくても問題ないです。
ただし、インストールモジュールはプラットフォームにあわせたものを用意する必要があります。

ドキュメントでは前提として以下が必要とされています。
最初Java11+Scala3で進めていたのですがSigV4周りでつまづき、今回の記事ではだいたい以下と同じ構成で用意しました。

  • Java 8+
  • Scala 2.11+
  • Spark 2.4+
  • Cassandra Connector 2.5+
  • Cassandra Driver 4.12

JDKとScalaについては先日以下の記事でセットアップ方法に触れていますので参考にしてください。

Sparkは以下の手順で導入しています。
Prerequisites for establishing connections to Amazon Keyspaces with the Spark Cassandra Connector - Amazon Keyspaces (for Apache Cassandra)

[root@ip-172-31-10-157 ec2-user]# curl -o spark-3.1.2-bin-hadoop3.tgz -k https://dlcdn.apache.org/spark/spark-3.1.2/spark-3.1.2-bin-hadoop3.2.tgz
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100  218M  100  218M    0     0  68.1M      0  0:00:03  0:00:03 --:--:-- 68.1M
[root@ip-172-31-10-157 ec2-user]# 
[root@ip-172-31-10-157 ec2-user]# # now to untar
[root@ip-172-31-10-157 ec2-user]# tar -zxvf spark-3.1.2-bin-hadoop3.tgz
spark-3.1.2-bin-hadoop3.2/
spark-3.1.2-bin-hadoop3.2/R/
spark-3.1.2-bin-hadoop3.2/R/lib/
spark-3.1.2-bin-hadoop3.2/R/lib/sparkr.zip

:

[root@ip-172-31-10-157 ec2-user]# export SPARK_HOME=$PWD/spark-3.1.2-bin-hadoop3.2

また、Keyspacesへクライアントから接続する際にはTLSが必要になるため、Starfieldデジタル証明書をダウンロードし、トラストストア用にJKS形式に変換します。
Spark起動時の構成ファイルで指定することでJVMのトラストストアに設定される仕組みのようです。

[ec2-user@ip-172-31-14-115 ~]$ curl https://certs.secureserver.net/repository/sf-class2-root.crt -O
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100  1468  100  1468    0     0   2499      0 --:--:-- --:--:-- --:--:--  2496
[ec2-user@ip-172-31-14-115 ~]$ openssl x509 -outform der -in sf-class2-root.crt -out temp_file.der
[ec2-user@ip-172-31-14-115 ~]$ keytool -import -alias cassandra -keystore cassandra_truststore.jks -file temp_file.der
キーストアのパスワードを入力してください:  
新規パスワードを再入力してください: 

:

この証明書を信頼しますか。 [いいえ]:  はい
証明書がキーストアに追加されました
[ec2-user@ip-172-31-14-115 ~]$

先程セットアップしたSparkからシェルを実行する際に、packagesを指定します。
また、ID/パスワードや接続先などを定義した構成ファイルを新規作成しています。(application.conf)
なお、SigV4の場合はさらにプラグインを追加する必要があります。

application.conf

datastax-java-driver {
        basic.contact-points = ["cassandra.ap-northeast-1.amazonaws.com:9142"]
        basic.load-balancing-policy {
            class = DefaultLoadBalancingPolicy
            local-datacenter = ap-northeast-1
        }
        basic.request {
              consistency = LOCAL_QUORUM
        }
        advanced {
            auth-provider = {
            class = PlainTextAuthProvider
                    username = "(IAMコンソールで作成したCassandra認証情報のID)"
                    password = "(IAMコンソールで作成したCassandra認証情報のパスワード)"
                    aws-region = "ap-northeast-1"
            }
            ssl-engine-factory {
                class = DefaultSslEngineFactory
                truststore-path = "(JKSファイル格納先)/cassandra_truststore.jks"
                truststore-password = "(JKS作成時のパスワード)"
                hostname-validation=false
            }
            metadata = {
                schema {
                     token-map.enabled = true
                }
            }
        }    
}
[ec2-user@ip-172-31-14-115 bin]$ ./spark-shell --files application.conf --conf spark.cassandra.connection.config.profile.path=application.conf --packages com.datastax.spark:spark-cassandra-connector_2.12:3.1.0 --conf spark.sql.extensions=com.datastax.spark.connector.CassandraSparkExtensions

:

Spark session available as 'spark'.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 3.1.2
      /_/

Using Scala version 2.12.10 (OpenJDK 64-Bit Server VM, Java 1.8.0_322)
Type in expressions to have them evaluated.
Type :help for more information.

scala>

シェルが立ち上がりました。

データ取得

Sparkからデータの取得を行ってみます。
注意点としてCassandraコネクタを使うようにシェル操作の最初にインポートを行う必要があります。

scala> import org.apache.spark.sql.cassandra._
import org.apache.spark.sql.cassandra._

scala> var df = spark.read.cassandraFormat("hogetable", "iwasahogeconnectorkeyspace").load()
22/04/22 06:42:52 WARN CassandraConnectionFactory: Ignoring all programmatic configuration, only using configuration from application.conf
df: org.apache.spark.sql.DataFrame = [dataid: int, name: string]

scala> df.show()
+------+----+                                                                   
|dataid|name|
+------+----+
|     4| ddd|
|     1| aaa|
|     3| ccc|
|     2| aaa|
|     5| eee|
+------+----+

scala> val dfFilterd = df.filter(df("dataid") > 2)
dfFilterd: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [dataid: int, name: string]

scala> dfFilterd.show
+------+----+
|dataid|name|
+------+----+
|     5| eee|
|     4| ddd|
|     3| ccc|
+------+----+

ここではデータの読み込みとフィルタリングを行ってみました。
他にもデータをCSVからテーブルへローディングしたり色々行ったのですが、手順は公式ドキュメントに記載されているので以下を参考にしてください。

推奨: Apache Cassandra Sparkコネクターの設定

こちら今回は割愛したのですが実際のワークロードで利用する際は様々なパラメータのチューニングが推奨されています。

  • 8MB未満のSparkパーティションサイズを作成する
  • 再試行が多い場合に同時書き込み回数を少なくする
  • 複数のCassandraセッションに分散
  • ランダムアクセスパターンを改善するためバッチ処理をオフ
  • SPARK_LOCAL_DIRSを設定する
  • 1秒あたりのリクエストのレートを制限

詳細は以下をご確認ください。

さいごに

今回は東京リージョンのKeySpacesへインターネット経由での接続を行いましたが、確認したところ東京リージョンのCassandraのVPCエンドポイントがあるようなので、EC2からVPCエンドポイント経由でプライベートな通信も出来るかもしれません。これはちょっとやってみたいですね。

また、今回はうまくSigV4認証での接続がうまくいきませんでした。
同じ問題にあった方のためにエラーメッセージだけ残しておきますが、いずれリベンジしたいと思います。長いのでスタックトレースだけ一部割愛します。

scala> var df = spark.read.cassandraFormat("hogetable", "iwasahogeconnectorkeyspace").load()
22/04/22 00:43:28 WARN CassandraConnectionFactory: Ignoring all programmatic configuration, only using configuration from application.conf
22/04/22 00:43:29 WARN InternalDriverContext: Option METADATA_SCHEMA_CHANGE_LISTENER_CLASS has been deprecated and will be removed in a future release; please use option METADATA_SCHEMA_CHANGE_LISTENER_CLASSES instead.
22/04/22 00:43:29 WARN InternalDriverContext: Option METADATA_NODE_STATE_LISTENER_CLASS has been deprecated and will be removed in a future release; please use option METADATA_NODE_STATE_LISTENER_CLASSES instead.
22/04/22 00:43:29 WARN InternalDriverContext: Option REQUEST_TRACKER_CLASS has been deprecated and will be removed in a future release; please use option REQUEST_TRACKER_CLASSES instead.
java.io.IOException: Failed to open native connection to Cassandra at Profile based config at application.conf :: Error instantiating class DefaultSslEngineFactory (specified by advanced.ssl-engine-factory.class): Cannot initialize SSL Context
  at com.datastax.spark.connector.cql.CassandraConnector$.createSession(CassandraConnector.scala:173)
:
  ... 49 elided
Caused by: java.lang.IllegalArgumentException: Error instantiating class DefaultSslEngineFactory (specified by advanced.ssl-engine-factory.class): Cannot initialize SSL Context
  at com.datastax.oss.driver.internal.core.util.Reflection.resolveClass(Reflection.java:336)
:

Caused by: java.lang.IllegalStateException: Cannot initialize SSL Context
  at com.datastax.oss.driver.internal.core.ssl.DefaultSslEngineFactory.<init>(DefaultSslEngineFactory.java:74)
:
  ... 19 more
Caused by: java.nio.file.NoSuchFileException: /hoge/ec2-user/hoge/cassandra_truststore.jks
  at java.base/sun.nio.fs.UnixException.translateToIOException(UnixException.java:92)
:
  ... 24 more