I Built an Agent That Automatically Generates and Executes SQL from Natural Language Using Amazon Athena × Bedrock Converse API

I Built an Agent That Automatically Generates and Executes SQL from Natural Language Using Amazon Athena × Bedrock Converse API

I will introduce the steps to build an architecture using only AWS Console operations that enables a Bedrock Agent equipped with an Athena SQL query execution tool to answer questions about structured data involving aggregation, comparison, and ranking in natural language.
2026.06.05

This page has been translated by machine translation. View original

Introduction

Amazon Bedrock's Knowledge Base (RAG) is powerful for searching unstructured documents, but it cannot fully handle questions involving aggregation, comparison, or ranking, such as "What is the total sales for Q1 FY2025?"

To address this challenge, we used Bedrock's Converse API (tool_use) to build an agent loop on the application side, enabling Claude to generate SQL, execute it on Athena, and generate answers based on the results.

While it's also possible to use Bedrock Agents (managed service), building your own agent loop gives you the advantage of freedom in streaming control and custom UI. This article introduces the implementation of this approach.

Architecture

bedrock-converse-api-athena-sql-agent

User's question

Application server (FastAPI, etc.)

bedrock_runtime.converse()  ← SQL tool defined via tool_use

Claude generates SQL (stop_reason: "tool_use")

App receives SQL and executes directly on Athena (no Lambda needed)

Results added to messages as toolResult

bedrock_runtime.converse()  ← Generate answer based on results

(Generate additional SQL if needed → re-execute → loop)

stop_reason: "end_turn" → answer complete

Key point: Instead of Bedrock Agents (managed service), we use the toolConfig of the Converse API to control the loop on the application side. No Lambda or Action Groups are needed — the app calls Athena directly.

Why Converse API Instead of Bedrock Agents?

Aspect Bedrock Agents Converse API + Custom Loop
Tool execution Via Lambda (Action Group) Direct call from app
Streaming Controlled by Agent Freely customizable
UX Fixed Can control each step: SQL display, result display, etc.
Loop control Left to Agent Fine-grained control such as max iteration count
Debugging Check from trace logs Verify directly from app logs
Deployment Agent + Lambda + Action Group App only

If you already have an application server, the Converse API approach is simpler to implement.

Prerequisites & Environment

  • An AWS account must be available
  • Access to Claude models on Amazon Bedrock must be enabled
  • Structured data (CSV/Excel, etc.) must be converted to Parquet format
  • Python 3.12 + boto3
  • Region: ap-northeast-1 (Tokyo)

Why Parquet?

Athena can also read CSV, but Parquet has the following advantages:

  • Columnar format: Reduces scan volume by reading only the necessary columns, lowering costs
  • Retains type information: Types such as INT and STRING are automatically recognized
  • High compression efficiency: Storage costs are also reduced with Snappy compression

To convert using Python, pandas + pyarrow is the easiest approach:

import pandas as pd

df = pd.read_excel("data.xlsx", sheet_name="Sheet1")
df.to_parquet("output/data.parquet", engine="pyarrow", compression="snappy")

Steps

Step 1: Place Data in S3

Upload Parquet files to S3. The key is to separate by prefix (folder) for each table.

s3://your-bucket/sql/
  ├── sales/
  │   └── sales.parquet
  ├── employees/
  │   └── employees.parquet
  └── products/
      └── products.parquet

Open the target bucket in the S3 console, create a folder for each table name under the sql/ prefix, and upload the corresponding Parquet file to each folder.
Also create the folder for Athena query result output (athena-results/), which will be described later.

s3-athena-folder-structure-redacted_dot_app

Note: Athena Query Result Output Location (athena-results/)

Every time Athena executes a query, it always writes the results to S3. This is an Athena specification that cannot be omitted.

s3://your-bucket/
  ├── sql/                  ← Source data (Parquet)
  │   ├── sales/
  │   └── employees/
  └── athena-results/       ← Auto-generated by Athena (query results)
      ├── <query-id-1>.csv
      ├── <query-id-1>.csv.metadata
      ├── <query-id-2>.csv
      ├── <query-id-2>.csv.metadata
      └── ...(increases with each query execution)

athena-results-example-redacted_dot_app

Why write to S3:

Athena is a serverless query engine that has no persistent storage of its own. Workers are launched to process each query execution and terminate immediately after returning results. Therefore, writing to S3 is mandatory as the place to retain query results.

Output file structure:

File Contents
<query-id>.csv The query results themselves (CSV with column headers)
<query-id>.csv.metadata JSON recording query metadata (statistics, scan volume, etc.)

Processing flow:

  1. Specify the output destination in ResultConfiguration.OutputLocation via start_query_execution()
  2. Athena executes the query and writes the results as a pair of .csv + .csv.metadata
  3. get_query_results() reads that CSV and returns the results

Files accumulate with each query execution at this output location. Repeated development and testing can quickly result in hundreds to thousands of files, so it is recommended to set up automatic deletion using S3 lifecycle rules.

Note that the IAM policy requires both s3:PutObject (for Athena to write) and s3:GetObject (to read results) for this output location.

Step 2: Clean Up athena-results/

As mentioned above, Athena writes result files to S3 every time a query is executed. Since leaving this unchecked causes the object count to balloon, set up automatic deletion using S3 lifecycle rules.

Configuration in the S3 console:

  1. Open the target bucket in the S3 console
  2. Go to the "Management" tab → Click "Create lifecycle rule"

s3-rule-menu-redacted_dot_app-redacted_dot_app

  1. Lifecycle rule settings:
    • Lifecycle rule name: delete-athena-results-after-7-days
    • Rule scope: Select "Limit the scope of this rule using one or more filters"
    • Prefix: Enter athena-results/
    • Object tags and object size can be left blank as-is

name-the-rule-and-set-prefix-redacted_dot_app

  1. Lifecycle rule actions:
    • Check "Expire current versions of objects"
    • A field for entering the number of days will appear in the "Review transition and expiration actions" section at the bottom
    • Days: Enter 7 (shorter is fine for development environments; adjust according to requirements for production)

set-expiration-for-seven-days-redacted_dot_app

  1. Click "Create rule"

create-rule-button-redacted_dot_app

created-athena-lifecycle-rule-redacted_dot_app

This will automatically delete query results after 7 days, keeping unnecessary storage costs and object count growth under control.

Step 3: Define Database and Tables in Athena

Athena uses Glue Data Catalog metadata to access data on S3. The easiest approach is to run DDL directly from the query editor in the Athena console.

Initial setup — Set query result output location:

If you're using Athena for the first time, you need to configure the result output location before running queries.

  1. Athena console → Query editor → "Settings" tab → "Manage" button
  2. Enter s3://your-bucket/athena-results/ in "Location of query result"
  3. Other fields (Expected bucket owner, encryption, etc.) can be left blank
  4. Click "Save"

set-query-results-location-redacted_dot_app

Running DDL:

Paste the following DDL one at a time into the query editor and execute. Since Athena only accepts one statement per query execution, running multiple CREATE TABLE statements together will result in an error.

-- Query 1: Create database
CREATE DATABASE IF NOT EXISTS your_database
COMMENT 'Structured data for SQL agent';

create-database-in-athena-redacted_dot_app

-- Query 2: Example table: sales data
CREATE EXTERNAL TABLE your_database.sales (
  `region`         STRING   COMMENT 'Region name',
  `fiscal_year`    INT      COMMENT 'Fiscal year',
  `quarter`        STRING   COMMENT 'Quarter (1Q, 2Q, 3Q, 4Q)',
  `revenue_plan`   BIGINT   COMMENT 'Planned revenue',
  `revenue_actual`  BIGINT  COMMENT 'Actual revenue'
)
STORED AS PARQUET
LOCATION 's3://your-bucket/sql/sales/'
TBLPROPERTIES ('parquet.compression'='SNAPPY');

create-tables-in-athena-redacted_dot_app

Key points:

  • CREATE EXTERNAL TABLE only references the data on S3 as-is — no data copying occurs
  • Specify a folder path rather than an individual file for LOCATION (don't forget the trailing /)
  • When using Japanese column names, wrap them in backticks in DDL and double quotes in SELECT statements
  • To change the schema, redefine it with DROP TABLECREATE TABLE (data on S3 is not deleted)

After creation, verify (also run one at a time):

SELECT * FROM your_database.sales LIMIT 10;
SELECT COUNT(*), MIN(fiscal_year), MAX(fiscal_year) FROM your_database.sales;

Step 4: Set Application Environment Variables

Manage the settings for the application to connect to Athena using environment variables. Configure them according to your deployment method — ECS Fargate task definition, Docker env file, .env file, etc.

Environment variable Value Description
ATHENA_DATABASE your_database Database name created in Step 3
ATHENA_WORKGROUP primary Athena workgroup (default is primary)
ATHENA_OUTPUT_S3 s3://your-bucket/athena-results/ Query result output location

Step 5: Configure IAM Policy

Grant the application's execution role (ECS task role, etc.) access permissions for Athena, S3, and Glue.

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "athena:StartQueryExecution",
                "athena:GetQueryExecution",
                "athena:GetQueryResults",
                "athena:StopQueryExecution",
                "athena:GetWorkGroup"
            ],
            "Resource": "*"
        },
        {
            "Effect": "Allow",
            "Action": [
                "glue:GetDatabase",
                "glue:GetTable",
                "glue:GetTables",
                "glue:GetPartition",
                "glue:GetPartitions"
            ],
            "Resource": "*"
        },
        {
            "Effect": "Allow",
            "Action": [
                "s3:GetBucketLocation",
                "s3:ListBucket"
            ],
            "Resource": "arn:aws:s3:::your-bucket"
        },
        {
            "Effect": "Allow",
            "Action": [
                "s3:PutObject",
                "s3:GetObject"
            ],
            "Resource": "arn:aws:s3:::your-bucket/*"
        }
    ]
}

Key points:

  • s3:PutObject is required for Athena to write query results to athena-results/
  • s3:GetObject is required for both reading data (Parquet in sql/) and retrieving results
  • glue:GetPartition(s) is required when using partitioned tables (include it in preparation for future expansion)
  • For production, restricting Resource to arn:aws:s3:::your-bucket/sql/* and arn:aws:s3:::your-bucket/athena-results/* is more secure

Step 6: Implement the Athena Query Execution Module

Instead of Lambda, create a module within the application that calls Athena directly.

"""athena.py — Athena query execution module"""

import asyncio
import re
import time
import boto3

MAX_ROWS = 200
POLL_INTERVAL = 0.5
MAX_WAIT = 30

ATHENA_DATABASE = "your_database"
ATHENA_WORKGROUP = "primary"
ATHENA_OUTPUT_S3 = "s3://your-bucket/athena-results/"

_SELECT_ONLY = re.compile(r"^\s*SELECT\b", re.IGNORECASE)

def _validate_sql(sql: str) -> None:
    """Reject anything other than SELECT statements."""
    if not _SELECT_ONLY.match(sql):
        raise ValueError(
            f"Only SELECT statements are allowed. Received: {sql[:80]!r}"
        )

def _run_query_sync(sql: str) -> dict:
    """Execute a query on Athena and return the results (synchronous)."""
    athena = boto3.client("athena")

    response = athena.start_query_execution(
        QueryString=sql,
        QueryExecutionContext={"Database": ATHENA_DATABASE},
        WorkGroup=ATHENA_WORKGROUP,
        ResultConfiguration={"OutputLocation": ATHENA_OUTPUT_S3},
    )
    execution_id = response["QueryExecutionId"]

    # Poll for completion
    deadline = time.time() + MAX_WAIT
    while time.time() < deadline:
        status = athena.get_query_execution(QueryExecutionId=execution_id)
        state = status["QueryExecution"]["Status"]["State"]
        if state == "SUCCEEDED":
            break
        if state in ("FAILED", "CANCELLED"):
            reason = status["QueryExecution"]["Status"].get(
                "StateChangeReason", "unknown"
            )
            raise RuntimeError(f"Athena query {state}: {reason}")
        time.sleep(POLL_INTERVAL)
    else:
        raise TimeoutError(
            f"Athena query did not complete within {MAX_WAIT}s"
        )

    # Retrieve results
    paginator = athena.get_paginator("get_query_results")
    pages = paginator.paginate(QueryExecutionId=execution_id)

    rows: list[list] = []
    columns: list[str] = []
    for page in pages:
        result = page["ResultSet"]
        if not columns:
            columns = [
                c["Label"]
                for c in result["ResultSetMetadata"]["ColumnInfo"]
            ]
        for row in result["Rows"][1 if not rows else 0:]:
            values = [d.get("VarCharValue") for d in row["Data"]]
            rows.append(values)
            if len(rows) >= MAX_ROWS:
                return {"columns": columns, "rows": rows, "row_count": len(rows)}

    return {"columns": columns, "rows": rows, "row_count": len(rows)}

async def execute_query(sql: str) -> dict:
    """Async wrapper. Executes synchronous processing in a thread pool."""
    _validate_sql(sql)
    loop = asyncio.get_event_loop()
    return await loop.run_in_executor(None, _run_query_sync, sql)

Design points:

Point Details
Only SELECT allowed Checked via regex. Even if Claude mistakenly generates DML, it won't be executed
Polling method Athena uses async execution, so a 3-step process: start_query_execution → status check → get_query_results
200-row result limit To avoid overflowing Claude's context window
Async wrapper Wrapped with run_in_executor for use in async frameworks such as FastAPI

Step 7: Tool Definition (Converse API toolConfig)

Create the tool definition to pass to the Converse API. This is the interface that tells Claude "a SQL execution tool is available."

EXECUTE_SQL_TOOL = {
    "toolSpec": {
        "name": "execute_sql_query",
        "description": (
            "Executes a Presto SQL SELECT query on Athena and returns the results as JSON."
        ),
        "inputSchema": {
            "json": {
                "type": "object",
                "properties": {
                    "sql": {
                        "type": "string",
                        "description": "The Presto SQL SELECT statement to execute",
                    }
                },
                "required": ["sql"],
            }
        },
    }
}

Compared to Bedrock Agents' Action Group + OpenAPI schema, this is extremely simple. All you need to do is define the tool's name, description, and parameters in a Python dictionary.

Step 8: Implementing the Agent Loop

This is the core of this article. Using the Converse API, we build our own loop to "have Claude generate SQL → execute it → return results → have it generate an answer."

"""agent_loop.py — SQL agent loop using the Converse API"""

import json
import boto3
from athena import execute_query

MODEL_ID = "ap-northeast-1.anthropic.claude-sonnet-4-6"
MAX_ITERATIONS = 4  # Maximum number of SQL execution iterations

bedrock = boto3.client("bedrock-runtime", region_name="ap-northeast-1")

# Tool definition (created in Step 4)
TOOL_CONFIG = {"tools": [EXECUTE_SQL_TOOL]}

def load_system_prompt() -> str:
    with open("system_prompt.md", encoding="utf-8") as f:
        return f.read()

async def run_agent(user_message: str, history: list[dict]) -> str:
    """
    Runs the agent loop for a user's question and returns the final answer.

    Flow:
    1. Send messages + tool definition to Converse API
    2. If stop_reason is "tool_use" → extract SQL → execute on Athena → add results to messages → re-send
    3. If stop_reason is "end_turn" → return text answer
    4. Force termination if max iteration count is reached
    """
    system_prompt = load_system_prompt()
    messages = _build_messages(history, user_message)

    for iteration in range(MAX_ITERATIONS):
        # ── Converse API call ──
        response = bedrock.converse(
            modelId=MODEL_ID,
            system=[{"text": system_prompt}],
            messages=messages,
            toolConfig=TOOL_CONFIG,
            inferenceConfig={"maxTokens": 4096, "temperature": 0},
        )

        assistant_message = response["output"]["message"]
        stop_reason = response["stopReason"]

        # ── If Claude returns a text answer → complete ──
        if stop_reason != "tool_use":
            for block in assistant_message.get("content", []):
                if "text" in block:
                    return block["text"]
            return ""

        # ── If Claude returns a tool call ──
        tool_use_block = _extract_tool_use(assistant_message)
        if not tool_use_block:
            return "Failed to generate SQL."

        sql = tool_use_block["input"]["sql"]
        tool_use_id = tool_use_block["toolUseId"]

        print(f"[iteration {iteration + 1}] SQL: {sql}")

        # ── Execute SQL on Athena ──
        try:
            result = await execute_query(sql)
        except (ValueError, RuntimeError, TimeoutError) as exc:
            result = {"error": str(exc)}

        # ── Add results to the conversation as toolResult ──
        messages = messages + [
            assistant_message,  # Claude's assistant message (containing tool_use)
            {
                "role": "user",
                "content": [
                    {
                        "toolResult": {
                            "toolUseId": tool_use_id,
                            "content": [
                                {"text": json.dumps(result, ensure_ascii=False)}
                            ],
                        }
                    }
                ],
            },
        ]
        # Call converse() again in the next loop iteration

    return "Maximum iteration count reached."

def _extract_tool_use(message: dict) -> dict | None:
    """Extract the execute_sql_query toolUse block from an assistant message."""
    for block in message.get("content", []):
        if "toolUse" in block and block["toolUse"]["name"] == "execute_sql_query":
            return block["toolUse"]
    return None

def _build_messages(history: list[dict], current: str) -> list[dict]:
    """Convert conversation history to the message format for the Converse API."""
    messages = []
    for turn in history:
        role = turn.get("role")
        content = turn.get("content", "")
        if role in ("user", "assistant") and content:
            messages.append({"role": role, "content": [{"text": content}]})
    messages.append({"role": "user", "content": [{"text": current}]})
    return messages

Agent loop flow diagram:

messages = [user_message]

┌─── Loop start (max 4 times) ───────────────────────┐
│                                                    │
│  response = converse(messages, toolConfig)          │
│                                                    │
│  stop_reason == "end_turn"?  ──→ Return text answer │
│        │ No                                        │
│        ↓                                           │
│  stop_reason == "tool_use"                         │
│        │                                           │
│        ↓                                           │
│  Extract SQL → Execute on Athena                    │
│        │                                           │
│        ↓                                           │
│  messages += [assistant_msg, toolResult_msg]        │
│        │                                           │
│        └──→ Return to loop start                   │
│                                                    │
└────────────────────────────────────────────────────┘

Implementation key points:

Point Details
Branch on stop_reason If "tool_use", execute SQL and continue loop; if "end_turn", answer is complete
toolResult format Linked to the corresponding tool call via toolUseId. This is the Converse API specification
Adding messages Add the assistant message (including tool call) and user message (toolResult) as a pair
Limiting iteration count Claude may query multiple tables sequentially, so set a limit to prevent infinite loops
Return errors as results too Returning Athena errors as JSON allows Claude to correct the SQL and retry

Step 9: Embed Schema Information in the System Prompt

This is the most important step. For Claude to generate SQL correctly, you need to explicitly describe table definitions and data characteristics in the system prompt.

You are a data analyst for [domain].

## Answer Rules
1. Cite exact figures from search results in your answers
2. Always include units with numerical values
3. If data is not found, respond with "No matching data was found"
4. Do not make assumptions or estimates

## SQL Query Writing Rules
- Syntax: Presto SQL (Athena compatible)
- Wrap Japanese column names in double quotes (e.g., "売上高")
- Use single quotes for string values (e.g., '1Q')

## Table Definitions

### Table name: sales
| Column | Type | Description | Example values |
|--------|-----|------|--------|
| `region` | STRING | Region name | Tokyo, Osaka |
| `fiscal_year` | INT | Fiscal year | 2021–2026 |
| `quarter` | STRING | Quarter | 1Q, 2Q, 3Q, 4Q, Full year |
| `revenue_plan` | BIGINT | Planned revenue (millions of yen) | — |
| `revenue_actual` | BIGINT | Actual revenue (millions of yen) | — |

## Data Notes
- [Filter conditions to avoid double-counting during aggregation]
- [Constraints on specific column value combinations]
- [Fiscal years/periods with missing data]
- [Rules for determining ambiguous terminology]

Prompt design tips:

Point Details
Table definitions are essential Claude doesn't know the DB schema. List the name, type, description, and possible values for every column
Document aggregation rules clearly For data containing summary rows, specifically describe filter conditions to prevent double-counting
Show JOIN patterns If joins across multiple tables are needed, include specific SQL examples
Define ambiguous terms If the same word has multiple meanings (e.g., whether "sales" means amount or number of units), establish decision rules
Write example values Claude doesn't know whether the value in the quarter column is '1Q' or 'Q1'

Lessons from experience: The quality of the system prompt determines 80% of SQL generation accuracy. Writing not just table definitions but also "notes specific to this data" carefully significantly reduces Claude's mistakes. In production use, we stabilized accuracy by including more than 14 rules in the prompt.

Step 10: Verify Operation

Verify operation with questions like the following:

Please tell me the total actual sales across all regions for Q1 FY2025.

When operating correctly, you will see the following flow in the logs:

[iteration 1] SQL: SELECT SUM("revenue_actual") AS total FROM sales WHERE "fiscal_year" = 2025 AND "quarter" = '1Q'

Operation is successful if Claude behaves as follows:

  1. Analyzes the question and decides to use the execute_sql_query tool (stop_reason: "tool_use")
  2. Generates an appropriate SELECT statement
  3. The app executes the query directly on Athena
  4. Returns the results as toolResult
  5. Claude interprets the results and answers in natural language (stop_reason: "end_turn")

Cases where multiple SQL executions occur: For questions like "Compare sales across all business segments," Claude may execute SQL multiple times for each table. In this case, the loop runs multiple times and aggregates all results before answering.

Common Pitfalls

Insufficient IAM Permissions

The application's execution role (ECS task role, etc.) requires athena:*, s3:* (for both data and results buckets), and glue:Get*.

Athena Query Result Output Location

Athena writes query results to S3. Verify write permissions for the path specified in ATHENA_OUTPUT_S3.

Handling Japanese Column Names

  • DDL (CREATE TABLE): Enclose with backticks `
  • DML (SELECT): Enclose with double quotes "
  • Explicitly state the rule to use double quotes in the system prompt. Claude tends to use backticks by default

toolResult Format Errors

In the Converse API, the toolUseId in toolResult must match the ID of the immediately preceding toolUse block. A mismatch will cause the API to return an error.

Claude Not Using SQL

Specifying "Please use the execute_sql_query tool for all questions" in the system prompt improves this behavior.

Cost Estimates

Service Billing Unit Estimate
Athena Amount of data scanned $5/TB (minimal with Parquet)
S3 Storage + requests Negligible with Parquet at a few MB
Bedrock Converse API Number of input/output tokens Model-dependent

The Parquet + Athena combination scans minimal data, and at internal tool usage levels, costs often stay within a few dollars per month. With no Lambda required, there's also less to manage.

Summary

By leveraging tool_use in the Bedrock Converse API and implementing an agent loop on the application side, we achieved natural language SQL queries against structured data.

What we learned:

  • You can build your own agent simply by detecting stop_reason: "tool_use" in the Converse API and looping. The setup is simpler than Bedrock Agents (managed service)
  • SQL generation accuracy is directly tied to the quality of the system prompt. Write not just table definitions, but also data characteristics, aggregation rules, rules for handling ambiguous terms, and concrete SQL examples
  • By calling Athena directly from the application, Lambda Cold Starts and Action Group configuration become unnecessary, resulting in faster responses
  • Returning errors as toolResult allows Claude to independently fix the SQL and retry

If you're hitting the limits of RAG for structured data searches, or if you're concerned about Bedrock Agents constraints (streaming, UI control), give this Converse API + custom loop approach a try.

Share this article