AWS IoTのジョブ機能とモノの動的グループを利用してデバイスに配布するソフトウェアを管理する

AWS IoTのジョブ機能とモノの動的グループを利用してデバイスに配布するソフトウェアを管理する手法についてご紹介します
2019.02.07

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

はじめに

サーバーレス開発部@大阪の岩田です。

AWS IoTのジョブ機能とモノの動的グループを利用してデバイスに配布するソフトウェアを管理する手法について調べてみました。

やること

ざっくり下記のような環境を作ります。

モノの動的グループを利用して、導入済みソフトウェアのバージョンがXX以下のデバイスに対してソフトウェアアップデートのジョブを作成。 AWS IoT側ではジョブの正常終了検知後にLambdaを起動して対象デバイスの管理情報を更新することで常にデバイスの管理情報を最新に保ち、次回のジョブ作成に備えます。

事前準備

事前にAWS IoTの設定をいくつか更新しておきます。

フリートインデックス作成の設定

モノの動的グループにはフリートインデックスが必要です。 今回はモノの属性にソフトウェアのバージョンを保持して管理する構成を取るので、「グループ名、説明、、、、」をオンにします。

イベントベースのメッセージ

ジョブの成功を検知してLambdaを起動するために、「ジョブの実行、成功、失敗、、、、」を有効にします。

テスト用のモノや証明書の準備

テストに使うモノや証明書、S3バケットなどを作成しておきます。詳細な手順は割愛します。

モノの属性についてですが、動的グループを作る際に利用するのでapp_versionという属性を追加して1.0を設定してください。

やってみる

ここから実際に試していきます。

最新版ソフトウェアのアップロード

まずデバイスに配布する最新版のソフトウェアを静的Webサイトホスティングを有効化したS3バケットにアップロードしておきます。今回は擬似的に下記のシェルスクリプトを最新版ソフトウェアとして扱います。

#!/bin/bash
echo "ver 2.0"

ジョブドキュメントの作成

次にジョブドキュメントを作成して適当なS3バケットにアップします。

{
    "app_url": "https://s3-ap-northeast-1.amazonaws.com/<ソフトウェアをアップしたS3バケット名>/app.sh",
    "app_version": "2.0"
}

app_urlには最新版のソフトウェアをDLするためのURLを、app_versionには対象ソフトウェアのバージョンを設定します。

モノの動的グループを作成

バージョン2.0以下のソフトウェアを実行しているデバイスを抽出するためにモノの動的グループを作成します。

aws iot create-dynamic-thing-group --thing-group-name appver_lt_20 --query-string 'attributes.app_version < 2.0'

クエリにattributes.app_version < 2.0を指定して対象のデバイスを抽出します。 作成が完了するとテスト用のモノがグループ内に表示されているのが確認できます。

モノの動的グループをターゲットにしたジョブの作成

作成したモノの動的グループをターゲットにしたジョブを作成します

aws iot create-job --job-id 1 --target-selection SNAPSHOT --document-source https://s3-ap-northeast-1.amazonaws.com/<ジョブドキュメントをアップしたS3バケット>/job.json --targets arn:aws:iot:ap-northeast-1:xxxxxxxxxxxx:thinggroup/appver_lt_20

デバイス側のプログラムを準備

デバイス側でジョブを処理しAWS IoTに完了通知を行うためのプログラムを用意します。 今回はAWS IoT Device SDK for PythonのサンプルプログラムjobsSample.pyを加工してデバイス側の処理をシミュレーションします。

executeJobを修正

ジョブドキュメントから取得したURLを使って最新版ソフトウェアをDLする処理を追加します。

    def executeJob(self, execution):
        print('Executing job ID, version, number: {}, {}, {}'.format(execution['jobId'], execution['versionNumber'], execution['executionNumber']))
        print('With jobDocument: ' + json.dumps(execution['jobDocument']))
        app_url = execution['jobDocument']['app_url']
        self.app_version = execution['jobDocument']['app_version']
        urllib.request.urlretrieve(app_url, 'app.sh')
startNextJobSuccessfullyInProgressを修正

デバイス側の処理が正常終了した後、適用済みのソフトウェアバージョンをAWS IoTに通知するように処理を追加します。

...略
            statusDetails = {
                'HandledBy': 'ClientToken: {}'.format(self.clientToken),
                'appVersion': str(self.app_version)
                }

この作りだと、デバイス側が申告してきたソフトウェアバージョンを鵜呑みにすることになるので、本来はLambdaがジョブIDをキーに対象のジョブドキュメントをパースして適用済みのソフトウェアバージョンを判断する方が良いでしょう。

jobsSample.pyの最終形です。

'''
/*
 * Copyright 2010-2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
 *
 * Licensed under the Apache License, Version 2.0 (the "License").
 * You may not use this file except in compliance with the License.
 * A copy of the License is located at
 *
 *  http://aws.amazon.com/apache2.0
 *
 * or in the "license" file accompanying this file. This file is distributed
 * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
 * express or implied. See the License for the specific language governing
 * permissions and limitations under the License.
 */
 '''

from AWSIoTPythonSDK.MQTTLib import AWSIoTMQTTClient
from AWSIoTPythonSDK.MQTTLib import AWSIoTMQTTThingJobsClient
from AWSIoTPythonSDK.core.jobs.thingJobManager import jobExecutionTopicType
from AWSIoTPythonSDK.core.jobs.thingJobManager import jobExecutionTopicReplyType
from AWSIoTPythonSDK.core.jobs.thingJobManager import jobExecutionStatus

import threading
import logging
import time
import datetime
import argparse
import json
import urllib.request

class JobsMessageProcessor(object):
    def __init__(self, awsIoTMQTTThingJobsClient, clientToken):
        #keep track of this to correlate request/responses
        self.clientToken = clientToken
        self.awsIoTMQTTThingJobsClient = awsIoTMQTTThingJobsClient
        self.done = False
        self.jobsStarted = 0
        self.jobsSucceeded = 0
        self.jobsRejected = 0
        self.app_version = ''
        self._setupCallbacks(self.awsIoTMQTTThingJobsClient)

    def _setupCallbacks(self, awsIoTMQTTThingJobsClient):
        self.awsIoTMQTTThingJobsClient.createJobSubscription(self.newJobReceived, jobExecutionTopicType.JOB_NOTIFY_NEXT_TOPIC)
        self.awsIoTMQTTThingJobsClient.createJobSubscription(self.startNextJobSuccessfullyInProgress, jobExecutionTopicType.JOB_START_NEXT_TOPIC, jobExecutionTopicReplyType.JOB_ACCEPTED_REPLY_TYPE)
        self.awsIoTMQTTThingJobsClient.createJobSubscription(self.startNextRejected, jobExecutionTopicType.JOB_START_NEXT_TOPIC, jobExecutionTopicReplyType.JOB_REJECTED_REPLY_TYPE)

        # '+' indicates a wildcard for jobId in the following subscriptions
        self.awsIoTMQTTThingJobsClient.createJobSubscription(self.updateJobSuccessful, jobExecutionTopicType.JOB_UPDATE_TOPIC, jobExecutionTopicReplyType.JOB_ACCEPTED_REPLY_TYPE, '+')
        self.awsIoTMQTTThingJobsClient.createJobSubscription(self.updateJobRejected, jobExecutionTopicType.JOB_UPDATE_TOPIC, jobExecutionTopicReplyType.JOB_REJECTED_REPLY_TYPE, '+')

    #call back on successful job updates
    def startNextJobSuccessfullyInProgress(self, client, userdata, message):
        payload = json.loads(message.payload.decode('utf-8'))
        if 'execution' in payload:
            self.jobsStarted += 1
            execution = payload['execution']
            self.executeJob(execution)
            statusDetails = {
                'HandledBy': 'ClientToken: {}'.format(self.clientToken),
                'appVersion': str(self.app_version)
                }
            threading.Thread(
              target = self.awsIoTMQTTThingJobsClient.sendJobsUpdate,
              kwargs = {
                'jobId': execution['jobId'],
                'status': jobExecutionStatus.JOB_EXECUTION_SUCCEEDED,
                'statusDetails': statusDetails,
                'expectedVersion': execution['versionNumber'],
                'executionNumber': execution['executionNumber']}
              ).start()
        else:
            print('Start next saw no execution: ' + message.payload.decode('utf-8'))
            self.done = True

    def executeJob(self, execution):
        print('Executing job ID, version, number: {}, {}, {}'.format(execution['jobId'], execution['versionNumber'], execution['executionNumber']))
        print('With jobDocument: ' + json.dumps(execution['jobDocument']))
        app_url = execution['jobDocument']['app_url']
        self.app_version = execution['jobDocument']['app_version']
        urllib.request.urlretrieve(app_url, 'app.sh')

    def newJobReceived(self, client, userdata, message):
        payload = json.loads(message.payload.decode('utf-8'))
        if 'execution' in payload:
            self._attemptStartNextJob()
        else:
            print('Notify next saw no execution')
            self.done = True

    def processJobs(self):
        self.done = False
        self._attemptStartNextJob()

    def startNextRejected(self, client, userdata, message):
        printf('Start next rejected:' + message.payload.decode('utf-8'))
        self.jobsRejected += 1

    def updateJobSuccessful(self, client, userdata, message):
        self.jobsSucceeded += 1

    def updateJobRejected(self, client, userdata, message):
        self.jobsRejected += 1

    def _attemptStartNextJob(self):
        statusDetails = {'StartedBy': 'ClientToken: {} on {}'.format(self.clientToken, datetime.datetime.now().isoformat())}
        threading.Thread(target=self.awsIoTMQTTThingJobsClient.sendJobsStartNext, kwargs = {'statusDetails': statusDetails}).start()

    def isDone(self):
        return self.done

    def getStats(self):
        stats = {}
        stats['jobsStarted'] = self.jobsStarted
        stats['jobsSucceeded'] = self.jobsSucceeded
        stats['jobsRejected'] = self.jobsRejected
        return stats

# Read in command-line parameters
parser = argparse.ArgumentParser()
parser.add_argument("-n", "--thingName", action="store", dest="thingName", help="Your AWS IoT ThingName to process jobs for")
parser.add_argument("-e", "--endpoint", action="store", required=True, dest="host", help="Your AWS IoT custom endpoint")
parser.add_argument("-r", "--rootCA", action="store", required=True, dest="rootCAPath", help="Root CA file path")
parser.add_argument("-c", "--cert", action="store", dest="certificatePath", help="Certificate file path")
parser.add_argument("-k", "--key", action="store", dest="privateKeyPath", help="Private key file path")
parser.add_argument("-p", "--port", action="store", dest="port", type=int, help="Port number override")
parser.add_argument("-w", "--websocket", action="store_true", dest="useWebsocket", default=False,
                    help="Use MQTT over WebSocket")
parser.add_argument("-id", "--clientId", action="store", dest="clientId", default="basicJobsSampleClient",
                    help="Targeted client id")

args = parser.parse_args()
host = args.host
rootCAPath = args.rootCAPath
certificatePath = args.certificatePath
privateKeyPath = args.privateKeyPath
port = args.port
useWebsocket = args.useWebsocket
clientId = args.clientId
thingName = args.thingName

if args.useWebsocket and args.certificatePath and args.privateKeyPath:
    parser.error("X.509 cert authentication and WebSocket are mutual exclusive. Please pick one.")
    exit(2)

if not args.useWebsocket and (not args.certificatePath or not args.privateKeyPath):
    parser.error("Missing credentials for authentication.")
    exit(2)

# Port defaults
if args.useWebsocket and not args.port:  # When no port override for WebSocket, default to 443
    port = 443
if not args.useWebsocket and not args.port:  # When no port override for non-WebSocket, default to 8883
    port = 8883

# Configure logging
logger = logging.getLogger("AWSIoTPythonSDK.core")
logger.setLevel(logging.DEBUG)
streamHandler = logging.StreamHandler()
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
streamHandler.setFormatter(formatter)
logger.addHandler(streamHandler)

# Init AWSIoTMQTTClient
myAWSIoTMQTTClient = None
if useWebsocket:
    myAWSIoTMQTTClient = AWSIoTMQTTClient(clientId, useWebsocket=True)
    myAWSIoTMQTTClient.configureEndpoint(host, port)
    myAWSIoTMQTTClient.configureCredentials(rootCAPath)
else:
    myAWSIoTMQTTClient = AWSIoTMQTTClient(clientId)
    myAWSIoTMQTTClient.configureEndpoint(host, port)
    myAWSIoTMQTTClient.configureCredentials(rootCAPath, privateKeyPath, certificatePath)

# AWSIoTMQTTClient connection configuration
myAWSIoTMQTTClient.configureAutoReconnectBackoffTime(1, 32, 20)
myAWSIoTMQTTClient.configureConnectDisconnectTimeout(10)  # 10 sec
myAWSIoTMQTTClient.configureMQTTOperationTimeout(10)  # 5 sec

jobsClient = AWSIoTMQTTThingJobsClient(clientId, thingName, QoS=1, awsIoTMQTTClient=myAWSIoTMQTTClient)

print('Connecting to MQTT server and setting up callbacks...')
jobsClient.connect()
jobsMsgProc = JobsMessageProcessor(jobsClient, clientId)
print('Starting to process jobs...')
jobsMsgProc.processJobs()
while not jobsMsgProc.isDone():
    time.sleep(2)

print('Done processing jobs')
print('Stats: ' + json.dumps(jobsMsgProc.getStats()))

jobsClient.disconnect()

Lambdaの作成

ジョブの実行完了をトリガーに起動するLambdaを準備します。 Lambda実行ロールにはAWS IoT周りの権限が付与されていることとします。

import boto3

iot_client = boto3.client('iot')

def handler(event, context):
  if event['status'] != 'SUCCEEDED':
    return

  thing_arn = event['thingArn']
  thing_name = thing_arn.split(':')[5].split('/')[1]

  iot_client.update_thing(
    thingName=thing_name,
    attributePayload={
        'attributes': {
            'app_version': event['statusDetails']['appVersion']
        }
    })

ルールの作成

ジョブの完了通知をトリガーにLambdaを起動するためにAWS IoTのルールを作成します。

ジョブの完了通知は予約済みのMQTTトピック$aws/events/jobExecution/jobID/succeededに流れてくるので、ルールクエリステートメントにはSELECT topic(4) as job_id, * FROM '$aws/events/jobExecution/+/succeeded'を指定し、アクションには先ほど作成したLambdaを指定します。

デバイス側でジョブを実行

準備が整ったので先ほど用意したプログラムを実行してみます。

python jobsSample.py -e <AWS IoTのエンドポイント> -r <ルートCA証明書> -k <対象デバイスの秘密鍵> -c <対象デバイスの証明書> -n <モノの名前> -id <クライアントID モノの名前と揃える>

実行完了後にapp.shがDLされているので実行してみます。

$ sh app.sh
ver2.0

ちゃんと最新版のソフトウェアが取得できています。

マネジメントコンソール上でもジョブが成功していることを確認してみましょう。

OKそうです。 対象となったモノの設定を確認するとapp_versionが2.0に更新されていることが分かります。

対象のモノがモノの動的グループから外れていることが分かります。

確認のため先ほどと同じ定義でジョブID:2のジョブを作成してみます。

aws iot create-job --job-id 2 --target-selection SNAPSHOT --document-source https://s3-ap-northeast-1.amazonaws.com/<ジョブドキュメントをアップしたS3バケット>/job.json --targets arn:aws:iot:ap-northeast-1:xxxxxxxxxxxx:thinggroup/appver_lt_20

マネジメントコンソールからジョブの進捗状況を確認します。

モノの動的グループに所属する=app_versionが2.0以下のデバイスが存在しないため、ALL:0となっています。

まとめ

AWS IoTのジョブ機能とモノの動的グループを利用して、デバイスに配布するソフトウェアを管理する手法についてご紹介しました。 今回はモノの動的グループを作成するためにモノの属性を利用しましたが、クエリの条件にはデバイスシャドウを利用することも可能です。

デバイスシャドウはモノの属性では扱えないJSON形式が扱えるため、たとえば

{
  "apps": [
    {
      "name": "app1",
      "version": 1.0
    },
    {
      "name": "app2",
      "version": 1.1
    },
    {
      "name": "app3",
      "version": 2.1
    }
  ]
}

といった情報を保持させて複数アプリのバージョンを管理するような使い方も出来そうです。このあたりのユースケースについて今後もさらに深掘りしていきたいと思います。

誰かの参考になれば幸いです。