1
0
mirror of synced 2026-05-22 14:43:35 +00:00

feat(payments): add Pay for Data (Heurist) use case (#1545)

Co-authored-by: Joshua Smith <jsmithac@amazon.com>
This commit is contained in:
Joshua M. Smith
2026-05-21 12:28:30 -07:00
committed by GitHub
parent 1b1ebcff5c
commit 87c999ba43
11 changed files with 2589 additions and 0 deletions
@@ -0,0 +1,54 @@
# Heurist Finance Agent — environment configuration
# Copy to .env and fill in your values before running pay-for-data.ipynb.
# ── AWS region ─────────────────────────────────────────────────────────────
AWS_REGION=us-west-2
# ── IAM roles (required) ───────────────────────────────────────────────────
# ControlPlaneRole: creates CredentialProvider / Manager / Connector
CONTROL_PLANE_ROLE_ARN=arn:aws:iam::123456789012:role/AgentCoreControlPlaneRole
# ManagementRole: creates sessions, invokes the Runtime endpoint
MANAGEMENT_ROLE_ARN=arn:aws:iam::123456789012:role/AgentCoreManagementRole
# ProcessPaymentRole: the Runtime container execution role; can ProcessPayment
PROCESS_PAYMENT_ROLE_ARN=arn:aws:iam::123456789012:role/AgentCoreProcessPaymentRole
# ResourceRetrievalRole: attached to the Payment Manager as its authorizer role
RESOURCE_RETRIEVAL_ROLE_ARN=arn:aws:iam::123456789012:role/AgentCoreResourceRetrievalRole
# ── Coinbase CDP credentials (required for embedded wallet) ────────────────
# Get these from https://portal.cdp.coinbase.com → your project → API keys
CDP_API_KEY_NAME=organizations/your-org-id/apiKeys/your-key-id
CDP_API_KEY_PRIVATE_KEY=-----BEGIN EC PRIVATE KEY-----\n...
CDP_WALLET_SECRET=your-wallet-secret
# Optional: email to link the embedded wallet to a user identity (for WalletHub login)
WALLET_EMAIL=your@email.com
# ── Network / blockchain ───────────────────────────────────────────────────
# base-mainnet: eip155:8453 (default — settles real USDC against the live
# Heurist mesh x402 registry)
# base-sepolia: eip155:84532 (testnet — free faucet USDC at faucet.circle.com,
# but Heurist Sepolia endpoints currently fail
# EIP-712 simulation against Coinbase's signer)
NETWORK=base-mainnet
# ── Provisioned resource IDs (populated by Step 4 — skip creation on re-runs) ──
# Leave blank on first run; save values printed by the notebook after Step 4.
MANAGER_ARN=
PAYMENT_CONNECTOR_ID=
PAYMENT_INSTRUMENT_ID=
# ── Session config ─────────────────────────────────────────────────────────
USER_ID=heurist-demo-user
SESSION_MAX_SPEND=0.25
SESSION_EXPIRY_MINUTES=60
# ── Bedrock model ──────────────────────────────────────────────────────────
BEDROCK_MODEL_ID=us.anthropic.claude-sonnet-4-6
# ── Heurist catalog ────────────────────────────────────────────────────────
HEURIST_CATALOG_URL=https://mesh.heurist.xyz/x402/agents?details=true
HEURIST_AGENT_IDS=ExaSearchDigestAgent,YahooFinanceAgent,FredMacroAgent,SecEdgarAgent
# ── S3 artifacts ───────────────────────────────────────────────────────────
# Optional: override the auto-generated bucket name
# ARTIFACTS_BUCKET=my-custom-bucket-name
@@ -0,0 +1,22 @@
.env
.venv/
__pycache__/
*.pyc
*.py[cod]
.DS_Store
.ipynb_checkpoints/
.pytest_cache/
.ruff_cache/
# AgentCore CLI scaffold + deploy artifacts (regenerated by the notebook).
# The agent source of truth lives in agent/. The scaffold under payfordata/
# is reproducible by re-running the notebook.
payfordata/
# Catalog cache is regenerated by `python agent/sync_registry.py` before each
# deploy — keep the latest version out of git.
agent/catalog_live_cache.json
# Container .env may carry deployment-specific values (S3 bucket name etc.).
# Keep it out of git; the notebook regenerates it from the user's environment.
agent/.env
@@ -0,0 +1,234 @@
# Pay for Data — Heurist Finance Agent
## Overview
A finance research agent that pays for real-time market data using **Amazon Bedrock AgentCore payments**. The agent calls paid [Heurist](https://heurist.xyz) endpoints for live prices, SEC filings, and macro indicators, analyzes the data with AgentCore Code Interpreter, and returns charts and reports as S3 presigned URLs — all without any manual payment code in the tools.
The agent is deployed to **AgentCore Runtime**: a managed container endpoint with HTTPS invocation, SigV4 auth, and automatic observability via CloudWatch.
> **Mainnet sample.** This walkthrough targets Base mainnet and calls the live [Heurist mesh x402 registry](https://mesh.heurist.xyz/x402/agents?details=true). Every invocation settles real USDC on-chain. Typical per-call prices are $0.002$0.005, so $1 USDC covers ~200 calls. A Base Sepolia variant of the catalog exists at `/x402/base-sepolia/agents?details=true`, but the EIP-712 signing path on AgentCore's Coinbase connector follows the connector's network selection, so this sample uses Base mainnet.
Heurist endpoints use the [x402 protocol](https://x402.org) — they return HTTP 402 until a valid payment proof is attached. The `AgentCorePaymentsPlugin` handles payment end-to-end: it intercepts 402 responses, generates a USDC proof via the AgentCore payment manager, attaches it, and retries. Your tool code stays a plain `http_request` call.
![CloudWatch GenAI Observability — Heurist Finance Agent](images/obs-dashboard.png)
## Architecture
```
App Backend (ManagementRole) AgentCore Runtime
| +------------------------------+
| create_session(budget=$X) | agent/main.py |
| | BedrockAgentCoreApp |
|-- invoke(manager_arn, session_id, --> | + AgentCorePaymentsPlugin |
| instrument_id, prompt) | |
| | http_request -> 402 |
|<-- {response, artifacts: [{url}]} --- | -> ProcessPayment -> retry |
| | -> Code Interpreter |
| get_session(check spend) | -> export to S3 |
+------------------------------+
|
v
CloudWatch GenAI Observability
(automatic via OpenTelemetry)
```
## How It Works
`AgentCorePaymentsPlugin` handles the entire x402 payment lifecycle:
```python
from bedrock_agentcore.payments.integrations.strands import (
AgentCorePaymentsPlugin,
AgentCorePaymentsPluginConfig,
)
payment_plugin = AgentCorePaymentsPlugin(
config=AgentCorePaymentsPluginConfig(
payment_manager_arn=PAYMENT_MANAGER_ARN,
user_id=USER_ID,
payment_instrument_id=PAYMENT_INSTRUMENT_ID,
payment_session_id=PAYMENT_SESSION_ID,
region="us-west-2",
)
)
agent = Agent(
model=BedrockModel(model_id=MODEL_ID),
tools=[http_request, code_interpreter, export_artifact_to_s3, ...],
plugins=[payment_plugin],
)
```
See [`agent/main.py`](agent/main.py) for the full implementation.
## Sample Details
| | |
|---|---|
| AgentCore components | AgentCore payments, AgentCore Code Interpreter, AgentCore Runtime |
| Agent framework | [Strands Agents](https://strandsagents.com/) |
| Model | Claude Sonnet 4.6 on Amazon Bedrock (configurable) |
| Payment protocol | [x402](https://x402.org) |
| Payment network | Base (USDC) |
## Data Sources
Fetched at runtime from the [Heurist mesh registry](https://mesh.heurist.xyz/x402/agents?details=true). By default the sample loads tools from four agents:
| Agent | Representative tools | Typical price |
|-------|----------------------|---------------|
| `YahooFinanceAgent` | `price_history`, `quote_snapshot`, `futures_snapshot` | $0.002 |
| `FredMacroAgent` | `macro_series_snapshot`, `macro_regime_context` | $0.003 |
| `SecEdgarAgent` | `filing_timeline`, `filing_diff`, `xbrl_fact_trends` | $0.002 |
| `ExaSearchDigestAgent` | `exa_web_search`, `exa_scrape_url` | $0.005 |
Override with the `HEURIST_AGENT_IDS` environment variable.
## Prerequisites
- An **AgentCore payment manager** created in your AWS account
- A **payment instrument** created and funded — embedded crypto wallet with USDC on **Base mainnet** (default; the notebook walks through this in Step 4)
- Coinbase CDP project with **Delegated signing** enabled, and a per-wallet delegation grant approved via the WalletHub URL returned at instrument creation
- Python 3.11+
- AWS credentials with Bedrock and AgentCore access in `us-west-2`
- Node.js 20+ (for the `@aws/agentcore` CLI)
- Docker (running, for `agentcore deploy` container build)
- [AWS CDK](https://docs.aws.amazon.com/cdk/v2/guide/getting_started.html) installed globally
## Layout
```
pay-for-data/
├── README.md
├── .env.example
├── pay-for-data.ipynb # notebook: deploy and invoke via AgentCore Runtime
└── agent/ # everything below ships in the Runtime container
├── main.py # AgentCore Runtime entry point (BedrockAgentCoreApp)
├── catalog.py # fetches Heurist registry, formats for system prompt
├── catalog_live_cache.json # synced catalog (bundled in Runtime image)
├── config.py # loads .env / payment context
├── sync_registry.py # CLI: refreshes the catalog cache (run before deploy)
├── requirements.txt # container Python deps
└── Dockerfile # opentelemetry-instrument python -m main
```
## Quick Start
Open [`pay-for-data.ipynb`](pay-for-data.ipynb) and run the cells in order:
| Step | What happens |
|------|-------------|
| 1 | Configure credentials and confirm AWS identity |
| 2 | Sync the Heurist tool catalog (bundled in the container image) |
| 3 | Create the S3 artifacts bucket |
| 4 | Provision embedded wallet resources (credential provider, manager, connector, instrument) |
| 5 | Fund the wallet and grant signing delegation via WalletHub |
| 6 | Enable Payment Manager observability (CW Logs + X-Ray vended-log delivery) |
| 7 | Scaffold and deploy to AgentCore Runtime via the `agentcore` CLI |
| 8 | Grant execution-role permissions (payment, Code Interpreter, S3, Bedrock + inference profile) |
| 9 | Invoke the deployed agent and inspect results |
| 10 | View observability traces in CloudWatch |
| 11 | Cleanup |
## Payment Flow
When the agent calls a paid Heurist endpoint:
1. `http_request` sends a POST to the endpoint URL.
2. Heurist returns HTTP 402 with x402 payment terms (network, asset, amount, recipient).
3. `AgentCorePaymentsPlugin` intercepts the response.
4. The plugin asks the AgentCore payment manager to generate a payment proof.
5. The payment manager uses the payment instrument to sign a USDC transfer and returns a proof.
6. The plugin attaches the proof as `X-PAYMENT` and retries — Heurist validates and returns the data.
The plugin retries up to 3 times per tool call. Payment limits are enforced at the session scope — the agent cannot exceed `maxSpendAmount`.
## How the Runtime Agent Works
`agent/main.py` implements the AgentCore Runtime service contract with full feature parity:
**Stateless, payload-driven**
All payment config (manager ARN, session ID, instrument ID) comes from the invocation payload. The container holds no credentials. The app backend (ManagementRole) creates payment sessions with spending limits before each invocation. The Runtime execution role (ProcessPaymentRole) can only spend within those limits.
**AgentCore Code Interpreter**
Code Interpreter is a remote AWS API — it works identically from a Runtime container as from any other environment. The agent uses it for pandas/matplotlib analysis and chart generation.
**S3 artifact storage**
Artifacts produced by Code Interpreter are uploaded to S3 and returned as presigned download URLs. The response shape is:
```json
{
"response": "<markdown research summary>",
"artifacts": [
{"name": "chart.png", "url": "https://...", "expires_in": 3600}
]
}
```
If `CI_ARTIFACTS_BUCKET` is not configured, the agent degrades gracefully: charts become markdown tables, text returns inline.
**Observability**
The `agentcore deploy` CLI configures the container to run under `opentelemetry-instrument`. Combined with `aws-opentelemetry-distro` (included in `agent/requirements.txt`), this provides:
- Strands agent spans (LLM calls, tool calls, agent turns) → CloudWatch GenAI Observability
- Code Interpreter calls stitched as child spans via W3C `traceparent` botocore instrumentation
- Payment calls (`ProcessPayment`, `GetPaymentInstrument`) as boto3 child spans
No instrumentation code required in `agent/main.py`.
**Execution role permissions** (attached by the notebook, Step 8):
| Permission set | Actions | Resource scope |
|---|---|---|
| Payment data-plane | `ProcessPayment`, `GetPaymentInstrument`, `GetPaymentInstrumentBalance`, `GetPaymentSession`, `GetResourcePaymentToken` | `payment-manager/*`, `payment-manager/*/instrument/*`, `payment-manager/*/session/*` |
| Code Interpreter | `StartCodeInterpreterSession`, `InvokeCodeInterpreter`, `StopCodeInterpreterSession` | `code-interpreter/*` |
| S3 artifacts | `PutObject`, `GetObject` | `<bucket>/heurist-finance-artifacts/*` |
| Bedrock model | `InvokeModel`, `InvokeModelWithResponseStream` | `foundation-model/*`, `inference-profile/*`, `application-inference-profile/*` (the latter two are required for CRIS-fronted models like Claude Sonnet 4.6 in us-west-2) |
## Environment Variables
See [`.env.example`](.env.example). Required on the host (notebook):
| Variable | Description |
|----------|-------------|
| `PAYMENT_MANAGER_ARN` | ARN of the AgentCore payment manager |
| `PAYMENT_SESSION_ID` | ID of an active payment session |
| `PAYMENT_INSTRUMENT_ID` | ID of a funded payment instrument (embedded crypto wallet) |
| `USER_ID` | User identifier for payment tracking |
| `BEDROCK_MODEL_ID` | Bedrock model (default: Claude Sonnet 4.6) |
| `HEURIST_AGENT_IDS` | Comma-separated Heurist agents to load |
| `HEURIST_CATALOG_URL` | Catalog endpoint — `https://mesh.heurist.xyz/x402/agents?details=true` (mainnet) or the `/x402/base-sepolia/...` variant for testnet |
Bundled in the container `.env` (set by Step 7):
| Variable | Description |
|----------|-------------|
| `CI_ARTIFACTS_BUCKET` | S3 bucket used for artifact upload |
| `CI_ARTIFACTS_PREFIX` | S3 key prefix (default: `heurist-finance-artifacts`) |
| `CI_ARTIFACTS_TTL` | Presigned URL TTL in seconds (default: 3600) |
| `AWS_REGION` | Region for boto3 clients |
| `AGENT_NAME` | Reported in payment observability |
| `BYPASS_TOOL_CONSENT` | Set to `true` so `strands_tools.http_request` skips its TTY confirm prompt — required because the Runtime container has no TTY |
| `AGENT_MAX_TOKENS` | Max Bedrock output tokens per agent turn (default: `32000`). Lower this if you only need short Q&A — Bedrock charges per output token, so a 32k cap is a worst-case ~$0.48 per turn for Claude Sonnet 4.6. Most turns use far less. The SDK default (4k) is too low for workflows that fetch data, run Code Interpreter, and write a markdown report in one turn — it raises `MaxTokensReachedException` mid-run. |
Payment context (`PAYMENT_MANAGER_ARN`, `PAYMENT_SESSION_ID`, `PAYMENT_INSTRUMENT_ID`, `USER_ID`) is passed in the **invocation payload** at runtime, not via env vars in the container.
## Costs
A single agent invocation incurs charges across four categories. Approximate worst-case figures for the default config:
| Category | Driver | Approx. cost per turn | Notes |
|---|---|---|---|
| **Heurist x402 (USDC on Base mainnet)** | Each paid tool call | $0.002$0.005 per call | Settles real USDC on-chain. A typical research run uses 310 paid calls. The wallet must be funded. |
| **Bedrock model output** | `AGENT_MAX_TOKENS` × Claude Sonnet 4.6 output rate | up to ~$0.90 per turn at the 32k cap | Bedrock charges $0.015 per 1k output tokens for Claude Sonnet 4.6 in us-west-2 (input is cheaper at $0.003 per 1k). Most turns use far less than the cap; lower `AGENT_MAX_TOKENS` for short Q&A. |
| **Bedrock AgentCore Runtime** | Container vCPU × seconds + memory × seconds while invoked | a few cents per minute of active invocation | Idle minutes between invocations are not billed (`idleRuntimeSessionTimeout=600s`). |
| **Bedrock AgentCore Code Interpreter** | Sessions started + minutes active | a few cents per turn | Only billed when the agent actually invokes the Code Interpreter tool. |
| **S3 + CloudWatch** | Artifact storage + log/trace ingestion | rounding error | A small chart + report is well under 1 MB. Vended-log delivery to CW Logs and X-Ray is metered the same as your other CW usage. |
Tune `AGENT_MAX_TOKENS` and `SESSION_MAX_SPEND` in `.env` to match the workflow you actually run. The notebook uses a $0.25 per-session spend cap by default, which is plenty for a multi-call research workflow.
## Notes
- Payment sessions expire. Create a fresh session before each invocation in automated workflows.
- Each paid call settles USDC on Base. Ensure your payment instrument is funded.
- Sync the catalog cache before building the container image (`python agent/sync_registry.py`). The cache is bundled in the image — the container does not call the Heurist registry at startup.
- Presigned artifact URLs expire after `CI_ARTIFACTS_TTL` seconds (default: 1 hour). Download or forward the URL to the end user promptly.
@@ -0,0 +1,37 @@
FROM public.ecr.aws/docker/library/python:3.13-slim-bookworm
WORKDIR /app
ENV PYTHONUNBUFFERED=1 \
PYTHONDONTWRITEBYTECODE=1 \
PIP_NO_CACHE_DIR=1 \
PIP_DISABLE_PIP_VERSION_CHECK=1
RUN apt-get update && apt-get install -y --no-install-recommends \
ca-certificates \
curl \
&& rm -rf /var/lib/apt/lists/*
RUN useradd -m -u 1000 bedrock_agentcore
COPY requirements.txt ./
RUN pip install -r requirements.txt
COPY --chown=bedrock_agentcore:bedrock_agentcore . .
USER bedrock_agentcore
EXPOSE 8080
# AgentCore Runtime monitors container health at the platform level via the
# service contract on :8080; this HEALTHCHECK exists for portability and to
# satisfy security scanners. BedrockAgentCoreApp serves /ping by default.
HEALTHCHECK --interval=30s --timeout=5s --start-period=15s --retries=3 \
CMD curl -fsS http://localhost:8080/ping || exit 1
# opentelemetry-instrument wraps botocore so that boto3 calls
# (InvokeCodeInterpreter, ProcessPayment) appear as child spans in the
# Runtime trace via W3C traceparent header propagation. Without this prefix,
# CI calls and payment calls are invisible in the AgentCore observability
# dashboard.
CMD ["opentelemetry-instrument", "python", "-m", "main"]
@@ -0,0 +1,217 @@
#!/usr/bin/env python3
"""Load the Heurist tool catalog and format it for the agent system prompt."""
from __future__ import annotations
import json
import math
import os
import re
import tempfile
from pathlib import Path
from typing import Any
import requests
from config import LIVE_CATALOG_CACHE_PATH, get_config
# --- Safety limits ---------------------------------------------------------
# These caps are intentionally generous for a sample but prevent accidental
# memory blow-ups from a misconfigured endpoint or disk corruption.
MAX_CATALOG_BYTES = 5 * 1024 * 1024 # 5 MiB on-disk cache
MAX_CATALOG_RESPONSE_BYTES = 10 * 1024 * 1024 # 10 MiB network payload
MAX_PROMPT_FIELD_LEN = 500 # per-field cap when rendered into the system prompt
_UNSAFE_FIELD_PLACEHOLDER = "(unavailable)"
_UNSAFE_PROMPT_CHARS = re.compile(r"[\x00-\x1f\x7f`|\[\]]")
def _sanitize_prompt_text(value: Any, max_len: int = MAX_PROMPT_FIELD_LEN) -> str:
"""Return a markdown-safe single-line string derived from ``value``.
External catalog data is interpolated into the agent's system prompt.
Without sanitization a malicious registry entry could inject links,
code fences, or table pipes that alter the prompt structure.
"""
if value is None:
return ""
text = str(value)
text = _UNSAFE_PROMPT_CHARS.sub(" ", text)
text = re.sub(r"\s+", " ", text).strip()
if len(text) > max_len:
text = text[:max_len].rstrip() + ""
return text
def _sanitize_url(value: Any) -> str:
"""Only accept http(s) URLs; otherwise return a placeholder."""
text = _sanitize_prompt_text(value, max_len=MAX_PROMPT_FIELD_LEN)
if not text:
return _UNSAFE_FIELD_PLACEHOLDER
if not re.match(r"^https?://[^\s]+$", text, re.IGNORECASE):
return _UNSAFE_FIELD_PLACEHOLDER
return text
def _coerce_price(raw: Any) -> float:
"""Convert a raw price value into a finite, non-negative float."""
try:
price = float(raw)
except (TypeError, ValueError) as exc:
raise ValueError(f"Invalid price value {raw!r}") from exc
if not math.isfinite(price) or price < 0:
raise ValueError(f"Invalid price value {raw!r}: must be a finite, non-negative number")
return price
def _atomic_write_text(path: Path, content: str) -> None:
"""Write ``content`` to ``path`` atomically via a same-directory temp file."""
path.parent.mkdir(parents=True, exist_ok=True)
fd, tmp_name = tempfile.mkstemp(prefix=f".{path.name}.", suffix=".tmp", dir=str(path.parent))
try:
with os.fdopen(fd, "w", encoding="utf-8") as fh:
fh.write(content)
fh.flush()
os.fsync(fh.fileno())
os.replace(tmp_name, path)
except Exception:
try:
os.unlink(tmp_name)
except OSError:
pass
raise
def fetch_live_catalog(session: requests.Session | None = None) -> dict[str, Any]:
"""Fetch the live Heurist mesh registry and cache it locally."""
cfg = get_config()
http = session or requests.Session()
response = http.get(cfg.heurist_catalog_url, timeout=30, stream=True)
response.raise_for_status()
chunks: list[bytes] = []
total = 0
for chunk in response.iter_content(chunk_size=64 * 1024):
if not chunk:
continue
total += len(chunk)
if total > MAX_CATALOG_RESPONSE_BYTES:
raise ValueError(f"Heurist catalog response exceeded {MAX_CATALOG_RESPONSE_BYTES} bytes")
chunks.append(chunk)
body = b"".join(chunks).decode(response.encoding or "utf-8")
payload = json.loads(body)
_atomic_write_text(LIVE_CATALOG_CACHE_PATH, json.dumps(payload, indent=2))
return payload
def load_live_catalog(path: Path | None = None) -> dict[str, Any]:
input_path = path or LIVE_CATALOG_CACHE_PATH
if not input_path.exists():
raise FileNotFoundError(f"Live catalog cache not found: {input_path}")
size = input_path.stat().st_size
if size > MAX_CATALOG_BYTES:
raise ValueError(
f"Catalog cache at {input_path} is {size} bytes which exceeds the "
f"{MAX_CATALOG_BYTES} byte limit. Delete or regenerate the file."
)
return json.loads(input_path.read_text(encoding="utf-8"))
def get_live_catalog(refresh: bool = False, session: requests.Session | None = None) -> dict[str, Any]:
if refresh or not LIVE_CATALOG_CACHE_PATH.exists():
return fetch_live_catalog(session=session)
return load_live_catalog()
def get_tools_for_agents(
agent_ids: tuple[str, ...] | list[str],
refresh: bool = False,
session: requests.Session | None = None,
) -> list[dict[str, Any]]:
"""Return normalized tool definitions for the selected Heurist agents."""
import logging
logger = logging.getLogger(__name__)
selected = set(agent_ids)
live_catalog = get_live_catalog(refresh=refresh, session=session)
tools: list[dict[str, Any]] = []
found_ids: set[str] = set()
for agent in live_catalog.get("agents", []):
agent_id = agent.get("agentId")
if not agent_id or agent_id not in selected:
continue
found_ids.add(agent_id)
for tool_def in agent.get("tools", []):
try:
price_usd = _coerce_price(tool_def["priceUsd"])
except (KeyError, ValueError):
continue
tools.append(
{
"agent_id": agent_id,
"tool_name": tool_def.get("name", ""),
"resource_url": tool_def.get("resourceUrl", ""),
"price_usd": price_usd,
"method": tool_def.get("method", "POST"),
"description": tool_def.get("description", ""),
"parameters": tool_def.get("parameters", {}) or {},
}
)
missing = selected - found_ids
if missing:
logger.warning(
"The following agent IDs were not found in the Heurist catalog and will be "
"skipped. They may have been renamed or removed: %s. "
"Run sync_registry to refresh the catalog, or update HEURIST_AGENT_IDS in .env.",
", ".join(sorted(missing)),
)
return tools
def format_catalog_for_prompt(tools: list[dict[str, Any]]) -> str:
"""Format the tool catalog as a reference table for the agent system prompt."""
lines = ["## Available Paid Endpoints (Heurist x402)", ""]
lines.append("| Agent | Tool | URL | Method | Price | Description |")
lines.append("|-------|------|-----|--------|-------|-------------|")
for t in tools:
agent_id = _sanitize_prompt_text(t.get("agent_id"), max_len=80)
tool_name = _sanitize_prompt_text(t.get("tool_name"), max_len=80)
url = _sanitize_url(t.get("resource_url"))
method = _sanitize_prompt_text(t.get("method"), max_len=10) or "POST"
desc = _sanitize_prompt_text(t.get("description"), max_len=80)
price = t.get("price_usd")
price_str = f"${price:.3f}" if isinstance(price, (int, float)) and math.isfinite(price) else "n/a"
lines.append(f"| {agent_id} | {tool_name} | {url} | {method} | {price_str} | {desc} |")
lines.append("")
lines.append("### Parameter Schemas")
lines.append("")
for t in tools:
params = t.get("parameters", {}) or {}
props = params.get("properties", {}) or {}
if not props:
continue
agent_id = _sanitize_prompt_text(t.get("agent_id"), max_len=80)
tool_name = _sanitize_prompt_text(t.get("tool_name"), max_len=80)
method = _sanitize_prompt_text(t.get("method"), max_len=10) or "POST"
url = _sanitize_url(t.get("resource_url"))
lines.append(f"**{agent_id}/{tool_name}** (`{method} {url}`)")
required_fields = params.get("required", []) or []
for name, schema in props.items():
if not isinstance(schema, dict):
schema = {}
safe_name = _sanitize_prompt_text(name, max_len=80)
required = safe_name in {_sanitize_prompt_text(r, max_len=80) for r in required_fields}
req_marker = " (required)" if required else ""
type_name = _sanitize_prompt_text(schema.get("type", "any"), max_len=40)
desc = _sanitize_prompt_text(schema.get("description", ""), max_len=120)
lines.append(f" - `{safe_name}`: {type_name}{req_marker}{desc}")
lines.append("")
return "\n".join(lines)
@@ -0,0 +1,110 @@
#!/usr/bin/env python3
"""Shared configuration for the Heurist finance agent."""
from __future__ import annotations
import os
from dataclasses import dataclass
from pathlib import Path
from dotenv import load_dotenv
AGENT_DIR = Path(__file__).resolve().parent
LIVE_CATALOG_CACHE_PATH = AGENT_DIR / "catalog_live_cache.json"
# Accept .env in either the agent dir (Runtime container layout) or the
# parent use-case dir (host machine layout for sync_registry).
ENV_CANDIDATE_PATHS: tuple[Path, ...] = (
AGENT_DIR / ".env",
AGENT_DIR.parent / ".env",
)
DEFAULT_HEURIST_AGENT_IDS = (
"ExaSearchDigestAgent",
"YahooFinanceAgent",
"FredMacroAgent",
"SecEdgarAgent",
)
# Required environment variables for the agent to run host-side scripts.
# The Runtime container does NOT need these — payment context comes from
# the invocation payload at runtime.
_REQUIRED_ENV_VARS: tuple[str, ...] = (
"PAYMENT_MANAGER_ARN",
"PAYMENT_SESSION_ID",
"PAYMENT_INSTRUMENT_ID",
)
def load_environment() -> None:
"""Load the local .env file from any of the supported locations."""
for candidate in ENV_CANDIDATE_PATHS:
if candidate.is_file():
load_dotenv(candidate, override=False)
@dataclass(frozen=True)
class AppConfig:
aws_region: str
aws_profile: str | None
bedrock_profile: str | None
bedrock_model_id: str
payment_manager_arn: str
payment_session_id: str
payment_instrument_id: str
user_id: str
heurist_catalog_url: str
heurist_tool_agent_ids: tuple[str, ...]
code_interpreter_session_name: str
agent_timeout_seconds: int
agent_max_tokens: int
def _parse_csv_tuple(raw_value: str | None, default: tuple[str, ...]) -> tuple[str, ...]:
if not raw_value:
return default
values = tuple(item.strip() for item in raw_value.split(",") if item.strip())
return values or default
def _require_env(name: str) -> str:
value = os.environ.get(name)
if value:
return value
searched = ", ".join(str(p) for p in ENV_CANDIDATE_PATHS)
raise RuntimeError(
f"Missing required environment variable {name!r}. "
f"Set it in your shell or in a .env file at one of: {searched}. "
f"See .env.example for the full list of required values."
)
def get_config() -> AppConfig:
load_environment()
missing = [name for name in _REQUIRED_ENV_VARS if not os.environ.get(name)]
if missing:
searched = ", ".join(str(p) for p in ENV_CANDIDATE_PATHS)
raise RuntimeError(
"Missing required environment variables: "
+ ", ".join(missing)
+ f". Set them in your shell or in a .env file at one of: {searched}. "
+ "See .env.example for the full list of required values."
)
return AppConfig(
aws_region=os.environ.get("AWS_REGION", "us-west-2"),
aws_profile=os.environ.get("AWS_PROFILE"),
bedrock_profile=os.environ.get("BEDROCK_PROFILE") or os.environ.get("AWS_PROFILE"),
bedrock_model_id=os.environ.get("BEDROCK_MODEL_ID", "us.anthropic.claude-sonnet-4-6"),
payment_manager_arn=_require_env("PAYMENT_MANAGER_ARN"),
payment_session_id=_require_env("PAYMENT_SESSION_ID"),
payment_instrument_id=_require_env("PAYMENT_INSTRUMENT_ID"),
user_id=os.environ.get("USER_ID", "demo-user"),
heurist_catalog_url=os.environ.get(
"HEURIST_CATALOG_URL",
"https://mesh.heurist.xyz/x402/base-sepolia/agents?details=true",
),
heurist_tool_agent_ids=_parse_csv_tuple(os.environ.get("HEURIST_AGENT_IDS"), DEFAULT_HEURIST_AGENT_IDS),
code_interpreter_session_name=os.environ.get("CODE_INTERPRETER_SESSION_NAME", "heurist-finance"),
agent_timeout_seconds=int(os.environ.get("AGENT_TIMEOUT_SECONDS", "300")),
agent_max_tokens=int(os.environ.get("AGENT_MAX_TOKENS", "64000")),
)
@@ -0,0 +1,601 @@
#!/usr/bin/env python3
"""
Heurist Finance Agent — AgentCore Runtime entry point.
Pay-for-data agent that:
- Calls paid Heurist endpoints via x402 (HTTP 402 → ProcessPayment → retry)
- Uses AgentCore Code Interpreter for sandboxed pandas/matplotlib analysis
- Uploads chart/CSV/report artifacts to S3 and returns presigned URLs
- Stateless — payment config (manager ARN, session, instrument) comes from
the invocation payload; the container holds no credentials
If `CI_ARTIFACTS_BUCKET` is not set, the agent degrades gracefully: charts
become markdown tables, text is returned inline.
Required IAM permissions for the execution role (see notebook Step 8):
Payments — ProcessPayment, GetPaymentInstrument, GetPaymentSession,
GetPaymentInstrumentBalance, GetResourcePaymentToken
on payment-manager/* and its instrument/* and session/*
Code Interpreter — Start/Stop/Invoke CodeInterpreterSession on code-interpreter/*
S3 — PutObject + GetObject on <bucket>/<prefix>/*
Bedrock — InvokeModel + InvokeModelWithResponseStream on the model
ARN AND on inference-profile/* (for CRIS-fronted models like
Claude Sonnet 4.6)
CloudWatch — added automatically by `agentcore deploy`
Environment variables (set via .env bundled in the container image):
CI_ARTIFACTS_BUCKET S3 bucket for artifact storage (optional but recommended)
CI_ARTIFACTS_PREFIX S3 key prefix (default: "heurist-finance-artifacts")
CI_ARTIFACTS_TTL Presigned URL TTL seconds (default: 3600)
HEURIST_AGENT_IDS Comma-separated Heurist agent IDs to load
BEDROCK_MODEL_ID Override the default Bedrock model ID
AGENT_NAME Name reported in payment observability
BYPASS_TOOL_CONSENT Set to "true" so http_request skips its TTY confirm prompt
(Runtime containers have no TTY)
Invocation payload:
prompt (str, required) — research request
payment_manager_arn (str, required)
user_id (str, required)
payment_session_id (str, required) — created by app backend with budget
payment_instrument_id (str, required)
bedrock_model_id (str, optional) — per-invocation model override
Response:
{
"response": "<markdown research summary>",
"artifacts": [ # empty list if no artifacts produced
{"name": "chart.png", "url": "https://...", "expires_in": 3600},
{"name": "report.md", "url": "https://...", "expires_in": 3600}
]
}
"""
from __future__ import annotations
# ---------------------------------------------------------------------------
# MINIMAL MODULE-LEVEL IMPORTS
#
# Only import what's needed for BedrockAgentCoreApp to start and respond to
# the /ping health check within the Runtime's 120s initialization timeout.
# All heavy imports (strands, bedrock_agentcore.payments, boto3 clients,
# catalog loading) are deferred to first request via _ensure_initialized().
#
# This is critical because `opentelemetry-instrument` (the CMD prefix in the
# Dockerfile) instruments every import at load time. With the full dependency
# tree (strands + bedrock_agentcore + boto3 + botocore), instrumentation
# alone can exceed 120s on cold start. Deferring keeps startup fast while
# preserving full OTel trace propagation for all request-time operations.
# ---------------------------------------------------------------------------
import json
import logging
import os
import threading
import uuid
from datetime import datetime
from typing import Any
from bedrock_agentcore.runtime import BedrockAgentCoreApp
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# App instance — must be at module level for the @app.entrypoint decorator.
# BedrockAgentCoreApp is lightweight; it just starts a uvicorn server with
# /ping and /invoke endpoints.
# ---------------------------------------------------------------------------
app = BedrockAgentCoreApp()
# ---------------------------------------------------------------------------
# Environment config (lightweight — just reads env vars)
# ---------------------------------------------------------------------------
REGION = os.environ.get("AWS_REGION", "us-west-2")
MODEL_ID = os.environ.get("BEDROCK_MODEL_ID", "us.anthropic.claude-sonnet-4-6")
CI_ARTIFACTS_BUCKET = os.environ.get("CI_ARTIFACTS_BUCKET", "")
CI_ARTIFACTS_PREFIX = os.environ.get("CI_ARTIFACTS_PREFIX", "heurist-finance-artifacts").rstrip("/")
CI_ARTIFACTS_TTL = int(os.environ.get("CI_ARTIFACTS_TTL", "3600"))
_raw_agent_ids = os.environ.get("HEURIST_AGENT_IDS", "")
DEFAULT_HEURIST_AGENT_IDS = (
"ExaSearchDigestAgent",
"YahooFinanceAgent",
"FredMacroAgent",
"SecEdgarAgent",
)
HEURIST_AGENT_IDS: tuple[str, ...] = (
tuple(a.strip() for a in _raw_agent_ids.split(",") if a.strip()) if _raw_agent_ids else DEFAULT_HEURIST_AGENT_IDS
)
# ---------------------------------------------------------------------------
# Lazy-initialized heavy dependencies.
#
# Deferred from module load to first request so the container can respond to
# the Runtime /ping health check within the 120s init timeout. The
# opentelemetry-instrument wrapper adds significant overhead to module
# imports; deferring keeps cold-start under the limit while preserving full
# OTel trace propagation at request time (all boto3 calls, LLM calls, and
# tool calls are still instrumented).
# ---------------------------------------------------------------------------
_init_lock = threading.Lock()
_initialized = False
_CI_CLIENT = None
_S3_CLIENT = None
_catalog_ref = ""
_http_request_tool = None
def _ensure_initialized() -> None:
"""Lazily import heavy deps and initialize service clients on first request."""
global _initialized, _CI_CLIENT, _S3_CLIENT, _catalog_ref, _http_request_tool
if _initialized:
return
with _init_lock:
if _initialized:
return
import boto3
from strands_tools import http_request
from strands_tools.code_interpreter import AgentCoreCodeInterpreter
from catalog import format_catalog_for_prompt, get_tools_for_agents
_http_request_tool = http_request
_CI_CLIENT = AgentCoreCodeInterpreter(region=REGION, session_name="runtime-init")
_S3_CLIENT = boto3.client("s3", region_name=REGION)
try:
_heurist_tools = get_tools_for_agents(HEURIST_AGENT_IDS, refresh=False)
_catalog_ref = format_catalog_for_prompt(_heurist_tools)
logger.info("Loaded %d Heurist tools from catalog cache.", len(_heurist_tools))
except Exception as e:
logger.warning("Could not load Heurist catalog: %s", e)
_catalog_ref = "(catalog unavailable — sync_registry was not run before image build)"
_initialized = True
logger.info("Agent dependencies initialized successfully.")
# ---------------------------------------------------------------------------
# Per-invocation state (thread-local for concurrent request isolation)
# ---------------------------------------------------------------------------
_invocation = threading.local()
def _artifacts() -> list[dict]:
if not hasattr(_invocation, "artifacts"):
_invocation.artifacts = []
return _invocation.artifacts
def _session_name() -> str:
if not hasattr(_invocation, "session_name"):
_invocation.session_name = f"heurist-{uuid.uuid4().hex[:12]}"
return _invocation.session_name
def _reset_invocation_state() -> None:
_invocation.artifacts = []
_invocation.session_name = f"heurist-{uuid.uuid4().hex[:12]}"
# ---------------------------------------------------------------------------
# CI result extraction helpers
# ---------------------------------------------------------------------------
def _extract_ci_text(tool_result: dict) -> str:
"""Extract the printed text output from a Code Interpreter tool result."""
import ast
content = tool_result.get("content", [])
if not content:
raise ValueError("Code Interpreter returned empty content")
text_blob = content[0].get("text", "")
if not text_blob:
raise ValueError("Code Interpreter returned no text")
try:
parsed = ast.literal_eval(text_blob)
return parsed[0]["text"]
except Exception:
return text_blob
# ---------------------------------------------------------------------------
# Artifact tools — defined as module-level functions with @tool decorator.
# They use the lazily-initialized _S3_CLIENT and _CI_CLIENT globals which
# are guaranteed to be set before any tool is called (handle_request calls
# _ensure_initialized() before constructing the Agent).
# ---------------------------------------------------------------------------
import re
from pathlib import Path
def _safe_s3_key_name(raw: str) -> str:
"""Return a safe S3 key filename component."""
name = Path(raw).name
name = re.sub(r"[^A-Za-z0-9._-]", "_", name).strip("._")
return name or "artifact"
# We need the @tool decorator from strands, but importing strands at module
# level is what causes the slow startup. Solution: define the tool functions
# as plain functions and wrap them with @tool inside _ensure_initialized().
# However, the Agent() constructor needs the tool references at request time.
#
# Simpler approach: import just the decorator (it's lightweight) and define
# tools normally. The heavy part is strands.Agent and strands.models, not
# the @tool decorator itself.
from strands import tool
@tool
def export_artifact_to_s3(remote_path: str, artifact_name: str | None = None) -> dict[str, Any]:
"""Export a file from the AgentCore Code Interpreter sandbox to S3.
Use this after creating a chart (PNG), CSV, or any file in the Code
Interpreter session. Returns a presigned URL the caller can download.
If S3 is not configured (CI_ARTIFACTS_BUCKET not set), returns an error
with a suggestion to represent the data as a markdown table instead.
Args:
remote_path: Path to the file inside the CI sandbox (e.g. "/tmp/chart.png")
artifact_name: Optional override for the output filename
"""
import base64
if not CI_ARTIFACTS_BUCKET:
return {
"error": "S3 artifact storage is not configured (CI_ARTIFACTS_BUCKET not set).",
"suggestion": (
"Represent charts as markdown tables using the underlying data. "
"Use save_report_to_s3 for text/CSV content, which returns it inline."
),
}
sn = _session_name()
export_code = f"""
import base64, json, mimetypes
from pathlib import Path
p = Path({remote_path!r})
if not p.exists():
raise FileNotFoundError(f"File not found in CI sandbox: {{str(p)}}")
print(json.dumps({{
"name": p.name,
"mime_type": mimetypes.guess_type(str(p))[0] or "application/octet-stream",
"b64": base64.b64encode(p.read_bytes()).decode(),
"size": p.stat().st_size,
}}))
"""
ci_result = _CI_CLIENT.code_interpreter(
{
"action": {
"type": "executeCode",
"session_name": sn,
"language": "python",
"code": export_code,
}
}
)
try:
payload = json.loads(_extract_ci_text(ci_result))
except Exception as exc:
return {"error": f"Could not parse CI export output: {exc}"}
if "b64" not in payload:
return {"error": f"Unexpected CI payload — missing b64 field: {payload}"}
file_bytes = base64.b64decode(payload["b64"])
safe_name = _safe_s3_key_name(artifact_name or payload.get("name", "artifact"))
s3_key = f"{CI_ARTIFACTS_PREFIX}/{sn}/{safe_name}"
_S3_CLIENT.put_object(
Bucket=CI_ARTIFACTS_BUCKET,
Key=s3_key,
Body=file_bytes,
ContentType=payload.get("mime_type", "application/octet-stream"),
)
url = _S3_CLIENT.generate_presigned_url(
"get_object",
Params={"Bucket": CI_ARTIFACTS_BUCKET, "Key": s3_key},
ExpiresIn=CI_ARTIFACTS_TTL,
)
artifact = {
"name": safe_name,
"url": url,
"s3_key": s3_key,
"size_bytes": len(file_bytes),
"mime_type": payload.get("mime_type", "application/octet-stream"),
"expires_in": CI_ARTIFACTS_TTL,
}
_artifacts().append(artifact)
logger.info("Exported artifact %s → s3://%s/%s", safe_name, CI_ARTIFACTS_BUCKET, s3_key)
return {
"status": "success",
"name": safe_name,
"url": url,
"expires_in": CI_ARTIFACTS_TTL,
}
@tool
def save_report_to_s3(content: str, filename: str) -> dict[str, Any]:
"""Save a text report (markdown, CSV, JSON) to S3 and return a presigned URL.
Use this for structured text output — financial summaries, data tables,
model outputs. For binary files produced in the Code Interpreter sandbox,
use export_artifact_to_s3 instead.
If S3 is not configured, the content is returned inline.
Args:
content: The text content to save
filename: Desired filename (e.g. "macro_summary.md", "prices.csv")
"""
if not CI_ARTIFACTS_BUCKET:
return {
"status": "inline",
"note": "S3 not configured — content returned inline.",
"filename": filename,
"content": content,
}
safe_name = _safe_s3_key_name(filename)
s3_key = f"{CI_ARTIFACTS_PREFIX}/{_session_name()}/{safe_name}"
content_type = "text/plain"
if safe_name.endswith(".md"):
content_type = "text/markdown"
elif safe_name.endswith(".csv"):
content_type = "text/csv"
elif safe_name.endswith(".json"):
content_type = "application/json"
elif safe_name.endswith(".html"):
content_type = "text/html"
encoded = content.encode("utf-8")
_S3_CLIENT.put_object(
Bucket=CI_ARTIFACTS_BUCKET,
Key=s3_key,
Body=encoded,
ContentType=content_type,
)
url = _S3_CLIENT.generate_presigned_url(
"get_object",
Params={"Bucket": CI_ARTIFACTS_BUCKET, "Key": s3_key},
ExpiresIn=CI_ARTIFACTS_TTL,
)
artifact = {
"name": safe_name,
"url": url,
"s3_key": s3_key,
"size_bytes": len(encoded),
"mime_type": content_type,
"expires_in": CI_ARTIFACTS_TTL,
}
_artifacts().append(artifact)
logger.info("Saved report %s → s3://%s/%s", safe_name, CI_ARTIFACTS_BUCKET, s3_key)
return {
"status": "success",
"name": safe_name,
"url": url,
"expires_in": CI_ARTIFACTS_TTL,
}
@tool
def list_invocation_artifacts() -> dict[str, Any]:
"""List all artifacts exported to S3 during this invocation.
Call this to verify what has been exported before composing the final response.
"""
arts = _artifacts()
return {
"count": len(arts),
"artifacts": [{"name": a["name"], "url": a["url"], "expires_in": a["expires_in"]} for a in arts],
}
# ---------------------------------------------------------------------------
# System prompt builder (invocation-specific: includes CI session name)
# ---------------------------------------------------------------------------
def _build_system_prompt(ci_session: str) -> str:
s3_instructions = (
(
f"- Charts/images: save to `/tmp/<name>` inside the CI session, then call "
f"`export_artifact_to_s3` with that path to upload to S3 and get a download URL.\n"
f"- Text reports/CSVs: call `save_report_to_s3` directly — no need to write to CI first.\n"
f"- Presigned URLs are valid for {CI_ARTIFACTS_TTL} seconds.\n"
f"- After exporting, include the URL in your response so the caller can access the file."
)
if CI_ARTIFACTS_BUCKET
else (
"- S3 artifact storage is not configured in this deployment.\n"
"- Represent all chart data as markdown tables using the underlying numbers.\n"
"- Use `save_report_to_s3` for text content — it will return the content inline."
)
)
return f"""You are a finance research and data visualization agent.
You have access to paid financial data endpoints via the Heurist network. Use the
`http_request` tool to call the endpoint URLs listed below. All endpoints accept POST
requests with JSON bodies.
**Payment is handled automatically.** When an endpoint returns HTTP 402, the system
settles USDC on-chain and retries the request. You do not need to handle payments.
{_catalog_ref}
## Working Rules
- Use http_request for all Heurist endpoint calls. Always method="POST", params as JSON body.
- Parallelize independent data fetches — issue multiple http_request calls in the same tool-use round when they don't depend on each other's results. Payment is handled per-call.
- Use AgentCore Code Interpreter for pandas/matplotlib analysis.
- Never fabricate data. Only use values returned by tools.
- If a tool call fails, report the error and stop.
## Code Interpreter — session: `{ci_session}`
**Session lifecycle**
- Start with `initSession` if the session is not initialized.
- Use `writeFiles` to pass datasets into the sandbox as JSON/CSV files.
- Use `executeCode` for analysis and charting.
- The session is private to this invocation and auto-expires.
**Artifact export**
{s3_instructions}
**CI action examples:**
- Init: `{{"action": {{"type": "initSession", "session_name": "{ci_session}", "description": "analysis"}}}}`
- Write: `{{"action": {{"type": "writeFiles", "session_name": "{ci_session}", "content": [{{"path": "data.json", "text": "{{...}}"}}]}}}}`
- Execute: `{{"action": {{"type": "executeCode", "session_name": "{ci_session}", "language": "python", "code": "import pandas as pd; ..." }}}}`
## Context
- Today: {datetime.now().strftime("%Y-%m-%d")}
- Region: {REGION}
- S3 artifacts: {"enabled (bucket: " + CI_ARTIFACTS_BUCKET + ")" if CI_ARTIFACTS_BUCKET else "not configured — text/table output only"}
"""
# ---------------------------------------------------------------------------
# Entry point
# ---------------------------------------------------------------------------
@app.entrypoint
def handle_request(payload: dict, context=None) -> dict:
"""Handle an invocation from the app backend.
The app backend creates a payment session with an appropriate budget before
invoking. Session ID and instrument ID are passed in the payload — the agent
cannot create or modify sessions (enforced at the IAM level).
Required payload fields:
prompt (str) — the research request
payment_manager_arn (str) — ARN of the Payment Manager
user_id (str) — user identity for payment isolation
payment_session_id (str) — active session with a spending limit
payment_instrument_id (str) — funded embedded wallet
Optional payload fields:
bedrock_model_id (str) — per-invocation model override
"""
# Lazy-init heavy deps on first request (keeps cold-start under 120s)
_ensure_initialized()
_reset_invocation_state()
ci_session = _session_name()
# Import heavy deps (already cached after _ensure_initialized)
import boto3
from bedrock_agentcore.payments.integrations.strands import (
AgentCorePaymentsPlugin,
AgentCorePaymentsPluginConfig,
)
from botocore.config import Config as BotoConfig
from strands import Agent
from strands.models import BedrockModel
# Unwrap the agentcore invoke double-wrapping:
# `agentcore invoke '{"key": "val"}'` → payload = {"prompt": '{"key":"val"}'}
raw_prompt = payload.get("prompt", "")
if isinstance(raw_prompt, str) and raw_prompt.strip().startswith("{"):
try:
inner = json.loads(raw_prompt)
if isinstance(inner, dict) and "payment_manager_arn" in inner:
payload = inner
except json.JSONDecodeError:
pass
prompt = payload.get("prompt", "").strip()
payment_manager_arn = payload.get("payment_manager_arn", "").strip()
user_id = payload.get("user_id", "").strip()
session_id = payload.get("payment_session_id", "").strip()
instrument_id = payload.get("payment_instrument_id", "").strip()
missing = [
name
for name, val in [
("prompt", prompt),
("payment_manager_arn", payment_manager_arn),
("user_id", user_id),
("payment_session_id", session_id),
("payment_instrument_id", instrument_id),
]
if not val
]
if missing:
return {"error": f"Missing required payload fields: {', '.join(missing)}"}
model_id = payload.get("bedrock_model_id", MODEL_ID)
payment_plugin = AgentCorePaymentsPlugin(
config=AgentCorePaymentsPluginConfig(
payment_manager_arn=payment_manager_arn,
user_id=user_id,
payment_instrument_id=instrument_id,
payment_session_id=session_id,
region=REGION,
agent_name=os.environ.get("AGENT_NAME", "HeuristFinanceAgent"),
)
)
# Claude Sonnet 4.6 supports up to 64k output tokens. Multi-step workflows
# (5+ paid tool calls + Code Interpreter + chart export + markdown
# report) routinely need more than the SDK's default 4k cap, which
# otherwise raises Strands' MaxTokensReachedException mid-run.
# The custom client config keeps long single-turn streamed responses
# from tripping the default 60s bedrock-runtime read timeout.
model = BedrockModel(
boto_session=boto3.Session(region_name=REGION),
boto_client_config=BotoConfig(
read_timeout=int(os.environ.get("AGENT_BEDROCK_READ_TIMEOUT", "1500")),
connect_timeout=15,
retries={"max_attempts": 1},
),
model_id=model_id,
streaming=True,
temperature=0,
max_tokens=int(os.environ.get("AGENT_MAX_TOKENS", "32000")),
)
agent = Agent(
system_prompt=_build_system_prompt(ci_session),
model=model,
tools=[
_http_request_tool,
_CI_CLIENT.code_interpreter,
export_artifact_to_s3,
save_report_to_s3,
list_invocation_artifacts,
],
plugins=[payment_plugin],
)
result = agent(prompt)
content = result.message.get("content", [])
text = next(
(block.get("text", "") for block in content if isinstance(block, dict) and "text" in block),
str(result),
)
return {
"response": text,
"artifacts": [{"name": a["name"], "url": a["url"], "expires_in": a["expires_in"]} for a in _artifacts()],
}
if __name__ == "__main__":
app.run()
@@ -0,0 +1,8 @@
bedrock-agentcore[strands-agents]>=1.9.0
boto3>=1.43.6
botocore>=1.43.6
strands-agents[otel]>=1.0.0
strands-agents-tools>=0.5.0
aws-opentelemetry-distro
python-dotenv>=1.0.0
requests>=2.32.0
@@ -0,0 +1,35 @@
#!/usr/bin/env python3
"""Fetch the live Heurist catalog and refresh the local cache.
Run this on the host machine (NOT inside the container) before each
`agentcore deploy` so the catalog cache bundled into the image is fresh.
Usage (from pay-for-data/):
python agent/sync_registry.py
"""
from __future__ import annotations
import sys
from pathlib import Path
# Make sibling modules importable when running as a script
sys.path.insert(0, str(Path(__file__).resolve().parent))
from catalog import fetch_live_catalog, get_tools_for_agents # noqa: E402
from config import LIVE_CATALOG_CACHE_PATH, get_config # noqa: E402
def main() -> None:
cfg = get_config()
catalog = fetch_live_catalog()
selected_tools = get_tools_for_agents(cfg.heurist_tool_agent_ids, refresh=False)
print(f"Saved live catalog cache to {LIVE_CATALOG_CACHE_PATH}")
print(f"Catalog url: {cfg.heurist_catalog_url}")
print(f"Catalog agents: {catalog.get('count', '?')}")
print(f"Selected agents: {', '.join(cfg.heurist_tool_agent_ids)}")
print(f"Selected tools: {len(selected_tools)}")
if __name__ == "__main__":
main()
Binary file not shown.

After

Width:  |  Height:  |  Size: 233 KiB

File diff suppressed because it is too large Load Diff