Boto3(AWS SDK for Python)でSQSに送信したメッセージをLambdaでポーリングして受信してみた

2017.03.09

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

こんにちは、みかみです。

さくら缶だと中身もおいしく感じる不思議..(近頃スーパードライの消費率上がってます。。

はじめに

やりたいこと

  • boto3 で SQS を操作してみたい
  • スケジュールイベントで実行される Lambda関数 をつくってみたい
  • SQS に保持されたメッセージを Lambda でポーリングして受信したい

確認環境(local)

SQSの準備

キューの作成&メッセージ送信

Boto3(AWS SDK for Python)を使って、SQSにキューを作成&メッセージを送信します。

send_msg.py

import boto3

name = 'test-load-mikami'
sqs = boto3.resource('sqs')
try:
    # キューの名前を指定してインスタンスを取得
    queue = sqs.get_queue_by_name(QueueName=name)
except:
    # 指定したキューがない場合はexceptionが返るので、キューを作成
    queue = sqs.create_queue(QueueName=name)

# メッセージ×3をキューに送信
msg_num = 3
msg_list = [{'Id' : '{}'.format(i+1), 'MessageBody' : 'msg_{}'.format(i+1)} for i in range(msg_num)]
response = queue.send_messages(Entries=msg_list)
print(response)

コンソールから実行します。

C:\Users\mikami.yuki\work\test_sqs>py send_msg.py
{'Successful': [{'Id': '1', 'MessageId': '92011f4d-472c-40e5-b166-ef55738e0380', 'MD5OfMessageBody': '9980c8a9248139f14f4165e5d53088aa'}, {'Id': '2', 'MessageId': '8231a409-8a98-461f-bcfb-66ccb6f75cad', 'MD5OfMessageBody': '0fd4e86b2daa009cd9929641dbd7dab6'}, {'Id': '3', 'MessageId': '3ee20fce-5b98-491d-83f9-c7466ab1d018', 'MD5OfMessageBody': '8b30166735242c192258d4974f662a5f'}], 'ResponseMetadata': {'RequestId': 'b44ab343-85a0-5ebe-96a5-e27bed9e8c1d', 'HTTPStatusCode': 200, 'HTTPHeaders': {'server': 'Server', 'date': 'Tue, 07 Mar 2017 07:49:18 GMT', 'content-type': 'text/xml', 'content-length': '861', 'connection': 'keep-alive', 'x-amzn-requestid': 'b44ab343-85a0-5ebe-96a5-e27bed9e8c1d'}, 'RetryAttempts': 0}}

AWS管理コンソールから確認してみると、ちゃんとメッセージがエンキューされました。

send_queue

メッセージ受信確認

送信したメッセージがちゃんと取れるか確認してみます。

get_msg.py

import boto3

# キューの名前を指定して
name = 'test-load-mikami'
sqs = boto3.resource('sqs')
queue = sqs.get_queue_by_name(QueueName=name)
while True:
    # メッセージを取得
    msg_list = queue.receive_messages(MaxNumberOfMessages=10)
    if msg_list:
        for message in msg_list:
            print(message.body)
            message.delete()
    else:
        # メッセージがなくなったらbreak
        break

コンソールから実行してみると、ちゃんとメッセージのbodyがとれています。

C:\Users\mikami.yuki\work\test_sqs>py get_msg.py
msg_1
msg_2
msg_3

AWS管理コンソールから確認してみると、すべてのメッセージがデキューされました。

get_queue

boto3 receive_messages() の取得メッセージ数

SQS.Queue.receive_messages() のリファレンスに以下の記載がありました。

  • MaxNumberOfMessages で一度に受信するメッセージの最大数を指定可能
  • MaxNumberOfMessages には、1~10が指定できる
  • MaxNumberOfMessages を指定しなかった場合のデフォルト値は1

ふむふむ。

MaxNumberOfMessages=10 で receive_message() すれば、キューの中の3つのメッセージ全部取れるはず↓

for message in queue.receive_messages(MaxNumberOfMessages=10):
    print(message.body)
    message.delete()

と思ったのですが、なぜか、全部取れない。。(1個しか取れなかったリ、2個しか取れなかったり。。

よくよく読んでみると、、

MaxNumberOfMessages (integer) -- The maximum number of messages to return. Amazon SQS never returns more messages than this value (however, fewer messages might be returned). Valid values are 1 to 10. Default is 1.

however, fewer messages might be returned」(「ただし、返されるメッセージは少なくなります」by Google 翻訳)

なるほど、です。。(指定した数取れるとは限らないのですね。。

Lambdaの準備

Lambda関数の作成

AWS管理コンソールから、スケジュールイベントでSQSメッセージを受信するLambda関数を作成します。

「Lambda関数の作成」ボタンから、Python 2.7 の lambda-canaryを選択。

lambda_canary

コードは先ほどメッセージの受信確認に使用したものを、インライン編集で入力しました。

lambda_code

保存してテストしてみます。

START RequestId: 10ecf83a-0303-11e7-8cf7-05b21b9da4a5 Version: $LATEST
An error occurred (AWS.SimpleQueueService.NonExistentQueue) when calling the GetQueueUrl operation: The specified queue does not exist or you do not have access to it.: QueueDoesNotExist
Traceback (most recent call last):
  File "/var/task/lambda_function.py", line 14, in lambda_handler
    queue = sqs.get_queue_by_name(QueueName=name)
  File "/var/runtime/boto3/resources/factory.py", line 520, in do_action
    response = action(self, *args, **kwargs)
  File "/var/runtime/boto3/resources/action.py", line 83, in __call__
    response = getattr(parent.meta.client, operation_name)(**params)
  File "/var/runtime/botocore/client.py", line 253, in _api_call
    return self._make_api_call(operation_name, kwargs)
  File "/var/runtime/botocore/client.py", line 543, in _make_api_call
    raise error_class(parsed_response, operation_name)
QueueDoesNotExist: An error occurred (AWS.SimpleQueueService.NonExistentQueue) when calling the GetQueueUrl operation: The specified queue does not exist or you do not have access to it.

END RequestId: 10ecf83a-0303-11e7-8cf7-05b21b9da4a5
REPORT RequestId: 10ecf83a-0303-11e7-8cf7-05b21b9da4a5	Duration: 213.61 ms	Billed Duration: 300 ms 	Memory Size: 128 MB	Max Memory Used: 25 MB

エラーだそうで。。

「キューがない」とか言われてるのですが、メッセージ取得できることはローカル環境から確認済みなので、ロールを見直してみます。

ロールの追加

Lambda関数作成時、「Lambda 関数ハンドラおよびロール」の項目では、

  • 「ロール」プルダウンで「テンプレートから新しいロールを作成」
  • 「ポリシーテンプレート」プルダウンで「SQSポーリングのアクセス権限」

を選択しました。(これだけではダメらしい。。

AWS管理コンソール「IAM」の「ロール」から、先ほど作成したロール名を選択し、「ポリシーのアタッチ」で「AmazonSQSFullAccess」を追加します。

role

再度Lambdaをテスト。

lambda_ok

OKです!

スケジュールイベントで実行したLambdaからSQSメッセージを取得

Lambda関数作成時、「トリガーの設定」ページ「スケジュール式」項目で、ポーリング時間を指定しました。

lambda_schedule_set

テスト実行で問題なかったので、スケジュールイベントトリガを有効化します。

lambda_triger

待つこと1分・・・

→CloudWatchのログを確認してみます。

lambda_log

ちゃんとメッセージ受信できました!

※CloudWatchのロググループ&ログストリームは、Lambda関数のパラメータ context からも参照できます。

まとめ(所感)

SQS も Lambda も、案外手軽にお試しできました。

これぞフルマネージドなAWSの利点ですねv

(でも最近、apache Kafka や RabbitMQ + Celery もちょっと気になる。。

参考