![[Update] I tried delivering Amazon Bedrock AgentCore Memory long-term memory events to Kinesis Data Streams](https://images.ctfassets.net/ct0aopd36mqt/7M0d5bjsd0K4Et30cVFvB6/5b2095750cc8bf73f04f63ed0d4b3546/AgentCore2.png?w=3840&fm=webp)
[Update] I tried delivering Amazon Bedrock AgentCore Memory long-term memory events to Kinesis Data Streams
This page has been translated by machine translation. View original
Introduction
Hello, I'm Jinno from the consulting department who loves supermarkets.
Recently I've only been going to Gyomu Super.
Recently, the Amazon Bedrock AgentCore Memory API has been updated with a new parameter called streamDeliveryResources.
It's now possible to stream changes to long-term memory events generated by AgentCore Memory to Amazon Kinesis Data Streams in real-time.
In this article, I'll try out this feature!
Details of this Update
In AgentCore Memory, long-term memories are automatically extracted and stored from saved short-term memories based on configured long-term memory strategies.
Until now, to detect changes in long-term memory records, you had to poll the ListMemoryRecords API and manage differences yourself.
The official documentation states:
Instead of polling APIs to detect changes, you receive push-based events to a Kinesis Data Stream in your account, enabling event-driven architectures that react to memory record lifecycle changes as they occur.
With this update, the streamDeliveryResources parameter has been added, allowing long-term memory record lifecycle events to be delivered to Kinesis Data Streams in real-time.
The configuration is simple: AgentCore Memory → Kinesis Data Streams.
It's a good way to check what kind of information flows in.
First, when streaming is enabled, a validation StreamingEnabled event is delivered.
| Event | Description |
|---|---|
| StreamingEnabled | Validation event when streaming is enabled |
Additionally, the following three types of lifecycle events for long-term memory records are delivered:
| Event | Description |
|---|---|
| MemoryRecordCreated | When a long-term memory record is created |
| MemoryRecordUpdated | When a long-term memory record is updated |
| MemoryRecordDeleted | When a long-term memory record is deleted |
These cover all changes to long-term memory records: creation, deletion, and updates.
Also, you can choose from two levels of delivery content:
| Level | Content |
|---|---|
| METADATA_ONLY | Only metadata such as memoryId, memoryRecordId, timestamps |
| FULL_CONTENT | Metadata plus the full text of the long-term memory record |
If you want to see the content of extracted long-term memories, you would select FULL_CONTENT.
Let's try it out!
Prerequisites
Here's my testing environment:
| Item | Version / Value |
|---|---|
| OS | macOS 15.7 |
| Python | 3.12 |
| boto3 | 1.42.63 |
| bedrock-agentcore | 1.4.3 |
| AWS Region | us-east-1 |
I'm managing this project with uv. Here's my pyproject.toml:
[project]
name = "agentcore-memory-stream"
version = "0.1.0"
description = "AgentCore Memory streamDeliveryResources demo"
requires-python = ">=3.12"
dependencies = [
"boto3>=1.42.63",
"bedrock-agentcore>=1.4.3",
]
Let's install the libraries:
uv sync
Now let's try it out. I'll do everything using Python scripts.
Let's Try It
Creating a Kinesis Data Stream
First, let's create a Kinesis Data Stream for delivery:
import boto3
REGION = "us-east-1"
STREAM_NAME = "agentcore-memory-stream"
kinesis = boto3.client("kinesis", region_name=REGION)
print(f"Creating Kinesis Data Stream '{STREAM_NAME}'...")
try:
kinesis.create_stream(StreamName=STREAM_NAME, ShardCount=1)
print("Creation request sent. Waiting for it to become ACTIVE...")
except kinesis.exceptions.ResourceInUseException:
print("Stream already exists. Skipping.")
waiter = kinesis.get_waiter("stream_exists")
waiter.wait(StreamName=STREAM_NAME)
desc = kinesis.describe_stream(StreamName=STREAM_NAME)
stream_arn = desc["StreamDescription"]["StreamARN"]
print(f"Stream ARN: {stream_arn}")
One shard is enough for testing purposes.
$ uv run setup_kinesis.py
Creating Kinesis Data Stream 'agentcore-memory-stream'...
Creation request sent. Waiting for it to become ACTIVE...
Stream ARN: arn:aws:kinesis:us-east-1:123456789012:stream/agentcore-memory-stream
Creating IAM Role
Now, let's create an IAM role for AgentCore Memory to write records to Kinesis:
Replace <Stream ARN from previous step> with the value you obtained in the previous step.
import json
import boto3
REGION = "us-east-1"
ROLE_NAME = "agentcore-memory-stream-role"
STREAM_ARN = "<Stream ARN from previous step>"
iam = boto3.client("iam", region_name=REGION)
trust_policy = {
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {"Service": "bedrock-agentcore.amazonaws.com"},
"Action": "sts:AssumeRole",
}
],
}
kinesis_policy = {
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": ["kinesis:PutRecords", "kinesis:DescribeStream"],
"Resource": STREAM_ARN,
}
],
}
print(f"Creating IAM role '{ROLE_NAME}'...")
try:
role_response = iam.create_role(
RoleName=ROLE_NAME,
AssumeRolePolicyDocument=json.dumps(trust_policy),
Description="AgentCore Memory -> Kinesis stream delivery role",
)
role_arn = role_response["Role"]["Arn"]
print(f"Role created: {role_arn}")
except iam.exceptions.EntityAlreadyExistsException:
role_arn = iam.get_role(RoleName=ROLE_NAME)["Role"]["Arn"]
print(f"Role already exists: {role_arn}")
iam.put_role_policy(
RoleName=ROLE_NAME,
PolicyName="kinesis-access",
PolicyDocument=json.dumps(kinesis_policy),
)
print("Attached Kinesis access policy.")
The trust policy principal is specified as bedrock-agentcore.amazonaws.com. For this configuration, the permissions kinesis:PutRecords and kinesis:DescribeStream were sufficient to make it work.
$ uv run setup_iam.py
Creating IAM role 'agentcore-memory-stream-role'...
Role created: arn:aws:iam::123456789012:role/agentcore-memory-stream-role
Attached Kinesis access policy.
Creating Memory with streamDeliveryResources
This is the main part. Let's create a Memory with the streamDeliveryResources parameter:
Replace <Stream ARN> and <IAM Role ARN> with the values from previous steps.
import boto3
import time
REGION = "us-east-1"
KINESIS_STREAM_ARN = "<Stream ARN>"
ROLE_ARN = "<IAM Role ARN>"
client = boto3.client("bedrock-agentcore-control", region_name=REGION)
response = client.create_memory(
name="stream_delivery_demo",
description="Memory with Kinesis stream delivery",
eventExpiryDuration=30,
memoryExecutionRoleArn=ROLE_ARN,
memoryStrategies=[
{
"semanticMemoryStrategy": {
"name": "facts",
"namespaces": ["/user/facts"],
}
}
],
streamDeliveryResources={
"resources": [
{
"kinesis": {
"dataStreamArn": KINESIS_STREAM_ARN,
"contentConfigurations": [
{
"type": "MEMORY_RECORDS",
"level": "FULL_CONTENT",
}
],
}
}
]
},
)
memory_id = response["memory"]["id"]
print(f"Memory ID: {memory_id}")
# Wait until ACTIVE
while True:
res = client.get_memory(memoryId=memory_id)
status = res["memory"]["status"]
print(f"Status: {status}")
if status == "ACTIVE":
break
time.sleep(5)
print(f"Memory is ready! ID: {memory_id}")
For memoryStrategies, I'm setting the semanticMemoryStrategy that extracts facts. Since stream events only occur when long-term memory records are generated, I chose this strategy.
For streamDeliveryResources, I'm setting FULL_CONTENT to include the full text of long-term memory records in the delivery, and specifying the ARN of the Kinesis Data Stream we created earlier.
Memory creation is asynchronous. You need to wait a bit until the status becomes ACTIVE. When completed, you'll see something like:
$ uv run create_memory.py
Memory ID: stream_delivery_demo-xxxxxxxxxx
Status: CREATING
Status: ACTIVE
Memory is ready! ID: stream_delivery_demo-xxxxxxxxxx
Creating Short-Term Memory to Generate Long-Term Memory
Once the Memory is ACTIVE, let's input conversation events to generate long-term memories.
Let's add short-term memories about preferences, hobbies, and habits, along with an AI's expected response.
Replace <Memory ID> with the ID displayed when creating the Memory.
from bedrock_agentcore.memory import MemoryClient
REGION = "us-east-1"
MEMORY_ID = "<Memory ID>"
memory_client = MemoryClient(region_name=REGION)
memory_client.create_event(
memory_id=MEMORY_ID,
actor_id="user-1",
session_id="session-1",
messages=[
("私はPythonが好きです。趣味は登山で、毎週末に山に登っています。", "user"),
(
"承知しました!Pythonがお好きで、登山が趣味なんですね。毎週末に登山されているとのこと、とてもアクティブですね。",
"assistant",
),
],
)
print("Event creation completed!")
print("It will take several tens of seconds to generate Memory Records.")
When you add conversation events, long-term memories are automatically extracted and generated by the semanticMemoryStrategy. This generation is asynchronous, so you should wait a bit before reading from Kinesis.
$ uv run create_event.py
Event creation completed!
It will take several tens of seconds to generate Memory Records.
Testing
Let's retrieve records from the Kinesis Data Stream to verify the streamed events!
import boto3
import json
import time
import sys
REGION = "us-east-1"
STREAM_NAME = "agentcore-memory-stream"
kinesis = boto3.client("kinesis", region_name=REGION)
# Get shard iterator
stream = kinesis.describe_stream(StreamName=STREAM_NAME)
shard_id = stream["StreamDescription"]["Shards"][0]["ShardId"]
iterator = kinesis.get_shard_iterator(
StreamName=STREAM_NAME,
ShardId=shard_id,
ShardIteratorType="TRIM_HORIZON",
)["ShardIterator"]
print("Retrieving records from Kinesis...\n")
all_records = []
for _ in range(3):
response = kinesis.get_records(ShardIterator=iterator, Limit=100)
all_records.extend(response["Records"])
iterator = response["NextShardIterator"]
if not response["Records"]:
time.sleep(2)
if not all_records:
print("No records have arrived yet.")
print("Please wait for Memory Records to be generated and try again.")
else:
print(f"Retrieved {len(all_records)} records!\n")
for i, record in enumerate(all_records):
data = json.loads(record["Data"])
print(f"--- Record {i + 1} ---")
print(json.dumps(data, indent=2, ensure_ascii=False))
print()
Since we're using a newly created stream, we're reading from the beginning with TRIM_HORIZON. If you're reusing an existing stream, you might want to use LATEST to avoid reading past events, or try with a new stream for clarity.
Let's run it:
$ uv run read_kinesis.py
Retrieving records from Kinesis...
4 records retrieved!
First, there's a StreamingEnabled event that notifies when streaming was enabled during Memory creation:
{
"memoryStreamEvent": {
"eventType": "StreamingEnabled",
"eventTime": "2026-03-07T10:38:54.281457087Z",
"memoryId": "stream_delivery_demo-xxxxxxxxxx",
"message": "Streaming enabled for memory resource: stream_delivery_demo-xxxxxxxxxx"
}
}
Next, when long-term memories are generated, MemoryRecordCreated events are delivered. In this case, three long-term memories were automatically extracted:
{
"memoryStreamEvent": {
"eventType": "MemoryRecordCreated",
"eventTime": "2026-03-07T10:42:46.953862273Z",
"memoryId": "stream_delivery_demo-xxxxxxxxxx",
"memoryRecordId": "mem-84b18edb-5441-4959-879e-a5911a009a8f",
"memoryRecordText": "ユーザーはPythonが好きです。",
"namespaces": ["/user/facts"],
"createdAt": 1772880140057,
"memoryStrategyId": "facts-Iw6sUBCHPP",
"memoryStrategyType": "SEMANTIC"
}
}
{
"memoryStreamEvent": {
"eventType": "MemoryRecordCreated",
"eventTime": "2026-03-07T10:42:47.986529571Z",
"memoryId": "stream_delivery_demo-xxxxxxxxxx",
"memoryRecordId": "mem-9e249035-2271-4057-936c-7c71eddca91a",
"memoryRecordText": "ユーザーの趣味は登山です。",
"namespaces": ["/user/facts"],
"createdAt": 1772880140057,
"memoryStrategyId": "facts-Iw6sUBCHPP",
"memoryStrategyType": "SEMANTIC"
}
}
{
"memoryStreamEvent": {
"eventType": "MemoryRecordCreated",
"eventTime": "2026-03-07T10:42:47.959662860Z",
"memoryId": "stream_delivery_demo-xxxxxxxxxx",
"memoryRecordId": "mem-d00f0061-a4f7-48da-a235-8d59e753866e",
"memoryRecordText": "ユーザーは毎週末に山に登っています。",
"namespaces": ["/user/facts"],
"createdAt": 1772880140057,
"memoryStrategyId": "facts-Iw6sUBCHPP",
"memoryStrategyType": "SEMANTIC"
}
}
Great! Long-term memory events are being delivered to Kinesis correctly!
Since I specified FULL_CONTENT, the memoryRecordText includes the full text of the record.
From a single conversation, three facts were automatically extracted: "likes Python," "hobby is mountain climbing," and "goes hiking every weekend."
Each event also contains metadata like memoryStrategyId, memoryStrategyType, and namespaces, so you could filter by strategy or namespace at the delivery destination.
Conclusion
Long-term memory records automatically extracted from conversation events flow into Kinesis in real-time, making it possible to record them as audit logs or use these events for real-time integration.
If needed, you could also combine this with Amazon Data Firehose to save the data to S3.
Personally, I might not use this extensively, but I could see using it for keeping logs.
I hope this article was helpful. Thank you for reading!


