【製造業 IoT の第一歩】三菱電機のシーケンサ MELSEC から取得した設備データを AWS に保存するまで

製造業の IoT 導入はとても奥が深い世界です。実際に試してみた内容や考慮したポイントの一端をご紹介します。
2022.12.23

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

概要

最近、製造業の設備データを収集して AWS で可視化・活用したいというお問い合わせを受けることが多くなりました。

特に、一定の規模以上の工場などでは、PLC (Programmable Logic Controller)という設備機器を制御する装置が利用されていることが多く、この PLC から直接データを取得したいというケースがあります。

そこで今回は、この PLC からデータを取得して、時系列データベースである Amazon Timestream に保存するまでの手順を紹介したいと思います。

余談

製造業においては、「シーケンサ」というと一般的に PLC 全般のことを指して表現されることが多いですが、シーケンサは 三菱電機の PLC 製品名です。
しかし、国内の PLC 製品のシェアは三菱電機の PLC が「60%」を占めると言われており、多くの現場に導入されているため「PLC = シーケンサ」と捉えられることがあります。

PLC からデータ取得するための課題

言葉で「PLC からデータ取得」と書くのは簡単ですが、実際に試してみるといくつか課題があることが分かりました。

多種多様な産業用通信プロトコル

製造業の現場における産業用ネットワークでは、IT とは違い非常に多くの通信プロトコルが使われており、メーカー独自のプロトコルも珍しくはありません。
そういった事情もあり、PLC においてもメーカーやデバイスのシリーズ等によって、利用できるプロトコルが異なります。

また、Ethernet で接続できる場合もあれば、RS485 といったシリアル通信が必要な場合もあります。これら全てをサポートするゲートウェイ機器を探したり作るのは現実的ではありません。産業用通信プロトコルの仕様に明るくなければ、ゼロから通信プログラムを書くことも時間がかかります。

なお、AWS のサービスとしては、AWS IoT SiteWise が OPC UA プロトコルをサポートしており、ゲートウェイアプリをデバイスにインストールすることで、設備データを AWS に転送することができます。
詳細は下記の記事をご覧頂ければと思います。

プロトコル変換

上記の通り、工場や倉庫などの機器間で使われているプロトコルでデータを取得した後、クラウド側が期待するプロトコルやフォーマットにデータを変換する必要があります。

たとえば、AWS IoT Core であれば JSON 形式でデータを受けることが期待された機能が多くあります。そのためゲートウェイ機器などで事前にプロトコルを変換して、必要に応じてデータフォーマットも変更する必要があります。

産業用通信プロトコルの理解

ゲートウェイを外部から調達するか自前で作成するか、いずれにしても産業用通信プロトコルの理解は欠かせません。
今回は、三菱電機のマニュアルを読んでプロコトルの仕様理解を進めながら PLC との接続設定を試行錯誤しました。(メーカーのサポートに質問の問い合わせも行いました。)

PLC に精通している方であれば大きな課題にはならないと思いますが、IT と製造業の両方に詳しい人材が少ないということが、製造業の IT 導入の課題となっていることを痛感しました。

クラウドへデータ送信するためのネットワーク

製造業の現場では、ネットワークが構内に閉じていることが多くあります。そのため現場の稼働状況などを知りたい場合は、現地まで行かないと分からないという課題があります。

今回のようなケースでも、PLC から収集したデータをクラウドまで送らないとあまり意味がありません。そのためにはモバイルルーターなどを用意する必要があります。

今回は、ゲートウェイが MVNO 各社の SIM による LTE 通信をサポートしていたので、「SORACOM」 を利用することにしました。

利用するゲートウェイ機器

今回は時間があまりなかったので、上記のような課題を速やかに解決するため、既製品のゲートウェイを調達して利用することにしました。
今回使ってみたのは、株式会社iND 様の「LMG-300」という PLC 対応 IoTゲートウェイです。

実物はコンパクトな筐体になっています。写真に写っているようにプラボックスなどに固定できるネジ穴もあるので、しっかり固定して設置できるようになっています。

lmg_300

このゲートウェイは次のような特徴を持っています。

  • 三菱のシーケンサ「MELSEC」に対応
    • MELSEC Qシリーズ
    • MELSEC iQ-Rシリーズ
  • MC プロトコルで PLC と接続してデータ収集可能
  • MQTT プロトコルに対応しており AWS IoT Core に Pub/Sub 通信が可能
  • SIM フリーで SORACOM Air などが利用可能

このような特徴から、PLC にゲートウェイを接続するだけで、MC プロトコル〜 MQTT のプロトコル変換も行ってくれるのでとても便利です。

ゲートウェイの設定については、詳細なマニュアルがホームページからダウンロードできるので、本記事では手順の紹介は割愛させていただきます。

注意点

データ取得上の制限

LMG 300 の仕様として、取得できるデータ種別は下記のとおりです。

  • ラッチリレー(L)
    • 登録可能数:200 点
  • データレジスタ(D)
    • 登録可能数:50 点

またそれぞれのデータ取得は、指定したデータ取得開始アドレスからの連続値となります。つまり、取得したいデータのアドレスだけを個々に指定してデータを取得するということができません。

そのため実際に設定する場合は、どのデータを取得したいのか事前に検討しておく必要があります。また、どうしても取りたいデータが取得可能なアドレス範囲になければ、PLC の設定を変更してアドレスを変更する必要もある点に注意が必要です。

このように LMG-300 は「小規模なデータを用いてクラウドへデータ収集する」用途に向いていると思いますが、ホームページの記載では追加開発も可能とのことですので、ご入用でしたらお問い合わせも検討いただくといいかと思います。

SORACOM で SoftBank 回線を利用したい場合の注意点

SORACOM 自体は NTT ドコモ、KDDI、SoftBank といったの国内主要キャリアをサポートしていますが、SoftBank 回線が使える SIM の条件として「デバイスがローミングをサポート」している必要があります。
LMG-300 はローミングをサポートしていないので、SoftBank 回線が利用できない点に注意が必要です。

AWS 側の考慮ポイント

デバイス側の検討は完了したので、次は AWS 側の構成を考えます。
今回は下記の3点がポイントになりました。

  • AWS 側に送られるデータ長がタイミングによって動的に変わる
  • Amazon Timestream にはマルチメジャーレコードで格納したい
  • Amazon Timestream の各種クォータに注意する

AWS 側に送られるデータ長がタイミングによって動的に変わる

AWS に送るデータ長がタイミングにより変わるというのは、ゲートウェイの仕様によるものです。例えば、インターネットとの通信断が発生したときにデータをローカルで保存しておき、復旧した際にまとめて一つのペイロードにして送る、といった仕様があります。

具体的には、「15:10:00」時点だけの情報となっている場合もあれば、「15:10:10」,「15:10:11」,「15:10:12」の3つの時点の情報が1つのデータとして送られることもある、というものです。

(なお、AWS IoT Core でサポートしているペイロードの最大値は 128 KB ですが、このゲートウェイはデータが 128 KB 以上の場合、データを分割して送信する仕様になっています。)

Amazon Timestream にはマルチメジャーレコードで格納したい

2022 年 12 月 22 日現在、AWS IoT Core にある Timestream のルールアクションは、シングルメジャーレコードのみサポートしています。上記の理由もあり今回は、Lambda 関数でマルチメジャーレコードにしてデータを格納します。

Amazon Timestream の各種クォータに注意する

Amazon Timestream では、WriteRecords API の制限として「一度に書き込みできるのは 最大 100 レコードまで」という制限があります。

今回利用するゲートウェイは、タイミング次第で一度に複数の時間分のデータを送ることがあるので 100 件以上のデータがあれば、100 レコードずつに分割して書き込むようにしています。

想定する構成

上記ポイントを踏まえて、今回は下記のような構成を作ることにします。

ゲートウェイから送られてくるペイロードのデータ長が可変ということもあり、Lambda でデータを処理してから Timestream へ書き込みを行います。

今回は単純な PoC という想定で Lambda のルールアクションを利用しています。要件やユースケースに応じて、Lambda の前段に Amazon SQS や Amazon Kinesis Data Streams を入れるなどしてもらえればと思います。
(図中にある Amazon SQS は Lambda の デッドレターキュー用のものです)

00-diagram

図の左側は、工場などの製造業の現場を表現していて、工場内にある PLC のデータをクラウドに送る環境を想定しています。
PLC の配置は参考例として記載したものですが、機種によってはお互いにデータの送受信ができるものもあり、複数の PLC のデータを上位の PLC で集約するといった構成を採ることがあります。
(構成図では便宜的に「集約 PLC」と記載しています)

この場合、上位の PLCで「どのデータ項目をどの下位 PLC のデータアドレスに割り付けるか」という設定をしておくことで、ゲートウェイは PLC 側の構成を意識せずにデータ取得することができます。

要するに、上位 PLC の設定において、

  • A01 という項目は XX という下位 PLC 上のデータ「xx
  • B02 という項目は YY という下位 PLC 上のデータ「yy

というように、それぞれの下位 PLC に対してどのデータを取得するか上位 PLC で設定しておくことで、ゲートウェイからは上位 PLC にアクセスするだけで下位 PLC のデータも取得することができるようになります。

このあたりの設定については、実際の現場の都合により様々だと思いますが、PLC が多段構成かどうかについて、AWS 側のアーキテクチャに影響ありません。
今回のような構成でもデータを収集できる例として、とらえて頂ければと思います。

サンプルデータの仕様

下記は ゲートウェイ「LMG-300」から AWS へ送られるサンプルデータです。これはゲートウェイが1回の MQTT Publish で AWS へ送る JSON データの例になります。

03-payload-format

冒頭で記載したようにデータ長が変わるという仕様で、次のような注意点があります。

    1. 機器データが格納される value のプロパティがゲートウェイの設定次第で可変
    • 設定が異なるゲートウェイだとプロパティの内容も異なる
      • 上記では D01D03L00L02 の内容が、D01D20L00L100 となる場合がある。
    1. ゲートウェイで取得したデータが格納される data000 にはタイミングによりプロパティの数が変わる
    • ネットワーク状況やゲートウェイの操作により1回の通信で送られるペイロードの内容が変わります。
      • 上記では 11:29:0011:30:00 の2つのデータだけだが、タイミングにより更に多くの時間のデータになることがある。

AWS SAM で環境構築

上記のようなデータの仕様を踏まえて AWS SAM で環境をサクッと作ります。

sam init

$ sam init \
    --runtime python3.9 \
    --name blog-iotcore-timestream \
    --app-template hello-world \
    --package-type Zip

SAM テンプレート

AWSTemplateFormatVersion: '2010-09-09'
Transform: AWS::Serverless-2016-10-31
Description: >
  blog-iotcore-timestream-multimesure

  Sample SAM Template for blog-iotcore-timestream-multimesure

Globals:
  Function:
    Timeout: 20
    Environment:
      Variables:
        TIMESTREAM_DATABASE: !Ref IoTDataTimestreamDatabase
        TIMESTREAM_TABLE: !GetAtt IoTDataTimestreamTable.Name

Parameters:
  # 案件固有の接頭辞をリソース名に付けて一意にする
  StackPrefix:
    Type: String
  Topic:
    Type: String

Resources:
  HelloWorldFunction:
    Type: AWS::Serverless::Function
    Properties:
      CodeUri: hello_world/
      Handler: app.lambda_handler
      Runtime: python3.9
      AutoPublishAlias: dev
      Timeout: 20
      Architectures:
        - x86_64
      Role: !GetAtt HelloWorldFunctionCustomRole.Arn
      Events:
        IoTRule:
          Type: IoTRule
          Properties:
            AwsIotSqlVersion: '2016-03-23'
            Sql: !Sub SELECT * FROM '${Topic}'
      DeadLetterQueue:
        Type: SQS
        TargetArn: !GetAtt LambdaDldSqs.Arn

  HelloWorldFunctionCustomRole:
    Type: AWS::IAM::Role
    Properties:
      AssumeRolePolicyDocument:
        Version: "2012-10-17"
        Statement:
          - Effect: "Allow"
            Action: "sts:AssumeRole"
            Principal:
              Service: lambda.amazonaws.com
      Policies:
        - PolicyName: "hello-world-policy"
          PolicyDocument:
            Version: "2012-10-17"
            Statement:
              - Effect: "Allow"
                Action:
                  - "timestream:WriteRecords"
                Resource: !Sub arn:aws:timestream:${AWS::Region}:${AWS::AccountId}:database/${IoTDataTimestreamDatabase}/table/${IoTDataTimestreamTable.Name}
              - Effect: "Allow"
                Action:
                  - "sqs:SendMessage"
                Resource: !Sub arn:aws:sqs:${AWS::Region}:${AWS::AccountId}:${LambdaDldSqs.QueueName}
              - Effect: "Allow"
                Action:
                  - "logs:CreateLogGroup"
                  - "logs:CreateLogStream"
                  - "logs:PutLogEvents"
                  - "timestream:DescribeEndpoints"
                Resource: "*"

# Timestream
  IoTDataTimestreamDatabase:
    Type: AWS::Timestream::Database
    Properties:
      DatabaseName: !Sub ${ProjectPrefix}-database

  IoTDataTimestreamTable:
    Type: AWS::Timestream::Table
    Properties:
      DatabaseName: !Ref IoTDataTimestreamDatabase
      RetentionProperties:
        MemoryStoreRetentionPeriodInHours: 24
        MagneticStoreRetentionPeriodInDays: 7
      MagneticStoreWriteProperties:
        EnableMagneticStoreWrites: true
      TableName: !Sub ${ProjectPrefix}-table

# SQS for Lambda DLQ
  LambdaDldSqs:
    Type: AWS::SQS::Queue
    Properties: 
      MessageRetentionPeriod: 345600 # Default 4日
      QueueName: !Sub ${ProjectPrefix}-iot-action-lambda-dlq
      ReceiveMessageWaitTimeSeconds: 20 # Long polling
      SqsManagedSseEnabled: true
      VisibilityTimeout: 30 # Default

  # CloudWatch Logs for Lambda
  # 明示的にCloudWatch Logsを作成してretentionを指定
  LambdaFuncLogGroup:
    Type: AWS::Logs::LogGroup
    Properties:
      LogGroupName: !Sub /aws/lambda/${HelloWorldFunction}
      RetentionInDays: 30

Lambda のコード

参考までに Lambda のコードを記載しておきます。(Python 3.9 を想定)
主に次のような処理を実施しています。

  • data000 に入るそれぞれの時間ごとのデータを一つずつバラして Timestream のメジャーを生成
    • 40 〜 83 行目
  • ディメンションと生成したメジャーからレコードを生成
    • 85 〜 94 行目
  • レコードが 100 件以上の場合は、一度の書き込みを100件ずつに分割
    • 103 〜 112 行目
  • 100 レコードずつ Timestream に書き込み
    • 114 〜 121 行目
import boto3
from botocore.config import Config
import json
import logging
import sys
import time
from datetime import datetime
import os

session = boto3.Session()
write_client = session.client('timestream-write', region_name='ap-northeast-1',
                            config=Config(read_timeout=20,    # リクエストタイムアウト(秒)
                            max_pool_connections=5000,        # 最大接続数
                            retries={'max_attempts': 10}))    # 最大試行回数

# Configure logging
logger = logging.getLogger()
logger.setLevel(logging.INFO)
streamHandler = logging.StreamHandler(stream=sys.stdout)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
streamHandler.setFormatter(formatter)
logger.addHandler(streamHandler)

# Amazon Timestream データベースとテーブル
DatabaseName = os.environ['TIMESTREAM_DATABASE']
TableName = os.environ['TIMESTREAM_TABLE']

# ディメンションの作成
def gen_dimensions(event):
    device_name = event['name']
    device_location = event['place']
    dimensions = [
        {'Name': 'Location', 'Value': device_location},
        {'Name': 'DeviceName', 'Value': device_name}
    ]
    return dimensions


# 1レコードあたりに入れるアイテムをデバイスデータから取得
def gen_record(event):

    # 'data000'のvalueだけ抽出
    only_data000 = event['data000']
    # 'data000'にある要素数を抽出(LMG300がデータ取得した回数をデータから読み取り)
    length_data000 = len(only_data000) # 1つのJSONに含まれる LMG300が取得したデータの数。つまりTimestreamに書き込みたいレコードの数)
    print("length data000: " + str(length_data000))

    num = 0
    records_X = []

    while num < length_data000:
        raw_datatime = event['data000'][num]['datetime'] # 2022-09-14T16:41:00.006+09:00
        datetime_type_time = datetime.fromisoformat(raw_datatime) # datetime型に変換
        unixtime_data = int(datetime_type_time.timestamp() * 1000) # ミリ秒のunixtimeに変換 ex.1663141260006

        # valueの各要素の key:value を取得
        value_num = 0
        MultiMeasureValue = [] # 2個目以上のデータがある時、MultiMeasureValue に抽出済みのデータが入っているので初期化
        records = [] #同上
        for item_name, item_value in event['data000'][num]['value'].items():
            #print(f'key:{item_name}, value:{item_value}')

            #measure_name = key
            #各データ(st, Dxx, Lxx)を個々のアイテムとして作成
            if item_name == 'st':
                myitem = {
                    'Name': str(item_name),
                    'Value': str(item_value),
                    'Type': 'VARCHAR'
                },
            else:
                myitem = {
                    'Name': str(item_name),
                    'Value': str(item_value),
                    'Type': 'DOUBLE'
                },
            records.append(myitem)
            # recordsに追加された最新の要素だけを抽出 (ex. D01のキーと値)
            t = records[value_num]
            # 抽出した最新の要素だけを MultiMeasureValue に追加して、挿入するレコードの MeasureValues を生成
            MultiMeasureValue.append(t[0])

            value_num += 1

        #レコード生成
        dummy_measure = {
            'Dimensions': gen_dimensions(event),
            'MeasureName': 'plc_metrics',
            'MeasureValueType': 'MULTI',
            'MeasureValues': MultiMeasureValue,
            'Time': str(unixtime_data),
            'TimeUnit': 'MILLISECONDS'
        }
        records_X.append(dummy_measure)
        #print("num: " + str(num))
        #print("records_X: " + str(records_X))
        time.sleep(1/1000)

        num += 1

    return records_X

# 100件ずつのレコードに分割
def split_list(l, n):
    for idx in range(0, len(l), n):
        yield l[idx:idx + n]


def lambda_handler(event, context):
    generated_records = gen_record(event)
    records_groups = list(split_list(generated_records, 100))
    records_groups_length = len(records_groups)

    # 100レコードずつ書き込み
    for records_group_num in range(0,records_groups_length):
        writing_records = records_groups[records_group_num]
        print("writing records group number: " + str(records_group_num))
        result = write_client.write_records(DatabaseName=DatabaseName,
                                            TableName=TableName,
                                            Records=writing_records, CommonAttributes={})
    return

Timestream の基本的な概念や用語については、下記を参考にしてもらえればと思います。
(少し古いのでマルチメジャーレコードなどについては言及されていないものになります)

ビルド

$ sam build

デプロイ

今回は、以下の2つをテンプレートのパラメーターとして渡せるようにしています。

  • プレフィックス
    • 作成される AWS リソースを識別するための識別子として設定
      • プロジェクト名や環境名(staging, production など)の指定を想定
    • 作成される AWS リソースのリソース名などに付与
    • 今回は blogtest をセット
  • データ送信先のトピック名
    • 今回は blog/test をセット

そのため、SAM CLI を実行するシェルの環境変数に、上記のパラメーターを設定してからデプロイを行います。

$ PROJECT_PREFIX=blogtest
$ TOPIC=blog/test

デプロイ時に --parameter-overrides オプションで環境変数をパラメーターに渡します。

$ sam package \
    --output-template-file packaged.yaml \
    --s3-bucket [アーティファクト用の S3 バケット名]
    
$ sam deploy \
    --template-file packaged.yaml \
    --stack-name blog-iotcore-timestream-stack \
    --s3-bucket [アーティファクトがある S3 バケット名] \
    --capabilities CAPABILITY_NAMED_IAM \
    --no-fail-on-empty-changeset \
    --parameter-overrides \
      ProjectPrefix=${PROJECT_PREFIX} \
      Topic=${TOPIC}

動作確認

今回は PLC の実機が手元にないので、次のようなサンプルデータを IoT Core に送って動作確認を行います。
送信先のトピックは、SAM でデプロイ時に指定したものになります。

{
    "timestamp":"2022-12-22T11:30:01.106+09:00",
    "accid": "123456",
    "seq_num":21,
    "device":"LMG-300",
    "version":"1.00",
    "name":"MACHINE01",
    "place":"Tokyo-001",
    "command":"report",
    "data_num":1,
    "data000":[
        {
            "datetime":"2022-12-22T11:29:00.006+09:00",
            "value":{
                "st":"000001",
                "D00":-1234,
                "D01":5678,
                "D02":1234,
                "D03":5648,
                "L00":1,
                "L01":0,
                "L02":1
            }
        },
        {
            "datetime":"2022-12-22T11:30:00.006+09:00",
            "value":{
                "st":"000001",
                "D00":-1212,
                "D01":5656,
                "D02":2323,
                "D03":7878,
                "L00":1,
                "L01":1,
                "L02":0
            }
        }
    ]
}

データの送信は、AWS IoT Core のマネジメントコンソールにある「テストクライアント」や 利用されている MQTT クライアントから送って下さい。

04-mqtt-publish

Timestream 側でクエリを実行して、データが格納されていることを確認します。

SELECT * FROM "blogtest-database"."blogtest-table" ORDER BY time DESC LIMIT 10

05-select-timestream

サンプルで送った2件のデータが、2つのレコードとして格納されていることが分かりました。

ダミーデータを自動生成して継続的にデータ送信したい場合

動作確認の度にサンプルデータのタイムスタンプを書き換えるのは面倒なので、「MQTT X」のスクリプト機能を使ってダミーデータの自動生成と自動送信ができるようにしてみました。

下記はそのサンプルスクリプトです。

function random(min, max) {
  return Math.round(Math.random() * (max - min)) + min
}


const d = new Date();
const year  = d.getFullYear();
const month = (d.getMonth() + 1).toString().padStart(2,'0');
const day   = d.getDate().toString().padStart(2,'0');

const hour  = d.getHours().toString().padStart(2,'0');
const min   = d.getMinutes().toString().padStart(2,'0');
const sec   = d.getSeconds().toString().padStart(2,'0');
d.getMilliseconds().toString().padStart(3,'0');
d.getMilliseconds().toString().padStart(3,'0');
d.getMilliseconds().toString().padStart(3,'0');
d.getMilliseconds();
const ms   = d.getMilliseconds().toString().padStart(3,'0');

function handlePayload(value) {
    let _value = {};
    let _data000_value = {};

  _value.timestamp = year + '-' + month + '-' + day + 'T' + hour + ':' + min + ':' + sec + '.' + ms + '+09:00';
  _value.accid = '123456';
  _value.seq_num = 21;
  _value.device = 'LMG-300';
  _value.version = '1.00';
  _value.name = 'CM-SAMPLE001';
  _value.place = 'Tokyo-001';
  _value.command = 'report';
  _value.data_num = 1;

  _data000_value.st = '000001';
  _data000_value.D00 = random(0,10);
  _data000_value.D01 = random(0,10);
  _data000_value.D02 = random(0,10);
  _data000_value.D03 = random(0,10);
  
  _data000_value.L00 = random(0,2);
  _data000_value.L01 = random(0,2);
  _data000_value.L02 = random(0,2);
  _value.data000 = [{datetime: _value.timestamp, value: _data000_value}];
  return JSON.stringify(_value, null, 4);
}

execute(handlePayload);

実行すると次のような形式のデータが自動で生成されます。
現在時刻をtimestampdatetime に挿入して、DXX 及び LXX のデータをランダムに生成します。

{
    "timestamp": "2022-12-13T01:12:34.367+09:00",
    "accid": "123456",
    "seq_num": 21,
    "device": "LMG-300",
    "version": "1.00",
    "name": "CM-SAMPLE001",
    "place": "Tokyo-001",
    "command": "report",
    "data_num": 1,
    "data000": [
        {
            "datetime": "2022-12-13T01:12:34.367+09:00",
            "value": {
                "st": "000001",
                "D00": 8,
                "D01": 7,
                "D02": 7,
                "D03": 6,
                "L00": 1,
                "L01": 0,
                "L02": 1
            }
        }
    ]
}

MQTT X のスクリプト機能については、下記も参考にしていただければと思います。

最後に

今回は「LMG-300」というデバイスを試していますが、探せば他にも様々なプロトコルをサポートするゲートウェイデバイスが見つかります。
それぞれの製品で仕様が異なるので事前の確認が必要ですが、なるべく開発コストをかけずに小規模に PoC として試してみたいという用途であれば、既製品の利用がマッチするケースもあるかと思います。

さらに柔軟なカスタマイズを行いたい場合は、AWS IoT Greengrass を使って要件に合ったカスタムコンポーネントを開発するという選択肢もあります。
プロジェクトのフェーズや目的を踏まえて、適切な構成を考えるようにしましょう。

「製造業 IoT」を検討されている方にとって、この記事が少しでもお役に立てれば幸いです。