SESで受信したメールをLambdaで振り分けてみた

AmazonLambda

はじめに

オペレーションチームの高橋です。

今回はSESで受信したメールをS3に保存し、Lambdaでメールを振り分けてS3バケットの対象フォルダに移動(コピー)するというところまでの処理を先日サポートされた Python3.6でやってみました。

Announcement: AWS Lambda now supports Python 3.6

やりたいこと

SESで受信したEC2のリタイアメントやメンテナンスのメールをS3のフォルダに振り分ける。 その他のメールは削除。

  • EC2のリタイアメント通知の件名
    [Retirement Notification] Amazon EC2 Instance scheduled for retirement.
  • EC2のメンテナンス通知の件名
    Amazon EC2 Maintenance - Maintenance [AWS Account: 012345678910]

処理の流れ

  1. SESで受信したメールをS3バケットに保存する
  2. S3をトリガーにLambda Functionが起動する
  3. Lambdaによってメールを対象フォルダに振り分ける

S3バケットの用意

SESでメールを受信した際にメールを保存するS3バケットを用意します。 振り分けようのフォルダも作成しました。

S3 Bucket S3 Bucket

SESでメール受信設定

SESでのメール受信の方法は以下ブログを参考に設定します。

[新機能]Amazon SES でメール受信が出来るようになりました!

Rule SetsはS3 bucketの「ses/」にメールが入るようにしました。

Rule Sets

Lambdaの設定

トリガーはS3(put) → Lambda

トリガー

処理の内容

  • EC2のリタイアメント通知は「ec2_retirement」フォルダへコピーしコピー元のメールは削除
  • EC2のメンテナンス通知は「ec2_maintenance」フォルダへコピーしコピー元のメールは削除
  • その他のメールは削除

コード

# -*- coding: utf-8 -*-
from __future__ import print_function

import enum
from enum import Enum
import boto3
import json
import urllib.parse
import re
import email
from email.parser import FeedParser
from email.header import decode_header
import time

# S3 高レベルAPI
s3 = boto3.resource('s3')

# リトライ回数
RETRY_COUNT = 3

# 振り分けルールが増えた場合、追記していく
# 関数 check_email_subject 、execute_handler にも追加
class EMAIL_TYPE(Enum):
    RETIREMENT = 1
    MAINTENANCE = 2

# メールの件名を取得
def get_email_subject(email_object):
    (subject, subject_charaset) = decode_header(email_object['Subject'])[0]

    if subject_charaset == None:
        email_subject = subject
    else:
        email_subject = subject.decode(subject_charaset)

    print("Email Subject: %s" % email_subject)
    return email_subject

# メールオブジェクトをコピー(移動)
def copy_folder(s3_new_folder, s3_object_name, s3_file_key, bucket, key):
    # コピー先のファイル名
    new_file = '%s/%s' % (s3_new_folder, s3_object_name)
    print(new_file)
    for i in range(1, 1+RETRY_COUNT):
        obj_copy = s3.Object(bucket, new_file).copy_from(CopySource={'Bucket': bucket, 'Key': s3_file_key})
        if obj_copy['ResponseMetadata']['HTTPStatusCode'] == 200:
            print("copy %s to %s complete" % (s3_object_name, s3_new_folder))
            break
        else:
            print(obj_copy['ResponseMetadata']['HTTPStatusCode'])
            time.sleep(i)
    else:
        raise Exception('object copy failed')

# メールオブジェクトを削除
def delete_original_object(s3_object, s3_file_key):
    for i in range(1, 1+RETRY_COUNT):
        s3_obj_delete = s3_object.delete()
        print(s3_obj_delete)
        if s3_obj_delete['ResponseMetadata']['HTTPStatusCode'] == 204:
            print("delete %s complete" % s3_file_key)
            break
        else:
            print(s3_obj_delete['ResponseMetadata']['HTTPStatusCode'])
            time.sleep(i)
    else:
        raise Exception('object delete failed')

def check_email_subject(email_subject):
    # リタイアメント
    chk_ec2_retirement = r'\[Retirement Notification\] Amazon EC2 Instance scheduled for retirement.'
    # メンテナンス(リブート)
    chk_ec2_maintenance = r'Amazon EC2 Maintenance - Maintenance \[AWS Account: \d{12}\]'

    # サブジェクトの判定
    # 振り分けルールが増えた場合、追記していく
    if re.search(chk_ec2_retirement, email_subject):
        return EMAIL_TYPE.RETIREMENT
    elif re.search(chk_ec2_maintenance, email_subject):
        return EMAIL_TYPE.MAINTENANCE

def execute_handler(bucket, key):
    # オブジェクトを取得する
    s3_object = s3.Object(bucket, key)

    # オブジェクトのkey取得
    s3_file_key = s3_object.key

    # オブジェクトのkeyからファイル名を取得
    s3_object_name = s3_file_key.split("/")[-1]

    # オブジェクトの内容を取得
    s3_object_response = s3_object.get()

    # オブジェクトのBodyを取得
    email_body = s3_object_response['Body'].read().decode('utf-8')

    email_object = email.message_from_string(email_body)

    # メールの情報を取得する
    # サブジェクト
    email_subject = get_email_subject(email_object)

    # メールのサブジェクト判別
    email_type = check_email_subject(email_subject)

    # サブジェクト分別 対象メールが増えたら elif を追加する
    # リタイアメント
    if email_type == EMAIL_TYPE.RETIREMENT:
        copy_folder('ec2_retirement', s3_object_name, s3_file_key, bucket, key)
    # リブート(メンテナンス)
    elif email_type == EMAIL_TYPE.MAINTENANCE:
        copy_folder('ec2_maintenance', s3_object_name, s3_file_key, bucket, key)
    else:
        print('Non-target Email')

    #ファイルの削除
    delete_original_object(s3_object, s3_file_key)

def lambda_handler(event, context):
    # バケット名取得
    bucket = event['Records'][0]['s3']['bucket']['name']

    # オブジェクトのkey取得
    key = urllib.parse.unquote_plus(event['Records'][0]['s3']['object']['key'])
    print(key)
    # メイン
    execute_handler(bucket, key)

テスト

以下、件名のメールを送信

  • [Retirement Notification] Amazon EC2 Instance scheduled for retirement.
  • Amazon EC2 Maintenance - Maintenance [AWS Account: 012345678910]
  • テスト

CloudWatch Logsで確認

CloudWatch Logs

期待通りの挙動になっていることを確認

S3で振り分けができたか確認

  • ec2_retirementフォルダ

S3

  • ec2_maintenanceフォルダ

S3

  • sesフォルダ

S3

受信したメールが対象のフォルダに振り分けられていて、元のメールは削除されていることが確認できます。
今回は受信したメールを削除しましたが、S3のライフサイクルで一定期間は保存してもいいと思います。

まとめ

今回はPythonの勉強も兼ねて、SESで受信したメールをLambdaで選別してS3のフォルダに分けるところまでやってみました。
S3のフォルダに対象のメールが入ったらS3 EventsでSNS通知したり、
Lambda 関数を呼び出したりできるのでメール毎にアクションを決めることができると思います。

では、また。