1
0
mirror of synced 2026-05-22 14:43:35 +00:00

Add harness sample: JWT inbound auth + OAuth-protected gateway (Issue #1397) (#1398)

* End-to-end notebook demonstrating AgentCore harness with JWT inbound
auth and OAuth-protected gateway outbound auth.

Architecture:
- User Auth Pool (Cognito, USER_PASSWORD_AUTH) for harness inbound auth
- M2M Pool (Cognito, client_credentials) for gateway outbound auth
- OAuth2 credential provider in AgentCore Identity
- Lambda target behind AgentCore Gateway with GATEWAY_IAM_ROLE
- harness with CUSTOM_JWT inbound + outboundAuth.oauth to gateway

Files:
- harness_oauth_gateway.ipynb — main notebook (25 cells)
- utils/setup_helpers.py — idempotent infra setup and cleanup functions
- utils/lambda_function_code.py — order management Lambda handler
- images/architecture.jpg — architecture diagram

All setup is idempotent and re-runnable. Cleanup discovers resources
by name and skips gracefully if not found.

* added name to contributors

* addressed ruff feedback on python files
This commit is contained in:
dhegde-aws
2026-04-24 07:09:53 -07:00
committed by GitHub
parent e17320cdfc
commit d8465434b3
8 changed files with 1542 additions and 0 deletions
@@ -0,0 +1,54 @@
# harness with OAuth Inbound Auth and OAuth-Protected Gateway
This sample demonstrates end-to-end OAuth integration with AgentCore harness:
- **Inbound auth**: User authenticates to the harness via Cognito JWT (USER_PASSWORD_AUTH)
- **Outbound auth**: harness authenticates to AgentCore Gateway via Cognito M2M (client credentials flow)
- **Gateway target**: A Lambda function exposed through the Gateway
For full harness documentation, see the [AgentCore harness Developer Guide](https://docs.aws.amazon.com/bedrock-agentcore/latest/devguide/harness.html).
## Architecture
<img src="images/architecture.jpg" alt="Architecture" width="800"/>
## What you'll learn
- Configuring `CUSTOM_JWT` inbound auth on a harness (any OIDC provider works)
- Configuring `outboundAuth.oauth` on a gateway tool (client credentials grant)
- Setting up an OAuth2 credential provider in AgentCore Identity
- Creating a Gateway with JWT inbound auth and a Lambda target
- Invoking the harness with a bearer token — secrets never leave the Token Vault
## Project structure
```
├── harness_oauth_gateway.ipynb ← main notebook
├── utils/
│ ├── setup_helpers.py ← idempotent infra setup & cleanup
│ └── lambda_function_code.py ← order management Lambda handler
├── images/
│ └── architecture.jpg ← architecture diagram
├── requirements.txt
└── README.md
```
## Prerequisites
- AWS account with Bedrock AgentCore access
- AWS credentials configured (`aws configure` or env vars)
- Python 3.10+, `boto3 >= 1.42.80`, `requests`, `jupyter`
- Bedrock model access enabled
## How to run
```bash
pip install -r requirements.txt
jupyter notebook harness_oauth_gateway.ipynb
```
Run cells top-to-bottom. User credentials are prompted via `getpass` — never visible in the notebook. All cells are idempotent — safe to re-run.
## Cleanup
The last cell in the notebook deletes all resources. It discovers resources by name and skips gracefully if they don't exist — works even after a kernel restart.
@@ -0,0 +1,536 @@
{
"cells": [
{
"cell_type": "markdown",
"id": "16bbec59",
"metadata": {},
"source": [
"# harness with JWT Inbound Auth & OAuth-Protected Gateway\n",
"\n",
"## Why this matters\n",
"\n",
"Production agents don't run in isolation. They serve real users who need to be\n",
"authenticated, and they call downstream APIs that require their own credentials.\n",
"Getting this right means:\n",
"\n",
"- **Only authorized users can invoke your agent** — not anyone with the endpoint URL\n",
"- **The agent calls tools with the right credentials** — each request carries\n",
" proper tokens for the backend, without secrets in your code\n",
"- **Secrets stay managed** — no client secrets in notebooks, no tokens in env vars\n",
"\n",
"AgentCore harness solves this with two config-level primitives: **inbound auth**\n",
"(who can call the harness) and **outbound auth** (how the harness calls tools).\n",
"This notebook walks through both.\n",
"\n",
"## What is AgentCore harness?\n",
"\n",
"A managed compute environment that turns agent configuration into a running agent.\n",
"You declare the model, system prompt, tools, and auth. AgentCore handles the\n",
"orchestration loop, compute (isolated Firecracker microVMs), tool invocation,\n",
"memory, and observability. No framework code, no containers, no deployment pipeline.\n",
"\n",
"For full documentation, see the \n",
"[AgentCore harness Developer Guide](https://docs.aws.amazon.com/bedrock-agentcore/latest/devguide/harness.html).\n",
"\n",
"## What you'll learn\n",
"\n",
"This notebook focuses on two harness features:\n",
"\n",
"**Inbound auth (`CUSTOM_JWT`)** — require callers to present a valid JWT before\n",
"the harness accepts the request. We use a Cognito user pool, but any OIDC provider\n",
"works. This is how you protect your agent endpoint.\n",
"\n",
"**Outbound auth (`outboundAuth.oauth`)** — the harness automatically fetches an\n",
"OAuth token (client credentials grant) to authenticate to an AgentCore Gateway.\n",
"The credential provider is registered once; the harness handles token exchange\n",
"on every tool call. No secrets in the invoke request.\n",
"\n",
"## Architecture\n",
"\n",
"<img src=\"images/architecture.jpg\" alt=\"Architecture\" width=\"800\"/>\n",
"\n",
"## Prerequisites\n",
"\n",
"- AWS account with Bedrock AgentCore access\n",
"- AWS credentials configured (`aws configure` or env vars)\n",
"- Python 3.10+, `boto3 >= 1.42.80`, `requests`, `jupyter`\n",
"- Bedrock model access enabled\n",
"\n",
"## Project structure\n",
"\n",
"```\n",
"\n",
"├── harness_oauth_gateway.ipynb ← this notebook\n",
"├── utils/\n",
"│ ├── setup_helpers.py ← infra setup & cleanup\n",
"│ └── lambda_function_code.py ← Lambda handler\n",
"├── requirements.txt\n",
"└── README.md\n",
"```\n",
"\n",
"Infrastructure setup is in `utils/setup_helpers.py`. This notebook focuses on\n",
"the harness. All cells are idempotent — safe to re-run."
]
},
{
"cell_type": "markdown",
"id": "7560680c",
"metadata": {},
"source": [
"## Step 0: Imports & Configuration"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "09e773c4",
"metadata": {},
"outputs": [],
"source": [
"import boto3, json, time, uuid, getpass, requests as http_requests, urllib.parse\n",
"from utils.setup_helpers import (\n",
" create_user_auth_pool,\n",
" create_m2m_pool,\n",
" create_credential_provider,\n",
" deploy_lambda,\n",
" create_gateway_with_lambda_target,\n",
" create_harness_execution_role,\n",
" cleanup_all,\n",
")\n",
"\n",
"REGION = boto3.session.Session().region_name or 'us-east-1'\n",
"ACCOUNT_ID = boto3.client('sts', region_name=REGION).get_caller_identity()['Account']\n",
"ac_control = boto3.client('bedrock-agentcore-control', region_name=REGION)\n",
"cognito = boto3.client('cognito-idp', region_name=REGION)\n",
"\n",
"PREFIX = 'harness-oauth-demo'\n",
"print(f'Region: {REGION} Account: {ACCOUNT_ID}')"
]
},
{
"cell_type": "markdown",
"id": "472df969",
"metadata": {},
"source": [
"## Step 1: Provision Infrastructure\n",
"\n",
"Each helper creates one resource group and returns IDs/ARNs.\n",
"All helpers check for existing resources first — re-running is safe.\n",
"See `utils/setup_helpers.py` for full implementation.\n",
"\n",
"| Helper | What it creates |\n",
"|--------|----------------|\n",
"| `create_user_auth_pool` | Cognito User Auth Pool — user pool, app client (USER_PASSWORD_AUTH), test user |\n",
"| `create_m2m_pool` | Cognito M2M Pool — user pool, resource server + scope, domain, app client (client_credentials) |\n",
"| `create_credential_provider` | OAuth2 credential provider in AgentCore Identity pointing to M2M Pool |\n",
"| `deploy_lambda` | IAM role + Lambda function from `utils/lambda_function_code.py` |\n",
"| `create_gateway_with_lambda_target` | Gateway (CUSTOM_JWT → M2M Pool) + Lambda target (GATEWAY_IAM_ROLE) |\n",
"| `create_harness_execution_role` | IAM role with Bedrock, Gateway, Token Vault, CloudWatch, X-Ray permissions |"
]
},
{
"cell_type": "markdown",
"id": "ed10536a",
"metadata": {},
"source": [
"### 1a. Cognito User Auth Pool — User Auth\n",
"Creates the pool that authenticates end users who invoke the harness."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "62d04fc8",
"metadata": {},
"outputs": [],
"source": [
"print('Enter credentials for the test user in User Auth Pool:')\n",
"USER1_NAME = getpass.getpass('Username: ')\n",
"USER1_PASS = getpass.getpass('Password (min 8 chars): ')\n",
"\n",
"pool1 = create_user_auth_pool(REGION, PREFIX, USER1_NAME, USER1_PASS)\n",
"print(f'\\nDiscovery URL: {pool1[\"discovery_url\"]}')"
]
},
{
"cell_type": "markdown",
"id": "804fba82",
"metadata": {},
"source": [
"### 1b. Cognito M2M Pool — M2M\n",
"Creates the pool for machine-to-machine auth (client credentials grant with custom scopes)."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "0c6580fa",
"metadata": {},
"outputs": [],
"source": [
"pool2 = create_m2m_pool(REGION, PREFIX)\n",
"print(f'\\nScope: {pool2[\"scope\"]}')\n",
"print(f'Discovery URL: {pool2[\"discovery_url\"]}')"
]
},
{
"cell_type": "markdown",
"id": "290ba17d",
"metadata": {},
"source": [
"### 1c. OAuth2 Credential Provider\n",
"Registers M2M Pool's client credentials with AgentCore Identity so the harness\n",
"can obtain M2M tokens for outbound gateway auth."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "5629ea97",
"metadata": {},
"outputs": [],
"source": [
"cred = create_credential_provider(\n",
" REGION, PREFIX,\n",
" discovery_url=pool2['discovery_url'],\n",
" client_id=pool2['client_id'],\n",
" client_secret=pool2['client_secret'],\n",
")"
]
},
{
"cell_type": "markdown",
"id": "b298ab09",
"metadata": {},
"source": [
"### 1d. Lambda Function\n",
"Deploys the order management Lambda from `utils/lambda_function_code.py`."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "aceb071c",
"metadata": {},
"outputs": [],
"source": [
"lam = deploy_lambda(REGION, PREFIX)"
]
},
{
"cell_type": "markdown",
"id": "bbdb245b",
"metadata": {},
"source": [
"### 1e. Gateway + Lambda Target\n",
"Creates the gateway with CUSTOM_JWT inbound auth (M2M Pool) and adds the\n",
"Lambda as a target with GATEWAY_IAM_ROLE outbound auth."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "37193da2",
"metadata": {},
"outputs": [],
"source": [
"gw = create_gateway_with_lambda_target(\n",
" REGION, PREFIX, ACCOUNT_ID,\n",
" discovery_url=pool2['discovery_url'],\n",
" allowed_client=pool2['client_id'],\n",
" allowed_scope=pool2['scope'],\n",
" lambda_arn=lam['function_arn'],\n",
" lambda_function_name=lam['function_name'],\n",
")\n",
"print(f'\\nGateway ARN: {gw[\"gateway_arn\"]}')"
]
},
{
"cell_type": "markdown",
"id": "8121efc6",
"metadata": {},
"source": [
"### 1f. harness execution role\n",
"Creates the IAM role the harness assumes at runtime, with permissions for\n",
"Bedrock, Gateway, OAuth2 Token Vault, Secrets Manager, and observability."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "564a8fe2",
"metadata": {},
"outputs": [],
"source": [
"harness_role = create_harness_execution_role(REGION, PREFIX, ACCOUNT_ID)"
]
},
{
"cell_type": "markdown",
"id": "34dec5e8",
"metadata": {},
"source": [
"## Step 2: Create the harness with CUSTOM_JWT Inbound Auth\n",
"\n",
"This is the core of the notebook. We create a harness that:\n",
"\n",
"- **Inbound auth**: `CUSTOM_JWT` authorizer pointing to **User Auth Pool**. Callers must\n",
" present a valid JWT from User Auth Pool to invoke the harness.\n",
"- **Model**: Claude Sonnet 4 via Amazon Bedrock.\n",
"- **Tool**: The AgentCore Gateway, with `outboundAuth.oauth` using the credential\n",
" provider (M2M Pool client credentials). The harness automatically obtains an M2M\n",
" token to authenticate to the gateway.\n",
"- **System prompt**: Order management assistant.\n",
"\n",
"If the harness already exists from a previous run, we reuse it."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "20a06140",
"metadata": {},
"outputs": [],
"source": [
"HARNESS_NAME = f'{PREFIX}-harness'.replace('-', '_')\n",
"\n",
"# Try to create; if it already exists, look it up\n",
"try:\n",
" harness_resp = ac_control.create_harness(\n",
" harnessName=HARNESS_NAME,\n",
" executionRoleArn=harness_role['role_arn'],\n",
" authorizerConfiguration={\n",
" 'customJWTAuthorizer': {\n",
" 'discoveryUrl': pool1['discovery_url'],\n",
" 'allowedClients': [pool1['client_id']],\n",
" }\n",
" },\n",
" model={\n",
" 'bedrockModelConfig': {\n",
" 'modelId': 'us.anthropic.claude-haiku-4-5-20251001-v1:0',\n",
" }\n",
" },\n",
" systemPrompt=[{\n",
" 'text': (\n",
" 'You are an order management assistant. '\n",
" 'Use the gateway tools to look up and update orders. '\n",
" 'Always confirm the order details before making changes.'\n",
" )\n",
" }],\n",
" tools=[{\n",
" 'type': 'agentcore_gateway',\n",
" 'name': 'order-gateway',\n",
" 'config': {\n",
" 'agentCoreGateway': {\n",
" 'gatewayArn': gw['gateway_arn'],\n",
" 'outboundAuth': {\n",
" 'oauth': {\n",
" 'providerArn': cred['arn'],\n",
" 'scopes': [pool2['scope']],\n",
" 'grantType': 'CLIENT_CREDENTIALS',\n",
" }\n",
" },\n",
" }\n",
" },\n",
" }],\n",
" )\n",
" HARNESS_ID = harness_resp['harness']['harnessId']\n",
" HARNESS_ARN = harness_resp['harness']['arn']\n",
" print(f'Harness created: {HARNESS_ID}')\n",
"except ac_control.exceptions.ConflictException:\n",
" HARNESS_ID = None\n",
" for h in ac_control.list_harnesses().get('harnesses', []):\n",
" if h.get('harnessName') == HARNESS_NAME:\n",
" HARNESS_ID = h['harnessId']\n",
" HARNESS_ARN = h['arn']\n",
" break\n",
" if not HARNESS_ID:\n",
" raise RuntimeError(f'Harness {HARNESS_NAME} conflict but not found')\n",
" print(f'Harness already exists: {HARNESS_ID}')\n",
"\n",
"print(f'Harness ID: {HARNESS_ID}')\n",
"print(f'Harness ARN: {HARNESS_ARN}')\n",
"\n",
"print('Waiting for harness to become READY...')\n",
"for _ in range(30):\n",
" h_status = ac_control.get_harness(harnessId=HARNESS_ID)['harness']['status']\n",
" if h_status == 'READY':\n",
" break\n",
" time.sleep(10)\n",
"print(f'Harness status: {h_status}')"
]
},
{
"cell_type": "markdown",
"id": "c929d909",
"metadata": {},
"source": [
"## Step 3: Get a Bearer Token from User Auth Pool\n",
"\n",
"Authenticate the test user via `USER_PASSWORD_AUTH` to get a JWT access token.\n",
"This token is what we pass as `Authorization: Bearer <token>` when invoking\n",
"the harness."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "c0de1eb1",
"metadata": {},
"outputs": [],
"source": [
"auth_result = cognito.initiate_auth(\n",
" ClientId=pool1['client_id'],\n",
" AuthFlow='USER_PASSWORD_AUTH',\n",
" AuthParameters={'USERNAME': USER1_NAME, 'PASSWORD': USER1_PASS},\n",
")\n",
"BEARER_TOKEN = auth_result['AuthenticationResult']['AccessToken']\n",
"print(f'Got bearer token (first 20 chars): {BEARER_TOKEN[:20]}...')"
]
},
{
"cell_type": "markdown",
"id": "dda6dc32",
"metadata": {},
"source": [
"## Step 4: Invoke the harness with Bearer Token\n",
"\n",
"Since boto3 doesn't support bearer-token invocation, we call the HTTPS\n",
"endpoint directly. The flow:\n",
"\n",
"1. Harness validates the JWT against User Auth Pool (inbound auth)\n",
"2. Agent reasons about the user message and decides to call a gateway tool\n",
"3. Harness uses the OAuth2 credential provider to get an M2M token from M2M Pool\n",
"4. Harness calls the gateway with that M2M token (outbound auth)\n",
"5. Gateway validates the M2M token against M2M Pool and invokes the Lambda\n",
"6. Result flows back through the agent to the user"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "016cc972",
"metadata": {},
"outputs": [],
"source": [
"escaped_arn = urllib.parse.quote(HARNESS_ARN, safe='')\n",
"url = f'https://bedrock-agentcore.{REGION}.amazonaws.com/harnesses/invoke?harnessArn={escaped_arn}'\n",
"SESSION_ID = f'notebook-session-{uuid.uuid4().hex}'\n",
"\n",
"headers = {\n",
" 'Authorization': f'Bearer {BEARER_TOKEN}',\n",
" 'Content-Type': 'application/json',\n",
" 'X-Amzn-Bedrock-AgentCore-Runtime-Session-Id': SESSION_ID,\n",
"}\n",
"\n",
"payload = {\n",
" 'messages': [{\n",
" 'role': 'user',\n",
" 'content': [{'text': 'Look up order ORD-001 and tell me its status.'}],\n",
" }],\n",
"}\n",
"\n",
"print(f'Session: {SESSION_ID}')\n",
"print(f'URL: {url[:80]}...')\n",
"print()\n",
"\n",
"# The harness returns an AWS event-stream (binary framed protocol), not plain JSON.\n",
"# We stream the response and extract text deltas from the event payloads.\n",
"resp = http_requests.post(url, headers=headers, json=payload, timeout=120, stream=True)\n",
"print(f'Status: {resp.status_code}')\n",
"\n",
"if resp.status_code == 200:\n",
" # Parse event-stream: each event has a JSON payload embedded in binary frames.\n",
" # We extract JSON objects from the raw byte stream.\n",
" full_text = []\n",
" raw = resp.content\n",
" # Find all JSON objects in the binary stream\n",
" import re\n",
" json_objects = re.findall(rb'\\{[^{}]*(?:\\{[^{}]*\\}[^{}]*)*\\}', raw)\n",
" for obj_bytes in json_objects:\n",
" try:\n",
" obj = json.loads(obj_bytes.decode('utf-8', errors='ignore'))\n",
" # contentBlockDelta events contain the text\n",
" delta = obj.get('delta', {})\n",
" if 'text' in delta:\n",
" full_text.append(delta['text'])\n",
" print(delta['text'], end='', flush=True)\n",
" except (json.JSONDecodeError, UnicodeDecodeError):\n",
" continue\n",
" print() # newline after streaming\n",
" if full_text:\n",
" print(f'\\n--- Full response ({len(\"\".join(full_text))} chars) ---')\n",
" else:\n",
" print('\\n(No text deltas found in stream. Raw first 500 bytes below)')\n",
" print(repr(raw[:500]))\n",
"else:\n",
" print(f'Error {resp.status_code}: {resp.text[:1000]}')"
]
},
{
"cell_type": "markdown",
"id": "60fef30d",
"metadata": {},
"source": [
"### What just happened?\n",
"\n",
"The full auth chain executed in one request:\n",
"\n",
"1. You sent a **User Auth Pool JWT** → harness validated it (inbound auth)\n",
"2. Agent decided to call `get_order` → harness fetched an **M2M token from M2M Pool**\n",
" via the credential provider (outbound auth)\n",
"3. Harness called the **gateway** with the M2M token → gateway validated it,\n",
" invoked **Lambda** with its own IAM role\n",
"4. Order details flowed back through the agent to you\n",
"\n",
"Three auth mechanisms, zero secrets in the invoke call. The harness handled\n",
"the token exchange automatically from the `outboundAuth.oauth` config.\n",
"\n",
"For more on harness security, see the \n",
"[AgentCore harness Developer Guide](https://docs.aws.amazon.com/bedrock-agentcore/latest/devguide/harness.html)."
]
},
{
"cell_type": "markdown",
"id": "7f6a4850",
"metadata": {},
"source": [
"## Step 5: Cleanup\n",
"\n",
"Deletes all resources in reverse order. Discovers by name so it works\n",
"even after a kernel restart. Skips gracefully if a resource doesn't exist."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "622c691b",
"metadata": {},
"outputs": [],
"source": [
"cleanup_all(REGION, PREFIX)"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.13.2"
}
},
"nbformat": 4,
"nbformat_minor": 5
}
Binary file not shown.

After

Width:  |  Height:  |  Size: 300 KiB

@@ -0,0 +1,3 @@
boto3>=1.38.0
jupyter
requests
@@ -0,0 +1 @@
# utils package
@@ -0,0 +1,92 @@
"""
Simple order management Lambda function for AgentCore Gateway target.
Exposes get_order and update_order tools.
"""
import json
# Mock order database
ORDERS = {
"ORD-001": {
"orderId": "ORD-001",
"item": "Mechanical Keyboard",
"status": "shipped",
"amount": 149.99,
},
"ORD-002": {
"orderId": "ORD-002",
"item": "USB-C Hub",
"status": "processing",
"amount": 59.99,
},
"ORD-003": {
"orderId": "ORD-003",
"item": "Monitor Stand",
"status": "delivered",
"amount": 89.99,
},
}
def lambda_handler(event, context):
"""Handle tool calls from AgentCore Gateway.
The gateway passes the tool name in context.client_context.custom['bedrockAgentCoreToolName'].
The event body contains the tool arguments directly.
"""
import logging
logger = logging.getLogger()
logger.setLevel(logging.INFO)
# Log the raw event and context for debugging
logger.info(f"Event: {json.dumps(event, default=str)}")
client_context = {}
if context and hasattr(context, "client_context") and context.client_context:
client_context = {
"custom": getattr(context.client_context, "custom", None),
"env": getattr(context.client_context, "env", None),
}
logger.info(f"ClientContext: {json.dumps(client_context, default=str)}")
# Get tool name from client context (set by AgentCore Gateway)
tool_name = ""
if context and hasattr(context, "client_context") and context.client_context:
custom = getattr(context.client_context, "custom", None) or {}
tool_name = custom.get("bedrockAgentCoreToolName", "")
# Fallback: check event body for 'name' key (direct invocation / testing)
if not tool_name:
tool_name = event.get("name", "")
logger.info(f"Resolved tool_name: {tool_name}")
# The gateway prefixes tool names as "{targetName}___{toolName}".
# Strip the prefix to get the bare tool name.
if "___" in tool_name:
tool_name = tool_name.split("___", 1)[1]
logger.info(f"Stripped prefix, bare tool_name: {tool_name}")
arguments = event.get("arguments", event)
if tool_name == "get_order":
order_id = arguments.get("orderId", "")
order = ORDERS.get(order_id)
if order:
return {"status": "success", "result": json.dumps(order)}
return {"status": "error", "result": f"Order {order_id} not found"}
elif tool_name == "update_order_status":
order_id = arguments.get("orderId", "")
new_status = arguments.get("status", "")
order = ORDERS.get(order_id)
if order:
order["status"] = new_status
return {
"status": "success",
"result": json.dumps({"orderId": order_id, "newStatus": new_status}),
}
return {"status": "error", "result": f"Order {order_id} not found"}
return {"status": "error", "result": f"Unknown tool: {tool_name}"}
@@ -0,0 +1,855 @@
"""
Helper functions for provisioning infrastructure used by the
harness-oauth-gateway notebook.
Each function is self-contained: it creates one logical resource group,
prints progress, and returns a dict with the values the notebook needs.
All functions are idempotent — safe to re-run if a resource already exists.
"""
import boto3
import io
import json
import time
import uuid
import zipfile
# ─────────────────────────────────────────────────────────────────────
# Internal helpers
# ─────────────────────────────────────────────────────────────────────
def _find_pool_by_name(cog, pool_name):
"""Return pool ID if a Cognito user pool with this name exists, else None."""
for p in cog.list_user_pools(MaxResults=60).get("UserPools", []):
if p["Name"] == pool_name:
return p["Id"]
return None
def _find_client_by_name(cog, pool_id, client_name):
"""Return (client_id, client_secret|None) if an app client exists."""
for c in cog.list_user_pool_clients(UserPoolId=pool_id, MaxResults=60)[
"UserPoolClients"
]:
if c["ClientName"] == client_name:
full = cog.describe_user_pool_client(
UserPoolId=pool_id,
ClientId=c["ClientId"],
)["UserPoolClient"]
return full["ClientId"], full.get("ClientSecret")
return None, None
def _ensure_role(iam_c, role_name, trust_doc):
"""Create an IAM role if it doesn't exist. Returns the role ARN."""
try:
return iam_c.create_role(
RoleName=role_name,
AssumeRolePolicyDocument=trust_doc,
)["Role"]["Arn"]
except iam_c.exceptions.EntityAlreadyExistsException:
return iam_c.get_role(RoleName=role_name)["Role"]["Arn"]
def _ensure_policy(iam_c, account_id, policy_name, policy_doc):
"""Create a managed policy if it doesn't exist. Returns the ARN."""
arn = f"arn:aws:iam::{account_id}:policy/{policy_name}"
try:
iam_c.create_policy(PolicyName=policy_name, PolicyDocument=policy_doc)
except iam_c.exceptions.EntityAlreadyExistsException:
pass
return arn
# ─────────────────────────────────────────────────────────────────────
# Cognito Pool #1 — User Auth (USER_PASSWORD_AUTH)
# ─────────────────────────────────────────────────────────────────────
def create_user_auth_pool(
region: str, prefix: str, username: str, password: str
) -> dict:
"""Create (or reuse) a Cognito user pool for end-user authentication.
Creates:
- User pool with USER_PASSWORD_AUTH
- App client (no secret)
- A test user with the supplied credentials
Returns dict with keys: pool_id, client_id, discovery_url
"""
cog = boto3.client("cognito-idp", region_name=region)
pool_name = f"{prefix}-user-pool"
client_name = f"{prefix}-user-client"
# ── Pool ──
pool_id = _find_pool_by_name(cog, pool_name)
if pool_id:
print(f" Pool #1 already exists: {pool_id}")
else:
pool_id = cog.create_user_pool(
PoolName=pool_name,
Policies={"PasswordPolicy": {"MinimumLength": 8}},
AutoVerifiedAttributes=["email"],
)["UserPool"]["Id"]
print(f" Pool #1 created: {pool_id}")
# ── App client ──
client_id, _ = _find_client_by_name(cog, pool_id, client_name)
if client_id:
print(f" Pool #1 client already exists: {client_id}")
else:
client_id = cog.create_user_pool_client(
UserPoolId=pool_id,
ClientName=client_name,
ExplicitAuthFlows=["ALLOW_USER_PASSWORD_AUTH", "ALLOW_REFRESH_TOKEN_AUTH"],
GenerateSecret=False,
)["UserPoolClient"]["ClientId"]
print(f" Pool #1 client created: {client_id}")
# ── Test user (create or reset password) ──
try:
cog.admin_create_user(
UserPoolId=pool_id,
Username=username,
MessageAction="SUPPRESS",
)
print(f' Test user "{username}" created')
except cog.exceptions.UsernameExistsException:
print(f' Test user "{username}" already exists')
cog.admin_set_user_password(
UserPoolId=pool_id,
Username=username,
Password=password,
Permanent=True,
)
discovery_url = (
f"https://cognito-idp.{region}.amazonaws.com/{pool_id}"
f"/.well-known/openid-configuration"
)
return dict(pool_id=pool_id, client_id=client_id, discovery_url=discovery_url)
# ─────────────────────────────────────────────────────────────────────
# Cognito Pool #2 — M2M (client_credentials + resource server)
# ─────────────────────────────────────────────────────────────────────
def create_m2m_pool(region: str, prefix: str) -> dict:
"""Create (or reuse) a Cognito user pool for machine-to-machine auth.
Creates:
- User pool
- Resource server with an 'invoke' scope
- Cognito domain (for the token endpoint)
- App client with client_credentials grant + secret
Returns dict with keys:
pool_id, client_id, client_secret, discovery_url,
scope, domain_prefix, token_endpoint
"""
cog = boto3.client("cognito-idp", region_name=region)
pool_name = f"{prefix}-m2m-pool"
client_name = f"{prefix}-m2m-client"
rs_id = f"{prefix}-gateway"
scope = f"{rs_id}/invoke"
# ── Pool ──
pool_id = _find_pool_by_name(cog, pool_name)
if pool_id:
print(f" Pool #2 already exists: {pool_id}")
else:
pool_id = cog.create_user_pool(
PoolName=pool_name,
Policies={"PasswordPolicy": {"MinimumLength": 8}},
)["UserPool"]["Id"]
print(f" Pool #2 created: {pool_id}")
# ── Resource server ──
try:
cog.create_resource_server(
UserPoolId=pool_id,
Identifier=rs_id,
Name="Gateway Resource Server",
Scopes=[
{"ScopeName": "invoke", "ScopeDescription": "Invoke gateway tools"}
],
)
print(f" Resource server created, scope: {scope}")
except Exception as e:
if "already exists" in str(e).lower():
print(f" Resource server already exists, scope: {scope}")
else:
raise
# ── Domain ──
desc = cog.describe_user_pool(UserPoolId=pool_id)["UserPool"]
domain_prefix = desc.get("Domain")
if domain_prefix:
print(f" Domain already exists: {domain_prefix}")
else:
domain_prefix = f"{prefix}-{uuid.uuid4().hex[:8]}"
cog.create_user_pool_domain(Domain=domain_prefix, UserPoolId=pool_id)
print(f" Domain created: {domain_prefix}")
token_endpoint = (
f"https://{domain_prefix}.auth.{region}.amazoncognito.com/oauth2/token"
)
# ── App client ──
client_id, client_secret = _find_client_by_name(cog, pool_id, client_name)
if client_id and client_secret:
print(f" Pool #2 client already exists: {client_id}")
else:
resp = cog.create_user_pool_client(
UserPoolId=pool_id,
ClientName=client_name,
GenerateSecret=True,
AllowedOAuthFlows=["client_credentials"],
AllowedOAuthScopes=[scope],
AllowedOAuthFlowsUserPoolClient=True,
SupportedIdentityProviders=["COGNITO"],
)
client_id = resp["UserPoolClient"]["ClientId"]
client_secret = resp["UserPoolClient"]["ClientSecret"]
print(f" Pool #2 client created: {client_id}")
discovery_url = (
f"https://cognito-idp.{region}.amazonaws.com/{pool_id}"
f"/.well-known/openid-configuration"
)
return dict(
pool_id=pool_id,
client_id=client_id,
client_secret=client_secret,
discovery_url=discovery_url,
scope=scope,
domain_prefix=domain_prefix,
token_endpoint=token_endpoint,
)
# ─────────────────────────────────────────────────────────────────────
# OAuth2 Credential Provider (AgentCore Identity)
# ─────────────────────────────────────────────────────────────────────
def create_credential_provider(
region: str,
prefix: str,
discovery_url: str,
client_id: str,
client_secret: str,
) -> dict:
"""Create (or reuse) an OAuth2 credential provider in AgentCore Identity.
Returns dict with keys: name, arn
"""
ac = boto3.client("bedrock-agentcore-control", region_name=region)
name = f"{prefix}-m2m-provider"
# Check if it already exists
try:
existing = ac.get_oauth2_credential_provider(name=name)
arn = existing["credentialProviderArn"]
print(f" Credential provider already exists: {arn}")
return dict(name=name, arn=arn)
except Exception:
pass # doesn't exist yet
resp = ac.create_oauth2_credential_provider(
name=name,
credentialProviderVendor="CustomOauth2",
oauth2ProviderConfigInput={
"customOauth2ProviderConfig": {
"oauthDiscovery": {"discoveryUrl": discovery_url},
"clientId": client_id,
"clientSecret": client_secret,
}
},
)
arn = resp["credentialProviderArn"]
print(f" Credential provider created: {arn}")
return dict(name=name, arn=arn)
# ─────────────────────────────────────────────────────────────────────
# Lambda Function
# ─────────────────────────────────────────────────────────────────────
def deploy_lambda(region: str, prefix: str) -> dict:
"""Deploy (or reuse) the order-management Lambda function.
Returns dict with keys: function_name, function_arn, role_name
"""
iam_c = boto3.client("iam")
lam = boto3.client("lambda", region_name=region)
role_name = f"{prefix}-lambda-role"
trust = json.dumps(
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {"Service": "lambda.amazonaws.com"},
"Action": "sts:AssumeRole",
}
],
}
)
role_arn = _ensure_role(iam_c, role_name, trust)
try:
iam_c.attach_role_policy(
RoleName=role_name,
PolicyArn="arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole",
)
except Exception:
pass
print(f" Role: {role_name}")
fn_name = f"{prefix}-order-mgmt"
# Check if Lambda already exists
try:
fn_arn = lam.get_function(FunctionName=fn_name)["Configuration"]["FunctionArn"]
print(f" Lambda already exists: {fn_name}")
except lam.exceptions.ResourceNotFoundException:
# Need IAM propagation only for new roles
print(" Waiting 10 s for IAM propagation…")
time.sleep(10)
buf = io.BytesIO()
with zipfile.ZipFile(buf, "w", zipfile.ZIP_DEFLATED) as zf:
zf.write("utils/lambda_function_code.py", "lambda_function.py")
buf.seek(0)
fn_arn = lam.create_function(
FunctionName=fn_name,
Runtime="python3.12",
Role=role_arn,
Handler="lambda_function.lambda_handler",
Code={"ZipFile": buf.read()},
Description="Order management for AgentCore Gateway",
Timeout=30,
)["FunctionArn"]
print(f" Lambda created: {fn_name}")
lam.get_waiter("function_active_v2").wait(FunctionName=fn_name)
print(" Lambda is Active")
return dict(function_name=fn_name, function_arn=fn_arn, role_name=role_name)
# ─────────────────────────────────────────────────────────────────────
# Gateway + Lambda target
# ─────────────────────────────────────────────────────────────────────
def create_gateway_with_lambda_target(
region: str,
prefix: str,
account_id: str,
discovery_url: str,
allowed_client: str,
allowed_scope: str,
lambda_arn: str,
lambda_function_name: str,
) -> dict:
"""Create (or reuse) an AgentCore Gateway with CUSTOM_JWT inbound auth
and a Lambda target using GATEWAY_IAM_ROLE outbound auth.
Returns dict with keys:
gateway_id, gateway_arn, gateway_url, target_id,
role_name, policy_name
"""
iam_c = boto3.client("iam")
ac = boto3.client("bedrock-agentcore-control", region_name=region)
# ── Gateway execution role ──
gw_role_name = f"{prefix}-gateway-role"
trust = json.dumps(
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {"Service": "bedrock-agentcore.amazonaws.com"},
"Action": "sts:AssumeRole",
}
],
}
)
gw_role_arn = _ensure_role(iam_c, gw_role_name, trust)
print(f" Gateway role: {gw_role_name}")
gw_policy_name = f"{prefix}-gw-lambda-policy"
policy_doc = json.dumps(
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": "lambda:InvokeFunction",
"Resource": f"arn:aws:lambda:{region}:{account_id}:function:{lambda_function_name}",
}
],
}
)
gw_policy_arn = _ensure_policy(iam_c, account_id, gw_policy_name, policy_doc)
iam_c.attach_role_policy(RoleName=gw_role_name, PolicyArn=gw_policy_arn)
# ── Gateway (try create, handle conflict if already exists) ──
gateway_name = f"{prefix}-gateway"
try:
print(" Waiting 10 s for IAM propagation…")
time.sleep(10)
gw = ac.create_gateway(
name=gateway_name,
protocolType="MCP",
roleArn=gw_role_arn,
authorizerType="CUSTOM_JWT",
authorizerConfiguration={
"customJWTAuthorizer": {
"discoveryUrl": discovery_url,
"allowedClients": [allowed_client],
"allowedScopes": [allowed_scope],
}
},
)
gw_id = gw["gatewayId"]
gw_arn = gw["gatewayArn"]
gw_url = gw.get("gatewayUrl", "")
print(f" Gateway created: {gw_id}")
except ac.exceptions.ConflictException:
# Already exists — find it by iterating the list
gw_id = None
for g in ac.list_gateways().get("items", []):
info = ac.get_gateway(gatewayIdentifier=g["gatewayId"])
if info.get("name") == gateway_name:
gw_id = info["gatewayId"]
gw_arn = info.get("gatewayArn", "")
gw_url = info.get("gatewayUrl", "")
break
if not gw_id:
raise RuntimeError(
f"Gateway {gateway_name} exists but could not be found via list"
)
print(f" Gateway already exists: {gw_id}")
# Wait for READY
print(" Waiting for gateway READY…")
for _ in range(30):
status = ac.get_gateway(gatewayIdentifier=gw_id)["status"]
if status == "READY":
break
time.sleep(10)
print(f" Gateway status: {status}")
# ── Lambda target (try create, handle conflict) ──
target_name = f"{prefix}-lambda-target"
tool_schemas = [
{
"name": "get_order",
"description": "Look up an order by ID. Returns item, status, amount.",
"inputSchema": {
"type": "object",
"properties": {
"orderId": {"type": "string", "description": "e.g. ORD-001"}
},
"required": ["orderId"],
},
},
{
"name": "update_order_status",
"description": "Update the status of an existing order.",
"inputSchema": {
"type": "object",
"properties": {
"orderId": {"type": "string", "description": "Order ID to update"},
"status": {"type": "string", "description": "New status value"},
},
"required": ["orderId", "status"],
},
},
]
try:
tgt = ac.create_gateway_target(
gatewayIdentifier=gw_id,
name=target_name,
targetConfiguration={
"mcp": {
"lambda": {
"lambdaArn": lambda_arn,
"toolSchema": {"inlinePayload": tool_schemas},
}
}
},
credentialProviderConfigurations=[
{"credentialProviderType": "GATEWAY_IAM_ROLE"}
],
)
tgt_id = tgt["targetId"]
print(f" Target created: {tgt_id}")
except ac.exceptions.ConflictException:
# Already exists — find it
tgt_id = None
for t in ac.list_gateway_targets(gatewayIdentifier=gw_id).get("items", []):
tgt_id = t["targetId"]
break
if not tgt_id:
raise RuntimeError(
f"Target on {gw_id} exists but could not be found via list"
)
print(f" Target already exists: {tgt_id}")
# Wait for target READY
print(" Waiting for target READY…")
for _ in range(30):
status = ac.get_gateway_target(gatewayIdentifier=gw_id, targetId=tgt_id)[
"status"
]
if status == "READY":
break
time.sleep(10)
print(f" Target status: {status}")
# Re-fetch gateway to get full ARN/URL if we reused
gw_full = ac.get_gateway(gatewayIdentifier=gw_id)
return dict(
gateway_id=gw_id,
gateway_arn=gw_full.get("gatewayArn", gw_arn),
gateway_url=gw_full.get("gatewayUrl", gw_url),
target_id=tgt_id,
role_name=gw_role_name,
policy_name=gw_policy_name,
)
# ─────────────────────────────────────────────────────────────────────
# Harness execution role
# ─────────────────────────────────────────────────────────────────────
def create_harness_execution_role(region: str, prefix: str, account_id: str) -> dict:
"""Create (or reuse) the IAM execution role the harness assumes at runtime.
Returns dict with keys: role_arn, role_name, policy_name
"""
iam_c = boto3.client("iam")
role_name = f"{prefix}-harness-role"
trust = json.dumps(
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {"Service": "bedrock-agentcore.amazonaws.com"},
"Action": "sts:AssumeRole",
}
],
}
)
role_arn = _ensure_role(iam_c, role_name, trust)
print(f" Role: {role_name} (already exists or created)")
policy_name = f"{prefix}-harness-policy"
policy_doc = json.dumps(
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "Bedrock",
"Effect": "Allow",
"Action": [
"bedrock:InvokeModel",
"bedrock:InvokeModelWithResponseStream",
],
"Resource": [
"arn:aws:bedrock:*::foundation-model/*",
f"arn:aws:bedrock:{region}:{account_id}:*",
],
},
{
"Sid": "Gateway",
"Effect": "Allow",
"Action": "bedrock-agentcore:InvokeGateway",
"Resource": f"arn:aws:bedrock-agentcore:{region}:{account_id}:gateway/*",
},
{
"Sid": "OAuth2TokenVault",
"Effect": "Allow",
"Action": "bedrock-agentcore:GetResourceOauth2Token",
"Resource": [
f"arn:aws:bedrock-agentcore:{region}:{account_id}:token-vault/default",
f"arn:aws:bedrock-agentcore:{region}:{account_id}:token-vault/default/oauth2credentialprovider/*",
f"arn:aws:bedrock-agentcore:{region}:{account_id}:workload-identity-directory/default",
f"arn:aws:bedrock-agentcore:{region}:{account_id}:workload-identity-directory/default/workload-identity/*",
],
},
{
"Sid": "OAuth2Secret",
"Effect": "Allow",
"Action": "secretsmanager:GetSecretValue",
"Resource": f"arn:aws:secretsmanager:{region}:{account_id}:secret:bedrock-agentcore-identity!default/oauth2/*",
},
{
"Sid": "WorkloadIdentity",
"Effect": "Allow",
"Action": [
"bedrock-agentcore:GetWorkloadAccessToken",
"bedrock-agentcore:GetWorkloadAccessTokenForJWT",
],
"Resource": ["*"],
},
{
"Sid": "EcrPublic",
"Effect": "Allow",
"Action": [
"ecr-public:GetAuthorizationToken",
"sts:GetServiceBearerToken",
],
"Resource": "*",
},
{
"Sid": "CloudWatch",
"Effect": "Allow",
"Action": [
"logs:CreateLogGroup",
"logs:CreateLogStream",
"logs:PutLogEvents",
"logs:DescribeLogGroups",
"logs:DescribeLogStreams",
],
"Resource": f"arn:aws:logs:{region}:{account_id}:log-group:/aws/bedrock-agentcore/*",
},
{
"Sid": "XRay",
"Effect": "Allow",
"Action": [
"xray:PutTraceSegments",
"xray:PutTelemetryRecords",
"xray:GetSamplingRules",
"xray:GetSamplingTargets",
],
"Resource": "*",
},
{
"Sid": "CWMetrics",
"Effect": "Allow",
"Action": "cloudwatch:PutMetricData",
"Resource": "*",
"Condition": {
"StringEquals": {"cloudwatch:namespace": "bedrock-agentcore"}
},
},
],
}
)
policy_arn = _ensure_policy(iam_c, account_id, policy_name, policy_doc)
iam_c.attach_role_policy(RoleName=role_name, PolicyArn=policy_arn)
print(f" Policy: {policy_name}")
print(" Waiting 15 s for IAM propagation…")
time.sleep(15)
return dict(role_arn=role_arn, role_name=role_name, policy_name=policy_name)
# ─────────────────────────────────────────────────────────────────────
# Cleanup — discover-and-delete all resources by name
# ─────────────────────────────────────────────────────────────────────
def cleanup_all(region: str, prefix: str):
"""Delete every resource created by this notebook, in reverse order.
Discovers resources by name so it works even after a kernel restart.
Skips gracefully if a resource was never created.
"""
account_id = boto3.client("sts", region_name=region).get_caller_identity()[
"Account"
]
cog = boto3.client("cognito-idp", region_name=region)
lam = boto3.client("lambda", region_name=region)
iam_c = boto3.client("iam")
ac = boto3.client("bedrock-agentcore-control", region_name=region)
harness_name = f"{prefix}-harness".replace("-", "_")
gateway_name = f"{prefix}-gateway"
fn_name = f"{prefix}-order-mgmt"
cred_name = f"{prefix}-m2m-provider"
lambda_role = f"{prefix}-lambda-role"
gw_role = f"{prefix}-gateway-role"
harness_role = f"{prefix}-harness-role"
gw_policy = f"{prefix}-gw-lambda-policy"
harness_policy = f"{prefix}-harness-policy"
pool1_name = f"{prefix}-user-pool"
pool2_name = f"{prefix}-m2m-pool"
deleted, skipped = [], []
def ok(m):
deleted.append(m)
print(f"{m}")
def skip(m):
skipped.append(m)
print(f" {m}")
def _wait_gone(check_fn, retries=24, delay=5, **kwargs):
for _ in range(retries):
try:
check_fn(**kwargs)
time.sleep(delay)
except Exception:
return
# 1. Harness
print("\n[1/7] Harness")
try:
matches = [
h
for h in ac.list_harnesses().get("harnesses", [])
if h.get("harnessName") == harness_name
]
if matches:
hid = matches[0]["harnessId"]
ac.delete_harness(harnessId=hid)
ok(f"Harness {hid} delete initiated")
_wait_gone(ac.get_harness, harnessId=hid)
ok("Harness deleted")
else:
skip("Harness not found")
except Exception as e:
skip(f"Harness: {e}")
# 2. Gateway + targets
print("\n[2/7] Gateway + targets")
try:
gws = [
g
for g in ac.list_gateways().get("items", [])
if g.get("name") == gateway_name
]
if gws:
gid = gws[0]["gatewayId"]
for t in ac.list_gateway_targets(gatewayIdentifier=gid).get("items", []):
tid = t["targetId"]
ac.delete_gateway_target(gatewayIdentifier=gid, targetId=tid)
ok(f"Target {tid} delete initiated")
_wait_gone(ac.get_gateway_target, gatewayIdentifier=gid, targetId=tid)
ok(f"Target {tid} deleted")
ac.delete_gateway(gatewayIdentifier=gid)
ok(f"Gateway {gid} delete initiated")
_wait_gone(ac.get_gateway, gatewayIdentifier=gid)
ok("Gateway deleted")
else:
skip("Gateway not found")
except Exception as e:
skip(f"Gateway: {e}")
# 3. Credential provider
print("\n[3/7] Credential provider")
try:
ac.get_oauth2_credential_provider(name=cred_name)
ac.delete_oauth2_credential_provider(name=cred_name)
ok(f"{cred_name} deleted")
except Exception as e:
skip(
"Credential provider not found"
if "not found" in str(e).lower()
or "ResourceNotFound" in str(type(e).__name__)
else f"{e}"
)
# 4. Lambda
print("\n[4/7] Lambda")
try:
lam.get_function(FunctionName=fn_name)
lam.delete_function(FunctionName=fn_name)
ok(f"{fn_name} deleted")
except lam.exceptions.ResourceNotFoundException:
skip("Lambda not found")
except Exception as e:
skip(f"Lambda: {e}")
# 5. IAM roles & policies
print("\n[5/7] IAM roles & policies")
def _del_role(rname, policy_names=None):
try:
iam_c.get_role(RoleName=rname)
except iam_c.exceptions.NoSuchEntityException:
skip(f"Role {rname} not found")
return
except Exception as e:
skip(f"{rname}: {e}")
return
try:
for p in iam_c.list_attached_role_policies(RoleName=rname)[
"AttachedPolicies"
]:
iam_c.detach_role_policy(RoleName=rname, PolicyArn=p["PolicyArn"])
except Exception:
pass
for pn in policy_names or []:
try:
iam_c.delete_policy(PolicyArn=f"arn:aws:iam::{account_id}:policy/{pn}")
ok(f"Policy {pn} deleted")
except Exception:
pass
try:
iam_c.delete_role(RoleName=rname)
ok(f"Role {rname} deleted")
except Exception as e:
skip(f"Role {rname}: {e}")
_del_role(lambda_role)
_del_role(gw_role, [gw_policy])
_del_role(harness_role, [harness_policy])
# 6. Cognito Pool #2
print("\n[6/7] Cognito Pool #2")
try:
m = [
p
for p in cog.list_user_pools(MaxResults=60)["UserPools"]
if p["Name"] == pool2_name
]
if m:
pid = m[0]["Id"]
try:
d = cog.describe_user_pool(UserPoolId=pid)["UserPool"].get("Domain")
if d:
cog.delete_user_pool_domain(Domain=d, UserPoolId=pid)
ok(f"Domain {d} deleted")
except Exception:
pass
cog.delete_user_pool(UserPoolId=pid)
ok(f"Pool #2 {pid} deleted")
else:
skip("Pool #2 not found")
except Exception as e:
skip(f"Pool #2: {e}")
# 7. Cognito Pool #1
print("\n[7/7] Cognito Pool #1")
try:
m = [
p
for p in cog.list_user_pools(MaxResults=60)["UserPools"]
if p["Name"] == pool1_name
]
if m:
pid = m[0]["Id"]
cog.delete_user_pool(UserPoolId=pid)
ok(f"Pool #1 {pid} deleted")
else:
skip("Pool #1 not found")
except Exception as e:
skip(f"Pool #1: {e}")
print(f"\n{'=' * 50}")
print(f"Deleted {len(deleted)} resources, skipped {len(skipped)}")
print("✅ Cleanup complete!")
@@ -8,3 +8,4 @@
- Ed Fraga
- Ray Wang
- Jay Viera
- Dheeraj Hegde