Apache Airflow QuickStartを実行してみた

Apache Airflow公式サイトのQuickStartをためしてみたブログです。
2022.01.02

データアナリティクス事業本部のnkhrです。タイトルの通りApache AirflowのQuickStartをやってみたブログです。

Quick Startでは、以下の2つの実行方法を試すことができます。

  1. Running Airflow locally
  2. Running Airflow in Docker

本ブログでは、1のQuickStartを手順に従ってEC2上で実行するCloudformationコードを作成し、動作を確認します。

1のQuick Startの構成では、DBとしてサーバ上のSqliteを利用し、タスク実行はSequential Executor(ユーザが1度に実行できるタスクが1つのみ)を利用しているため、Test利用を想定しています。Production環境で利用する場合は、Databaseを別に構成し(RDSなど)、ExecuterはLocal Executor(シングルマシンの場合)やCelery Executor、Kubernetes Executorの利用が推奨されています。Productionでのデプロイ方法は公式サイトをご確認ください。

Running Airflow locally (EC2上での起動)

EC2上でAirflowを起動するためのテンプレートを作りました。

「▶EC2の起動とAirflowのインストールCfnコード」の「▶」をクリックすると、コードを表示できます。AirflowをインストールするEC2は、Public Subnetに配置し、特定のIPアドレスから接続できるようにします。Airflowのインストールは、QuickStartではpipによるインストール方法が記載されているため、EC2にはpython(今回はVersion 3.8を利用)の設定も行います。

コードを実行する前提として下記のインフラリソースを作成済みとします。

  • VPC 1つ(Cfn実行時にVPC IDを指定)
  • Public Subnet 1つ(Cfn実行時にSubnet IDを指定)
  • Internet Gateway
  • Routing Table(Public Subnetのルーティングテーブル。0.0.0.0/0はInternet Gatewayにルーティング)
  • Key Pair
EC2の起動とAirflowのインストールCfnコード
AWSTemplateFormatVersion: 2010-09-09

Parameters:
  VpcId:
    Type: String
    Default: vpc-xxxx
  SubnetId:
    Type: String
    Default: subnet-xxxx
  AccessCidr:
    Type: String
    Default: x.x.x.x/32
  KeyName:
    Type: AWS::EC2::KeyPair::KeyName
    Default: test-key
  AirflowPort:
    Type: String
    Default: 8080
  AmazonLinuxImageId:
    Type: AWS::SSM::Parameter::Value<AWS::EC2::Image::Id>
    Default: /aws/service/ami-amazon-linux-latest/amzn2-ami-hvm-x86_64-gp2
    Description: The AmazonLinux2 image id at least.
  InstanceType:
    Type: String
    Default: t3.micro
    AllowedValues:
      - t3.nano
      - t3.micro
      - t3.small    
  TimeZone:
    Type: String
    Default: Asia/Tokyo
  Locale:
    Type: String
    Default: ja_JP.utf8
  AirflowVersion:
    Type: String
    Default: 2.2.3

Metadata: 
  AWS::CloudFormation::Interface: 
    ParameterGroups: 
      - Label: 
          default: Airflow init parameters
        Parameters: 
          - VpcId
          - SubnetId
          - AccessCidr
          - KeyName

Resources:
  SecurityGroup:
    Type: AWS::EC2::SecurityGroup
    Properties: 
      VpcId: !Ref VpcId
      GroupName: airflow-localaccess-sg
      GroupDescription: !Sub localaccess-sg-${AWS::StackName}
      Tags:
        - Key: Name
          Value: airflow-localaccess-sg
      SecurityGroupIngress:
        - IpProtocol: tcp
          FromPort: !Ref AirflowPort
          ToPort: !Ref AirflowPort
          CidrIp: !Ref AccessCidr
        - IpProtocol: tcp
          FromPort: 22
          ToPort: 22
          CidrIp: !Ref AccessCidr

  EC2Role:
    Type: AWS::IAM::Role
    Properties:
      AssumeRolePolicyDocument:
        Statement:
        - Action: 'sts:AssumeRole'
          Effect: Allow
          Principal:
            Service: ec2.amazonaws.com
      MaxSessionDuration: 36000
      ManagedPolicyArns:
        - arn:aws:iam::aws:policy/AmazonSSMManagedInstanceCore
      Tags:
        - Key: Name
          Value: role-ec2-airflow

  IAMInstanceProfile:
    Type: AWS::IAM::InstanceProfile
    Properties:
      Roles: 
        - !Ref EC2Role
      Path: "/"

  AirflowServer:
    Type: AWS::EC2::Instance
    CreationPolicy: 
      ResourceSignal:
        Timeout: PT20M
    Properties: 
      IamInstanceProfile: !Ref IAMInstanceProfile
      ImageId: !Ref AmazonLinuxImageId
      InstanceType: !Ref InstanceType
      KeyName: !Ref KeyName
      InstanceInitiatedShutdownBehavior: stop
      DisableApiTermination: false
      Monitoring: False
      BlockDeviceMappings:    
        - DeviceName: /dev/xvda
          Ebs:
            VolumeType: gp3
            DeleteOnTermination: true
            Encrypted: true
            VolumeSize: 10
      NetworkInterfaces:
        - AssociatePublicIpAddress: true
          DeviceIndex: 0
          SubnetId: !Ref SubnetId
          GroupSet: 
            - !Ref SecurityGroup
      Tags: 
        - Key: Name
          Value: airflow-server
      UserData: 
        Fn::Base64: !Sub |
          #!/bin/bash
          echo "--------- Change Timezone/Locale ------------------------"
          timedatectl set-timezone ${TimeZone}
          localectl set-locale LANG=${Locale}

          echo "--------- Install & Configure Python --------------------"
          amazon-linux-extras enable python3.8
          yum -y install python3.8
          update-alternatives --install /usr/bin/python python /usr/bin/python3.8 1
          /usr/bin/pip3.8 install --upgrade pip
          update-alternatives --install /usr/bin/pip pip /usr/local/bin/pip3 1  
          python -V
          pip -V

          echo "--------- Change yum configuration ----------------------"
          sed -i -e '1s/python/python2/g' /bin/yum
          sed -i -e '1s/python/python2/g' /usr/libexec/urlgrabber-ext-down
          yum update -y

          echo "--------- Update Sqlite for airflow requirement ---------"
          yum install -y gcc-c++ python-devel python-setuptools

          wget https://www.sqlite.org/2021/sqlite-autoconf-3370000.tar.gz
          tar xzvf sqlite-autoconf-3370000.tar.gz
          cd sqlite-autoconf-3370000
          ./configure --prefix=/opt/sqlite/sqlite3
          make
          make install

          echo "--------- Install Airflow -------------------------------"
          groupadd airflow
          useradd airflow -g airflow
          passwd -d airflow
          su airflow
          cd ~
          echo 'export PATH="$PATH:/home/airflow/.local/bin"' >> ~/.bashrc
          echo 'export AIRFLOW_HOME=~/airflow' >> ~/.bashrc
          echo 'export LD_LIBRARY_PATH="/opt/sqlite/sqlite3/lib"' >> ~/.bashrc
          echo 'export LD_RUN_PATH="/opt/sqlite/sqlite3/lib"' >> ~/.bashrc
          source ~/.bashrc

          sqlite3 --version
          python -c "import sqlite3; print(sqlite3.sqlite_version)"
          
          mkdir ~/airflow
          AIRFLOW_VERSION=${AirflowVersion}
          PYTHON_VERSION="$(python --version | cut -d " " -f 2 | cut -d "." -f 1-2)"
          CONSTRAINT_URL="https://raw.githubusercontent.com/apache/airflow/constraints-$AIRFLOW_VERSION/constraints-$PYTHON_VERSION.txt"
          pip install "apache-airflow==$AIRFLOW_VERSION" --constraint "$CONSTRAINT_URL"

          echo "<EXECUTE> /opt/aws/bin/cfn-signal -e $? --stack ${AWS::StackName} --resource AirflowServer --region ${AWS::Region}"
          /opt/aws/bin/cfn-signal -e $? --stack ${AWS::StackName} --resource AirflowServer --region ${AWS::Region}

Outputs:
  AccessPublicIp:
    Value: !GetAtt AirflowServer.PublicIp
  AirflowURL:
    Value: !Sub "http://${AirflowServer.PublicIp}:${AirflowPort}"

Cfnでは以下のパラメータを実行環境に合わせて修正します。EC2サーバの起動処理は「/var/log/」配下の「cfn-init-xxx.log」または「cloud-init-xxx.log」に出力されます。

  • VpcId
  • SubnetId
  • AccessCidr :EC2に接続するInboundのIPアドレス範囲を指定します
  • KeyName:EC2にアタッチするKey Pairの名前

Standaloneでインストールした場合、DatabaseとしてEC2サーバ上のSQLLightを使用します。SQLLiteの利用はテスト環境でのみ推奨されており、Production環境でAirflowを利用する場合は、Standaloneモードを利用しないことが推奨されています。

Amazon Linux上でのAirflow Standalone構築Tips

  • pythonのパスをPython3系に変更した場合、yumはPython2系のみ対応しているためyumが利用できなくなります。今回は、yumがPython2を使うように設定ファイルのヘッダーを変更しています。その他の対応としては、python3コマンドのまま利用する(pythonとpython3を共存させる)方法があります。
  • AmazonLinux(2021/12時点)で利用できるSqliteのバージョンは、Airflowで必要とされるバージョンを満たしていません。そのため、Sqliteをソースコードからインストール・設定しています。

Airflow standaloneの起動・接続

Standaloneコマンドを実行した場合は、Adminユーザが自動で作成されます。

  1. EC2サーバに接続し、airflowユーザに切り替える sudo su - airflow
  2. Standaloneコマンド(All in One)を実行
        $ airflow standalone

  3. adminユーザが自動で作成され、パスワードが自動生成されるためメモする

  4. CfnのOutputに表示される「AirflowURL」にアクセスする(http://x.x.x.x:8080)

  5. 上記3でメモしたユーザ名、パスワードでアクセスできることを確認する

standaloneではなく、個別に各コンポーネントを設定・実行する場合は、以下のコマンドを実行する(standaloneの場合は、以下の処理が実行される)

## db initは初回実行時のみ必要(standaloneコマンドにはdb initも含まれる)
$ airflow db init
$ airflow users create \
--username admin \
--firstname Peter \
--lastname Parker \
--role Admin \
--email admin[@](mailto:spiderman@superhero.org)example.com

## バックグラウンド実行を行う場合は-Dを付ける
$ airflow webserver --port 8080 -D
$ airflow scheduler -D

Airflowで利用するファイル

Airflowをインストールすると以下のファイルが作成されます。

  • $AIRFLOW_HOME :デフォルトは「~/airflow」フォルダ
  • $AIRFLOW_HOME/airflow.cfg:設定ファイル
  • $AIRFLOW_HOME/airflow-webserver.pid:Webサーバ実行時のPIDファイル。systemdで起動した場合は「/run/airflow/webserver.pid」に作成される

Executorの変更方法

Airflowの設定ファイル「$AIRFLOW_HOME/airflow.cfg」の[core]セクションで利用するExecutorを指定する。Airflowが提供するExecutorタイプを設定するか、独自実装のカスタムExecutorを設定することもできる。

現在設定されているExecutorの確認には、以下のコマンドを実行する。

[airflow@ip-x-x-x-x ~]$ airflow config get-value core executor
SequentialExecutor

最後に

今回はApache Airflowの概要をつかむためにQuickStartを実行してみました。QuickStartの構成はProductionでは推奨されませんが、AirflowのPoCやテスト利用で活用する機会はあるかもしれません。AWSではManagedのAirflow(MWAA)が使えるためProductionでの利用を検討されてもよいかもしれません。

以上、nkhrでした。