SORACOMの通信量をAWS LambdaでElasticsearchに取り込む

Elasticsearch Service

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

IoT向けデータ通信サービスSORACOMを使っていると、SIM単位や昼間・夜間、アップロード・ダウンロードなど様々な角度から通信量が気になります。 SORACOM 管理画面ではSIM単位でこれら情報が可視化されています。

これだけでは物足りないというユーザー向けに、各種データは管理画面からダウンロードできたり、SORACOM API で取得できるため、利用者の用途に合わせて分析できるようになっています。

今回は AWS Lambda を使ってSORACOM に通信量を問い合わせ、Elasticsearch に取り込む方法を紹介します。

SORACOM API について

SORACOM API では、ユーザー認証後にAPIキーとトークンを取得し、以後のAPIリクエストではこれらの値を用いて呼び出します。

API リファレンス https://dev.soracom.io/jp/docs/api

APIサーバとのユーザー認証

API auth を利用して SORACOM API と認証します。 アカウント登録したメールアドレスとパスワードを渡します。

curl では次のコマンドに該当します。

$ export SORACOM_EMAIL=foo@example.com
$ export SORACOM_PASSWORD=YOUR_PASSWORD
$ curl -X POST -s \
  --header "Accept: application/json" \
  --header "Content-type: application/json" \
  -d "{ \"email\": \"$SORACOM_EMAIL\", \"password\": \"$SORACOM_PASSWORD\" }" \
  https://api.soracom.io/v1/auth

{
  "apiKey": "dummy",
  "operatorId": "dummy",
  "token": "dummy"
}

通信量の取得

API /stats/air/subscribers を利用して SORACOM Air の通信量を取得します。

リクエストヘッダーとして

  • X-Soracom-API-Key に API キー
  • X-Soracom-Token に トークン

を渡します。

また、URL やパラメーターとして、データ取得対象や期間等に関する以下の情報を渡します。

  • imsi : 通信量を取得したい SIM の IMSI 番号
  • from : 開始日時(UNIX タイム)
  • to : 終了日時(UNIX タイム)
  • period : 集計単位(month, day, minutes)

curl では次のコマンドに該当します。

$ curl -X GET \
  --header "Accept: application/json" \
  --header "X-Soracom-API-Key: dummy-api-key" \
  --header "X-Soracom-Token: dummy-token" \
  "https://api.soracom.io/v1/stats/air/subscribers/{IMSI_NUMBER}?from=1449976310&to=1449978310&period=minutes"

[
  {
    "date": "2015-12-13T03:11:50.792",
    "unixtime": 1449976310,
    "dataTrafficStatsMap": {
      "s1.standard": {
        "uploadByteSizeTotal": 114,
        "downloadByteSizeTotal": 88,
        "uploadPacketSizeTotal": 1,
        "downloadPacketSizeTotal": 1
      }
    }
  },
  ...
]

これらの処理を Python で書いたのが以下のコードです。

1時間に1度定期実行して1時間前の通信量を取得するようになっており、たとえば 13:30 に実行されれば 12:00-13:00の間の通信量を取得します。

# API https://dev.soracom.io/jp/docs/api/#!/Stats/get_stats_air_subscriber

import datetime
import json
import requests

def get_truncated_time():
    # truncate minutes, seconds
    # e.g. 2015/05/12 11:22:33 => 2015/05/12 11:00:00
    return datetime.datetime.now().replace(minute=0, second=0, microsecond=0)

def soracom_stats(email, password, imsi):
    # get SORACOM traffic stats
    headers = {
      "Accept": "application/json",
      "Content-type": "application/json",
    }
    payload = {
      "email": email, 
      "password": password,
    }
    result = requests.post(
      "https://api.soracom.io/v1/auth",
      headers = headers,
      data = json.dumps(payload)
    )

    print "result of auth: [%d]%s" % (result.status_code, result.reason)
    auth = json.loads(result.text)

    headers = {
      "Accept": "application/json",
      "X-Soracom-Api-Key": auth["apiKey"],
      "X-Soracom-Token": auth["token"],
    }

    payload = {
      "from" : (get_truncated_time() - datetime.timedelta(hours = 1)).strftime("%s"),
      "to" : get_truncated_time().strftime("%s"),
      "period" : "minutes",
    }
    result = requests.get(
      "https://api.soracom.io/v1/stats/air/subscribers/%s"%imsi,
      headers = headers,
      params = payload,
    )
    print "result of stats/air/subscribers: [%d]%s" % (result.status_code, result.reason)
    stats = json.loads(result.text)
    return stats

def test():
    stats = soracom_stats(
      "foo@example.com",
      "dummy",
      "123456789012345"
    )
    from pprint import pprint
    pprint(stats)

if __name__ == '__main__':
    test()

Elasticsearch について

最終的に Kibana で可視化したいため、データストアとして Elasticsearch のマネージドサービスである AWS Elasticsearch Service を利用します。管理画面から Elasticsearch を起動し、ステータスが Active になるまで待ちます。

Activeになった後は、soracom という名前のインデックス配下に通信量用のマッピング(スキーマ) stats を定義します。

$ curl -XPUT 'http://DUMMY.ap-northeast-1.es.amazonaws.com/soracom' -d '
{
  "mappings": {
    "stats": {
      "properties": {
        "date": {
          "type": "date",
          "format": "dateOptionalTime"
        },
        "unixtime": {
          "type": "long"
        },
        "imsi": {
          "type": "string"
        },
        "plan": {
          "type": "string"
        },
        "downloadByteSizeTotal": {
          "type": "long"
        },
        "downloadPacketSizeTotal": {
          "type": "long"
        },
        "uploadByteSizeTotal": {
          "type": "long"
        },
        "uploadPacketSizeTotal": {
          "type": "long"
        }
      }
    }
  }
}
'

後はこのマッピングに対してレコードを投入してきます。

データ投入は curl では次のコマンドに該当します。

$ curl -XPOST 'http://DUMMY.ap-northeast-1.es.amazonaws.com/soracom/stats' -d '
{
    "imsi": "asdf-123456789",
    "date": "2015-12-13T03:11:50.792",
    "unixtime": 1449976310,
    "uploadByteSizeTotal": 114,
    "downloadByteSizeTotal": 88,
    "uploadPacketSizeTotal": 1,
    "downloadPacketSizeTotal": 1,
    "plan": "s1.standard"
}'

Elasticsearch 側で ID 採番するため、PUT ではなく POST を使っています。

データ投入用リクエストを Python で書いたのが以下のコードです。

# POST traffic stats to Elasticsearch
for record in stats:
    for plan, stats_map in record['dataTrafficStatsMap'].items():
        payload = {
            "imsi": IMSI,
            "date": record['date'],
            "unixtime": record['unixtime'],
            "plan": plan,
        }
        payload.update(stats_map)

        result = requests.post(
          "http://DUMMY.ap-northeast-1.es.amazonaws.com/soracom/stats",
          data = json.dumps(payload)
        )

AWS Lambda スケジュール化

1 プログラムにまとめる

  • SORACOM API からのデータ取得
  • Elasticsearch へのデータ保存

をまとめて1ファイル(lambda.py)の AWS Lambda 関数化したのが以下です。

AWS Lambda 関数としてパッケージ化する

AWS Lambda から呼び出せるように ZIP ファイル化します。

# workaround for Homebrew Python
$ cat setup.cfg
[install]
prefix=

$ pip install requests -t  `pwd`
...
$ ls -1
lambda.py
requests
requests-2.8.1.dist-info
setup.cfg
$ zip -r lambda.zip .

Lambda 関数の登録

AWS Lambda 管理画面から作成した Zip ファイルを登録します。 イベントソースは "Scheduled Event" とし、"Schedule expression""rate(1 hour)" にして1時間に1度実行させます。

Kibana 管理画面

Kibana 管理画面から soracom のインデックスを追加します。"Time-field name""date" を選択します。

soracom-kibana-settings しばらく放置すると、データが溜まります。

kibana-graph

セキュリティー面の強化

今回紹介したシステムではセキュリティー面で2点ほど気をつけるべき点があります。

Elasticsearch Serviceへのアクセス制限

データ連携の説明を優先したため、今回は Elasticsearch Service への通信で認証をしていません。 認証させる方法は次の記事を参考にしてください。

[小ネタ] botocoreのAWS APIリクエストの署名プロセスのみを利用する | Developers.IO

AWS Lambda関数内の認証情報管理

AWS Lambda 関数内では SORACOM のアカウント情報をベタ書きしています。 AWS KMS を使って、アカウント情報を暗号化する方法は次の記事を参考にしてください。

KMSで認証情報を暗号化しLambda実行時に復号化する | Developers.IO

最後に

  • SORACOM の通信量 API
  • AWS Lambda のイベントスケジューラー
  • Elasticsearch/Kibana を使った可視化

を使ってカジュアルに SORACOM 通信量の可視化を紹介しました。

コード自体は短いですが、 SORACOM API で取得する情報を変えたり、データストアを RDB に変えたりと、ユースケース向けにカスタマイズする雛形にはなったのではないかと思います。