この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。
いわさです。
先日 Amazon S3 バケットのデータを Azure Data Explorer へ取り込むために Azure Data Factory を使いました。
このように AWS に分析対象のワークロードがあって、Azure に機械学習や分析の基盤があるような場合に、Azure Data Factory を使うことで ETL パイプラインを構築出来ることがわかりました。
前回は単純にコピーアクティビティを一度実行するだけで、パイプラインとしての意識はなく、またデータの加工や抽出も行っていませんでした。
今回は AWS から Azure へ抽出と加工を使ったパイプラインを作成してみたいと思います。
コピーアクティビティとデータフローアクティビティ
Azure Data Factory ではアクティビティを組み合わせてパイプラインを作成します。
主なアクティビティとしてはデータ移動(コピー)アクティビティ、データフロー(変換)アクティビティ、カスタムアクティビティなどがあります。
コピーアクティビティではソースデータストアからシンクデータストア(ターゲット)にデータをコピーします。
シンプルにデータの移動だけを行うものですが、多様なデータストアに対応していて、Amazon RDS for SQL Server もソースデータストアの場合はサポートされています。
データフローアクティビティの場合はデータを抽出しつつ変換や様々な処理を行うことが出来ます。
こちらは Amazon RDS for SQL Server はサポートされていないようです。ソース・シンクデータストアとして Azure SQL Database はサポートされています。
カスタムアクティビティは少し特殊なのですが、上記2つの標準アクティビティでサポートされていないデータストアとの間でデータを移動する場合に独自ロジックを実行することが出来ます。
今回はパイプラインとふたつのアクティビティ(コピーとデータフロー)を組み合わせて、色々やってみたいと思います。
- Amazon RDS for SQL Server から Azure SQL Database へのコピーアクティビティ
- Azure SQL Database から Azure SQL Database へのデータフローアクティビティ
カスタムアクティビティに興味のある方は以下のドキュメントをご確認ください。
データストアの事前準備
詳細な手順は割愛しますが、AWS 上にはパブリックアクセス可能な Amazon RDS for SQL Server を用意します。
そして 1 つのテーブルと 5 つのレコードを作成します。
レコードごとに異なる日付値を設定しておきます。
CREATE TABLE [dbo].[hogeaws](
[HogeId] [tinyint] NOT NULL,
[HogeName] [nvarchar](25) NOT NULL,
[HogeDate] [date] NOT NULL)
INSERT INTO [dbo].[hogeaws](HogeId, HogeName, HogeDate) VALUES (1, 'aaa', DATEADD(day, 2, GETDATE()));
INSERT INTO [dbo].[hogeaws](HogeId, HogeName, HogeDate) VALUES (2, 'bbb', DATEADD(day, 1, GETDATE()));
INSERT INTO [dbo].[hogeaws](HogeId, HogeName, HogeDate) VALUES (3, 'ccc', DATEADD(day, 0, GETDATE()));
INSERT INTO [dbo].[hogeaws](HogeId, HogeName, HogeDate) VALUES (4, 'ddd', DATEADD(day, -1, GETDATE()));
INSERT INTO [dbo].[hogeaws](HogeId, HogeName, HogeDate) VALUES (5, 'eee', DATEADD(day, -2, GETDATE()));
Azure 上にはパブリックアクセス可能な Azure SQL Database を用意します。
先程と同じ構造のテーブルを 2 つ作成しておきます。
CREATE TABLE [dbo].[table1](
[HogeId] [tinyint] NOT NULL,
[HogeName] [nvarchar](25) NOT NULL,
[HogeDate] [date] NOT NULL);
CREATE TABLE [dbo].[table2](
[HogeId] [tinyint] NOT NULL,
[HogeName] [nvarchar](25) NOT NULL,
[HogeDate] [date] NOT NULL);
データセット作成
データソースが用意出来たら、データセットとして定義します。
アクティビティでデータストアを使用するためにはデータセットというデータストアに関する情報が定義されたオブジェクトを用意します。
データストアへの接続情報やスキーマ情報などはデータストアが保持しています。
ファクトリリソースのデータセットメニューから「新しいデータセット」を選択することで作成することが出来ます。
Amazon RDS
まずはデータストアで AMazon RDS for SQL Server を選択します。
新しいリンクサービスで接続情報に必要な情報を設定します。
この時点でテスト接続を行ってみましょう。
失敗しました。
リンクサービス設定箇所にて「統合ランタイム経由で接続」という項目がありました。
統合ランタイムによってデータストアへの接続が行われます。自分でネットワークを構築して統合ランタイムをホスティングすることも出来ますが、ここではデフォルトのAutoResolveIntegrationRuntime
を選択しています。
これは Azure マネージドな統合ランタイムです。マネージド統合ランタイムからのアクセスが Amazon RDS for SQL Server 側で許可されていないため接続に失敗しました。
マネージド統合ランタイムの IP アドレス範囲は公開されていて、以下からダウンロードすることが出来ます。
今回はセキュリティグループで IP アドレスを許可する方針としています。
:
{
"name": "DataFactory.JapanEast",
"id": "DataFactory.JapanEast",
"properties": {
"changeNumber": 5,
"region": "japaneast",
"regionId": 24,
"platform": "Azure",
"systemService": "DataFactory",
"addressPrefixes": [
"13.78.109.192/28",
"20.43.64.128/25",
"20.43.65.0/26",
"20.43.70.120/29",
"20.191.164.0/24",
"20.191.165.0/26",
"20.210.70.88/29",
"40.79.187.208/28",
"40.79.195.224/28",
"2603:1040:407:1::480/121",
"2603:1040:407:1::500/122",
"2603:1040:407:1::700/121",
"2603:1040:407:1::780/122",
"2603:1040:407:402::330/124",
"2603:1040:407:802::210/124",
"2603:1040:407:c02::210/124"
],
"networkFeatures": [
"API",
"NSG"
]
}
:
セキュリティグループ設定後にテスト接続してみると接続に成功しました。
接続が出来たら自動でスキーマが認識されているはずなので、データセットとして指定するテーブル名を選択します。
Azure SQL Database
Azure SQL Database も同じように選択します。
こちらも SQL Server のファイヤウォールエラーが発生しますのでアクセス出来るように許可します。
先程と同じように IP アドレスで指定も出来るはずですが、Azure SQL Database の SQL Server ファイヤウォールでは「Azure からのアクセスを許可する」という簡単設定も用意されていて、今回はこちらを使いました。
コピーアクティビティの作成
では、まずはコピーアクティビティから作成しましょう。
パイプラインのサイドメニューから「移動と変換」の「データのコピー」を選択します。
コピーアクティビティでもいくつか設定のカスタマイズが可能ですが、今回はシンプルにデータストアのみを指定することにします。
カスタムクエリが使えるのでここでも抽出や変換は出来たりしますが、今回はパイプラインで複数アクティビティを組み合わせることを試したいので単純にコピーだけしてみます。
ソースデータストアには Amazon RDS for SQL Server のデータセットを指定します。
シンクデータストアには Azure SQL Database の table1 のデータセットを指定します。
これで、AWS から Azure へシンプルなデータのコピーが行われるはずです。
データフローアクティビティの作成
続いてデータフローアクティビティを構成します。
ここでは AWS からコピーされたデータに対して、さらに本日以降の日付フィールドでフィルターし、特定フィールド文字列を小文字から大文字に変換してみようと思います。
まずはデータフローを新規作成します。
データフローではまずはソースデータストアに Azure SQL Database のデータセットを指定します。
データストアタスクの右下に「+」アイコンがあり、こちらから後続タスクを追加することが出来ます。
まずはフィルターを追加します。
フィルター条件としてはHogeDate
が本日以降のもののみを抽出対象としています。
このあたりを構成する際にクエリ式を記述することになります。詳細は以下をご確認ください。
続いて「派生列」タスクを使ってフィールド値の変換も行います。
以下のように様々なタスクが用意されているので色々と試してみましょう。
最後にシンクデータストアも指定します。
これで抽出し・加工したデータが出力される形になります。
パイプラインの作成
まだ少しやることがあります。
アクティビティはパイプラインで実行するオブジェクトなので、パイプラインの構成も必要です。
データコピーアクティビティは既にパイプライン上に配置されているはずなので、データフローアクティビティを配置します。
配置後にプロパティで先程作成したデータフローを指定します。
そしてコピーアクティビティの「+」アイコンから「成功」を選択し、追加されたラベルからデータフローアクティビティまでドラッグアンドドロップします。
アクティビティ同士が接続されました。
これで、コピーアクティビティが成功したのちにデータフローアクティビティが実行されます。
今回は接続しただけの非常にシンプルなパイプラインですが、パイプラインでは様々な制御を行うためのコンポーネントが用意されています。
実行
パイプラインが作成されたので、最後にパイプラインを実行しましょう。
実行するためにはトリガーが必要です。
その場で即時実行も出来るし、スケジューリング設定したりイベント駆動にも出来ます。
今回は「今すぐトリガー」をすることで即時実行しましょう。
実行中
実行中のパイプラインおよび実行履歴はモニターメニューから状況を確認することが出来ます。
アクティビティのアイコンをクリックすると処理の詳細な処理状況や結果を確認することも出来ます。
データフローアクティビティも確認してみましょう。
フィルターで3件処理されたんだなーというのがわかったりします。
実行後
さて、処理が終わったので結果を見てみましょう。
Amazon RDS for SQL Server に 5 件あったレコードのうち、HogeDate
が当日以降のもののみ抽出され、さらにHogeName
が小文字から大文字に変換されて、Azure SQL Database へロードされていますね。
成功です。
さいごに
本日は Azure Data Factory を使って Amazon RDS から Azure SQL Database へデータの抽出と変換をしてみました。
ブログだと少し長くなりましたが、簡単なパイプラインであれば実際はすぐに構築することが出来ました。
また、動的クエリを組み込もうとすると少し構文や関数を調べながら行う必要がありますが、それでもかなり実装しやすかったという所感です。
データストアについてはアクティビティの種類によって対応・非対応があるようなので、この点だけ少し注意しながら対象となっているか確認が必要ですね。