
Amazon MWAAに依存ライブラリを追加してみた
この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。
こんにちは。サービスグループの武田です。
本日発表されたAWSのマネージドAirflowであるAmazon MWAA(Managed Workflow for Apache Airflow)で遊んでいます。
今回は前回作ったAirflowの環境にライブラリを追加する手順を確認してみました。
ライブラリの前提
次のライブラリは追加の必要なく使用できます。
- Airflowが依存しているライブラリ
- AWSが追加しているライブラリ
そもそもAirflowが依存しているライブラリはインストールされているので、自分で追加する必要がありません。たとえばPandasやNumPy、Pendulumなどがあげられます。次に、AWSが追加しているライブラリが2つあり、psycopg2とbotoはセットアップ済みとなっています。MWAAのメタデータストアとしておそらくPostgreSQLを使っているのと、botoはまぁAWS環境ですしね。
というわけで、それ以外のライブラリが別途必要な場合には、自分で追加する必要があります。
今回のゴール
今回はスクレイピングライブラリとして人気のあるBeautifulSoupを追加して、Developers.IOのトップページからタイトルを抜き出してみます。
依存ライブラリがない状態を確認
まずはDAGの中で使用しているライブラリが環境にない場合を確認してみます。Airflowに追加するDAGとして次のスクリプトを用意しました。
import urllib.request
import airflow
from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator
from bs4 import BeautifulSoup
args = {
    "owner": "airflow",
    "start_date": airflow.utils.dates.days_ago(2),
    "provide_context": True,
}
def get_devio_top_page(url, **context):
    return urllib.request.urlopen(url).read().decode("utf-8")
def print_title(html, **context):
    soup = BeautifulSoup(html, "html.parser")
    print(soup.find("title"))
with DAG(
    dag_id="scraping_test",
    default_args=args,
    schedule_interval=None,
) as dag:
    t1 = PythonOperator(
        task_id="task1",
        python_callable=get_devio_top_page,
        op_kwargs={"url": "https://dev.classmethod.jp/"},
    )
    t2 = PythonOperator(
        task_id="task2",
        python_callable=print_title,
        op_kwargs={"html": '{{ ti.xcom_pull(task_ids="task1") }}'},
    )
    t1 >> t2
task1はDevIOのトップページを取得しているタスクです。標準ライブラリのurllibを使っているごく普通のプログラムです。task2がBeautifulSoupを使ってページのタイトルを抜き出してログに出す処理をしています。ページの中身は前のタスクで取得しているので、xcom経由で受け渡しています。
このファイルをS3にアップロードします。
しばらく待ってからAirflowの管理画面を開くと次のようにエラーメッセージが出ていました。
bs4のモジュールがないよってことでエラーになっています。それではrequirements.txtを用意して環境にライブラリを追加していきましょう。
依存ライブラリを追加してみた
MWAAの環境に依存ライブラリを追加するには、まずそれが書かれているrequirements.txtを用意します。今回必要となったのはBeautifulSoupだけですので、それを記述します。
beautifulsoup4==4.9.3
現時点ではPEP 508に準拠はしておらず、pip freezeでの生成は非推奨とのことです。
用意したファイルはS3バケットにアップロードします。MWAAがアクセスできれば任意の場所で問題ないはずです。今回はdagsフォルダーと同じ階層に置きました。
マネジメントコンソールでMWAAにアクセスし、更新する環境にチェックを入れて[編集]をクリックします。
要件ファイルの項目で、先ほどアップロードしたrequirements.txtを選択します。ほかは変更せずに[保存]をクリックします。
環境の更新が始まります。
約20分ほどで更新が完了しました。環境クラスやWorkerインスタンスの台数によっては、この時間が前後するかもしれません。
先ほどエラーが出ていたAirflowの管理画面をもう一度開き、ページをリロードしてみます。今度は問題なくDAGが追加されていました!
DAGを有効にして手動で実行してみると、タスクが実行されました。
task2のログを確認してみると、DevIOのタイトルが出力されています!
まとめ
Pythonには豊富なライブラリがそろっていますが、インストールしなければ使えません。MWAAの環境でも問題なくライブラリを追加できますので、安心してDAGを構築できそうですね。


















