Amazon Athena JDBCプログラミングガイド

2017.03.31

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

はじめに

今回は私が Amazon Athena で JDBCプログラミングを実際に書いて得られた知見についてまとめてみました。各種チューニングパラメタの使い方、実際のDDLや参照系クエリーを例に解説します。

JDBCドライバーの入手

公開されたS3バケットからダウンロードから取得する方法とAmazon Athena のユーザーガイド(英語マニュアル)のリンクからダウンロードする方法の2種類あります。 現在は、JDBC 4.1互換のドライバが提供されています。

公開されたS3バケットからダウンロード

誰でも公開されたS3バケットからダウンロードできます。例えば、AWS CLIでは以下のようにダウンロードできます。

aws s3 cp s3://athena-downloads/drivers/AthenaJDBC41-1.0.0.jar [ local_directory ]

Amazon Athena のユーザーガイド(英語マニュアル)からダウンロード

Amazon Athena のユーザーガイド(英語マニュアル)Downloading the Driver の リンクからダウンロードできます。

20170331-athena-jdbc-download

JDBCドライバで接続する

入手したJDBCドライバを使って、Redshiftに接続してみます。

JDBC接続文字列

現在(2017/3/31)、サービス提供しているリージョンは米国東部(バージニア北部)、米国東部 (オハイオ)、米国西部 (オレゴン)の3リージョンです。以下のようにJDBC接続文字列はリージョンのコードが含まれます。

jdbc:awsathena://athena.<REGION>.amazonaws.com:443

よって、JDBC接続文字列はリージョン毎に異なり、以下のようになります。

リージョン URL
米国東部(バージニア北部) jdbc:awsathena://athena.us-east-1.amazonaws.com:443
米国東部 (オハイオ) jdbc:awsathena://athena.us-east-2.amazonaws.com:443
米国西部 (オレゴン) jdbc:awsathena://athena.us-west-2.amazonaws.com:443

接続の例

一般的なJDBCプログラムと異なる点が2つあります。1つ目は S3アクセス可能なパーミッションが付与されている認証情報を設定します。下記の例では aws_access_key_idaws_secret_access_key をそれぞれ指定しています。2つ目はクエリを実行した結果を保存先である s3_staging_dir を指定する必要があります。つまり、認証情報(user と password)は、データソースであるS3ファイルを読み込み、クエリ結果を s3_staging_dir の下に保存するための情報です。

Connection、Statement、ResultSetともにAutoCloseableインタフェースが実装さてれてますので、以下の例では、try-with-resources 文を利用して、スコープの終わりで各リソースが確実に閉じられるようにします。従来必要であったオブジェクトのnull判定、closeが不要になり、コードがスッキリします。Java7以降のスタンダードなコーティングスタイルです。

	protected void runSQL() {
		String athenaUrl = "jdbc:awsathena://athena.us-east-1.amazonaws.com:443";
		
		String stagingDirKeyName = DigestUtils.md5Hex(String.valueOf(System.nanoTime()));

		String sql = "SELECT "
				+ " product_category,"
				+ " sum(sales) as sales,"
				+ " sum(profit) as profit"
				+ " FROM tempdb.orders_summary GROUP BY 1 ORDER BY 1 LIMIT 10;";

		Properties dbParam = new Properties();
		dbParam.put("user", "ABCDEFGHIJKLMNOPQRST");
		dbParam.put("password", "depTYF7bj4XVPp3RF6vgp3RFHp1jpPdML36vgjwU");
		dbParam.put("s3_staging_dir", "s3://cm-data/" + stagingDirKeyName + "/");

		try (Connection conn = DriverManager.getConnection(athenaUrl, dbParam);
				Statement st = conn.createStatement(); 
				ResultSet rs = st.executeQuery(sql)) {
			while (rs.next()) {
				System.out.println("---");
				System.out.println("product category: " + rs.getString("product_category"));
				System.out.println("sales: " + rs.getString("sales"));
				System.out.println("profit: " + rs.getString("profit"));
			}
		} catch (SQLException e) {
			e.printStackTrace();
		}
	}

設定オプション

設定オプションはRedshiftに接続するときに JDBC URL に付加します。

JDBC ドライバの設定オプション

JDBCドライバの次のオプションを構成できます。

S3 Staging Directory

Amazon Athenaは、s3_staging_dir にバケット もしくは バケットとフォルダ 指定しなければなりません。クエリを実行すると s3_staging_dir の下に実行結果に関するCSVファイルとメタデータファイルを出力します。クエリを実行するたびにこれらの新ファイルが作成されますので、実行後はファイルが不要であれば削除を忘れないでください。

プロパティ名 説明 デフォルト値 必須
s3_staging_dir クエリ出力が書き込まれるAmazon S3の場所。その後、JDBCドライバはAthenaのクエリ結果を読み取り、ユーザーにデータの行を戻すように要求します。

例. "s3://cm-bucket/staging_dir/"

認証情報(アクセスキー / AWS Credentials Provider)

Amazon Athenaは、認証情報は、Athena や Amazon S3バケットなどのAWSサービスやリソースにアクセスするための資格情報が必要です。認証情報に 「アクセスキー」 もしくは 「AWS Credentials Provider」 のどちらかを指定しなければなりません。

・アクセスキーを指定する場合

user に aws_access_key_id、password にaws_secret_access_keyを指定します。

・AWS Credentials Providerを指定する場合

aws_credentials_provider_class に認証情報プロバイダのクラスを指定します。指定した認証情報プロバイダのクラスに引数の指定が必要な場合は aws_credentials_provider_arguments に引数をカンマ区切りの値で指定します。

 

プロパティ名 説明 デフォルト値 必須
user aws_access_key_idを指定する。

例."ABCDEFGHIJKLMNOPQRST"

password aws_secret_access_key を指定する。

例."depTYF7bj4XVPp3RF6vgp3RFHp1jpPdML36vgjwU"

aws_credentials_provider_class AWSCredentialsProviderインタフェースを実装する認証情報プロバイダのクラス名。

例. "com.amazonaws.auth.profile.ProfileCredentialsProvider"

aws_credentials_provider_arguments Credentials Provider のコンストラクタの引数をカンマ区切りの値で指定します。引数が不要な場合はこのプロパティの指定は不要です。

例. "profile datalake"

リトライ・タイムアウト関連

デフォルトではconnection_timeout(接続タイムアウト)と socket_timeout(ソケットタイムアウト)は10,000msです。これらリクエストのエラーが発生すると100ms から 1000ms の範囲でディレイした後、最大10回リトライします。

Athenaのサービス上の制限はクエリータイムアウトが30分(デフォルト)なので、connection_timeout や socket_timeoutは、必然的にクエリタイムアウトよりも短く設定する事になります。

プロパティ名 説明 デフォルト値 必須
max_error_retries JDBCクライアントがAthenaに要求を試みる最大再試行回数。 10 X
retry_base_delay Athenaへの再試行の試行間の最小遅延量(ミリ秒単位)。 100 X
retry_max_backoff_time Athenaに接続しようとする再試行の最大遅延時間(ミリ秒単位)。 1000 X
connection_timeout 試行が終了する前にAthenaに正常に接続するための最大時間(ミリ秒単位)。 10000 X
socket_timeout Athenaにデータを送信するためにソケットを待機する最大時間(ミリ秒単位)。 10000 X

ログ

JDBCドライバの実行ログの出力について設定します。Amazon AthenaはRedshiftに比較するとあまり具体的なSQLエラーの理由を出力してくれないのですが、代わりにログファイルに詳細を出力しますので、パラメタが反映されているか、SQLのどこでエラーが発生しているのか具体的に把握することができます。

公式マニュアル(英語)では log_level デフォルト値が「ー」ですが、log_pathのみ指定した場合に DEBUG が出力されていましたので、以下の表ではDEBUGとしています。

プロパティ名 説明 デフォルト値 必須
log_path Athena JDBCドライバのログのローカルパス。ログのパスが指定されていない場合、ログファイルは作成されません。

例. "/var/log/athena.log"

X
log_level Athena JDBCドライバログのログレベルを指定します。

  • INFO
  • DEBUG
  • WARN
  • ERROR
  • ALL
  • OFF
  • FATAL
  • TRACE

例. "INFO"

"DEBUG" X

ユースケースとコード例

DDLや参照系 Queryを実行する方法について、実際のコードを例に解説します。

DDLの実行

公式マニュアルのサンプルコードでは、DDLをexecuteQuery()で実行するように記載されていますが、私の環境ではエラーになりましたので代わりに execute()でDDLを実行しています。

一時的にテーブル定義してクエリーの結果を得る場合や、S3ファイルの追加イベントでパーティションを自動追加するなどといったユースケースが考えられます。

テーブルの作成

DDL内のスラッシュのエスケープに注意が必要ですが、SQLを外部テンプレート化してしまえばこの煩わしさから開放されます。

	/** Athena URL */
	static final String athenaUrl = "jdbc:awsathena://athena.us-east-1.amazonaws.com:443";
	
	/** DDL */
	static final String sql = "CREATE EXTERNAL TABLE IF NOT EXISTS tempdb.orders_summary ("
			+ "  product_category string,"
			+ "  product_sub_category string,"
			+ "  product_name string,"
			+ "  sales bigint,"
			+ "  profit bigint"
			+ ")"
			+ " ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'"
			+ " WITH SERDEPROPERTIES ("
			+ "  'serialization.format' = '\\t',"
			+ "  'field.delim' = '\\t'"
			+ ") LOCATION 's3://cm-data/orders_temp/';";
	
	protected void runSQL(String sql) {
        /* DB Parameter */
		Properties dbParam = new Properties();
		dbParam.put("user", "ABCDEFGHIJKLMNOPQRST");
		dbParam.put("password", "depTYF7bj4XVPp3RF6vgp3RFHp1jpPdML36vgjwU");
		dbParam.put("s3_staging_dir", "s3://cm-data/souece/");
		
		try (Connection conn = DriverManager.getConnection(athenaUrl, dbParam);
				Statement st = conn.createStatement()) {
			st.execute(sql);
		} catch (SQLException e) {
			e.printStackTrace();
		}
	}
	
	public static void main(String[] args) {
		AthenaDDL query = new AthenaDDL();
		query.runSQL(sql);
	}

テーブルの削除

「テーブルの作成」のSQLを変更するのみです。

	/** DDL */
	static final String sql = "DROP TABLE tempdb.orders_summary;"

参照系 Query の実行

参照系クエリーは executeQuery() と execute() で実行できることを確認しています。プレースホルダを利用するpreparedStatementはサポートされていないないため、実行するとnot implimanted が出力されます。プレスフォルダがサポートされていないので、SQLインジェクションには注意が必要です。

結果セットからデータを取得

「接続の例」で紹介したコードとほぼ一緒なのですが、ここでは 認証情報にuser/passwordではなく、AWS Credentials Providerを指定しています。

	protected void runSQL() {
		String athenaUrl = "jdbc:awsathena://athena.us-east-1.amazonaws.com:443";
		
		String stagingDirKeyName = DigestUtils.md5Hex(String.valueOf(System.nanoTime()));

		String sql = "SELECT "
				+ " product_category,"
				+ " sum(sales) as sales,"
				+ " sum(profit) as profit"
				+ " FROM tempdb.orders_summary GROUP BY 1 ORDER BY 1 LIMIT 10;";

		Properties dbParam = new Properties();
		dbParam.put("aws_credentials_provider_class", "com.amazonaws.auth.DefaultAWSCredentialsProviderChain");
		dbParam.put("s3_staging_dir", "s3://cm-data/" + stagingDirKeyName + "/");

		try (Connection conn = DriverManager.getConnection(athenaUrl, dbParam);
				Statement st = conn.createStatement(); 
				ResultSet rs = st.executeQuery(sql)) {
			while (rs.next()) {
				System.out.println("---");
				System.out.println("product category: " + rs.getString("product_category"));
				System.out.println("sales: " + rs.getString("sales"));
				System.out.println("profit: " + rs.getString("profit"));
			}
		} catch (SQLException e) {
			e.printStackTrace();
		}
	}

S3ファイルからデータを取得

Amazon Athenaのようにビッグデータを扱うサービスでは、クエリの結果が大きな場合が度々起こりえます。そのような場合は結果セットから実行を取得せず s3_staging_dir に指定したS3オブジェクトを取得する方法が有効です。以下の例では、実行して結果を取得せずに終了しています。実行結果は s3://cm-data/query-result/" の下に.csv ファイルとして保存されていますので、実行後にそのS3オブジェクトを取得することで結果を得ることができます。ビックデータを取り扱う実践的なテクニックです。

	protected void dumpSQL() {

		String athenaUrl = "jdbc:awsathena://athena.us-east-1.amazonaws.com:443";
		
		String stagingBucketName = "cm-data";
		String stagingDirKeyName = "query-result";

		String sql = "SELECT "
				+ " product_category,"
				+ " sum(sales) as sales,"
				+ " sum(profit) as profit"
				+ " FROM tempdb.orders_summary GROUP BY 1 ORDER BY 1 LIMIT 10;";

		Properties dbParam = new Properties();
		dbParam.put("user", "ABCDEFGHIJKLMNOPQRST");
		dbParam.put("password", "depTYF7bj4XVPp3RF6vgp3RFHp1jpPdML36vgjwU");
		dbParam.put("s3_staging_dir", "s3://" + stagingBucketName + "/" + stagingDirKeyName + "/");

		try (Connection conn = DriverManager.getConnection(athenaUrl, dbParam);
				Statement st = conn.createStatement()) {
			st.execute(sql);
			
			/* Get data from Staging directory */
		} catch (SQLException e) {
			e.printStackTrace();
		}
	}

更新系 Query の実行

更新系クエリーの実行はサポートされていませんので、トランザクションをサポートしていません。

最後に

Amazon Athena で JDBCプログラミングする時のポイントや制限事項をまとめますと、

  • 更新系クエリーはサポートされていない
  • DDL・参照系クエリーをともに実行するならexecute()を用いる
  • preparedStatementはサポートされていないので、SQLインジェクションには注意が必要
  • 大きな結果の取得は結果セットから取得する以外に s3_staging_dir に指定したS3オブジェクトを取得する方法が有効
  • s3_staging_dir に作成されたS3オブジェクトは自動的には削除されないので忘れずにクリーナップする

あと、JDBCのインタフェースの互換を保たれてはいますが、バックエンドはRDBではないという意識を持つと良いでしょう。

Amazon Athena のJDBCドライバを利用すれば、JavaとSQLの経験があるエンジニアであれば簡単にプログラミングできます。実際に私もETLを自動化するプログラムを作成しましたがS3上のファイルをテーブル定義してクエリを実行、更にクエリーを実行、結果をCSV出力して、Redshiftにロードしています。この一連の流れがスムーズに自動化されるのは非常に気持ちが良いものです。 Amazon Athena はこれまでS3にバックアップしていたデータがあたかもテーブルであるかのように参照できますので、これまで活用できなかったバックアップしている生データの分析や、これらデータ分析の自動化等にどうぞご活用ください。