
Pub/SubのAI Inference SMTでメッセージにVertex AI推論を組み込んでみる
はじめに
データアナリティクス事業本部のkobayashiです。
2026年4月6日にPub/SubのAI Inference Single Message Transform(SMT) がGA(一般提供)になりました。Pub/Subのメッセージフローの中でVertex AIモデルによる推論をインラインで実行し、結果をメッセージに付加できる機能です。
従来、Pub/Subのメッセージに対してAI推論を行うにはCloud FunctionsやCloud Runなどの仲介サービスが必要でしたが、AI Inference SMTを使うことでPub/Sub単体でメッセージのAIエンリッチメントが可能になります。
- AI Inference SMT | Pub/Sub | Google Cloud
- Single Message Transforms (SMTs) overview | Pub/Sub | Google Cloud
Single Message Transforms(SMT)とは
まずAI Inference SMTの前に、SMTの概要を説明します。
SMT(Single Message Transforms)はPub/Subのメッセージに対して軽量な変換をサービス内で直接行う機能です。メッセージのデータや属性を変換し、別途変換用のサービスを用意する必要なくPub/Sub内でETL的な処理が行えます。
SMTの種類
| SMTタイプ | 説明 |
|---|---|
| AI Inference SMT | Vertex AIモデルでメッセージに対する推論を実行 |
| User-Defined Function (UDF) SMT | JavaScriptの関数でカスタム変換を実行 |
SMTの適用レベル
SMTはトピックとサブスクリプションの両方に設定可能です。
- トピックレベル: メッセージがパブリッシュされた時点で変換(スキーマバリデーション前)
- サブスクリプションレベル: フィルタリング後、サブスクライバーへの配信前に変換
1つのトピックまたはサブスクリプションに対して最大5つのSMTを設定でき、定義した順序で実行されます。
メッセージフロー
AI Inference SMTとは
AI Inference SMTはPub/Subメッセージに対してVertex AIモデルの推論をインラインで実行するSMTです。推論結果はメッセージに付加され、後続の処理で利用できます。
主な特徴
| 項目 | 内容 |
|---|---|
| 対応モデル | Vertex AIの自前デプロイモデル、Model Garden(Gemini、Claude、Llama等50以上) |
| 推論タイムアウト | 60秒 |
| バッチ処理 | 非対応(1メッセージにつき1推論リクエスト) |
| プライベートエンドポイント | 非対応(パブリックエンドポイントのみ) |
| 上限 | 1トピックまたはサブスクリプションにつきAI Inference SMTは1つ |
対応モデル
Google モデル:
- Gemini(2.0-flash、2.5-flash、2.5-pro、3.xなど)
- Embedding モデル
サードパーティモデル(Model Garden経由):
- Anthropic Claude
- Meta Llama
- Mistral
- DeepSeek
- Qwen
エンドポイントの指定形式は以下のとおりです。
# 自前デプロイモデル
projects/{プロジェクトID}/locations/{ロケーション}/endpoints/{エンドポイント}
# Model Garden(MaaS)
projects/{プロジェクトID}/locations/{ロケーション}/publishers/{パブリッシャー}/models/{モデル名}
メッセージの変換イメージ
変換前(パブリッシュされたメッセージ):
{
"model": "google/gemini-2.5-flash",
"messages": [
{
"role": "user",
"content": "この商品レビューの感情を分析してください: とても使いやすくて満足しています"
}
]
}
変換後(サブスクライバーが受け取るメッセージ):
{
"original_message": {
"model": "google/gemini-2.5-flash",
"messages": [
{
"role": "user",
"content": "この商品レビューの感情を分析してください: とても使いやすくて満足しています"
}
]
},
"model_output": {
"choices": [
{
"message": {
"content": "この商品レビューの感情はポジティブです。..."
}
}
]
}
}
元のメッセージが original_message に保持され、推論結果が model_output に追加されます。
AI Inference SMTの設定方法
IAMの設定
AI Inference SMT が Vertex AI を呼び出す際のサービスアカウントは、デフォルトで Pub/Sub サービスエージェント(service-{プロジェクト番号}@gcp-sa-pubsub.iam.gserviceaccount.com)が使用されます。本記事ではシンプルに、この Pub/Sub サービスエージェントに直接 Vertex AI を呼び出す権限を付与する構成で進めます。
1. Pub/Sub サービスエージェントの作成(未作成の場合のみ)
Pub/Sub API を有効化すると通常は自動で作成されますが、まだ存在しない場合は以下のコマンドで明示的に作成できます。
$ gcloud beta services identity create \
--service=pubsub.googleapis.com \
--project={プロジェクトID}
2. Pub/Sub サービスエージェントへ Vertex AI の権限を付与
Pub/Sub サービスエージェントが Vertex AI モデルを呼び出せるよう、Vertex AI User ロール(roles/aiplatform.user)を付与します。
$ gcloud projects add-iam-policy-binding kobayashi-masahiro \
--member="serviceAccount:service-233151396088@gcp-sa-pubsub.iam.gserviceaccount.com" \
--role="roles/aiplatform.user" \
--condition=None
必要な権限まとめ:
| 対象 | ロール | 用途 |
|---|---|---|
| Pub/Sub サービスエージェント | roles/aiplatform.user |
Vertex AI モデルへの推論呼び出し |
SMT定義ファイルの作成
AI Inference SMTの設定をYAMLファイルで定義します。
- aiInference:
endpoint: projects/{プロジェクトID}/locations/global/publishers/google/models/gemini-2.5-flash
unstructuredInference:
parameters:
"max_tokens": 25000
エンドポイントのリージョンについて
エンドポイントの locations にリージョン(例: us-central1)を指定すると、SMT実行時のリージョンと一致しない場合に以下のようなエラーが発生します。
FAILED_PRECONDITION: The request contains an AI Inference SMT with an endpoint in a different region than where the request is being processed.
Pub/Sub側の処理リージョンとエンドポイントのリージョンが一致する必要があるため、特別な要件がなければ global を指定するのが確実です。
SMTのバリデーションとテスト
サブスクリプションに適用する前に、SMTの定義をバリデーション・テストできます。
# バリデーション
$ gcloud pubsub message-transforms validate \
--message-transform-file=ai-smt.yaml
Message transform is valid.
# テスト(サンプルメッセージで実行)
$ gcloud pubsub message-transforms test \
--message-transforms-file=ai-smt.yaml \
--message='{"model":"google/gemini-2.5-flash","messages":[{"role":"user","content":"Hello"}]}' \
--format=json
テストコマンドは以下のように推論結果を含む変換後のメッセージを返します。
[
{
"data": "{\"model_output\":{\"choices\":[{\"finish_reason\":\"stop\",\"index\":0,\"logprobs\":null,\"message\":{\"content\":\"Hello! How can I help you today?\",\"role\":\"assistant\"}}],\"created\":1776639286,\"id\":\"Nl3lafjzApaR3NMP0tHQ-Ag\",\"model\":\"google/gemini-2.5-flash\",\"object\":\"chat.completion\",\"system_fingerprint\":\"\",\"usage\":{\"completion_tokens\":9,\"completion_tokens_details\":{\"reasoning_tokens\":317},\"extra_properties\":{\"google\":{\"traffic_type\":\"ON_DEMAND\"}},\"prompt_tokens\":1,\"total_tokens\":327}},\"original_message\":{\"messages\":[{\"content\":\"Hello\",\"role\":\"user\"}],\"model\":\"google/gemini-2.5-flash\"}}"
}
]
テスト時にもVertex AIモデルが呼び出されるため、Vertex AIの利用料金が発生する点に注意してください。
IAM権限が不足している場合
Pub/SubサービスエージェントにVertex AIの権限が付与されていない場合、validateやtestの時点で以下のエラーが返ります。前述の「IAMの設定」が完了しているか確認してください。
FAILED_PRECONDITION: Cloud Pub/Sub does not have the necessary permissions to perform this operation: Permission 'aiplatform.endpoints.get' denied on resource '//aiplatform.googleapis.com/projects/{プロジェクト番号}/locations/{ロケーション}'
トピックとサブスクリプションの作成
# トピックの作成
$ gcloud pubsub topics create my-ai-topic
Created topic [projects/{プロジェクトID}/topics/my-ai-topic].
# SMTを設定したサブスクリプションの作成
$ gcloud pubsub subscriptions create my-ai-sub \
--topic=projects/{プロジェクトID}/topics/my-ai-topic \
--ack-deadline=600 \
--message-transforms-file=ai-smt.yaml
Created subscription [projects/{プロジェクトID}/subscriptions/my-ai-sub].
--ack-deadline=600 はサブスクライバーがメッセージを受信してから ack を返すまでの猶予時間(秒)です。AI推論の所要時間に加えてサブスクライバー側の処理時間も加味する必要があるため、推論タイムアウト60秒に十分な余裕を持たせて Pub/Sub の最大値である10分(600秒)に設定しています。
メッセージのパブリッシュとpull
Chat Completions API形式でメッセージをパブリッシュします。
$ gcloud pubsub topics publish my-ai-topic --message=$'{
"model": "google/gemini-2.5-flash",
"messages": [
{
"role": "user",
"content": "pub/subってどんなサービス?"
}
]
}'
messageIds:
- '18462074268845036'
サブスクライバーが受け取るメッセージには、元のメッセージに加えてGeminiの推論結果が含まれます。サブスクリプションからメッセージをpullすると以下のようなデータが取得できます(data フィールドはBase64エンコードされているためデコードした結果を示します)。
$ gcloud pubsub subscriptions pull my-ai-sub --auto-ack --limit=1 --format=json
{
"model_output": {
"choices": [
{
"finish_reason": "stop",
"index": 0,
"logprobs": null,
"message": {
"content": "Pub/Sub(パブサブ)は、「**Publisher/Subscriber(発行者/購読者)**」の略で、非同期メッセージングシステムの一種です。\n\n簡単に言うと、情報の「送り手」と「受け手」が直接やり取りすることなく、(以降省略)",
"role": "assistant"
}
}
],
"created": 1776551224,
"id": "OAXkaaycDP-a3NMPh7e46A4",
"model": "google/gemini-2.5-flash",
"object": "chat.completion",
"system_fingerprint": "",
"usage": {
"completion_tokens": 1070,
"completion_tokens_details": {
"reasoning_tokens": 1051
},
"extra_properties": {
"google": {
"traffic_type": "ON_DEMAND"
}
},
"prompt_tokens": 7,
"total_tokens": 2128
}
},
"original_message": {
"messages": [
{
"content": "pub/subってどんなサービス?",
"role": "user"
}
],
"model": "google/gemini-2.5-flash"
}
}
original_message に元のパブリッシュ内容、model_output にGemini 2.5 Flashによる推論結果が格納されていることが確認できます。
エラーハンドリング
タイムアウト
推論が60秒を超えるとデリバリーがタイムアウトし、メッセージの保持期間内でリトライされます。
デッドレターキュー
失敗したメッセージはデッドレタートピック(設定されている場合)に転送されます。AI推論の失敗に備えてデッドレタートピックの設定をお勧めします。
トピックSMTの場合
トピックレベルのSMTで推論が失敗した場合、パブリッシュリクエスト自体がエラーとなり、パブリッシャーにエラー詳細が返されます。
ユースケース
AI Inference SMTは以下のようなリアルタイム処理に活用できます。
- 感情分析: 商品レビューやカスタマーフィードバックのリアルタイム感情分析
- テキスト分類: サポートチケットの自動分類・ルーティング
- コンテンツモデレーション: ユーザー投稿のリアルタイム審査
- エンベディング生成: テキストデータのベクトル化(検索・推薦システム用)
- データエンリッチメント: イベントデータへのコンテキスト情報の付加
従来これらの処理にはCloud FunctionsやCloud Runなどの仲介サービスが必要でしたが、AI Inference SMTによりPub/Subの設定だけで実現できるようになりました。
まとめ
Pub/SubのAI Inference SMTを紹介しました。これまでCloud FunctionsやCloud Runを挟まないと実現できなかったメッセージに対するAI推論が、Pub/SubとVertex AIの設定だけで完結するようになりました。GeminiやClaude、LlamaなどModel Gardenの50以上のモデルに対応しているので、感情分析やコンテンツ分類、エンベディング生成といったリアルタイムAIパイプラインを手軽に組み立てたいときに活用していくと良いと思います。
最後まで読んで頂いてありがとうございました。










