AWS GlueのDynamicFrameの動きを見てみる

こんにちは、小澤です。

AWS Glueでは、SparkのDataFrameではなく、DynamicFrameというものが使われているようです。 今回はこのDynamicFrameがどのような動きをするのかやGithubで公開されているライブラリからtransform処理が行われた際にどのような動きになっているのかなどをみていきたいと思います。

DynamicFrameとは

DynamicFrameについては、公式ドキュメンに記載があります。

DynamicFrameはAWS Glueで独自に定義されたデータ構造のようです。

  • DataFrameのようなテーブル形式でデータ
  • DataFrameと異なり列ごとのデータ型が異なるものにも柔軟な対応が可能

という特徴があるようです。

DynamicFrameが柔軟なスキーマに対応するために、内部ではDataFrameのRowに対応するようなDynamicRecordというものを使って論理レコードをあわらしているようです。 DynamicRecordに関しては、これ以上詳細な情報を見つけることができませんでしたが、以下のような対応関係を想定しておけば問題無いかと思います。

Spark AWS Glue
DataFrame DynamicFrame
Row DynamicRecord

また、詳細はこの後見ていくことになりますが、SparkのDataFrameに対して行う各種操作に相当する処理が同様にDynamicFrameにも定義されています。

複数の型を持つ列の扱い

AWS Glueでは、同一の列に複数の性質の異なるデータが入ってくる可能性が想定されいます。 これは、AWS GlueがETLなどの処理で利用されることを想定しているため、そのままでは従来のDataFrame形式に落とし込めないようなデータに対して適切な形式へと変換することも意識して作られているためと考えられます。

例えば、以下のようなデータを想定します。

5.1 3.5 1.4 0.2 Iris-setosa
4.9 3.0 1.4 0.2 Iris-setosa
...
...
Iris-setosa 5.0 3.4 1.5 0.2
Iris-setosa 4.4 2.9 1.4 0.2
...
...
6.2 3.4 5.4 2.3 Iris-virginica
5.9 3.0 5.1 1.8 Iris-virginica

データを記録する際に、誤って一部だけ列の順番を間違えてしまいました。てへぺろ(・ω<)

こういったデータを読み込むと以下のように最初と最後の列が「choice」というものになっているのが確認できます。

root
|-- col0: choice
| |-- double
| |-- string
|-- col1: double
|-- col2: double
|-- col3: double
|-- col4: choice
| |-- double
| |-- string

これで、複数の型のデータが混在している状況ということがわかるので後は実際のデータに基づいて適切な形式になるようにしてあげればOKです。

Crawlerによるテーブル定義とDynamicFrame

DynamicFrameでの複数の型を持つ列に対する扱いに関しては以下の公式ドキュメントにも記載されています。

-

Code Example: Data Preparation Using ResolveChoice, Lambda, and ApplyMapping - AWS Glue

上記のドキュメントでは、Crawlerがテーブルを作成する際はデータの先頭2MBを見て判断すると記載されています。 DynamicFrameでは、この先頭2MBには登場しなかったデータがそれまでと異なる型を持つ場合にも対応可能となっています。

ここで、先頭の2MBに以下のようなデータを考えみましょう。

フルマラソンは
42.195
kmです

さて、この時「42.195」はdouble、それ以外はstringになるでしょうか?

2017/10/17現在の型の決まり方は以下のようになっているようです。

「42.195」はstringとして扱うことも可能です。 Crawlerでは、先頭の2MBに含まれるデータをすべて表すことが可能な型としてテーブル定義を行うようです。 そのため、ここではすべてのデータがstringとなります。

ここでそのデータをDynamicFrameからstringのまま読み込むとすべてのデータがstringとして表現可能なため、choiceではなくすべてstringとして扱われました。

intに対してのdoubleや、int, doubleに対してのstringなど型は異なるが表現可能なデータ混在していて、テーブル定義として包括できる側の型になっている場合は、その点も確認する必要があります。

この点はデータ分析を行うという目的上、以下のような考え方をするといいかと思います。

  • x : 何でもかんでも適当にデータを入れれば後はAWS Glueがよしなにやってくれる
  • o : データには時々不正なものが含まれてしまう前提でそれらに対してどのような扱いをすべきかは人間が考える

残念ながらデータ分析は魔法はありません。 データ分析に適した適切なデータにするには人間が何をしたいかで判断する必要があります。

DataFrameなどが列ごとに単一の型となるほうが、「分析を行う」という観点では都合がいいです。 そういった意味でDynamicFrameの性質をうまく活用する場面としては、一部不正なデータが含まれてしまっているのを検知するなど、後続の処理がそのままでは対応できなかった場面において前処理として対処しておくのに非常に役立つでしょう。

DynamicFrameとDataFrame

DynamicFrameとSparkのDataFrameを相互に変換するためのメソッドが用意されているようです。

変換 メソッド
DynamicFrame -> DataFrame DynamicFrame.toDF()
DataFrame -> DynamicFrame DynamicFrame.fromDF()

なお、このように同一列に複数の型が共存している場合は、toDFを使ってSparkのDataFrameには変換できないようです。

transform処理の呼び出しの流れ

さて、ここでtransform処理の呼び出しの流れがソースコード上でどうなっているのかを見てみます。

AWS Glueで生成されるtransformのコードは

ApplyMapping.apply(frame = <dynamicframeの変数>, mappings = [<変換リスト>], transformation_ctx = "applymapping1")

のように特定のtransform処理をを行うクラスのapplyメソッドを呼び出しています。 このapplyメソッドは、すべてのtransform系の親クラスであるTransformで定義されています。

@classmethod
def apply(cls, *args, **kwargs):
    transform = cls()
    return transform(*args, **kwargs)

クラスメソッド(※ 弊社ではありません)として定義されており、呼び出しを行ったクラスのインスタンスを作成したのち、__call__メソッドを呼び出しています。

具体的に呼び出される処理は、ApplyMappingであれば以下のようになっています。

def __call__(self, frame, mappings, case_sensitive = False,
             transformation_ctx = "", info = "", stageThreshold = 0, totalThreshold = 0):
        return frame.apply_mapping(mappings, case_sensitive)

引数として渡された、DynamicFrameのapply_mappingメソッドを呼び出しています。

DynamicFrameのapply_mappingの処理は以下のようになっています。

def apply_mapping(self, mappings, case_sensitive = False, transformation_ctx = "", info = "", stageThreshold = 0, totalThreshold = 0):
    def _to_java_mapping(mapping_tup):
        source_path, source_type, target_path, target_type = mapping_tup
        return self.glue_ctx._jvm.MappingSpec.apply(
            source_path,
            source_type,
            target_path,
            target_type)

    if isinstance(mappings, tuple):
        mappings = [mappings]

    mappings_list = [ _to_java_mapping(m) for m in mappings ]

    new_jdf = self._jdf.pyApplyMapping(
        self.glue_ctx._jvm.PythonUtils.toSeq(mappings_list),
        case_sensitive,
        transformation_ctx,
        callsite(), info, long(stageThreshold), long(totalThreshold))

    return DynamicFrame(new_jdf, self.glue_ctx, self.name)

内容を見ていただくと、この先はほとんどGlueContextで持っているJVMで動いているコードへと処理を渡しているようなものになっています。

この先のglue_ctx._jvmから呼び出している処理の実装は公開されていないようなので、DynamicFrameに対しての各種処理の実装がどのようになっているかまでは追えませんがラピュタの技術並みにすごいことが行われていると想像しておくことにします。 (もし私が見落としているだけで実はどこかで確認できるよ!という場合は教えてください)

このように各種transform処理は、

  1. それぞれのクラスメソッドを呼び出し
  2. メソッド内でインスタンスを作成して__call__メソッドの呼び出し
  3. __call__メソッド内でDynamicFrameのメソッドを呼び出し

という流れになっているようです。

余談:DynamicFrameのメソッドを直接呼び出す

各種transform処理はDynamicFrameに定義されたメソッドを呼び出しています。 これらのメソッドはアクセスコントロールがされているわけではない(というかPythonではできない)ので実は直接呼び出すことも可能です。

また、公式のサンプルの以下の部分では実際にそれを行っています。

medicare_nest = medicare_tmp.apply_mapping([('drg definition', 'string', 'drg', 'string'), 
                             ('provider id', 'long', 'provider.id', 'long'),
                             ('provider name', 'string', 'provider.name', 'string'),
                             ('provider city', 'string', 'provider.city', 'string'),
                             ('provider state', 'string', 'provider.state', 'string'),
                             ('provider zip code', 'long', 'provider.zip', 'long'),
                             ('hospital referral region description', 'string','rr', 'string'),
                             ('ACC', 'string', 'charges.covered', 'double'),
                             ('ATP', 'string', 'charges.total_pay', 'double'),
                             ('AMP', 'string', 'charges.medicare_pay', 'double')])

このようなこと記述ができるということはメソッドチェインで処理を記述することも可能ということになります。

SparkやPandasでDataFrameに対する操作を行うときは割とこういう書き方もしますし、Rなんかだとdplyrを使ったります。 私のようなものぐさな人間の場合、途中途中の処理に変数名をつける手間が省けるなんて考えたりもしますw

ただし、このやり方には1つ注意点があります。 生成されるコードのやり方では__call__メソッドで何らかの処理を行うことも可能です。 2017/10/17現在はそういったものは無いようですが、今後そういったものが含まれる処理ができた際は動作が異なる可能性がありますのでご注意ください。

おわりに

今回はAWS GlueのコアとなるDynamicFrameがどういったものなのかと、transform処理が実行される際の処理の流れを見ていきました。

DynamicFrameはGlueを使う上で重要な概念となりますで、

  • 生成されるDAGやOptimizer
  • DataFrameと比較しての処理速度

なども今後見ていけるといいかなと思っています。