[AWS IoT Core]容量が大きいファイルを取り扱う際は認証情報プロバイダーを使用してS3に直接アップロードする
はじめに
コンサルティング部の神野です。
デバイスからファイルなどの画像を送信する際にMQTTのペイロード上限は128KBのため大きいファイルの取り扱いは難しいことがあります。
困ったなーと思っていたらAWS IoT Coreに備わっている機能である認証情報プロバイダーを活用してデバイスから直接S3にファイルをアップロードできることに気づきました。
今回は認証情報プロバイダーを使って、TerraformでS3にデバイスから直接アップロードする方法を実装していきます。
デバイスが認証情報プロバイダーを使ったAWSリソースのアクセス方法
公式が認証情報プロバイダーについて解説していて、以下引用です。
引用:AWS IoT Core 認証情報プロバイダーを使用した AWS サービスへの直接呼び出しの許可
- AWS IoT Core デバイスは、認証情報プロバイダーにセキュリティトークンをHTTPSリクエストします。このリクエストには、認証のためのデバイスの X.509 証明書が含まれています。
- 認証情報プロバイダーは、証明書を検証し、デバイスにセキュリティトークンをリクエストするアクセス許可があることを確認するために、リクエストを AWS IoT Core 認証および認可モジュールに転送します。
- 証明書が有効で、セキュリティトークンをリクエストするアクセス許可がある場合、 AWS IoT Core 認証および認可モジュールは成功を返します。それ以外の場合は、デバイスに例外が送信されます。
- 証明書が正常に検証されると、認証情報プロバイダーは AWS Security Token Service (AWS STS) を呼び出して、作成したIAMロールを引き受けます。
- AWS STS は、権限が制限された一時的なセキュリティトークンを認証情報プロバイダーに返します。
- 認証情報プロバイダーは、デバイスにセキュリティトークンを返します。
- デバイスはセキュリティトークンを使用して、署名バージョン 4 で AWS リクエストに署名します。
- リクエストされたサービスは IAMを呼び出して署名を検証し、認証情報プロバイダー用に作成したIAMロールにアタッチされたアクセスポリシーに対してリクエストを承認します。
- IAMが署名を正常に検証し、リクエストを承認すると、リクエストは成功します。それ以外の場合は、IAM は例外を送信します。
かなりざっとまとめると、
- デバイスがAWS IoT Coreの認証情報プロバイダーにリクエストを送信し、デバイス証明書の権限が適切か検証
- 検証の結果問題なければSecurity Tokenを返却
- デバイスはそのSecurity Tokenを使ってAWSのリソースにアクセス可能になる
という流れとなります。
実装イメージ
ペイロードが小さい場合
- MQTTでファイルを直接送信する
ファイルをBase64にエンコードしてMQTTで直接連携し、Lambdaなどの後段のサービスで処理するイメージです。
ペイロードが大きい場合
- AWS IoT Coreの認証情報プロバイダーにアクセスして、S3へのアクセス権限を取得する
- S3にファイルをアップロードする
- アップロードしたファイルパスをMQTTで連携する
ファイルのペイロードが大きい場合は、MQTTのペイロード上限を超過しないように事前にファイルをアップロードして、そのURLを連携する方式となります。
認証情報プロバイダーを経由して、一時的なアクセストークンを取得するのが今回のポイントとなります。
構築
準備
前回の記事に従ってTerraformを使ってデバイスと見立てたEC2を作成します。
この構築用のコードをベースに今回は差分でコードを追記していきます。
下記に前回の記事で紹介したレポジトリと今回の記事で使用したコードのレポジトリを貼っていますので、参考までに活用いただけますと幸いです。
前回紹介記事のコード
今回実施のコード
作成
今回は認証情報プロバイダーを使ってデバイスがS3にファイルをアップロードできるように構築を進めていきます。
iot.tf
下記リソースを追加で実装します。
- IoTポリシーの追加
file/upload
トピックへのPublish権限- 作成するIoT ロール エイリアスが権限を引き受けることを可能にするポリシー
- アップロードファイル用のS3バケット
- S3ファイルアップロード用のIAMロール
- S3にアップロード可能なポリシーを保持
- デバイスが使用するロール
- IoT ロール エイリアス
- S3バケットへアップロード可能な権限を持ったIAM ロールと紐付け
- デバイスがIAMロールを間接的に引き受けるために、このIoT ロール エイリアスを仲介に挟む必要があります。
- デバイス証明書 ↔︎ IoT ロール エイリアス ↔︎ IAMロールのイメージです。
- デバイスがIAMロールを間接的に引き受けるために、このIoT ロール エイリアスを仲介に挟む必要があります。
- S3バケットへアップロード可能な権限を持ったIAM ロールと紐付け
- 認証情報プロバイダー用のIoTエンドポイント
- このエンドポイントにアクセスして、一時的なリソースのアクセス権限を取得
# IoTポリシーを作成
# このポリシーは特定のトピックに対する操作を許可
resource "aws_iot_policy" "pubsub" {
name = "PubSubToSpecificTopic"
policy = jsonencode({
Version = "2012-10-17"
Statement = [
{
# 特定のクライアントの接続を許可
Effect = "Allow"
Action = ["iot:Connect"]
Resource = ["arn:aws:iot:${data.aws_region.current.name}:${data.aws_caller_identity.current.account_id}:client/${aws_iot_thing.example.name}"]
},
{
# 特定のトピックへの発行と受信を許可
Effect = "Allow"
Action = ["iot:Publish", "iot:Receive"]
+ Resource = ["arn:aws:iot:${data.aws_region.current.name}:${data.aws_caller_identity.current.account_id}:topic/my/test/topic", "arn:aws:iot:${data.aws_region.current.name}:${data.aws_caller_identity.current.account_id}:topic/file/upload"]
},
{
# 特定のトピックフィルターへのサブスクリプションを許可
Effect = "Allow"
Action = ["iot:Subscribe"]
+ Resource = ["arn:aws:iot:${data.aws_region.current.name}:${data.aws_caller_identity.current.account_id}:topicfilter/my/test/topic"]
},
{
# 認証情報プロパイダーの許可
Effect = "Allow"
Action = ["iot:AssumeRoleWithCertificate"]
+ Resource = "${aws_iot_role_alias.this.arn}"
}
]
})
}
+ # IAMロールの作成(認証情報プロバイダー用)
+ resource "aws_iam_role" "iot_device" {
+ name = "iot_device_role"
+ assume_role_policy = jsonencode({
+ Version = "2012-10-17"
+ Statement = [{
+ Action = "sts:AssumeRole"
+ Effect = "Allow"
+ Principal = {
+ Service = "credentials.iot.amazonaws.com"
+ }
+ }]
+ })
+ }
+ # S3バケットの作成
+ resource "aws_s3_bucket" "this" {
+ bucket = "<your-account-bucket>"
+ }
+ resource "aws_s3_bucket_public_access_block" "this" {
+ bucket = aws_s3_bucket.this.bucket
+ block_public_acls = true
+ block_public_policy = true
+ ignore_public_acls = true
+ restrict_public_buckets = true
+ }
+ # S3へのアクセス許可ポリシー
+ resource "aws_iam_policy" "s3_iot_device_policy" {
+ name = "s3_put_policy"
+ description = "putObject Policy from Device"
+
+ policy = jsonencode({
+ Version = "2012-10-17"
+ Statement = [{
+ Effect = "Allow"
+ Action = [
+ "s3:PutObject"
+ ]
+ Resource = "arn:aws:s3:::${aws_s3_bucket.this.bucket}/*"
+ }]
+ })
+ }
+ # ポリシーとロールの紐付け
+ resource "aws_iam_role_policy_attachment" "s3_put_policy_attachment" {
+ role = aws_iam_role.iot_device.name
+ policy_arn = aws_iam_policy.s3_iot_device_policy.arn
+ }
+ # 認証情報プロバイダー用のロールエイリアス
+ resource "aws_iot_role_alias" "this" {
+ alias = "device-iot-role-alias"
+ role_arn = aws_iam_role.iot_device.arn
+ }
+ # 認証情報プロバイダーのエンドポイントを取得する
+ data "aws_iot_endpoint" "credential" {
+ endpoint_type = "iot:CredentialProvider"
+ }
ec2.tf
ファイルアップロードを実行するPythonファイルupload_file.py
を後で追加するので、下記項目の置換処理を追記します。
- IoT エンドポイント(
iot_endpoint
) - IoT 認証情報エンドポイント(
iot_cred_endpoint
) - IoT ロール エイリアス(
role_alias
) - デバイスがファイルをアップロードするS3バケット名(
target_bucket_name
)
# テンプレートファイルの読み込みとローカル変数の設定
locals {
iot_pubsub_script = templatefile("${path.module}/scripts/iot_pubsub.py", {
iot_endpoint = data.aws_iot_endpoint.data.endpoint_address
})
+ upload_file_script = templatefile("${path.module}/scripts/upload_file.py", {
+ iot_endpoint = data.aws_iot_endpoint.data.endpoint_address
+ iot_cred_endpoint = data.aws_iot_endpoint.credential.endpoint_address
+ role_alias = aws_iot_role_alias.this.alias
+ target_bucket_name = aws_s3_bucket.this.bucket
+ })
setup_script = templatefile("${path.module}/scripts/setup.sh", {
iot_pubsub_script = local.iot_pubsub_script
+ upload_file_script = local.upload_file_script
aws_region = data.aws_region.current.name
})
}
setup.sh
ファイルアップロード用のスクリプトupload_file.py
作成処理を追記します。
#!/bin/bash
yum update -y
yum install -y python3 python3-pip jq
pip3 install AWSIoTPythonSDK boto3
# Retrieve secrets from Secrets Manager
SECRET=$(aws secretsmanager get-secret-value --secret-id iot_certificate_test --region ${aws_region} --query SecretString --output text)
# Extract certificate and private key
echo $SECRET | jq -r '.certificate_pem' > /home/ec2-user/certificate.pem
echo $SECRET | jq -r '.private_key' > /home/ec2-user/private.key
# Download root CA
curl https://www.amazontrust.com/repository/AmazonRootCA1.pem -o /home/ec2-user/root-ca.pem
# Copy the Python script to the EC2 instance
cat <<EOT > /home/ec2-user/iot_pubsub.py
${iot_pubsub_script}
EOT
+ # Copy the Python script to the EC2 instance
+ cat <<EOT > /home/ec2-user/upload_file.py
+ ${upload_file_script}
+ EOT
# Set appropriate permissions
chown ec2-user:ec2-user /home/ec2-user/*.pem /home/ec2-user/*.key /home/ec2-user/*.py
chmod 600 /home/ec2-user/*.pem /home/ec2-user/*.key
chmod 644 /home/ec2-user/*.py
upload_file.py
該当ファイルサイズを取得して、閾値の50KBを超えているかチェックします。チェック結果に応じて処理を分岐させます。
- 閾値を超えている場合は直接S3にアップロードして、そのファイルURLをMQTTで送信
- 認証情報プロバイダーにリクエストを送信し、アクセストークンを取得
- 取得したアクセストークンを使ってS3にファイルをアップロード
- 閾値を超えていない場合はBase64でファイルをエンコードして、MQTTで直接送信
import time
import json
from AWSIoTPythonSDK.MQTTLib import AWSIoTMQTTClient
import boto3
import json
import http.client
import ssl
import sys
import os
import base64
# エンドポイントの定義
iot_endpoint = "${iot_endpoint}"
iot_cred_endpoint = "${iot_cred_endpoint}"
# アップロード先のバケット
target_bucket_name = "${target_bucket_name}"
# MQTTクライアントの作成
myMQTTClient = AWSIoTMQTTClient("example-thing")
myMQTTClient.configureEndpoint(iot_endpoint, 8883)
# 認証情報の設定
myMQTTClient.configureCredentials(
"/home/ec2-user/root-ca.pem",
"/home/ec2-user/private.key",
"/home/ec2-user/certificate.pem"
)
# IoT Coreへ接続
myMQTTClient.connect()
# ファイルサイズの閾値(50KB)
SIZE_THRESHOLD = 51200
def get_file_size(file_path):
return os.path.getsize(file_path)
def upload_s3(file_path):
file_size = get_file_size(file_path)
if file_size > SIZE_THRESHOLD:
# ファイルサイズが閾値を超える場合、S3にアップロード
context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
context.load_cert_chain("/home/ec2-user/certificate.pem", "/home/ec2-user/private.key")
context.load_verify_locations("/home/ec2-user/root-ca.pem")
context.verify_mode = ssl.CERT_REQUIRED
context.check_hostname = True
headers = {
"x-amzn-iot-thingname": "example-thing"
}
conn = http.client.HTTPSConnection(iot_cred_endpoint, port=443, context=context)
# 認証情報プロバイダーとIoTエイリアスに紐づくURLからトークンを取得
conn.request("GET", "/role-aliases/${role_alias}/credentials", headers=headers)
response = conn.getresponse()
credential_data = json.loads(response.read())
# 取得した認証情報からS3クライアントを作成
s3 = boto3.client("s3",aws_access_key_id=credential_data['credentials']['accessKeyId'],
aws_secret_access_key=credential_data['credentials']['secretAccessKey'],
aws_session_token=credential_data['credentials']['sessionToken'])
file_name = os.path.basename(file_path)
s3.upload_file(file_path, target_bucket_name, file_name)
# S3からURLを取得
s3_url = f"https://{target_bucket_name}.s3.amazonaws.com/{file_name}"
message = {"type": "s3_url", "url": s3_url}
myMQTTClient.publish("file/upload", json.dumps(message), 1)
print("Successfully sent file directly to S3")
else:
# ファイルサイズが閾値以下の場合、MQTTで直接送信
with open(file_path, "rb") as file:
file_content = file.read()
file_content_base64 = base64.b64encode(file_content).decode('ascii')
message = {
"type": "file_content",
"content": file_content_base64
}
myMQTTClient.publish("file/upload", json.dumps(message), 1)
print("Successfully sent file directly via MQTT")
# スクリプトファイル実行時のメイン関数
if __name__ == "__main__":
if len(sys.argv) != 2:
print("Usage: python script.py <file_name>")
sys.exit(1)
file_name = sys.argv[1]
file_path = os.path.join(os.getcwd(), file_name)
if not os.path.exists(file_path):
print(f"Error: File '{file_name}' does not exist in the current directory.")
sys.exit(1)
upload_s3(file_path)
動作確認
Session Manager
Session Managerから該当のEC2インスタンスへログインして操作します。
ec2-user
でログイン- アップロード用のダミーファイルを作成
- 5KB
- 1MB
upload_file.py
ファイルを実行- 5KBのファイルを引数
- 1MBのファイルを引数
# ec2-userでログイン
sudo su --login ec2-user
# 5KBのファイルを作成
fallocate -l 5K light_file.txt
# 1MBのファイルを作成
fallocate -l 1M large_file.txt
# 5KBのファイルを引数にして、Pythonのスクリプトを実行
python3 upload_file.py light_file.txt
# 実行ログ
Successfully sent file directly via MQTT
# 1MBのファイルを引数にして、Pythonのスクリプトを実行
python3 upload_file.py large_file.txt
# 実行ログ
Successfully sent file directly to S3
S3
S3バケットのスクリーンショット
1MBのファイルも適切にアップロードされていますね!
AWS IoT Core
AWS IoT Coreの機能で備わっている「MQTTテストクライアント」で、トピックfile/upload
にファイルが適切に送信されているか確認します。
AWS IoT Core MQTTテストクライアントのスクリーンショット
下のメッセージが閾値未満のファイル、上のメッセージが閾値を超過しているファイルで、どちらも適切な方法でそれぞれ送信されていますね!
今回はここで終了ですが、実際にはこのメッセージをトリガーにするIoT RuleでLambda関数をInvokeしてtype
に応じて処理を分岐させるようなイメージで活用します。
おわりに
MQTTのペイロード上限を超過するようなファイルをアップロードする際に、認証情報プロバイダーを使用する方法はいかがだったでしょうか?他にもS3の署名付きURLを使用してアップロードする方法もあるので今後ご紹介させていただければと思います!
本記事が少しでも参考になりましたら幸いです。
参考
記事作成にあたり下記記事を参考にさせていただきました。