Snowflakeで定期的にCOPYコマンドを実行するタスクを作ってみた

2021.07.01

こんにちは!エノカワです。

Snowflakeには、SQLステートメントを実行するタスクという機能があります。
スケジュールを定義して、テーブルへのデータ投入などの定期的な作業を実行することができます。

また、タスクAの実行後にタスクBを実行するといった依存関係を定義することもできます。

今回は、定期的にCOPYコマンドを実行するタスクを作ってみました。

前提条件

Citi Bikeというシェアサイクリングサービスの利用者に関するデータを使用します。
以下の記事を参考にロードするデータの準備を行います。

ロードするデータの準備が完了すると、以下が作成された状態になります。

  • データベース:CITIBIKE
  • テーブル:TRIPS
  • ステージ:CITIBIKE_TRIPS ※今回使用しません
  • ファイル形式:CSV

タスクの概要

以下のようなイメージで動くタスクを作成します。

  • タスクA
    • 毎時0分に起動する
    • COPYコマンドでTRIPSテーブルのデータをステージのファイルにアンロードする
    • アンロードするファイルは年ごとに分割する
  • タスクB
    • タスクAの実行完了後に起動する
    • COPYコマンドでステージにあるファイルのデータをテーブルにロードする
    • ロード先のテーブルは年ごとに分かれており、一致する年のファイルをロードする

使用するテーブル

TRIPSテーブルを使用します。

テーブルの構造は下記のようになっています。
STARTTIMESTOPTIMEは、シェアサイクリングサービスの利用開始時刻と終了時刻です。

アンロードするデータ

タスクAでは、下記の条件でアンロードするデータを絞り込みます。

  • タスクAが起動した時刻の時間(HOUR)とSTOPTIMEの時間(HOUR)が一致するデータ
    • 例)15:00に起動した場合、STOPTIMEが15時台のデータのみアンロードする

ロードするデータ

タスクBでは、下記の3つの項目をロードします。

  • ロードしたファイルの名前
  • STOPTIME(シェアサイクリングサービスの終了時刻)
  • タスクBの実行時刻

準備

実際にタスクを作成る前に、準備を行います。

ワークシート上で下記SQLを実行します。

コンテキスト設定

USEコマンドで今回の検証で使用するコンテキストを設定します。

USE ROLE SYSADMIN;
USE WAREHOUSE COMPUTE_WH;
USE CITIBIKE;
USE SCHEMA PUBLIC;

ロード先のテーブル作成

CRATE TABLEコマンドで2013年、2014年、2015年の3つのテーブルを作成します。

create or replace table TRIPS_2013_TABLE (
  filename varchar,
  stoptime timestamp,
  load_datetime timestamp
);
create or replace table TRIPS_2014_TABLE like TRIPS_2013_TABLE;
create or replace table TRIPS_2015_TABLE like TRIPS_2013_TABLE;

アンロード先のステージ作成

CREATE STAGEコマンドでアンロード先のステージを作成します。
今回は名前付きステージを使用します。

create or replace stage TRIPS_UNLOAD_STAGE;

タイムゾーン設定

検証を進めていく途中でタイムスタンプを確認しますが、
タイムゾーンがデフォルトのAmerica/Los_Angeles(太平洋標準時)のままだと分かりにくいので、
ALTERコマンドでタイムゾーンをAsia/Tokyoに変更しておきます。

alter session set TIMEZONE = 'Asia/Tokyo';

準備が整いました。

それでは、実際にタスクを作成していきましょう。

タスク作成

タスクは、CREATE TASKコマンドで作成することができます。

タスクA:ステージにアンロードするタスク

create or replace task TRIPS_UNLOAD_TASK
  WAREHOUSE = COMPUTE_WH
  SCHEDULE = 'USING CRON 0 */1 * * * Asia/Tokyo'
  TIMEZONE = 'Asia/Tokyo'
  COMMENT = ' TRIPSテーブルからデータをアンロード'
AS
  copy into @TRIPS_UNLOAD_STAGE
    from (select * from TRIPS
          where date_part(year, stoptime) in (2013, 2014, 2015)
            and date_part(hour, stoptime) = date_part(hour, current_timestamp))
          partition by (date_part(year, stoptime) || '/' || date_part(hour, stoptime))
          file_format = (format_name = CSV compression='GZIP')
;

2〜5行目では、タスクのパラメータを指定しています。

  • WAREHOUSE:タスクの実行に使用される仮想ウェアハウスを指定します。
  • SCHEDULE:タスクを定期的に実行するスケジュールを指定します。
    • cron式とタイムゾーンを指定します。今回は、毎時0分で指定します。
  • TIMEZONE:セッションのタイムゾーンを指定します。
    • クエリでcurrent_timestampを使用しているので、Asia/Tokyoを指定しておきます。
  • COMMENT:タスクのコメントを指定します。

7〜12行目では、タスクで実行されるSQLステートメントを指定しています。

COPYコマンドでTRIPS_UNLOAD_STAGEステージにアンロードするクエリです。

  • STOPTIMEでアンロードするデータを絞り込んでいます。
    • 年(YEAR):2013年 or 2014年 or 2015年
    • 時間(HOUR):現在時刻(タスク実行時刻)の時間
  • PARTITON BYを指定してアンロードするファイルを分割しています。
    • STOPTIMEYYYY/HH形式に変換した値で分割

PARTITON BYについては、以下の記事でも紹介しています。


タスクB:テーブルにロードするタスク

2013年、2014年、2015年でテーブルが分かれているので、別々にタスクを作成します。

2013年

create or replace task TRIPS_LOAD_2013_TASK
  WAREHOUSE = COMPUTE_WH
  TIMEZONE = 'Asia/Tokyo'
  COMMENT = '2013年のファイルをロード'
  after TRIPS_UNLOAD_TASK
AS
  copy into TRIPS_2013_TABLE
    from(select metadata$filename,  $3, current_timestamp
           from @TRIPS_UNLOAD_STAGE/2013
           (file_format => CSV))
;

2〜5行目では、タスクのパラメータを指定しています。

  • after:現在のタスクの先行タスクを指定します。
    • タスクAの後に実行させたいので、TRIPS_UNLOAD_TASKを指定します。

なお、afterを指定した場合は、SCHEDULEを指定することはできません。


7〜10行目では、タスクで実行されるSQLステートメントを指定しています。

COPYコマンドでTRIPS_2013_TABLEテーブルにロードするクエリです。

  • metadata$filenameは、ロードするファイルの名前です。
  • $3は、ロードするファイルの3番目のフィールドでSTOPTIMEです。
  • current_timestampは、現在のタイムスタンプを返す関数(=タスクの実行時刻)です。

ステージにあるファイルのクエリについては、以下の記事でも紹介しています。


2014年、2015年も同様に作成します。

2013年と異なるのは、COMMENTの内容(4行目)とロードするファイルパス(9行目)の2箇所です。

2014年

create or replace task TRIPS_LOAD_2014_TASK
  WAREHOUSE = COMPUTE_WH
  TIMEZONE = 'Asia/Tokyo'
  COMMENT = '2014年のファイルをロード'
  after TRIPS_UNLOAD_TASK
AS
  copy into TRIPS_2014_TABLE
    from(select metadata$filename,  $3, current_timestamp
           from @TRIPS_UNLOAD_STAGE/2014
           (file_format => CSV))
;

2015年

create or replace task TRIPS_LOAD_2015_TASK
  WAREHOUSE = COMPUTE_WH
  TIMEZONE = 'Asia/Tokyo'
  COMMENT = '2015年のファイルをロード'
  after TRIPS_UNLOAD_TASK
AS
  copy into TRIPS_2015_TABLE
    from(select metadata$filename,  $3, current_timestamp
           from @TRIPS_UNLOAD_STAGE/2013
           (file_format => CSV))
;

タスク起動

作成したタスクを確認してみましょう。
SHOW TASKSコマンドでタスクをリストを取得できます。

stateは、タスクの現在の状態です。

  • started:開始
  • suspend:一時停止

この段階では、タスクは一時停止状態になっているので開始してあげる必要があります。


タスクを起動するには「グローバル権限」のEXECUTE TASKが必要となるので、
ACCOUNTADMINSYSADMINに権限を付与します。

USE ROLE ACCOUNTADMIN;
GRANT EXECUTE TASK ON ACCOUNT TO ROLE SYSADMIN;
USE ROLE sysadmin;

ALTER TASKコマンドでタスクをプロパティを変更することができます。
resumeを指定して開始済み状態に変更します。

依存関係を持つタスクの場合、子タスク(タスクB)の方から先に開始済みにする必要があります。

alter task TRIPS_LOAD_2013_TASK resume;
alter task TRIPS_LOAD_2014_TASK resume;
alter task TRIPS_LOAD_2015_TASK resume;
alter task TRIPS_UNLOAD_TASK resume;

SHOW TASKSコマンドでタスクの状態を確認してみましょう。

タスクが開始状態になりましたね。

この状態で3時間ほど放置します。

タスク履歴確認

3時間経過しました。

タスクが実行されているか確認してみましょう。

TASK_HISTORYテーブル関数でタスクの使用履歴を取得することができます。

select *
  from table(information_schema.task_history());

タスクの使用履歴が取得できました。

13時(17行目)から1時間ごとに4つのタスクが実行されていますね。

4つのタスクの中身を見てみると、
タスクA(赤枠囲み)のあとに3つのタスクB(青枠囲み)が実行されていることが分かります。

1行目のタスク(緑枠囲み)は、17時開始でスケジューリングされたタスクAです。
クエリ実行前なのでQUERY IDがNULLになっています。

なお、今回は実行時刻を確認したいので表示する列を絞り込んでいます。

テーブル確認

ロードしたテーブルの中身も確認しておきましょう。

下記SQLを実行して、
テーブルごとにロードされたファイルの名前、STOPTIME、タスク実行時刻をクエリします。

2013年

select filename, max(stoptime), max(load_datetime)
  from TRIPS_2013_TABLE group by 1 order by 1;

2014年

select filename, max(stoptime), max(load_datetime)
  from TRIPS_2014_TABLE group by 1 order by 1;

2015年

select filename, max(stoptime), max(load_datetime)
  from TRIPS_2015_TABLE group by 1 order by 1;

各テーブルに該当する年のファイルがロードされています。

タスクの実行時刻も1時間ごとになっていますね。

タスク停止

タスクはしばらく使用しないので、停止しておきましょう。

ALTER TASKコマンドでsuspendを指定して停止状態に変更します。

起動時とは逆に、親タスク(タスクA)の方から先に停止する必要があります。

alter task TRIPS_UNLOAD_TASK suspend;
alter task TRIPS_LOAD_2013_TASK suspend;
alter task TRIPS_LOAD_2014_TASK suspend;
alter task TRIPS_LOAD_2015_TASK suspend;

まとめ

以上、定期的にCOPYコマンドを実行するタスクを作ってみました。

繰り返し実行するSQLステートメントにスケジュールなどのパラメータを指定するだけで
定期的に実行するタスクを作成することができました。

今回は、SQLステートメントを実行するタスクを作成しましたが、
ストアドプロシージャを実行するタスクを作成することも可能です。

参考