AWS IoT SiteWise に取り込むデータをゲートウェイでフィルタリングして利用料金を最適化する

2022.06.03

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

SiteWise に工場や倉庫の設備機器データを SiteWise ゲートウェイで送るとき、ゲートウェイの設定でデータのフィルタリングが可能です。
簡単な設定でできるので試してみました。

前提

本記事では、SiteWise ゲートウェイの構築方法や SiteWise の設定については記載しません。
手順の詳細については下記で紹介していますので、こちらを参考にしてください。パラメータなどが異なるだけで手順は同じです。

OPC UAサーバの作成

今回も Python でダミーの OPC UAサーバを作成します。
コードは下記です。(custom-server.pyとしてゲートウェイ上に保存します。)

custom-server.py

import logging
import asyncio
import sys
import time
import random
sys.path.insert(0, "..")

from asyncua import ua, Server
from asyncua.common.methods import uamethod

@uamethod
def func(parent, value):
    return value * 2

async def main():
    _logger = logging.getLogger('asyncua')
    # setup our server
    server = Server()
    await server.init()
    #server.set_endpoint('opc.tcp://localhost:4840')
    server.set_endpoint('opc.tcp://192.168.0.11:4840')

    # setup our own namespace, not really necessary but should as spec
    uri = 'http://examples.freeopcua.github.io'
    idx = await server.register_namespace(uri)

    # populating our address space
    # server.nodes, contains links to very common nodes like objects and root
    factory = await server.nodes.objects.add_object(idx, 'Factory')
    mycount11 = await factory.add_variable(idx, '1/turbin/1/Count', 0.0) # 数値は初期値
    mycount12 = await factory.add_variable(idx, '1/turbin/2/Count', 0.0) # 数値は初期値
    myrpm11 = await factory.add_variable(idx, '1/turbin/1/RPM', 0.0)
    myrpm12 = await factory.add_variable(idx, '1/turbin/2/RPM', 0.0)
    mytemperature11 = await factory.add_variable(idx, '1/turbin/1/Temperature', 0.0)
    mytemperature12 = await factory.add_variable(idx, '1/turbin/2/Temperature', 0.0)

    # Set MyVariable to be writable by clients
    await mycount11.set_writable()
    await mycount12.set_writable()
    await myrpm11.set_writable()
    await myrpm12.set_writable()
    await mytemperature11.set_writable()
    await mytemperature12.set_writable()
    await server.nodes.objects.add_method(ua.NodeId('ServerMethod', 2), ua.QualifiedName('ServerMethod', 2), func, [ua.VariantType.Int64], [ua.VariantType.Int64])
    _logger.info('Starting server!')
    async with server:
        while True:
            await asyncio.sleep(5)

           # データを送信(生成)した回数を0.1刻みでカウントして送る
            count11 = await mycount11.get_value() + random.uniform(1,5)
            count12 = await mycount12.get_value() + random.randint(1,10)
            print('count11: ' + str(count11))
            _logger.info('Set value of %s(COUNT11) to %.1f', mycount11, count11)
            print('count12: ' + str(count12))
            _logger.info('Set value of %s(COUNT12) to %.1f', mycount12, count12)
            await mycount11.write_value(count11)
            await mycount12.write_value(count12)

            # データが所定の値を超過したら0にする。デモとしてのグラフの見栄えを意識しただけの処理です。
            await asyncio.sleep(5)
            count11_init = await mycount11.get_value()
            count12_init = await mycount12.get_value()
            
            if count11_init > 10:
                await mycount11.set_value(0.0)
            if count12_init > 20:
                await mycount12.set_value(0.0)
            

            # タービンの回転数を取得する
            rpm11 = float(random.randint(600000000,1500000000))
            rpm12 = float(random.randint(600000000,1500000000))

            _logger.info('Set value of %s(RPM11) to %.1f', myrpm11, rpm11)
            print('rpm11: ' + str(rpm11))
            _logger.info('Set value of %s(RPM12) to %.1f', myrpm12, rpm12)
            print('rpm12: ' + str(rpm12))
            await myrpm11.write_value(rpm11)
            await myrpm12.write_value(rpm12)

            # タービンの温度を取得する
            temperature11 = random.uniform(30, 85)
            temperature12 = random.uniform(30, 85)
            _logger.info('Set value of %s(TEMPERATURE11) to %.1f', mytemperature11, temperature11)
            print('temperature11: ' + str(temperature11))
            _logger.info('Set value of %s(TEMPERATURE12) to %.1f', mytemperature12, temperature12)
            print('temperature12: ' + str(temperature12))
            await mytemperature11.write_value(temperature11)
            await mytemperature12.write_value(temperature12)

if __name__ == '__main__':

    logging.basicConfig(level=logging.DEBUG)

    asyncio.run(main(), debug=True)

SiteWise 側の設定

今回は、SiteWise Monitor 側のポータルやダッシュボードも作成済みとします。 ダッシュボードの作成方法は、下記で紹介していますので参考にしてください。

フィルタリングを試す

それでは、OPC UAサーバを実行します。

$ python3 custom-server.py

問題なく SiteWise 側にデータが取り込めていれば下記のようにデータが見れるようになります。
タービン1とタービン2のデータが見えています。

01-before-data

次に、取り込むデータを「タービン2」だけに制限してみます。
SiteWise のコンソール画面で「ゲートウェイ」を開いて「データソース」の項目を編集します。

02-edit-data-source

デフォルトではノード ID が「/」となっており、OPC UA サーバ上の全データを取得する設定になっています。

03-default-node

今回は「工場1のタービン2」だけに制限したいので、対象のノード ID をパス形式で次のように指定します。
タービン2のデータ全てを取得したいので、ワイルドカードも使っています。

/Factory/1/turbin/2/*

入力できたら「保存」をクリックして終了します。

04-edit-node

保存した直後はゲートウェイ側に設定が反映されていないので、「非同期」と表示されます。

05-not-sync

少し経てば「同期中」になります。

06-sync

再度、ダッシュボードをみると下記のように「タービン2」のデータ(オレンジのグラフ)だけになっていることが分かります。

07-only-turbin2

フィルタリングされる場所

ノードをフィルタリングする際、その設定は SiteWise の「データソース」の画面で行いました。つまり、これは「フィルタリングはゲートウェイ上で行われる」ということになります。
実際、OPC UA サーバ側でノードを増やしてもフィルタリングにより収集の対象外となっているものは、「データストリーム」には現れませんでした。

したがって、SiteWise へのデータ取り込み自体もフィルタリングされることになるので、うまく利用することで SiteWise の「データ取り込み」料金を削減することができると考えられます。

最後に

ノードのパスを指定することで、リアルタイムに可視化したいデータだけ取り込むようにフィルタリングできることが確認できました。
また、同様の操作で他のパスのノードを追加することもできます。

リアルタイムに可視化する必要の無いデータも SiteWise に取り込んでしまうと無駄なコストが発生します。 うまくフィルタリングしてコストを最適化できるようにしましょう。

以上です。