[データ分析] サーバーサイドからのデータインポートを試してみる #ThinkingData

2023.09.25

こんにちは、ゲームソリューション部の新屋です。 前回はクライアントサイドからのデータインポートをやってみたので、今回はサーバーサイドのデータインポートをやってみます。

前回記事はこちら


データインポートにおける、クライアントサイドとサーバーサイドの違いは以下の通りです。

クライアントSDK: サーバーと通信せず、デバイス情報やユーザー行動に関する情報を比較的簡単に収集します
サーバーSDK: サーバーと通信し、より正確なユーザー行動に関する情報を収集します

データ送信の準備

クライアントサイドもサーバーサイドもSDKを使う点は同じなのですが、サーバーサイドはLogBusの併用を推奨しています。

迅速かつ正確なデータを転送するために、SDK+LogBusの併用でサーバデータのデータレポートを推奨しております

サーバーSDK-Golang

LogBus

Logbus2はバックエンドのログデータをTE Systemにリアルタイムでインポートするために使用されます。その主な仕組みは、FlumeやLoggieと似ており、サーバーのログディレクトリ内のファイルストリームを監視し、任意のログファイルに新しいデータが生成されると、新しいデータを検証してTE Systemにリアルタイムで送信します。

以下の場合がLogBus2を使用してデータに送信することを推奨します。

1.サーバーSDK/Kafka/SLSを使用してデータをTE形式で保存し、LogBus2を介してデータをアップロードする場合
2.データの精度とディメンションに高い要件があり、クライアントSDKだけでは要件を満たせず、またクライアントSDKへのアクセスが不便な場合
3.バックエンドのデータプッシュフローを自社で開発しにくい場合
4.大量の過去のデータを転送する必要がある場合
5.メモリ使用量と転送効率に一定の要件がある場合

Logbus2 利用ガイド


概要をざっと把握したところで試していきましょう。

今回のLogbusの構成はサイドカーパターンにしていますが、Kafkaと組み合わせたProducer+Broker+Consumer構成にもできるようです。

1. サンプルゲーム(ローカルAPIのみ)

  • バックエンド
    • Go
    • OpenAPI
    • docker-compose
  • 10連ガチャ
    • 排出アイテムをユーザーIDと紐づけてトラッキング→データインポートする

OASはこのような感じになります。

openapi.yaml

/gacha/ten:
  put:
    tags:
      - gacha
    operationId: GachaTen
    parameters:
      - name: "X-Guest-Id"
        in: header
        required: true
        schema:
          type: string
    requestBody:
      content:
        application/json:
      type: object
      properties:
        userId:
          type: integer
          format: uint64
      responses:
        200:
          content:
            application/json:
          type: object
          properties:
            userId:
              type: integer
              format: int64
            result:
              type: array
            items:
              type: string

定義の通りに実装すると、APIの実行結果は以下のようになります。

curl -X PUT 'http://localhost:8080/gacha/ten' \
    -H 'Content-Type: application/json' \
    -H 'X-Guest-Id: 18a44564556ab0-0c9fff123b6956-19525634-2073600-18a445645571037' \
    -d '{"userId": 1}'

{"result":["Torch","Torch","Herb","Potion","Herb","Herb","Potion","Phoenix Feather","Potion","Torch"],"userId":1}

X-Guest-IdはクライアントサイドのSDKで発行したゲストIDです。詳細は前回の記事をご確認ください。

userIdはログイン後サーバーサイドで付与するユーザー識別子です。
ユーザーIDをそのままAPI経由で渡すことは本番環境では無いでしょうが、便宜上そのまま渡すようにしました。

2. LogBus+SDK実装

LogBus実装

今回は、LogBusはローカル上のコンテナで稼働させます。

まず、以下のようにディレクトリを作成します。

.
└── logbus
    ├── conf
    ├── log
    └── runtime

次に、docker-compose.yamlに以下のように記述します。

./compose.yaml

logbus:
    image: thinkingdata/ta-logbus-v2:latest
    volumes:
      - ./logbus/conf/:/ta/logbus/conf/
      - ./logbus/log/:/ta/logbus/log/
      - ./logbus/runtime/:/ta/logbus/runtime/
      - ./tracking-data-volume:/tracking-data # このディレクトリにトラッキングデータが入る

volumes:
  tracking-data-volume:

最後に、logbus/conf配下にdaemon.jsonを作成し、以下のよう設定を記述します。

./logbus/conf/daemon.json

{
  "datasource": [
    {
      "type": "file",
      "app_id": "...",
      "file_patterns": ["/tracking-data/*"]
    }
  ],
  "push_url": "..."
}

app_idとpush_urlはプロジェクト設定から確認して設定します。

これで設定完了です。

LogBusコンテナが起動できるか確認をします。

docker-compose up logbus

[+] Running 1/0
 ✔ Container backend-logbus-1  Created                                                                                                                            0.0s 
Attaching to backend-logbus-1
backend-logbus-1  | 3:35PM INF Current Version: 2.1.0.0

問題なさそうです。

SDK初期化・実装

SDK初期化とラッパー関数を実装していきます。

このとき、LogBusのディレクトリ名を指定して初期化をします。

package tracking

import (
	"context"
	"fmt"
	"os"

	"github.com/ThinkingDataAnalytics/go-sdk/thinkingdata"
)

var TA thinkingdata.TDAnalytics

func init() {
	consumer, err := thinkingdata.NewLogConsumerWithConfig(thinkingdata.LogConfig{
		Directory:      "/tracking-data/",
		FileNamePrefix: "ta-tracking",
	})
	if err != nil {
		_, _ = fmt.Fprintf(os.Stderr, "Error TA initialize\n: %s", err)
		os.Exit(1)
	}
	TA = thinkingdata.New(consumer)
}

func Track(ctx context.Context, properties map[string]any) error {
  // middlewareなどでリクエスト受付時にcontextにUserId, GuestIdが入っている前提
	accountId := ctx.Value("SystemUserId").(string)
	eventName := ctx.Value("OperationId").(string)
	guestId := ctx.Value("x_guest_id").(string)

	err := TA.Track(accountId, guestId, eventName, properties)
	if err != nil {
		return err
	}

	return nil
}

これでtracking.Track()を呼び出せばトラッキングを実行できます。

トラッキング実行

func (c *GachaController) GachaTen(
	ctx context.Context,
	req gen.GachaTenRequestObject,
) (gen.GachaTenResponseObject, error) {
	gacha, err := c.GachaUseCase.GachaTen(ctx, *req.Body)
	if err != nil {
		return nil, err
	}

  // ガチャロジックの中でトラッキング
	err = tracking.Track(ctx, nil, nil, map[string]any{
		"got_items": gacha.Result,
	})
	if err != nil {
		return nil, err
	}

	return gen.GachaTen200JSONResponse{
		Result: gacha.Result,
		UserId: gacha.UserId,
	}, nil
}

以上で実装完了です。

3. 確認

では、APIを呼び出して動作確認およびデータインポートできているか確認します。

curl -X PUT 'http://localhost:8080/gacha/ten' \
    -H 'Content-Type: application/json' \
    -H 'X-Guest-Id: 18a44564556ab0-0c9fff123b6956-19525634-2073600-18a445645571037' \
    -d '{"userId": 1}'

{"result":["Potion","Herb","Herb","Sword","Herb","Herb","Crystal Orb","Crystal Orb","Torch","Sword"],"userId":1}

余談ですが、排出確率1%で設定したCrystal Orbが2つ出ていますね。引きが良かったようです。ちなみにSwordの排出確率は10%の設定です。

logbusコンテナ内にデータが送られているか確認します。

# logbusコンテナ
cd /tracking-data/

# ls
TA-tracking.log.2023-08-15  TA-tracking.log.2023-08-30  TA-tracking.log.2023-09-01  TA-tracking.log.2023-09-16
TA-tracking.log.2023-08-16  TA-tracking.log.2023-08-31  TA-tracking.log.2023-09-06  TA-tracking.log.2023-09-17

# tail -n 1 TA-tracking.log.2023-09-17
{"#account_id":"system_user_id_1","#distinct_id":"x_guest_id_18a44564556ab0-0c9fff123b6956-19525634-2073600-18a445645571037","#type":"track","#time":"2023-09-17 07:46:40.702","#event_name":"GachaTen","#uuid":"56bb3126-552e-11ee-937b-0242ac180004","properties":{"#lib":"Golang","#lib_version":"1.7.0","got_items":["Potion","Herb","Herb","Sword","Herb","Herb","Crystal Orb","Crystal Orb","Torch","Sword"]}}

次にThinkingEngineにデータが送られているか確認します。

ユーザーID(#account_id)と獲得アイテムが送信されていることを確認できました。