Ibisを使ってみる | Hadoop Advent Calendar 2016 #21

こんにちは、小澤です。 この記事はHadoop Advent Calendar 21日目のものとなります。

前回はDeepleaning4J on Sparkを利用して、SparkでDeep Learningを行いました。
今回はIbisというものを使ってみたいと思います。

Ibisとは

ImpalaなどのSQL on Hadoopに対してPythonからDataFrame風に処理を実行できるライブラリのようです。 PythonのDataFrameのライブラリであるPandasと同じ作者が作っているようなので、そうあたりの親和性もいいのではないかと思い今回は使ってみたいと思います。

なぜこのようなライブラリが必要か

以下の図のような環境を考えてみます。

ibis

ここではDWHなどを挟まずにHadoop上にあるデータにSQLでアクセスしています。 また、データ抽出と分析の過程でそれぞれ別なものを利用しています。 この時、分析に使う環境では主にDataFrameの形式でデータを扱うことになります。DataFrameではSQLと似たような操作も可能なので、気になるのが「どこまでをSQLで記述して、どこからをDataFrameとして扱うか」ですね。 そこで、DataFrameの扱いに慣れている人のために、最初のSQLの段階からDataFrameに対する操作であるかのように扱うためのライブラリとしてあるのがIbisです。

使ってみる

分散処理環境を構築してImpalaなどに接続してみるのが一番いいかとは思うのですが、ここでは試しに動かしてみるのに最も手軽なSQLiteをデータのある環境として利用します。 内容としてはQuickstart on Crunchbase analysis using Ibis and SQLiteにあるものを動かしてみます。

インストールと接続

まずはインストールします

pip install ibis-framework

データベースへのアクセス

import ibis
ibis.options.interactive = True
con = ibis.sqlite.connect('../resources/crunchbase.db/crunchbase.db')

con.list_tables()
['acquisitions', 'companies', 'investments', 'rounds']

テーブルの一覧が確認できました。

簡単な処理の実装

次にテーブルに関する情報と集約処理を行ってみます

# テーブル名をしていして、特定のテーブルを取得して情報を見る
rounds = con.table('rounds')
rounds.info()
Table rows: 87161

Column                   Type    Non-null #
------                   ----    ----------
company_permalink        string  87161     
company_name             string  87161     
company_category_list    string  83464     
company_market           string  80523     
company_country_code     string  81291     
company_state_code       string  59296     
company_region           string  81016     
company_city             string  80208     
funding_round_permalink  string  87161     
funding_round_type       string  87161     
funding_round_code       string  23648     
funded_at                string  87161     
funded_month             string  87147     
funded_quarter           string  87147     
funded_year              int32   87147     
raised_amount_usd        float   73406  

# カラムを選択してcountを行う
# SQLだと、count関数とgroup byのようなもの
rounds.funding_round_type.value_counts()
       funding_round_type  count
0                   angel   4602
1        convertible_note    838
2          debt_financing   5869
3     equity_crowdfunding   2401
4                   grant   1523
5   non_equity_assistance     69
6           post_ipo_debt     92
7         post_ipo_equity    425
8          private_equity   1864
9    product_crowdfunding    241
10       secondary_market     61
11                   seed  22053
12            undisclosed   4128
13                venture  42995

# 別なテーブルに対して、nullものと、そうでないものそれぞれのカウントを行う
expr = acquisitions.price_amount.isnull().name('has_price').value_counts()
# executeでPandasのDataFrameとして取得
expr.execute()
   has_price  count
0          0   3816
1          1  51424

データの型

Ibis内部で扱われているデータの型を見ています。

# 内部がテーブルになっている場合はTableExpr
type(expr)
ibis.expr.types.TableExpr

# 1カラムだけの場合はArray
companies = con.table('companies')
type(companies.funding_total_usd)
ibis.expr.types.FloatArray

# 集約関数の結果など、1つの値になっている場合Scalar
expr = companies.funding_total_usd.mean()
type(expr)
ibis.expr.types.DoubleScalar

executeを行うとTableExprはPandasにおけるDataFrame、ArrayはSeries、Scalarはnumpyの変数になるようです。

クロス集計テーブルの作成

次にクロス集計を行う例を見ていきます。 まず、timestampが入っているデータに対する処理が行える例が出ています。

funded_at = rounds.funded_at.cast('timestamp')
funded_at.year().value_counts()[10:20]
   year  count
0  1960      3
1  1973      1
2  1974      2
3  1979      1
4  1982      3
5  1983      1
6  1984      4
7  1985      6
8  1986      4
9  1987      6

ここでは明示的にキャストして年だけを抽出しています。 この仕組みを利用して以下のカラムを利用してたクロス集計を行います。

rounds.funding_round_code.value_counts()
  funding_round_code  count
0               None  63513
1                  A  11382
2                  B   6548
3                  C   3329
4                  D   1530
5                  E    608
6                  F    201
7                  G     45
8                  H      5

まずは縦持ちの方で必要なデータを抽出します。

year = funded_at.year().name('year')
expr = rounds[(rounds.funding_round_type == 'venture') & year.between(2000, 2015) & rounds.funding_round_code.notnull()] \
    .group_by([year, 'funding_round_code']) \
    .size()
result = expr.execute()
result[:10]
   year funding_round_code  count
0  2000                  A     79
1  2000                  B     32
2  2000                  C     10
3  2000                  D      8
4  2000                  E      2
5  2001                  A     50
6  2001                  B     30
7  2001                  C     17
8  2001                  D      4
9  2001                  E      1

ここでちょっとしたDataFrame操作のテクニックが出てきます。 DataFrameに対する操作では、行数と同じサイズの真偽値の配列を渡してやることで配列中のTrueなっている行のみを抜き出すことができます。 その仕組みを利用して、expr= ... ではroundsにに対して3つの真偽値配列をandで渡すことで条件に一致する行のみを抜き出しています。

これを横持ちに変換することでクロス集計表を得ます。

pivoted = result.set_index(['year', 'funding_round_code']) \
    .unstack('funding_round_code') \
    .fillna(0)
pivoted
                     count                                             
funding_round_code       A      B      C      D      E     F     G    H
year                                                                   
2000                  79.0   32.0   10.0    8.0    2.0   0.0   0.0  0.0
2001                  50.0   30.0   17.0    4.0    1.0   0.0   0.0  0.0
2002                  35.0   39.0   25.0    5.0    2.0   2.0   0.0  0.0
2003                  68.0   45.0   14.0   13.0    1.0   0.0   1.0  0.0
2004                 146.0   76.0   32.0   15.0    3.0   1.0   0.0  0.0
2005                 513.0  334.0  176.0   67.0   26.0   6.0   0.0  0.0
2006                 717.0  465.0  226.0   91.0   35.0   7.0   1.0  0.0
2007                 956.0  583.0  281.0  110.0   49.0   7.0   1.0  0.0
2008                 979.0  653.0  308.0  120.0   54.0  17.0   1.0  0.0
2009                 753.0  531.0  290.0  147.0   55.0  28.0   0.0  0.0
2010                1013.0  598.0  369.0  149.0   52.0  18.0   2.0  0.0
2011                1250.0  700.0  334.0  175.0   60.0  18.0   5.0  0.0
2012                1242.0  610.0  345.0  184.0   69.0  16.0   7.0  0.0
2013                1606.0  796.0  377.0  185.0   81.0  38.0   6.0  0.0
2014                1757.0  952.0  471.0  223.0  108.0  36.0  18.0  5.0
2015                  88.0   71.0   34.0   28.0    8.0   5.0   3.0  0.0

set_indexでyear→funding_round_codeの階層を持つインデックスを作成て、unstackでそれを縦横に変換しています。

Bucketを使う

最後のBucketを使ってみます。

funding_buckets = [0, 1000000, 10000000, 50000000, 100000000, 500000000, 1000000000]
bucket = companies.funding_total_usd.bucket(funding_buckets, include_over=True)
bucket.value_counts()
   unnamed  count
0      NaN  12055
1      0.0  15965
2      1.0  15754
3      2.0   7839
4      3.0   1532
5      4.0   1022
6      5.0     88
7      6.0     37

bucketで指定した配列の値に応じて、companies.funding_total_usdが0から1000000の間であれば0、1000000から10000000の間であれば1のように数値が振られていきます。
次に各bucketごとの名前を設定していきます

bucket_names = ['0 to 1m', '1m to 10m', '10m to 50m', '50m to 100m', '100m to 500m', '500m to 1b', 'Over 1b']
counts = bucket.name('bucket').value_counts()
labeled = counts.bucket.label(bucket_names)
with_names = counts.mutate(bucket_names=labeled)
with_names
   bucket  count  bucket_names
0     NaN  12055          None
1     0.0  15965       0 to 1m
2     1.0  15754     1m to 10m
3     2.0   7839    10m to 50m
4     3.0   1532   50m to 100m
5     4.0   1022  100m to 500m
6     5.0     88    500m to 1b
7     6.0     37       Over 1b

mutateを使って項目の追加をしている以外は新しい要素はないかと思います。
次に集計を行います。aggregateを利用して複数の集計を行っています。

metrics = companies.group_by(bucket.name('bucket')) \
    .aggregate(count=companies.count(), total_funding=companies.funding_total_usd.sum()) \
    .mutate(bucket_names=lambda x: x.bucket.label(bucket_names))
metrics
   bucket  count  total_funding  bucket_names
0     NaN  12055            NaN          None
1     0.0  15965   4.505177e+09       0 to 1m
2     1.0  15754   5.712283e+10     1m to 10m
3     2.0   7839   1.724166e+11    10m to 50m
4     3.0   1532   1.054132e+11   50m to 100m
5     4.0   1022   1.826600e+11  100m to 500m
6     5.0     88   5.804196e+10    500m to 1b
7     6.0     37   1.040123e+11       Over 1b

最後にテーブル化します。少々内容が複雑ですが、個々の要素はこれまで登場したものの組み合わせなので難しい点はないかと思います

joined = companies.mutate(bucket=bucket, status=companies.status.fillna('Unknown')) \
    [(companies.founded_at > '2010-01-01') | companies.funding_total_usd.isnull()] \
    .group_by(['bucket', 'status']) \
    .size() \
    .mutate(bucket_name=lambda x: x.bucket.label(bucket_names).fillna('Unknown'))
table = joined.execute()
table.set_index(['status', 'bucket', 'bucket_name']).unstack('status')
                      count                          
status              Unknown acquired closed operating
bucket bucket_name                                   
NaN    Unknown       3284.0    511.0  458.0    7802.0
0.0    0 to 1m        209.0    145.0  477.0    8246.0
1.0    1m to 10m      109.0    254.0  119.0    5125.0
2.0    10m to 50m      20.0     65.0   13.0    1346.0
3.0    50m to 100m      1.0     10.0    1.0     152.0
4.0    100m to 500m     1.0      2.0    1.0     122.0
5.0    500m to 1b       NaN      NaN    NaN      12.0
6.0    Over 1b          NaN      NaN    NaN       5.0

生成されるSQL

ここまででIbisを使ったDataFrameによる操作のイメージはつかめたかと思います。
次に実際に裏側ではどのようなSQLが生成されているのかを見てみましょう。

print(ibis.impala.compile(joined))

を実行すると、下記のようなSQL文が表示されます。

SELECT *,
  isnull(CASE `bucket`
    WHEN 0 THEN '0 to 1m'
    WHEN 1 THEN '1m to 10m'
    WHEN 2 THEN '10m to 50m'
    WHEN 3 THEN '50m to 100m'
    WHEN 4 THEN '100m to 500m'
    WHEN 5 THEN '500m to 1b'
    WHEN 6 THEN 'Over 1b'
    ELSE NULL
  END, 'Unknown') AS `bucket_name`
FROM (
  SELECT `bucket`, `status`, count(*) AS `count`
  FROM (
    SELECT `permalink`, `name`, `homepage_url`, `category_list`, `market`,
           `funding_total_usd`, isnull(`status`, 'Unknown') AS `status`,
           `country_code`, `state_code`, `region`, `city`, `funding_rounds`,
           `founded_at`, `founded_month`, `founded_quarter`, `founded_year`,
           `first_funding_at`, `last_funding_at`,
      CASE
        WHEN (`funding_total_usd` >= 0) AND (`funding_total_usd` < 1000000) THEN 0
        WHEN (`funding_total_usd` >= 1000000) AND (`funding_total_usd` < 10000000) THEN 1
        WHEN (`funding_total_usd` >= 10000000) AND (`funding_total_usd` < 50000000) THEN 2
        WHEN (`funding_total_usd` >= 50000000) AND (`funding_total_usd` < 100000000) THEN 3
        WHEN (`funding_total_usd` >= 100000000) AND (`funding_total_usd` < 500000000) THEN 4
        WHEN (`funding_total_usd` >= 500000000) AND (`funding_total_usd` <= 1000000000) THEN 5
        WHEN `funding_total_usd` > 1000000000 THEN 6
        ELSE NULL
      END AS `bucket`
    FROM companies
    WHERE ((`founded_at` > '2010-01-01') OR `funding_total_usd` IS NULL)
  ) t1
  GROUP BY 1, 2
) t0

SQLを書くかDataFrameで操作するかについて

Ibisを使ってDataFrameを操作すると内部的にはSQLに変換されています。 これは利用する際はSQLを書かなくてもいいことを意味しています。
しかし、一方でSQLに慣れている人やプログラミングに詳しくないデータ分析者などの場合はSQLでかけたほうがいいと思うことも多かと思います。
これについてはSpark SQLとDataFrame APIなんかでも同じですが、どちらを利用するのが良いかというのは一概には決められないと思います。 必要に応じて場面ごとに使い分けれるようになるのが一番かと思います。

終わりに

今回はIbisというものを使ってみました。
DataFrameの操作に慣れている人にとっては非常に便利なライブラリとなっており、うまく活用していければ非常に強力なツールになってくれることと思います。

明日はHBaseについて書かせていただく予定です。
ぜひ、お楽しみに!