Amazon KinesisとAWS WAFを利用して、サーバレスでリアルタイムな侵入防止システムを作ってみた

はじめに

AWSチームのすずきです。

ストリーミングデータの処理サービスのAmazon Kinesisを利用して、Webシステムのアクセスログ解析を実施し、 不正アクセス元と判定されたソースIP情報を、Lambdaを利用して、AWS WAFに反映する事で、 サーバレスでリアリタイムな侵入防止システム(IDS)を実現する機会がありました。

今回、その概要について紹介させて頂きます。

構成概要図

waf-kinesis-analytics-12

AWS WAF

  • ELB は WAFに対応した ALB (Application Load Balancer) を利用します。
  • AWS WAF は、指定したIPアドレス(Conditions)のアクセス制限を実現するルール、ACLを設定します。

ACLs

waf-kinesis-analytics-10

Rule

waf-kinesis-analytics-09

Conditions

waf-kinesis-analytics-08

EC2

nginx

  • EC2上で稼働するWebサーバとして、Nginxを利用しました
  • アクセスログは、パース等の利便性に優れたLTSV形式で出力する設定を行いました

  • 「nginx.conf」 設定抜粋

  http {
    log_format  ltsv "time:$time_iso8601\t"
    "remote_addr:$remote_addr\t"
    "request_method:$request_method\t"
    "request_length:$request_length\t"
    "request_uri:$request_uri\t"
    "uri:$uri\t"
    "query_string:$query_string\t"
    "status:$status\t"
    "bytes_sent:$bytes_sent\t"
    "body_bytes_sent:$body_bytes_sent\t"
    "http_referer:$http_referer\t"
    "http_user_agent:$http_user_agent\t"
    "http_x_forwarded_for:$http_x_forwarded_for\t"
    "http_x_forwarded_proto:$http_x_forwarded_proto\t"
    "request_time:$request_time\t"
    "upstream_response_time:$upstream_response_time\t"
    "upstream_cache_status:$upstream_cache_status\t"
  }

  server {
    access_log  /var/log/nginx/access_ltsv.log  ltsv;
  }

Fleutnd

  • ログコレクタとしてFluentd(td-agent)、AWSへのログ転送は、Kinesis Plugin(kinesis_firehose)を利用しました。

  • 「td-agent.conf」 抜粋

<source>
  type tail
  format ltsv
  tag nginx.access
  path /var/log/nginx/access_ltsv.log
  pos_file /var/log/td-agent/buffer/access.log.pos
</source>
<match nginx.access>
  type copy
  <store>
    @type kinesis_firehose
    region us-west-2
    delivery_stream_name firehose_name
    buffer_type file
    buffer_path /tmp/fluent.*.buffer
    buffer_chunk_limit 8m
    buffer_queue_limit 64
    retry_wait 30s
    flush_interval 10s
    retry_limit 5
    flush_at_shutdown true
  </store>
</match>
  • 「delivery_stream_name」にFirehoseで作成済みのストリーム名を指定します。 - Firehoseへのアクセス権はEC2、IAMロールで付与しています
  • バッファ、ログ送付間隔指定を行う事で、ログ転送のタイムラグを縮小しています。
  • 参考:fluent-plugin-kinesisでKinesis Streamsにログを送信する

Kinesis

  • 2016年12月時点で、Kinesis Firehose、Analyticsの利用可能なオレゴンリージョン(us-west-2)を利用しました。

waf-kinesis-analytics-05

Kinesis Firehose

  • 受信したデータ、一定時間バッファリングした後、S3へ自動保管するFirehoseを利用しました

waf-kinesis-analytics-06

  • Firehoseのデフォルトの上限 (2,000 トランザクション / 秒、5,000レコード / 秒、5 MB/ 秒)を超える利用が予想される場合、AWSサポートに対しFirehoseの上限緩和申請を実施します。
  • より多くのログ流量が見込まれる場合や、他のリアルタイム処理を併用する場合、KinesisStreamの利用も検討します。

  • AWS サービス制限(Amazon Kinesis Firehose の制限)

Kinesis Analytics

Source

  • Fluentdの出力先としたFirehoseを指定します
  • KinesisyAnalyticsは、Firehoseに到着したデータをほぼリアリタイム(数秒程度の遅延)で参照する事が可能です。
  • Kinesisに到着したアクセスログは、FluentdによりLTSV→JSON変換されているため、カラム定義などデフォルトのまま処理が可能です。

waf-kinesis-analytics-02

Real-time analytics

  • サンプルSQLとして提供されている「Aggregate function in a tumbling time window」を元に、30秒間に一定条件を満たした接続元IPアドレスを抽出する指定を行いました。
設定画面

waf-kinesis-analytics-03

SQL
CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" 
  (http_x_forwarded_for VARCHAR(64), uri VARCHAR(64), http_user_agent VARCHAR(64), request_count INTEGER, request_total_time INTEGER, row_time_iso9601 varchar(32));

CREATE OR REPLACE  PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"
SELECT 
  STREAM "http_x_forwarded_for", "uri" ,"http_user_agent"
  , COUNT(*) AS request_count
  , SUM("request_time") AS request_total_time
  , TIMESTAMP_TO_CHAR('yyyy-MM-dd',MIN("SOURCE_SQL_STREAM_001"."ROWTIME")) || 'T' || TIMESTAMP_TO_CHAR('HH:mm:ss', MIN("SOURCE_SQL_STREAM_001"."ROWTIME")) || 'Z' as row_time_iso9601
FROM "SOURCE_SQL_STREAM_001"
WHERE "http_user_agent" not like 'Amazon Route 53 Health Check Service%'
and "uri" not like '/wp-admin/%'
and "status" <> 304
GROUP BY "http_x_forwarded_for", "uri" ,"http_user_agent" , FLOOR(("SOURCE_SQL_STREAM_001".ROWTIME - TIMESTAMP '1970-01-01 00:00:00') SECOND / 30 TO SECOND)
HAVING COUNT(*) > 20
;
  • 30秒間に20回以上、同一URIへのリクエストが行われたソースIPアドレス(http_x_forwarded_for)を抽出しています。
  • 標準SQL準拠した記述により、UA、URI、HTTPレスポンスコードなどによるレコードの絞込、除外を実施しています。
  • 後段のLambdaでの処理やログ記録用の情報として、ブロック対象とするIPアドレス以外に、URI、UA、リクエスト数、総所要時間、ISO9601書式の時刻情報を出力します。

  • 参考リンク: AWS Kinesis Analytics » SQL Reference » Standard SQL Operators

Destination

  • 先の「analytics」で抽出されたレコードの出力先を指定します。
  • Lambda連携の為、出力先は 事前に作成した Kinesis Stream、出力形式は「JSON」としました。

waf-kinesis-analytics-04

Kinesis Stream

  • Lambda連携用のKinesis Stream、流量は多くない事が見込まれるため、最小構成(シャード:1)で設置します

waf-kinesis-analytics-07

Lambda

  • KinesisAnalyticsにより、不正アクセス元とされたIPアドレス、WAFのアクセス制限情報(Condtion)として反映します
  • KinesisStreamのイベントトリガとして設定し、Lambdaの環境変数「IPSetId」で指定したIPリストを更新を行います。

コード

import os
import base64
import json
import boto3
client = boto3.client('waf-regional', region_name='ap-northeast-1')

def lambda_handler(event, context):
    for record in event['Records']:
        payload = base64.b64decode(record['kinesis']['data'])
        data = json.loads(payload)

        x_forwarded_for = str(data["HTTP_X_FORWARDED_FOR"])

        blockip = str(x_forwarded_for.split(",")[0]) + "/32"
        print(blockip)

        response = client.update_ip_set(
        IPSetId = os.environ['IPSetId'],
        ChangeToken = client.get_change_token()['ChangeToken'],
        Updates=[
            {
                'Action': 'INSERT',
                'IPSetDescriptor': {
                'Type': 'IPV4',
                'Value': blockip
                }
            },
        ]
    )

更新結果

  • Lambdaのイベントを有効化する事で、WAFで利用されるIPリストが更新されるようになりました。

waf-kinesis-analytics-11

まとめ

Kinesis(Firehose、Analytcs、Stream)と、AWS WAFをLambdaで連携させる事で、 不正なアクセス発生をニアリアルタイムに解析し、発生から数分でアクセス遮断を実現する仕組みを サーバレスに実現する事ができました。

EC2を利用した不正アクセス対策を導入済みの環境でも、攻撃によりEC2のリソースがボトルネックとなり、 DDoS攻撃が成立してしまう場合がありましたが、AWSのマネジメントサービスのWAFとKinesis(Firehose、Analytics)は、 必要な性能はAWS側で担保された状態で利用する事が期待できます。

また、EC2のみで攻撃に耐えるインフラをオートスケールなどで実現した場合、 初期の構築コストや、増強中のリソース費用が負担となる事がありますが、 AWS WAFにより不正アクセスの一部を食い止める事で、AWS利用の最適化にも繋がると思われます。

今回紹介させて頂いたのは、簡単な条件を用いた不正アクセスのブロックの実現でしたが、 保護ルールの管理や、システムに合わせた最適化に向けたログの可視化などについては、別の機会に紹介させて頂ければと思います。

参考リンク

AWSブログ

How to Configure Rate-Based Blacklisting with AWS WAF and AWS Lambda

  • CloudFrontのアクセスログをLambdaで解析し、不正アクセス元IPをAWS WAFでブロックする仕組みが紹介されています。
  • 今回の保護対象は、CloudFront(CDN)経由での公開が難しいシステムだった事。また、不正アクセス元の遮断、短いタイムラグで実現するために、AWS側で取得するログではなく、Nginxで取得したログをKinesisで処理する仕組みを導入しました。

Norikra

Norikra+FluentdでDoS攻撃をブロックする仕組みを作ってみた

  • 今回紹介させて頂いた内容と処理概要は同一です
  • 不正アクセス判定に利用するCEPエンジン、KinesisAnalyticsの替わりにNorikraが利用されています。
  • 不正アクセスの遮断、WAFの替わりにVPCのNACLを利用しています。