![[アップデート] Amazon Bedrock AgentCore Memory の長期記憶イベントを Kinesis Data Streams に配信してみた](https://images.ctfassets.net/ct0aopd36mqt/7M0d5bjsd0K4Et30cVFvB6/5b2095750cc8bf73f04f63ed0d4b3546/AgentCore2.png?w=3840&fm=webp)
[アップデート] Amazon Bedrock AgentCore Memory の長期記憶イベントを Kinesis Data Streams に配信してみた
はじめに
こんにちは、スーパーマーケットが大好きなコンサルティング部の神野(じんの)です。
最近は業務スーパーばかり行っています。
最近、Amazon Bedrock AgentCore Memory の API がアップデートされ、streamDeliveryResources というパラメータが新たに追加されました。
AgentCore Memory で生成される長期記憶の変更イベントを Amazon Kinesis Data Streams にリアルタイム配信できるようになりました。
今回はこの機能を実際に試してみたいと思います!
今回のアップデート内容
AgentCore Memory では、保存された短期記憶から設定した長期記憶戦略に基づいて自動的に長期記憶を抽出して保存します。
これまでは長期記憶レコードの変更を検知するには ListMemoryRecords API をポーリングして自前で差分管理するぐらいしかありませんでした。
公式ドキュメントにも下記のように記載があります。
Instead of polling APIs to detect changes, you receive push-based events to a Kinesis Data Stream in your account, enabling event-driven architectures that react to memory record lifecycle changes as they occur.
それが今回のアップデートで、streamDeliveryResources というパラメータが追加され、長期記憶レコードのライフサイクルイベントを Kinesis Data Streams にリアルタイムで配信できるようになりました。
構成としてはシンプルで、AgentCore Memory → Kinesis Data Streams という流れです。
どういった情報が流れ込むか確認するのに良いですね。
まず、ストリーミング有効化時には検証用の StreamingEnabled イベントが配信されます。
| イベント | 説明 |
|---|---|
| StreamingEnabled | ストリーミング有効化時の検証イベント |
加えて、長期記憶レコードのライフサイクルイベントとして下記3種類が配信されます。
| イベント | 説明 |
|---|---|
| MemoryRecordCreated | 長期記憶レコードが新規作成された時 |
| MemoryRecordUpdated | 長期記憶レコードが更新された時 |
| MemoryRecordDeleted | 長期記憶レコードが削除された時 |
作成・削除・更新と、長期記憶レコードに対する変更が網羅されていますね。
また、配信コンテンツのレベルは2種類から選択できます。
| レベル | 内容 |
|---|---|
| METADATA_ONLY | memoryId、memoryRecordId、タイムスタンプ等のメタデータのみ |
| FULL_CONTENT | メタデータに加えて長期記憶レコードの本文も含む |
抽出された長期記憶の内容を見たい時はFULL_CONTENTを選択する感じですね。
それでは実際にやってみましょう!
前提
今回の検証環境は下記のとおりです。
| 項目 | バージョン / 値 |
|---|---|
| OS | macOS 15.7 |
| Python | 3.12 |
| boto3 | 1.42.63 |
| bedrock-agentcore | 1.4.3 |
| AWS リージョン | us-east-1 |
今回は uv でプロジェクトを管理しています。pyproject.toml は下記のとおりです。
[project]
name = "agentcore-memory-stream"
version = "0.1.0"
description = "AgentCore Memory streamDeliveryResources demo"
requires-python = ">=3.12"
dependencies = [
"boto3>=1.42.63",
"bedrock-agentcore>=1.4.3",
]
ライブラリをインストールします。
uv sync
早速やってみましょう。今回はPythonのスクリプトで全部やってみます。
やってみる
Kinesis Data Stream の作成
まずは配信先となる Kinesis Data Stream を作成します。
import boto3
REGION = "us-east-1"
STREAM_NAME = "agentcore-memory-stream"
kinesis = boto3.client("kinesis", region_name=REGION)
print(f"Kinesis Data Stream '{STREAM_NAME}' を作成中...")
try:
kinesis.create_stream(StreamName=STREAM_NAME, ShardCount=1)
print("作成リクエスト送信完了。ACTIVE になるまで待機します...")
except kinesis.exceptions.ResourceInUseException:
print("既に存在します。スキップします。")
waiter = kinesis.get_waiter("stream_exists")
waiter.wait(StreamName=STREAM_NAME)
desc = kinesis.describe_stream(StreamName=STREAM_NAME)
stream_arn = desc["StreamDescription"]["StreamARN"]
print(f"Stream ARN: {stream_arn}")
シャード数は検証用なので1つで十分です。
$ uv run setup_kinesis.py
Kinesis Data Stream 'agentcore-memory-stream' を作成中...
作成リクエスト送信完了。ACTIVE になるまで待機します...
Stream ARN: arn:aws:kinesis:us-east-1:123456789012:stream/agentcore-memory-stream
IAM ロールの作成
AgentCore Memory が Kinesis にレコードを書き込むための IAM ロールを作成します。
<前のステップで取得した Stream ARN> には、先ほど取得した値を入れてください。
import json
import boto3
REGION = "us-east-1"
ROLE_NAME = "agentcore-memory-stream-role"
STREAM_ARN = "<前のステップで取得した Stream ARN>"
iam = boto3.client("iam", region_name=REGION)
trust_policy = {
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {"Service": "bedrock-agentcore.amazonaws.com"},
"Action": "sts:AssumeRole",
}
],
}
kinesis_policy = {
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": ["kinesis:PutRecords", "kinesis:DescribeStream"],
"Resource": STREAM_ARN,
}
],
}
print(f"IAM ロール '{ROLE_NAME}' を作成中...")
try:
role_response = iam.create_role(
RoleName=ROLE_NAME,
AssumeRolePolicyDocument=json.dumps(trust_policy),
Description="AgentCore Memory -> Kinesis stream delivery role",
)
role_arn = role_response["Role"]["Arn"]
print(f"ロール作成完了: {role_arn}")
except iam.exceptions.EntityAlreadyExistsException:
role_arn = iam.get_role(RoleName=ROLE_NAME)["Role"]["Arn"]
print(f"既に存在します: {role_arn}")
iam.put_role_policy(
RoleName=ROLE_NAME,
PolicyName="kinesis-access",
PolicyDocument=json.dumps(kinesis_policy),
)
print("Kinesis アクセスポリシーをアタッチしました。")
信頼ポリシーの Principal は bedrock-agentcore.amazonaws.com を指定します。今回の構成では、権限は kinesis:PutRecords と kinesis:DescribeStream の2つで動作しました。
$ uv run setup_iam.py
IAM ロール 'agentcore-memory-stream-role' を作成中...
ロール作成完了: arn:aws:iam::123456789012:role/agentcore-memory-stream-role
Kinesis アクセスポリシーをアタッチしました。
streamDeliveryResources 付きで Memory を作成
今回のメインです。streamDeliveryResources を指定して Memory を作成します。
<Stream ARN> と <IAM ロール ARN> には、前のステップで取得した値を設定します。
import boto3
import time
REGION = "us-east-1"
KINESIS_STREAM_ARN = "<Stream ARN>"
ROLE_ARN = "<IAM ロール ARN>"
client = boto3.client("bedrock-agentcore-control", region_name=REGION)
response = client.create_memory(
name="stream_delivery_demo",
description="Memory with Kinesis stream delivery",
eventExpiryDuration=30,
memoryExecutionRoleArn=ROLE_ARN,
memoryStrategies=[
{
"semanticMemoryStrategy": {
"name": "facts",
"namespaces": ["/user/facts"],
}
}
],
streamDeliveryResources={
"resources": [
{
"kinesis": {
"dataStreamArn": KINESIS_STREAM_ARN,
"contentConfigurations": [
{
"type": "MEMORY_RECORDS",
"level": "FULL_CONTENT",
}
],
}
}
]
},
)
memory_id = response["memory"]["id"]
print(f"Memory ID: {memory_id}")
# ACTIVE になるまで待機
while True:
res = client.get_memory(memoryId=memory_id)
status = res["memory"]["status"]
print(f"Status: {status}")
if status == "ACTIVE":
break
time.sleep(5)
print(f"Memory is ready! ID: {memory_id}")
memoryStrategiesは事実などを抽出する戦略 semanticMemoryStrategy を設定しています。長期記憶レコードが生成されないとストリームイベントが発生しないため、今回はこの戦略を採用しました。
streamDeliveryResourcesは FULL_CONTENT にすることで、長期記憶レコードの本文も含めて配信するようにし、事前に作成したKinesis Data StreamのARNを指定します。
Memory の作成は非同期です。ステータスが ACTIVE になるまで少し待つ必要があります。完了したら下記のように表示されます。
$ uv run create_memory.py
Memory ID: stream_delivery_demo-xxxxxxxxxx
Status: CREATING
Status: ACTIVE
Memory is ready! ID: stream_delivery_demo-xxxxxxxxxx
短期記憶を作成して長期記憶を生成
Memory が ACTIVE になったら、会話イベントを投入して長期記憶を生成させます。
短期記憶に、好きなものや趣味や習慣を伝えて、それに対するAIによる返事を想定して投入します。
<Memory ID> には、Memory 作成時に表示された ID を入れます。
from bedrock_agentcore.memory import MemoryClient
REGION = "us-east-1"
MEMORY_ID = "<Memory ID>"
memory_client = MemoryClient(region_name=REGION)
memory_client.create_event(
memory_id=MEMORY_ID,
actor_id="user-1",
session_id="session-1",
messages=[
("私はPythonが好きです。趣味は登山で、毎週末に山に登っています。", "user"),
(
"承知しました!Pythonがお好きで、登山が趣味なんですね。毎週末に登山されているとのこと、とてもアクティブですね。",
"assistant",
),
],
)
print("イベント作成完了!")
print("Memory Record の生成には数十秒かかります。")
会話イベントを投入すると、semanticMemoryStrategy によって自動的に長期記憶が抽出・生成されます。この生成は非同期で行われるため、Kinesis からの読み取りは少し待ってから実行します。
$ uv run create_event.py
イベント作成完了!
Memory Record の生成には数十秒かかります。
動作確認
Kinesis Data Stream からレコードを取得して、ストリーム配信されたイベントを確認してみます!
import boto3
import json
import time
import sys
REGION = "us-east-1"
STREAM_NAME = "agentcore-memory-stream"
kinesis = boto3.client("kinesis", region_name=REGION)
# シャードイテレータを取得
stream = kinesis.describe_stream(StreamName=STREAM_NAME)
shard_id = stream["StreamDescription"]["Shards"][0]["ShardId"]
iterator = kinesis.get_shard_iterator(
StreamName=STREAM_NAME,
ShardId=shard_id,
ShardIteratorType="TRIM_HORIZON",
)["ShardIterator"]
print("Kinesis からレコードを取得中...\n")
all_records = []
for _ in range(3):
response = kinesis.get_records(ShardIterator=iterator, Limit=100)
all_records.extend(response["Records"])
iterator = response["NextShardIterator"]
if not response["Records"]:
time.sleep(2)
if not all_records:
print("まだレコードが届いていません。")
print("Memory Record の生成を待って再実行してください。")
else:
print(f"{len(all_records)} 件のレコードを取得しました!\n")
for i, record in enumerate(all_records):
data = json.loads(record["Data"])
print(f"--- Record {i + 1} ---")
print(json.dumps(data, indent=2, ensure_ascii=False))
print()
今回は新規作成したストリームをそのまま使っているので、TRIM_HORIZON で最初から読んでいます。既存ストリームを再利用する場合は過去のイベントも読み込むので、その場合は LATEST にするか、新しいストリームで試すのが分かりやすいです。
実行してみます。
$ uv run read_kinesis.py
Kinesis からレコードを取得中...
4 件のレコードを取得しました!
まず、Memory 作成時に ストリームを有効にしたことを通知するStreamingEnabled イベントが配信されていました。
{
"memoryStreamEvent": {
"eventType": "StreamingEnabled",
"eventTime": "2026-03-07T10:38:54.281457087Z",
"memoryId": "stream_delivery_demo-xxxxxxxxxx",
"message": "Streaming enabled for memory resource: stream_delivery_demo-xxxxxxxxxx"
}
}
続いて、長期記憶が生成されると MemoryRecordCreated イベントが配信されます。今回は3件の長期記憶が自動抽出されていました。
{
"memoryStreamEvent": {
"eventType": "MemoryRecordCreated",
"eventTime": "2026-03-07T10:42:46.953862273Z",
"memoryId": "stream_delivery_demo-xxxxxxxxxx",
"memoryRecordId": "mem-84b18edb-5441-4959-879e-a5911a009a8f",
"memoryRecordText": "ユーザーはPythonが好きです。",
"namespaces": ["/user/facts"],
"createdAt": 1772880140057,
"memoryStrategyId": "facts-Iw6sUBCHPP",
"memoryStrategyType": "SEMANTIC"
}
}
{
"memoryStreamEvent": {
"eventType": "MemoryRecordCreated",
"eventTime": "2026-03-07T10:42:47.986529571Z",
"memoryId": "stream_delivery_demo-xxxxxxxxxx",
"memoryRecordId": "mem-9e249035-2271-4057-936c-7c71eddca91a",
"memoryRecordText": "ユーザーの趣味は登山です。",
"namespaces": ["/user/facts"],
"createdAt": 1772880140057,
"memoryStrategyId": "facts-Iw6sUBCHPP",
"memoryStrategyType": "SEMANTIC"
}
}
{
"memoryStreamEvent": {
"eventType": "MemoryRecordCreated",
"eventTime": "2026-03-07T10:42:47.959662860Z",
"memoryId": "stream_delivery_demo-xxxxxxxxxx",
"memoryRecordId": "mem-d00f0061-a4f7-48da-a235-8d59e753866e",
"memoryRecordText": "ユーザーは毎週末に山に登っています。",
"namespaces": ["/user/facts"],
"createdAt": 1772880140057,
"memoryStrategyId": "facts-Iw6sUBCHPP",
"memoryStrategyType": "SEMANTIC"
}
}
おお、ちゃんと Kinesis に長期記憶のイベントが配信されていますね!!
FULL_CONTENT を指定しているので memoryRecordText にレコードの本文が含まれています。
1つの会話から「Pythonが好き」「趣味は登山」「毎週末に山に登っている」と3つのファクトが自動抽出されていますね。
各イベントには memoryStrategyId や memoryStrategyType、namespaces といったメタデータも含まれているので、配信先で戦略やネームスペースごとにフィルタリングすることもできそうです。
おわりに
会話イベントから自動抽出された長期記憶のレコードがリアルタイムで Kinesis に流れてくるので、監査ログとして記録したり、このイベントを活用してリアルタイムに連携したりなど可能かと思います。
必要であれば、Amazon Data Firehose などと組み合わせて S3 に保存する構成も取りやすそうです。
個人的にすごい活用するわけではないと思いますが、確かにログとして残すのに使うかも・・・?ぐらいに感じました。
本記事が少しでも参考になりましたら幸いです。最後までご覧いただきありがとうございました!









