この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。
こんにちは、みかみです。 データアナリティクス事業本部に所属しています。
GCP のデータ分析基盤関連でよく聞く Google Dataflow。 設定などの手間が必要なく、オートスケーリングな並列分散処理をしてくれるデータパイプラインが簡単に構築できるらしいとのことですが、実際に使ったことがなかったので、触ってみました。
やりたいこと
- Google Dataflowをさわってみたい
- Dataflowを使うにはどうすればよいのか知りたい
前提
本エントリでは、Dataflow を実際に動かしてみるために、Python を使用したクイックスタートを実行して、指定したテキストファイル内に出現する単語の数をカウントしてみます。
動作確認には Cloud Shell を使用します。 Cloud Shell には Google Cloud SDK などの基本的な環境はインストール済みのため、Python 実行環境の構築や Google サービスアカウントの設定手順は省略します。
また、Dataflow のジョブ実行に必要な各種 API は有効化済みです。
- プロダクトの概要 | Cloud Shell ドキュメント
- Cloud Shell の使用 | Cloud Shell ドキュメント
- Python を使用したクイックスタート > 始める前に | Dataflow ドキュメント
- Python を使用したクイックスタート > 環境の設定 | Dataflow ドキュメント
Dataflow の料金
Dataflow を利用する場合、ジョブの実行時間に従って秒単位で課金が発生します。 課金額はジョブが起動するワーカーの種類によって異なります。
また、Dataflow ジョブで GCS や BigQuery など他の GCP リソースを使用する場合には、各リソースの利用料金が発生します。
詳細は公式ドキュメントでご確認ください。
GCS バケットを作成
Dataflow ジョブの実行結果を格納用するための GCS バケットを作成します。
GCP 管理コンソールから、東京リージョンに、ストレージクラスが Standard の GCS バケットを作成しました。
Apache Beam SDK for Python を Cloud Shell にインストール
ドキュメントに従って、Cloud Shell に Apache Beam SDK をインストールします。
2020/10 現在、Cloud Shell の python および pip のデフォルトバージョンは 2系なので、python コマンドは python3
で実行する必要があります。
gcp_da_user@cloudshell:~$ python --version
********************************************************************************
Python 2 is deprecated. Upgrade to Python 3 as soon as possible.
See https://cloud.google.com/python/docs/python2-sunset
To suppress this warning, create an empty ~/.cloudshell/no-python-warning file.
The command will automatically proceed in seconds or on any key.
********************************************************************************
Python 2.7.16
gcp_da_user@cloudshell:~$ python -m pip --version
********************************************************************************
Python 2 is deprecated. Upgrade to Python 3 as soon as possible.
See https://cloud.google.com/python/docs/python2-sunset
To suppress this warning, create an empty ~/.cloudshell/no-python-warning file.
The command will automatically proceed in seconds or on any key.
********************************************************************************
pip 18.1 from /usr/lib/python2.7/dist-packages/pip (python 2.7)
gcp_da_user@cloudshell:~$ python3 --version
Python 3.7.3
gcp_da_user@cloudshell:~$ python3 -m pip --version
pip 18.1 from /usr/lib/python3/dist-packages/pip (python 3.7)
pip3 install
コマンドで、Apache Beam SDK を無事インストールできました。
gcp_da_user@cloudshell:~ (cm-da-mikami-yuki-258308)$ pip3 install apache-beam[gcp]
Collecting apache-beam[gcp]
Downloading https://files.pythonhosted.org/packages/d8/b4/16ed288b3df6700ed3aba059e9e9550054dc3b5e5f955c12a82a5bfcf79a/apache_beam-2.24.0-cp37-cp37m-manylinux1_x86_64.whl (8.5MB)
100% |████████████████████████████████| 8.6MB 139kB/s
Requirement already satisfied: numpy<2,>=1.14.3 in /usr/local/lib/python3.7/dist-packages (from apache-beam[gcp]) (1.19.2)
Requirement already satisfied: future<1.0.0,>=0.18.2 in /usr/local/lib/python3.7/dist-packages (from apache-beam[gcp]) (0.18.2)
(省略)
Successfully installed apache-beam-2.24.0 avro-python3-1.9.2.1 cachetools-3.1.1 dill-0.3.1.1 docopt-0.6.2 fastavro-0.23.6 fasteners-0.15 google-apitools-0.5.31 google-cloud-bigquery-
1.28.0 google-cloud-bigtable-1.5.1 google-cloud-dlp-1.0.0 google-cloud-pubsub-1.7.0 google-cloud-vision-1.0.0 grpcio-gcp-0.2.2 hdfs-2.5.8 httplib2-0.17.4 mock-2.0.0 monotonic-1.5 oau
th2client-3.0.0 pbr-5.5.0 pyarrow-0.17.1 pydot-1.4.1
Dataflow のジョブを作成して実行
まずはクイックスタートのドキュメントに記載の通り、サンプルテキストをインプットに指定して、テキストファイル内の単語と出現回数を出力してみます。
--input
オプションで指定する解析対象のサンプルファイルの中身を確認してみると、シェイクスピアの「リア王」の英文のようです。
gcp_da_user@cloudshell:~ (cm-da-mikami-yuki-258308)$ gsutil cat "gs://dataflow-samples/shakespeare/kinglear.txt"
KING LEAR
DRAMATIS PERSONAE
LEAR king of Britain (KING LEAR:)
KING OF FRANCE:
DUKE OF BURGUNDY (BURGUNDY:)
DUKE OF CORNWALL (CORNWALL:)
DUKE OF ALBANY (ALBANY:)
EARL OF KENT (KENT:)
EARL OF GLOUCESTER (GLOUCESTER:)
EDGAR son to Gloucester.
EDMUND bastard son to Gloucester.
CURAN a courtier.
(省略)
ALBANY Bear them from hence. Our present business
Is general woe.
[To KENT and EDGAR]
Friends of my soul, you twain
Rule in this realm, and the gored state sustain.
KENT I have a journey, sir, shortly to go;
My master calls me, I must not say no.
ALBANY The weight of this sad time we must obey;
Speak what we feel, not what we ought to say.
The oldest hath borne most: we that are young
Shall never see so much, nor live so long.
[Exeunt, with a dead march]
apache_beam
の wordcount
を実行して、Dataflow のジョブを作成&実行してみます。
gcp_da_user@cloudshell:~ (cm-da-mikami-yuki-258308)$ python3 -m apache_beam.examples.wordcount \
> --region asia-northeast1 \
> --input gs://dataflow-samples/shakespeare/kinglear.txt \
> --output gs://test-mikami-dataflow/wordcount/outputs \
> --runner DataflowRunner \
> --project cm-da-mikami-yuki-258308 \
> --temp_location gs://test-mikami-dataflow/tmp/
INFO:apache_beam.internal.gcp.auth:Setting socket default timeout to 60 seconds.
INFO:apache_beam.internal.gcp.auth:socket default timeout is 60.0 seconds.
INFO:oauth2client.transport:Attempting refresh to obtain initial access_token
INFO:apache_beam.runners.portability.stager:Downloading source distribution of the SDK from PyPi
INFO:apache_beam.runners.portability.stager:Executing command: ['/usr/bin/python3', '-m', 'pip', 'download', '--dest', '/tmp/tmp2gejmwow', 'apache-beam==2.24.0', '--no-deps', '--no-binary', ':all:']
INFO:apache_beam.runners.portability.stager:Staging SDK sources from PyPI: dataflow_python_sdk.tar
INFO:apache_beam.runners.portability.stager:Downloading binary distribution of the SDK from PyPi
INFO:apache_beam.runners.portability.stager:Executing command: ['/usr/bin/python3', '-m', 'pip', 'download', '--dest', '/tmp/tmp2gejmwow', 'apache-beam==2.24.0', '--no-deps', '--only-binary', ':all:', '--python-version', '37', '--implementation', 'cp', '--abi', 'cp37m', '--platform', 'manylinux1_x86_64']
INFO:apache_beam.runners.portability.stager:Staging binary distribution of the SDK from PyPI: apache_beam-2.24.0-cp37-cp37m-manylinux1_x86_64.whl
WARNING:root:Make sure that locally built Python SDK docker image has Python 3.7 interpreter.
INFO:root:Using Python SDK docker image: apache/beam_python3.7_sdk:2.24.0. If the image is not available at local, we will try to pull from hub.docker.com
INFO:apache_beam.runners.dataflow.internal.apiclient:Defaulting to the temp_location as staging_location: gs://test-mikami-dataflow/tmp/
(省略)
37-cp37m-manylinux1_x86_64.whl in 0 seconds.
INFO:apache_beam.runners.dataflow.internal.apiclient:Create job: <Job
createTime: '2020-10-19T09:49:36.814188Z'
currentStateTime: '1970-01-01T00:00:00Z'
id: '2020-10-19_02_49_35-16393904807203415868'
location: 'asia-northeast1'
name: 'beamapp-gcpdauser-1019094932-454901'
projectId: 'cm-da-mikami-yuki-258308'
stageStates: []
startTime: '2020-10-19T09:49:36.814188Z'
steps: []
tempFiles: []
type: TypeValueValuesEnum(JOB_TYPE_BATCH, 1)>
INFO:apache_beam.runners.dataflow.internal.apiclient:Created job with id: [2020-10-19_02_49_35-16393904807203415868]
INFO:apache_beam.runners.dataflow.internal.apiclient:Submitted job: 2020-10-19_02_49_35-16393904807203415868
INFO:apache_beam.runners.dataflow.internal.apiclient:To access the Dataflow monitoring console, please navigate to https://console.cloud.google.com/dataflow/jobs/asia-northeast1/2020-10-19_02_49_35-16393904807203415868?project=cm-da-mikami-yuki-258308
INFO:apache_beam.runners.dataflow.dataflow_runner:Job 2020-10-19_02_49_35-16393904807203415868 is in state JOB_STATE_PENDING
INFO:apache_beam.runners.dataflow.dataflow_runner:2020-10-19T09:49:35.491Z: JOB_MESSAGE_DETAILED: Autoscaling was automatically enabled for job 2020-10-19_02_49_35-16393904807203415868.
INFO:apache_beam.runners.dataflow.dataflow_runner:2020-10-19T09:49:35.491Z: JOB_MESSAGE_DETAILED: Autoscaling is enabled for job 2020-10-19_02_49_35-16393904807203415868. The number of workers will be between 1 and 1000.
INFO:apache_beam.runners.dataflow.dataflow_runner:2020-10-19T09:49:41.404Z: JOB_MESSAGE_BASIC: Worker configuration: n1-standard-1 in asia-northeast1-a.
(省略)
INFO:apache_beam.runners.dataflow.dataflow_runner:Job 2020-10-19_02_49_35-16393904807203415868 is in state JOB_STATE_RUNNING
INFO:apache_beam.runners.dataflow.dataflow_runner:2020-10-19T09:50:15.971Z: JOB_MESSAGE_DETAILED: Autoscaling: Raised the number of workers to 1 based on the rate of progress in the currently running stage(s).
INFO:apache_beam.runners.dataflow.dataflow_runner:2020-10-19T09:51:59.138Z: JOB_MESSAGE_DETAILED: Workers have started successfully.
INFO:apache_beam.runners.dataflow.dataflow_runner:2020-10-19T09:51:59.150Z: JOB_MESSAGE_DETAILED: Workers have started successfully.
(省略)
INFO:apache_beam.runners.dataflow.dataflow_runner:2020-10-19T09:53:15.879Z: JOB_MESSAGE_DEBUG: Executing success step success33
INFO:apache_beam.runners.dataflow.dataflow_runner:2020-10-19T09:53:15.937Z: JOB_MESSAGE_DETAILED: Cleaning up.
INFO:apache_beam.runners.dataflow.dataflow_runner:2020-10-19T09:53:16.004Z: JOB_MESSAGE_DEBUG: Starting worker pool teardown.
INFO:apache_beam.runners.dataflow.dataflow_runner:2020-10-19T09:53:16.019Z: JOB_MESSAGE_BASIC: Stopping worker pool...
INFO:apache_beam.runners.dataflow.dataflow_runner:2020-10-19T09:54:07.060Z: JOB_MESSAGE_DETAILED: Autoscaling: Resized worker pool from 1 to 0.
INFO:apache_beam.runners.dataflow.dataflow_runner:2020-10-19T09:54:07.081Z: JOB_MESSAGE_BASIC: Worker pool stopped.
INFO:apache_beam.runners.dataflow.dataflow_runner:2020-10-19T09:54:07.092Z: JOB_MESSAGE_DEBUG: Tearing down pending resources...
INFO:apache_beam.runners.dataflow.dataflow_runner:Job 2020-10-19_02_49_35-16393904807203415868 is in state JOB_STATE_DONE
実行ログを確認すると、バッチタイプの Dataflow ジョブを作成後、オートスケーリング台数を 1 ~ 1000 に設定し、東京リージョンに n1-standard-1 の GCE ワーカーを 1 台起動してジョブを実行し、完了後にワーカーをシャットダウンしたことが確認できました。
実行結果を確認してみます。
gcp_da_user@cloudshell:~ (cm-da-mikami-yuki-258308)$ gsutil ls -lh "gs://test-mikami-dataflow/wordcount/outputs*"
15.86 KiB 2020-10-19T09:53:14Z gs://test-mikami-dataflow/wordcount/outputs-00000-of-00003
15.84 KiB 2020-10-19T09:53:14Z gs://test-mikami-dataflow/wordcount/outputs-00001-of-00003
16.09 KiB 2020-10-19T09:53:14Z gs://test-mikami-dataflow/wordcount/outputs-00002-of-00003
TOTAL: 3 objects, 48944 bytes (47.8 KiB)
gcp_da_user@cloudshell:~ (cm-da-mikami-yuki-258308)$ gsutil cat "gs://test-mikami-dataflow/wordcount/outputs*"
bow: 3
scurvy: 1
Importune: 1
forked: 1
embossed: 1
import: 1
day: 7
(省略)
Kent: 19
(省略)
Edgar: 10
(省略)
Regan: 19
(省略)
Goneril: 12
(省略)
Lear: 17
(省略)
Cordelia: 22
(省略)
Gloucester: 26
(省略)
lightnings: 1
disorders: 2
they: 49
break: 7
about: 11
実行時に --output
オプションで指定した GCS パスに、単語と出現数のカウントファイルが出力されていることが確認できました。
Dataflow ジョブの実行結果は GCP 管理コンソールからも確認できます。
ワークフローを視覚的に確認することができるので、エラー発生時の解析に便利そうです。
入力データを日本語にしてみる
--input
で日本語のテキストファイルを指定した場合、出力結果はどうなるのか試してみました。
弊社クラスメソッドのウェブサイト 経営理念 ページの記載内容をテキストに落として GCS にアップロードしました。
アップロードした日本語のテキストファイルを --input
オプションで指定して、再度 apache_beam.examples.wordcount
を実行してみます。
gcp_da_user@cloudshell:~ (cm-da-mikami-yuki-258308)$ python3 -m apache_beam.examples.wordcount \
> --region asia-northeast1 \
> --input gs://test-mikami-dataflow/cl_policy.txt \
> --output gs://test-mikami-dataflow/classmethod/outputs \
> --runner DataflowRunner \
> --project cm-da-mikami-yuki-258308 \
> --temp_location gs://test-mikami-dataflow/classmethod/tmp/
(省略)
INFO:apache_beam.runners.dataflow.internal.apiclient:Create job: <Job
createTime: '2020-10-19T10:41:00.317940Z'
currentStateTime: '1970-01-01T00:00:00Z'
id: '2020-10-19_03_40_59-9806754790662253067'
location: 'asia-northeast1'
name: 'beamapp-gcpdauser-1019104056-585061'
projectId: 'cm-da-mikami-yuki-258308'
stageStates: []
startTime: '2020-10-19T10:41:00.317940Z'
steps: []
tempFiles: []
type: TypeValueValuesEnum(JOB_TYPE_BATCH, 1)>
INFO:apache_beam.runners.dataflow.internal.apiclient:Created job with id: [2020-10-19_03_40_59-9806754790662253067]
INFO:apache_beam.runners.dataflow.internal.apiclient:Submitted job: 2020-10-19_03_40_59-9806754790662253067
INFO:apache_beam.runners.dataflow.internal.apiclient:To access the Dataflow monitoring console, please navigate to https://console.cloud.google.com/dataflow/jobs/asia-northeast1/2020-10-19_03_40_59-9806754790662253067?project=cm-da-mikami-yuki-258308
INFO:apache_beam.runners.dataflow.dataflow_runner:Job 2020-10-19_03_40_59-9806754790662253067 is in state JOB_STATE_PENDING
INFO:apache_beam.runners.dataflow.dataflow_runner:2020-10-19T10:40:59.318Z: JOB_MESSAGE_DETAILED: Autoscaling was automatically enabled for job 2020-10-19_03_40_59-9806754790662253067.
INFO:apache_beam.runners.dataflow.dataflow_runner:2020-10-19T10:40:59.318Z: JOB_MESSAGE_DETAILED: Autoscaling is enabled for job 2020-10-19_03_40_59-9806754790662253067. The number of workers will be between 1 and 1000.
INFO:apache_beam.runners.dataflow.dataflow_runner:2020-10-19T10:41:05.566Z: JOB_MESSAGE_BASIC: Worker configuration: n1-standard-1 in asia-northeast1-a.
(省略)
INFO:apache_beam.runners.dataflow.dataflow_runner:2020-10-19T10:41:07.429Z: JOB_MESSAGE_DEBUG: Starting worker pool setup.
INFO:apache_beam.runners.dataflow.dataflow_runner:2020-10-19T10:41:07.439Z: JOB_MESSAGE_BASIC: Starting 1 workers in asia-northeast1-a...
(省略)
INFO:apache_beam.runners.dataflow.dataflow_runner:Job 2020-10-19_03_40_59-9806754790662253067 is in state JOB_STATE_RUNNING
(省略)
INFO:apache_beam.runners.dataflow.dataflow_runner:2020-10-19T10:44:26.825Z: JOB_MESSAGE_DEBUG: Executing success step success33
INFO:apache_beam.runners.dataflow.dataflow_runner:2020-10-19T10:44:26.851Z: JOB_MESSAGE_DETAILED: Cleaning up.
INFO:apache_beam.runners.dataflow.dataflow_runner:2020-10-19T10:44:26.981Z: JOB_MESSAGE_DEBUG: Starting worker pool teardown.
INFO:apache_beam.runners.dataflow.dataflow_runner:2020-10-19T10:44:26.991Z: JOB_MESSAGE_BASIC: Stopping worker pool...
INFO:apache_beam.runners.dataflow.dataflow_runner:2020-10-19T10:45:18.631Z: JOB_MESSAGE_DETAILED: Autoscaling: Resized worker pool from 1 to 0.
INFO:apache_beam.runners.dataflow.dataflow_runner:2020-10-19T10:45:18.653Z: JOB_MESSAGE_BASIC: Worker pool stopped.
INFO:apache_beam.runners.dataflow.dataflow_runner:2020-10-19T10:45:18.664Z: JOB_MESSAGE_DEBUG: Tearing down pending resources...
INFO:apache_beam.runners.dataflow.dataflow_runner:Job 2020-10-19_03_40_59-9806754790662253067 is in state JOB_STATE_DONE
ジョブの実行は正常終了したようです。
出力結果を確認してみます。
gcp_da_user@cloudshell:~ (cm-da-mikami-yuki-258308)$ gsutil ls -lh "gs://test-mikami-dataflow/classmethod/outputs*"
824 B 2020-10-19T10:44:25Z gs://test-mikami-dataflow/classmethod/outputs-00000-of-00003
765 B 2020-10-19T10:44:25Z gs://test-mikami-dataflow/classmethod/outputs-00001-of-00003
852 B 2020-10-19T10:44:25Z gs://test-mikami-dataflow/classmethod/outputs-00002-of-00003
TOTAL: 3 objects, 2441 bytes (2.38 KiB)
gcp_da_user@cloudshell:~ (cm-da-mikami-yuki-258308)$ gsutil cat "gs://test-mikami-dataflow/classmethod/outputs*"
パートナーシップ認定といった形で評価されました: 1
クラウド: 1
Enterprise: 1
クラスメソッドは: 1
より多くのお客様にサービスをご提供します: 1
クラスメソッドのビジネス: 1
改善し続け: 1
社員の1人ひとりが現場提案とスピード決裁をモットーとした課題解決を行っています: 1
私たちは常に興味と関心を持ち: 1
オープンな発想: 1
お客様の事業課題に寄り添った提案力と: 1
クラスメソッドは学びを欲するクリエイティブな人材を積極的に採用し: 1
ビッグデータ: 1
そのために市場へ耳を傾け: 1
新技術に取り組むチャレンジを続けています: 1
音声認識技術のコンサルティングやシステム開発: 1
や: 1
高い技術力: 1
という経営理念のもと: 1
高い評価を得る技術者集団として: 1
経営理念: 1
創業以来1000を超える事業支援実績を重ねました: 1
柔軟かつ行動的な姿勢で: 1
創造活動への貢献: 1
運用サービスを提供しています: 1
技術情報やノウハウを自ら創造: 1
お客様のビジネス支援にあたって心がけている基本方針です: 1
私たちはビジネス視点とコンシューマー視点を兼ね備えた柔軟な思考を心がけています: 1
国内初となる: 1
作り手視点のプロ意識と消費者視点のセールス意識を軸に: 1
それらを下支えするのはフラットな組織と個人裁量です: 1
オープンな発想と高い技術力により: 1
蓄積: 1
社会から必要とされる存在であり続けることを目指しています: 1
技術力でお客様の売上に貢献するために: 1
モバイル: 1
お客様や技術コミュニティなど幅広い分野への貢献に努めます: 1
クラスメソッドはお客様から常に必要とされる付加価値を提供し: 1
すべての人々: 1
形式化したノウハウをベースに継続的なビジネス支援を行っております: 1
広い視野を持ち: 1
何事にも積極的に取り組んでいくのがクラスメソッドのスタイルです: 1
発信: 1
AWSプレミアコンサルティングパートナー: 1
先進的かつ専門性の高い技術力はAWSクラウド活用において最上位企業のみが認定される: 1
すべての人々の創造活動に貢献し続ける: 1
GitHub: 1
日本語の場合、やはり助詞でうまく分割できないようですが、ジョブエラーになることはなく、処理結果がファイル出力されることが確認できました。
必要に応じて、apache_beam
の Python コードを修正してロジック変更も可能とのことです。
まとめ(所感)
Google Dataflow を初めて触ってみましたが、本当に面倒な設定などは全く必要なく、ものの 30 分でジョブ実行して結果を確認することができました。
Apache Beam SDK を使用すれば、ジョブの作成や実行を意識する必要もなく、すぐに Dataflow で処理を実行できます。
管理コンソールから Dataflow の実行ログや処理フローを視覚的に確認することもできるので、エラー発生時に原因を確認する際にも使いやすそうです。
引き続き、Dataflow の他の使い方も試してみようと思います。