[Update] I tried delivering Amazon Bedrock AgentCore Memory long-term memory events to Kinesis Data Streams

[Update] I tried delivering Amazon Bedrock AgentCore Memory long-term memory events to Kinesis Data Streams

2026.03.07

This page has been translated by machine translation. View original

Introduction

Hello, I'm Jinno from the consulting department who loves supermarkets.
Recently I've only been going to Gyomu Super.

Recently, the Amazon Bedrock AgentCore Memory API has been updated with a new parameter called streamDeliveryResources.

https://awsapichanges.com/archive/changes/053383-bedrock-agentcore-control.html

It's now possible to stream changes to long-term memory events generated by AgentCore Memory to Amazon Kinesis Data Streams in real-time.

In this article, I'll try out this feature!

Details of this Update

In AgentCore Memory, long-term memories are automatically extracted and stored from saved short-term memories based on configured long-term memory strategies.
Until now, to detect changes in long-term memory records, you had to poll the ListMemoryRecords API and manage differences yourself.

The official documentation states:

https://docs.aws.amazon.com/bedrock-agentcore/latest/devguide/memory-record-streaming.html

Instead of polling APIs to detect changes, you receive push-based events to a Kinesis Data Stream in your account, enabling event-driven architectures that react to memory record lifecycle changes as they occur.

With this update, the streamDeliveryResources parameter has been added, allowing long-term memory record lifecycle events to be delivered to Kinesis Data Streams in real-time.

The configuration is simple: AgentCore Memory → Kinesis Data Streams.
It's a good way to check what kind of information flows in.

First, when streaming is enabled, a validation StreamingEnabled event is delivered.

Event Description
StreamingEnabled Validation event when streaming is enabled

Additionally, the following three types of lifecycle events for long-term memory records are delivered:

Event Description
MemoryRecordCreated When a long-term memory record is created
MemoryRecordUpdated When a long-term memory record is updated
MemoryRecordDeleted When a long-term memory record is deleted

These cover all changes to long-term memory records: creation, deletion, and updates.

Also, you can choose from two levels of delivery content:

Level Content
METADATA_ONLY Only metadata such as memoryId, memoryRecordId, timestamps
FULL_CONTENT Metadata plus the full text of the long-term memory record

If you want to see the content of extracted long-term memories, you would select FULL_CONTENT.

Let's try it out!

Prerequisites

Here's my testing environment:

Item Version / Value
OS macOS 15.7
Python 3.12
boto3 1.42.63
bedrock-agentcore 1.4.3
AWS Region us-east-1

I'm managing this project with uv. Here's my pyproject.toml:

pyproject.toml
[project]
name = "agentcore-memory-stream"
version = "0.1.0"
description = "AgentCore Memory streamDeliveryResources demo"
requires-python = ">=3.12"
dependencies = [
    "boto3>=1.42.63",
    "bedrock-agentcore>=1.4.3",
]

Let's install the libraries:

Command
uv sync

Now let's try it out. I'll do everything using Python scripts.

Let's Try It

Creating a Kinesis Data Stream

First, let's create a Kinesis Data Stream for delivery:

setup_kinesis.py
import boto3

REGION = "us-east-1"
STREAM_NAME = "agentcore-memory-stream"

kinesis = boto3.client("kinesis", region_name=REGION)

print(f"Creating Kinesis Data Stream '{STREAM_NAME}'...")

try:
    kinesis.create_stream(StreamName=STREAM_NAME, ShardCount=1)
    print("Creation request sent. Waiting for it to become ACTIVE...")
except kinesis.exceptions.ResourceInUseException:
    print("Stream already exists. Skipping.")

waiter = kinesis.get_waiter("stream_exists")
waiter.wait(StreamName=STREAM_NAME)

desc = kinesis.describe_stream(StreamName=STREAM_NAME)
stream_arn = desc["StreamDescription"]["StreamARN"]
print(f"Stream ARN: {stream_arn}")

One shard is enough for testing purposes.

Execution result
$ uv run setup_kinesis.py
Creating Kinesis Data Stream 'agentcore-memory-stream'...
Creation request sent. Waiting for it to become ACTIVE...
Stream ARN: arn:aws:kinesis:us-east-1:123456789012:stream/agentcore-memory-stream

Creating IAM Role

Now, let's create an IAM role for AgentCore Memory to write records to Kinesis:

Replace <Stream ARN from previous step> with the value you obtained in the previous step.

setup_iam.py
import json
import boto3

REGION = "us-east-1"
ROLE_NAME = "agentcore-memory-stream-role"
STREAM_ARN = "<Stream ARN from previous step>"

iam = boto3.client("iam", region_name=REGION)

trust_policy = {
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Principal": {"Service": "bedrock-agentcore.amazonaws.com"},
            "Action": "sts:AssumeRole",
        }
    ],
}

kinesis_policy = {
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": ["kinesis:PutRecords", "kinesis:DescribeStream"],
            "Resource": STREAM_ARN,
        }
    ],
}

print(f"Creating IAM role '{ROLE_NAME}'...")

try:
    role_response = iam.create_role(
        RoleName=ROLE_NAME,
        AssumeRolePolicyDocument=json.dumps(trust_policy),
        Description="AgentCore Memory -> Kinesis stream delivery role",
    )
    role_arn = role_response["Role"]["Arn"]
    print(f"Role created: {role_arn}")
except iam.exceptions.EntityAlreadyExistsException:
    role_arn = iam.get_role(RoleName=ROLE_NAME)["Role"]["Arn"]
    print(f"Role already exists: {role_arn}")

iam.put_role_policy(
    RoleName=ROLE_NAME,
    PolicyName="kinesis-access",
    PolicyDocument=json.dumps(kinesis_policy),
)
print("Attached Kinesis access policy.")

The trust policy principal is specified as bedrock-agentcore.amazonaws.com. For this configuration, the permissions kinesis:PutRecords and kinesis:DescribeStream were sufficient to make it work.

Execution result
$ uv run setup_iam.py
Creating IAM role 'agentcore-memory-stream-role'...
Role created: arn:aws:iam::123456789012:role/agentcore-memory-stream-role
Attached Kinesis access policy.

Creating Memory with streamDeliveryResources

This is the main part. Let's create a Memory with the streamDeliveryResources parameter:

Replace <Stream ARN> and <IAM Role ARN> with the values from previous steps.

create_memory.py
import boto3
import time

REGION = "us-east-1"
KINESIS_STREAM_ARN = "<Stream ARN>"
ROLE_ARN = "<IAM Role ARN>"

client = boto3.client("bedrock-agentcore-control", region_name=REGION)

response = client.create_memory(
    name="stream_delivery_demo",
    description="Memory with Kinesis stream delivery",
    eventExpiryDuration=30,
    memoryExecutionRoleArn=ROLE_ARN,
    memoryStrategies=[
        {
            "semanticMemoryStrategy": {
                "name": "facts",
                "namespaces": ["/user/facts"],
            }
        }
    ],
    streamDeliveryResources={
        "resources": [
            {
                "kinesis": {
                    "dataStreamArn": KINESIS_STREAM_ARN,
                    "contentConfigurations": [
                        {
                            "type": "MEMORY_RECORDS",
                            "level": "FULL_CONTENT",
                        }
                    ],
                }
            }
        ]
    },
)

memory_id = response["memory"]["id"]
print(f"Memory ID: {memory_id}")

# Wait until ACTIVE
while True:
    res = client.get_memory(memoryId=memory_id)
    status = res["memory"]["status"]
    print(f"Status: {status}")
    if status == "ACTIVE":
        break
    time.sleep(5)

print(f"Memory is ready! ID: {memory_id}")

For memoryStrategies, I'm setting the semanticMemoryStrategy that extracts facts. Since stream events only occur when long-term memory records are generated, I chose this strategy.
For streamDeliveryResources, I'm setting FULL_CONTENT to include the full text of long-term memory records in the delivery, and specifying the ARN of the Kinesis Data Stream we created earlier.

Memory creation is asynchronous. You need to wait a bit until the status becomes ACTIVE. When completed, you'll see something like:

Execution result
$ uv run create_memory.py
Memory ID: stream_delivery_demo-xxxxxxxxxx
Status: CREATING
Status: ACTIVE
Memory is ready! ID: stream_delivery_demo-xxxxxxxxxx

Creating Short-Term Memory to Generate Long-Term Memory

Once the Memory is ACTIVE, let's input conversation events to generate long-term memories.
Let's add short-term memories about preferences, hobbies, and habits, along with an AI's expected response.

Replace <Memory ID> with the ID displayed when creating the Memory.

create_event.py
from bedrock_agentcore.memory import MemoryClient

REGION = "us-east-1"
MEMORY_ID = "<Memory ID>"

memory_client = MemoryClient(region_name=REGION)

memory_client.create_event(
    memory_id=MEMORY_ID,
    actor_id="user-1",
    session_id="session-1",
    messages=[
        ("私はPythonが好きです。趣味は登山で、毎週末に山に登っています。", "user"),
        (
            "承知しました!Pythonがお好きで、登山が趣味なんですね。毎週末に登山されているとのこと、とてもアクティブですね。",
            "assistant",
        ),
    ],
)

print("Event creation completed!")
print("It will take several tens of seconds to generate Memory Records.")

When you add conversation events, long-term memories are automatically extracted and generated by the semanticMemoryStrategy. This generation is asynchronous, so you should wait a bit before reading from Kinesis.

Execution result
$ uv run create_event.py
Event creation completed!
It will take several tens of seconds to generate Memory Records.

Testing

Let's retrieve records from the Kinesis Data Stream to verify the streamed events!

read_kinesis.py
import boto3
import json
import time
import sys

REGION = "us-east-1"
STREAM_NAME = "agentcore-memory-stream"

kinesis = boto3.client("kinesis", region_name=REGION)

# Get shard iterator
stream = kinesis.describe_stream(StreamName=STREAM_NAME)
shard_id = stream["StreamDescription"]["Shards"][0]["ShardId"]

iterator = kinesis.get_shard_iterator(
    StreamName=STREAM_NAME,
    ShardId=shard_id,
    ShardIteratorType="TRIM_HORIZON",
)["ShardIterator"]

print("Retrieving records from Kinesis...\n")

all_records = []
for _ in range(3):
    response = kinesis.get_records(ShardIterator=iterator, Limit=100)
    all_records.extend(response["Records"])
    iterator = response["NextShardIterator"]
    if not response["Records"]:
        time.sleep(2)

if not all_records:
    print("No records have arrived yet.")
    print("Please wait for Memory Records to be generated and try again.")
else:
    print(f"Retrieved {len(all_records)} records!\n")
    for i, record in enumerate(all_records):
        data = json.loads(record["Data"])
        print(f"--- Record {i + 1} ---")
        print(json.dumps(data, indent=2, ensure_ascii=False))
        print()

Since we're using a newly created stream, we're reading from the beginning with TRIM_HORIZON. If you're reusing an existing stream, you might want to use LATEST to avoid reading past events, or try with a new stream for clarity.

Let's run it:

Execution result
$ uv run read_kinesis.py
Retrieving records from Kinesis...

4 records retrieved!

First, there's a StreamingEnabled event that notifies when streaming was enabled during Memory creation:

StreamingEnabled event
{
  "memoryStreamEvent": {
    "eventType": "StreamingEnabled",
    "eventTime": "2026-03-07T10:38:54.281457087Z",
    "memoryId": "stream_delivery_demo-xxxxxxxxxx",
    "message": "Streaming enabled for memory resource: stream_delivery_demo-xxxxxxxxxx"
  }
}

Next, when long-term memories are generated, MemoryRecordCreated events are delivered. In this case, three long-term memories were automatically extracted:

MemoryRecordCreated event (first)
{
  "memoryStreamEvent": {
    "eventType": "MemoryRecordCreated",
    "eventTime": "2026-03-07T10:42:46.953862273Z",
    "memoryId": "stream_delivery_demo-xxxxxxxxxx",
    "memoryRecordId": "mem-84b18edb-5441-4959-879e-a5911a009a8f",
    "memoryRecordText": "ユーザーはPythonが好きです。",
    "namespaces": ["/user/facts"],
    "createdAt": 1772880140057,
    "memoryStrategyId": "facts-Iw6sUBCHPP",
    "memoryStrategyType": "SEMANTIC"
  }
}
MemoryRecordCreated event (second)
{
  "memoryStreamEvent": {
    "eventType": "MemoryRecordCreated",
    "eventTime": "2026-03-07T10:42:47.986529571Z",
    "memoryId": "stream_delivery_demo-xxxxxxxxxx",
    "memoryRecordId": "mem-9e249035-2271-4057-936c-7c71eddca91a",
    "memoryRecordText": "ユーザーの趣味は登山です。",
    "namespaces": ["/user/facts"],
    "createdAt": 1772880140057,
    "memoryStrategyId": "facts-Iw6sUBCHPP",
    "memoryStrategyType": "SEMANTIC"
  }
}
MemoryRecordCreated event (third)
{
  "memoryStreamEvent": {
    "eventType": "MemoryRecordCreated",
    "eventTime": "2026-03-07T10:42:47.959662860Z",
    "memoryId": "stream_delivery_demo-xxxxxxxxxx",
    "memoryRecordId": "mem-d00f0061-a4f7-48da-a235-8d59e753866e",
    "memoryRecordText": "ユーザーは毎週末に山に登っています。",
    "namespaces": ["/user/facts"],
    "createdAt": 1772880140057,
    "memoryStrategyId": "facts-Iw6sUBCHPP",
    "memoryStrategyType": "SEMANTIC"
  }
}

Great! Long-term memory events are being delivered to Kinesis correctly!

Since I specified FULL_CONTENT, the memoryRecordText includes the full text of the record.
From a single conversation, three facts were automatically extracted: "likes Python," "hobby is mountain climbing," and "goes hiking every weekend."

Each event also contains metadata like memoryStrategyId, memoryStrategyType, and namespaces, so you could filter by strategy or namespace at the delivery destination.

Conclusion

Long-term memory records automatically extracted from conversation events flow into Kinesis in real-time, making it possible to record them as audit logs or use these events for real-time integration.
If needed, you could also combine this with Amazon Data Firehose to save the data to S3.
Personally, I might not use this extensively, but I could see using it for keeping logs.

I hope this article was helpful. Thank you for reading!

Share this article

FacebookHatena blogX

Related articles