1
0
mirror of synced 2026-05-22 22:53:35 +00:00

Coding Agents (#1445)

* adding typescript

* adding coding agents

* adding coding agents/ fix lint
This commit is contained in:
Evandro Franco
2026-05-07 10:31:04 -04:00
committed by GitHub
parent 0d71f5b5ab
commit 310073382c
22 changed files with 3054 additions and 0 deletions
@@ -0,0 +1,136 @@
# Claude Code on AgentCore Runtime with S3 Files
Deploys Claude Code as an HTTP agent on AWS Bedrock AgentCore Runtime, with an S3 Files file system mounted at `/mnt/s3files` for persistent storage shared across sessions.
## Architecture
```
┌─────────────────────────┐ ┌─────────────────────────┐
│ AgentCore Runtime │ │ AgentCore Runtime │
│ Session A │ │ Session B │
│ (Claude Code) │ │ (Claude Code) │
│ │ │ │
│ /mnt/s3files ──────────┼────┐ │ /mnt/s3files ──────────┼────┐
└─────────────────────────┘ │ └─────────────────────────┘ │
│ │
▼ ▼
┌──────────────────────────────────────────────────┐
│ S3 Files File System │
│ │
│ ┌────────────────────────┐ │
│ │ S3 Files Access Point │ │
│ │ (uid/gid 1000) │ │
│ └───────────┬────────────┘ │
└──────────────┼───────────────────────────────────┘
┌──────────────────────────────┐
│ S3 Bucket │
│ (agentcore-<account-id>) │
│ │
│ agents/ │
│ ├── skills/ │
│ ├── results/ │
│ └── ... │
└──────────────────────────────┘
```
Multiple runtime sessions mount the same S3 Files file system, enabling agents to share skills, results, and data across independent invocations.
```
CloudFormation stack (cfn-vpc.yaml):
VPC, subnets, NAT Gateway, Security Group
S3 Files IAM role, file system, access point, mount targets
deploy.py creates:
IAM execution role
AgentCore Runtime (container from ECR, S3 Files mounted at /mnt/s3files)
```
## Prerequisites
### Python environment
```bash
uv venv --python 3.13 .venv
source .venv/bin/activate
uv pip install boto3 awscli --force-reinstall --no-cache-dir
```
### S3 Files IAM policies
The CloudFormation stack creates an IAM role (`S3FilesRole`) with the permissions required by S3 Files (S3, KMS, and EventBridge). For the full list of required policies, see the [S3 Files prerequisite policies](https://docs.aws.amazon.com/AmazonS3/latest/userguide/s3-files-prereq-policies.html) documentation.
## Step-by-step guide
### Step 1 — Infrastructure setup (CloudFormation)
Run the setup script to create the S3 bucket, deploy the CloudFormation stack (VPC, subnets, NAT Gateway, Security Group, S3 Files), build the arm64 Docker image, and push it to ECR.
```bash
./setup.sh us-west-2
```
All outputs are saved to `envvars.config` and used automatically by the next steps.
### Step 2 — Deploy the agent
Create the IAM execution role and the AgentCore Runtime:
```bash
python deploy.py
```
The script waits until the runtime status is `READY` and saves the runtime config to `runtime_config.json`.
If you need to update an existing runtime (e.g. after rebuilding the Docker image), run:
```bash
python update.py
```
### Step 3 — Invoke the agent
Send a prompt to the deployed agent. The first call creates a new session; subsequent calls can reuse the session ID for conversation continuity.
**Session A** — create a shared skill on the persistent filesystem:
```bash
python invoke.py "can u create a new skill, to review python code? This skill should be created into /mnt/s3files/skills/"
```
Continue the conversation within the same session:
```bash
python invoke.py --session <session-a-id> "now add unit tests for that skill"
```
**Session B** — a completely new session accesses the same filesystem and uses the skill created by Session A:
```bash
python invoke.py "list the skills available in /mnt/s3files/skills/ and use the python review skill to review this code: def add(a,b): return a+b"
```
Both sessions share `/mnt/s3files`, so anything written by one session is immediately available to others.
### Step 4 — Execute a command on the running session
Run a shell command directly on the container using the session ID from the previous step:
```bash
python exec_cmd.py --session 7fd93a80-8838-4721-abea-b1787dd0172c "ls -l /mnt/s3files"
```
### Step 5 — Cleanup
Delete all AgentCore resources (runtime, IAM role) and the CloudFormation stack. The S3 bucket is kept.
```bash
python cleanup.py
```
Or use the shell wrapper:
```bash
./cleanup.sh
```
@@ -0,0 +1,371 @@
AWSTemplateFormatVersion: "2010-09-09"
Description: VPC infrastructure for AgentCore Claude Code with S3 Files
Parameters:
VpcCidr:
Type: String
Default: "10.0.0.0/16"
BucketName:
Type: String
Description: S3 bucket name (agentcore-<account-id>)
S3FilesPrefix:
Type: String
Default: "agents/"
Mappings:
AgentCoreAZs:
us-east-1:
AZ1: use1-az1
AZ2: use1-az2
us-east-2:
AZ1: use2-az1
AZ2: use2-az2
us-west-2:
AZ1: usw2-az1
AZ2: usw2-az2
eu-west-1:
AZ1: euw1-az1
AZ2: euw1-az2
eu-central-1:
AZ1: euc1-az1
AZ2: euc1-az2
eu-north-1:
AZ1: eun1-az1
AZ2: eun1-az2
eu-west-2:
AZ1: euw2-az1
AZ2: euw2-az2
eu-west-3:
AZ1: euw3-az1
AZ2: euw3-az2
ap-southeast-1:
AZ1: apse1-az1
AZ2: apse1-az2
ap-southeast-2:
AZ1: apse2-az1
AZ2: apse2-az2
ap-northeast-1:
AZ1: apne1-az1
AZ2: apne1-az2
ap-northeast-2:
AZ1: apne2-az1
AZ2: apne2-az2
ap-south-1:
AZ1: aps1-az1
AZ2: aps1-az2
ca-central-1:
AZ1: cac1-az1
AZ2: cac1-az2
Resources:
# ── VPC ──────────────────────────────────────────────────────────────────────
VPC:
Type: AWS::EC2::VPC
Properties:
CidrBlock: !Ref VpcCidr
EnableDnsSupport: true
EnableDnsHostnames: true
Tags:
- Key: Name
Value: agentcore-vpc
InternetGateway:
Type: AWS::EC2::InternetGateway
Properties:
Tags:
- Key: Name
Value: agentcore-igw
VPCGatewayAttachment:
Type: AWS::EC2::VPCGatewayAttachment
Properties:
VpcId: !Ref VPC
InternetGatewayId: !Ref InternetGateway
# ── Public Subnets ──────────────────────────────────────────────────────────
PublicSubnet1:
Type: AWS::EC2::Subnet
Properties:
VpcId: !Ref VPC
CidrBlock: "10.0.1.0/24"
AvailabilityZoneId: !FindInMap [AgentCoreAZs, !Ref "AWS::Region", AZ1]
MapPublicIpOnLaunch: true
Tags:
- Key: Name
Value: agentcore-public-1
PublicSubnet2:
Type: AWS::EC2::Subnet
Properties:
VpcId: !Ref VPC
CidrBlock: "10.0.2.0/24"
AvailabilityZoneId: !FindInMap [AgentCoreAZs, !Ref "AWS::Region", AZ2]
MapPublicIpOnLaunch: true
Tags:
- Key: Name
Value: agentcore-public-2
# ── Private Subnets ─────────────────────────────────────────────────────────
PrivateSubnet1:
Type: AWS::EC2::Subnet
Properties:
VpcId: !Ref VPC
CidrBlock: "10.0.11.0/24"
AvailabilityZoneId: !FindInMap [AgentCoreAZs, !Ref "AWS::Region", AZ1]
Tags:
- Key: Name
Value: agentcore-private-1
PrivateSubnet2:
Type: AWS::EC2::Subnet
Properties:
VpcId: !Ref VPC
CidrBlock: "10.0.12.0/24"
AvailabilityZoneId: !FindInMap [AgentCoreAZs, !Ref "AWS::Region", AZ2]
Tags:
- Key: Name
Value: agentcore-private-2
# ── Public Route Table ──────────────────────────────────────────────────────
PublicRouteTable:
Type: AWS::EC2::RouteTable
Properties:
VpcId: !Ref VPC
Tags:
- Key: Name
Value: agentcore-public-rt
PublicRoute:
Type: AWS::EC2::Route
DependsOn: VPCGatewayAttachment
Properties:
RouteTableId: !Ref PublicRouteTable
DestinationCidrBlock: "0.0.0.0/0"
GatewayId: !Ref InternetGateway
PublicSubnet1RTAssoc:
Type: AWS::EC2::SubnetRouteTableAssociation
Properties:
SubnetId: !Ref PublicSubnet1
RouteTableId: !Ref PublicRouteTable
PublicSubnet2RTAssoc:
Type: AWS::EC2::SubnetRouteTableAssociation
Properties:
SubnetId: !Ref PublicSubnet2
RouteTableId: !Ref PublicRouteTable
# ── NAT Gateway ─────────────────────────────────────────────────────────────
NatEIP:
Type: AWS::EC2::EIP
Properties:
Domain: vpc
NatGateway:
Type: AWS::EC2::NatGateway
Properties:
AllocationId: !GetAtt NatEIP.AllocationId
SubnetId: !Ref PublicSubnet1
Tags:
- Key: Name
Value: agentcore-nat
# ── Private Route Table ─────────────────────────────────────────────────────
PrivateRouteTable:
Type: AWS::EC2::RouteTable
Properties:
VpcId: !Ref VPC
Tags:
- Key: Name
Value: agentcore-private-rt
PrivateRoute:
Type: AWS::EC2::Route
Properties:
RouteTableId: !Ref PrivateRouteTable
DestinationCidrBlock: "0.0.0.0/0"
NatGatewayId: !Ref NatGateway
PrivateSubnet1RTAssoc:
Type: AWS::EC2::SubnetRouteTableAssociation
Properties:
SubnetId: !Ref PrivateSubnet1
RouteTableId: !Ref PrivateRouteTable
PrivateSubnet2RTAssoc:
Type: AWS::EC2::SubnetRouteTableAssociation
Properties:
SubnetId: !Ref PrivateSubnet2
RouteTableId: !Ref PrivateRouteTable
# ── Security Group ──────────────────────────────────────────────────────────
AgentCoreSecurityGroup:
Type: AWS::EC2::SecurityGroup
Properties:
GroupDescription: Security group for AgentCore runtimes
VpcId: !Ref VPC
SecurityGroupIngress:
- IpProtocol: tcp
FromPort: 2049
ToPort: 2049
CidrIp: "10.0.0.0/16"
Description: Allow NFS (S3 Files mount) from VPC
SecurityGroupEgress:
- IpProtocol: "-1"
CidrIp: "0.0.0.0/0"
Description: Allow all outbound
Tags:
- Key: Name
Value: agentcore-sg
# ── S3 Files IAM Role ──────────────────────────────────────────────────────
S3FilesRole:
Type: AWS::IAM::Role
Properties:
RoleName: !Sub "s3files-${BucketName}-role"
AssumeRolePolicyDocument:
Version: "2012-10-17"
Statement:
- Effect: Allow
Principal:
Service: elasticfilesystem.amazonaws.com
Action: sts:AssumeRole
Policies:
- PolicyName: s3files-bucket-access
PolicyDocument:
Version: "2012-10-17"
Statement:
- Effect: Allow
Action:
- s3:GetObject
- s3:PutObject
- s3:DeleteObject
- s3:ListBucket
- s3:GetBucketLocation
- s3:AbortMultipartUpload
- s3:DeleteObject*
- s3:GetObject*
- s3:List*
- s3:PutObject*
- s3:ListBucketVersions
Resource:
- !Sub "arn:aws:s3:::${BucketName}"
- !Sub "arn:aws:s3:::${BucketName}/*"
- Sid: S3ObjectPermissions
Effect: Allow
Action:
- s3:AbortMultipartUpload
- s3:DeleteObject*
- s3:GetObject*
- s3:List*
- s3:PutObject*
Resource: !Sub "arn:aws:s3:::${BucketName}/*"
- Sid: UseKmsKeyWithS3Files
Effect: Allow
Action:
- kms:GenerateDataKey
- kms:Encrypt
- kms:Decrypt
- kms:ReEncryptFrom
- kms:ReEncryptTo
Condition:
StringLike:
kms:ViaService: !Sub "s3.${AWS::Region}.amazonaws.com"
kms:EncryptionContext:aws:s3:arn:
- !Sub "arn:aws:s3:::${BucketName}"
- !Sub "arn:aws:s3:::${BucketName}/*"
Resource: !Sub "arn:aws:kms:${AWS::Region}:${AWS::AccountId}:*"
- Sid: EventBridgeManage
Effect: Allow
Action:
- events:DeleteRule
- events:DisableRule
- events:EnableRule
- events:PutRule
- events:PutTargets
- events:RemoveTargets
Condition:
StringEquals:
events:ManagedBy: elasticfilesystem.amazonaws.com
Resource:
- "arn:aws:events:*:*:rule/DO-NOT-DELETE-S3-Files*"
- Sid: EventBridgeRead
Effect: Allow
Action:
- events:DescribeRule
- events:ListRuleNamesByTarget
- events:ListRules
- events:ListTargetsByRule
Resource:
- "arn:aws:events:*:*:rule/*"
# ── S3 Files File System ───────────────────────────────────────────────────
S3FilesFileSystem:
Type: AWS::S3Files::FileSystem
DependsOn: S3FilesRole
Properties:
Bucket: !Sub "arn:aws:s3:::${BucketName}"
Prefix: !Ref S3FilesPrefix
RoleArn: !GetAtt S3FilesRole.Arn
# ── S3 Files Access Point ──────────────────────────────────────────────────
S3FilesAccessPoint:
Type: AWS::S3Files::AccessPoint
Properties:
FileSystemId: !Ref S3FilesFileSystem
PosixUser:
Uid: 1000
Gid: 1000
RootDirectory:
Path: /mnt/s3files
CreationPermissions:
OwnerUid: 1000
OwnerGid: 1000
Permissions: "755"
# ── S3 Files Mount Target (one per private subnet) ────────────────────────
S3FilesMountTarget1:
Type: AWS::S3Files::MountTarget
Properties:
FileSystemId: !Ref S3FilesFileSystem
SubnetId: !Ref PrivateSubnet1
SecurityGroups:
- !Ref AgentCoreSecurityGroup
S3FilesMountTarget2:
Type: AWS::S3Files::MountTarget
Properties:
FileSystemId: !Ref S3FilesFileSystem
SubnetId: !Ref PrivateSubnet2
SecurityGroups:
- !Ref AgentCoreSecurityGroup
Outputs:
VpcId:
Value: !Ref VPC
PrivateSubnet1Id:
Value: !Ref PrivateSubnet1
PrivateSubnet2Id:
Value: !Ref PrivateSubnet2
SecurityGroupId:
Value: !Ref AgentCoreSecurityGroup
S3FilesFileSystemId:
Value: !Ref S3FilesFileSystem
S3FilesAccessPointId:
Value: !Ref S3FilesAccessPoint
S3FilesAccessPointArn:
Value: !GetAtt S3FilesAccessPoint.AccessPointArn
S3FilesRoleArn:
Value: !GetAtt S3FilesRole.Arn
@@ -0,0 +1,120 @@
"""
Clean up all resources created by setup.sh and deploy.py.
Deletes everything except the S3 bucket:
- AgentCore Runtime endpoint and runtime
- AgentCore IAM execution role
- CloudFormation stack (VPC, S3 Files, security group, NAT, etc.)
Usage:
python cleanup.py
"""
import json
import os
import sys
import time
import boto3
def load_config(filename):
path = os.path.join(os.path.dirname(__file__), filename)
if not os.path.exists(path):
return {}
with open(path) as f:
if filename.endswith(".json"):
return json.load(f)
cfg = {}
for line in f:
line = line.strip()
if line and "=" in line and not line.startswith("#"):
key, value = line.split("=", 1)
cfg[key] = value.strip('"').strip("'")
return cfg
def main():
runtime_cfg = load_config("runtime_config.json")
env_cfg = load_config("envvars.config")
if not runtime_cfg:
print("Error: runtime_config.json not found.")
sys.exit(1)
agent_name = runtime_cfg["agent_name"]
runtime_id = runtime_cfg["runtime_id"]
region = runtime_cfg["region"]
stack_name = env_cfg.get("AGENTCORE_STACK_NAME", "agentcore-claude-code")
session = boto3.Session(region_name=region)
account_id = session.client("sts").get_caller_identity()["Account"]
control = session.client("bedrock-agentcore-control", region_name=region)
iam = session.client("iam")
cfn = session.client("cloudformation")
print(f"Cleaning up resources for: {agent_name}\n")
# 1. Delete AgentCore endpoints
try:
endpoints = control.list_agent_runtime_endpoints(agentRuntimeId=runtime_id)
for ep in endpoints.get("runtimeEndpoints", []):
name = ep["name"]
if name == "DEFAULT":
continue
print(f" Deleting endpoint: {name}")
control.delete_agent_runtime_endpoint(
agentRuntimeId=runtime_id, endpointName=name
)
if endpoints.get("runtimeEndpoints"):
print(" Waiting for endpoint deletion...")
time.sleep(30)
except Exception as e:
print(f" Warning: {e}")
# 2. Delete AgentCore runtime
try:
print(f" Deleting runtime: {runtime_id}")
control.delete_agent_runtime(agentRuntimeId=runtime_id)
print(" Waiting for runtime deletion...")
time.sleep(30)
except Exception as e:
print(f" Warning: {e}")
# 3. Delete AgentCore IAM execution role
role_name = f"agentcore-{agent_name}-role"
try:
policies = iam.list_role_policies(RoleName=role_name)
for policy_name in policies.get("PolicyNames", []):
iam.delete_role_policy(RoleName=role_name, PolicyName=policy_name)
iam.delete_role(RoleName=role_name)
print(f" Deleted IAM role: {role_name}")
except iam.exceptions.NoSuchEntityException:
print(f" IAM role not found: {role_name}")
except Exception as e:
print(f" Warning: {e}")
# 4. Delete CloudFormation stack (VPC, S3 Files, SG, NAT, etc.)
try:
print(f" Deleting CloudFormation stack: {stack_name}")
cfn.delete_stack(StackName=stack_name)
print(" Waiting for stack deletion (this may take a few minutes)...")
waiter = cfn.get_waiter("stack_delete_complete")
waiter.wait(StackName=stack_name, WaiterConfig={"Delay": 15, "MaxAttempts": 40})
print(f" Stack deleted: {stack_name}")
except Exception as e:
print(f" Warning: {e}")
# 5. Remove local config files
for f in ["runtime_config.json", "envvars.config"]:
path = os.path.join(os.path.dirname(__file__), f)
if os.path.exists(path):
os.remove(path)
bucket_name = f"agentcore-{account_id}"
print(f"\nCleanup complete for {agent_name}")
print(f" (bucket s3://{bucket_name} was kept)")
if __name__ == "__main__":
main()
@@ -0,0 +1,15 @@
#!/usr/bin/env bash
set -euo pipefail
SCRIPT_DIR="$(cd "$(dirname "$0")" && pwd)"
cd "$SCRIPT_DIR"
echo "============================================================"
echo " AgentCore Cleanup"
echo "============================================================"
echo ""
python3 cleanup.py
echo ""
echo "Done."
@@ -0,0 +1,359 @@
"""
Deploy Claude Code agent to AgentCore Runtime using a container image from ECR.
Run setup.sh first, then:
python deploy.py
Reads configuration from envvars.config (created by setup.sh).
"""
import json
import os
import sys
import time
import boto3
# ── Load config ──────────────────────────────────────────────────────────────
def load_dotconfig():
config_path = os.path.join(os.path.dirname(__file__), "envvars.config")
cfg = {}
if os.path.exists(config_path):
with open(config_path) as f:
for line in f:
line = line.strip()
if line and "=" in line and not line.startswith("#"):
key, value = line.split("=", 1)
cfg[key] = value
return cfg
file_cfg = load_dotconfig()
def cfg(key, default=None):
return file_cfg.get(key) or os.environ.get(key) or default
# ── Configuration ────────────────────────────────────────────────────────────
REGION = cfg("AGENTCORE_REGION", boto3.session.Session().region_name or "us-west-2")
session = boto3.Session(region_name=REGION)
ACCOUNT_ID = session.client("sts").get_caller_identity()["Account"]
AGENT_NAME = cfg("AGENTCORE_AGENT_NAME", f"claude_code_{int(time.time()) % 100000}")
ECR_URI = cfg("AGENTCORE_ECR_URI")
SUBNET_1 = cfg("AGENTCORE_SUBNET_1")
SUBNET_2 = cfg("AGENTCORE_SUBNET_2")
SECURITY_GROUP = cfg("AGENTCORE_SECURITY_GROUP")
S3FILES_AP_ARN = cfg("AGENTCORE_S3FILES_AP_ARN")
S3FILES_BUCKET = cfg("AGENTCORE_S3FILES_BUCKET", f"agentcore-{ACCOUNT_ID}")
if not ECR_URI:
print("Error: AGENTCORE_ECR_URI not found. Run setup.sh first.")
sys.exit(1)
if not all([SUBNET_1, SUBNET_2, SECURITY_GROUP]):
print("Error: VPC config (subnets, security group) not found. Run setup.sh first.")
sys.exit(1)
PROTOCOL = "HTTP"
S3FILES_MOUNT_PATH = "/mnt/s3files"
print(f"Region: {REGION}")
print(f"Account: {ACCOUNT_ID}")
print(f"Agent: {AGENT_NAME}")
print(f"Image: {ECR_URI}")
print(f"Subnets: {SUBNET_1}, {SUBNET_2}")
print(f"SG: {SECURITY_GROUP}")
if S3FILES_AP_ARN:
print(f"S3 Files: {S3FILES_AP_ARN}")
print(f"Mount: {S3FILES_MOUNT_PATH}")
# ── Step 1: Create IAM Execution Role ────────────────────────────────────────
def create_execution_role() -> str:
iam = session.client("iam")
role_name = f"agentcore-{AGENT_NAME}-role"
trust_policy = {
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {"Service": "bedrock-agentcore.amazonaws.com"},
"Action": "sts:AssumeRole",
"Condition": {"StringEquals": {"aws:SourceAccount": ACCOUNT_ID}},
},
{
"Sid": "AllowS3FilesAssumeRole",
"Effect": "Allow",
"Principal": {"Service": "elasticfilesystem.amazonaws.com"},
"Action": "sts:AssumeRole",
"Condition": {
"StringEquals": {"aws:SourceAccount": ACCOUNT_ID},
"ArnLike": {"aws:SourceArn": f"arn:aws:s3files:{REGION}:{ACCOUNT_ID}:file-system/*"},
},
},
],
}
inline_policy = {
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": ["logs:DescribeLogStreams", "logs:CreateLogGroup"],
"Resource": [f"arn:aws:logs:{REGION}:{ACCOUNT_ID}:log-group:/aws/bedrock-agentcore/runtimes/*"],
},
{
"Effect": "Allow",
"Action": ["logs:DescribeLogGroups"],
"Resource": [f"arn:aws:logs:{REGION}:{ACCOUNT_ID}:log-group:*"],
},
{
"Effect": "Allow",
"Action": ["logs:CreateLogStream", "logs:PutLogEvents"],
"Resource": [f"arn:aws:logs:{REGION}:{ACCOUNT_ID}:log-group:/aws/bedrock-agentcore/runtimes/*:log-stream:*"],
},
{
"Effect": "Allow",
"Action": ["xray:PutTraceSegments", "xray:PutTelemetryRecords", "xray:GetSamplingRules", "xray:GetSamplingTargets"],
"Resource": ["*"],
},
{
"Effect": "Allow",
"Action": "cloudwatch:PutMetricData",
"Resource": "*",
"Condition": {"StringEquals": {"cloudwatch:namespace": "bedrock-agentcore"}},
},
{
"Sid": "BedrockModelInvocation",
"Effect": "Allow",
"Action": ["bedrock:InvokeModel", "bedrock:InvokeModelWithResponseStream"],
"Resource": ["arn:aws:bedrock:*::foundation-model/*", f"arn:aws:bedrock:{REGION}:{ACCOUNT_ID}:*"],
},
{
"Sid": "ECRPull",
"Effect": "Allow",
"Action": ["ecr:GetAuthorizationToken"],
"Resource": ["*"],
},
{
"Sid": "ECRImage",
"Effect": "Allow",
"Action": ["ecr:BatchGetImage", "ecr:GetDownloadUrlForLayer"],
"Resource": [f"arn:aws:ecr:{REGION}:{ACCOUNT_ID}:repository/agentcore-claude-code"],
},
{
"Sid": "S3Files",
"Effect": "Allow",
"Action": [
"s3files:GetAccessPoint",
"s3files:GetFileSystem",
"s3files:GetMountTarget",
"s3files:DescribeMountTargets",
"s3files:ListMountTargets",
"s3files:ClientMount",
"s3files:ClientWrite",
"s3files:ClientRootAccess",
],
"Resource": [
S3FILES_AP_ARN,
S3FILES_AP_ARN.rsplit("/access-point/", 1)[0],
],
},
{
"Sid": "EFSClientAccess",
"Effect": "Allow",
"Action": [
"elasticfilesystem:ClientMount",
"elasticfilesystem:ClientWrite",
],
"Resource": f"arn:aws:elasticfilesystem:{REGION}:{ACCOUNT_ID}:file-system/*",
"Condition": {
"ArnLike": {
"elasticfilesystem:AccessPointArn": f"arn:aws:elasticfilesystem:{REGION}:{ACCOUNT_ID}:access-point/*",
}
},
},
{
"Sid": "EFSDescribe",
"Effect": "Allow",
"Action": [
"elasticfilesystem:DescribeAccessPoints",
"elasticfilesystem:DescribeMountTargets",
],
"Resource": [
f"arn:aws:elasticfilesystem:{REGION}:{ACCOUNT_ID}:file-system/*",
f"arn:aws:elasticfilesystem:{REGION}:{ACCOUNT_ID}:access-point/*",
],
},
{
"Sid": "S3BucketPermissions",
"Effect": "Allow",
"Action": [
"s3:ListBucket",
"s3:ListBucketVersions",
],
"Resource": [f"arn:aws:s3:::{S3FILES_BUCKET}"],
"Condition": {"StringEquals": {"aws:ResourceAccount": ACCOUNT_ID}},
},
{
"Sid": "S3ObjectPermissions",
"Effect": "Allow",
"Action": [
"s3:AbortMultipartUpload",
"s3:DeleteObject*",
"s3:GetObject*",
"s3:List*",
"s3:PutObject*",
],
"Resource": [f"arn:aws:s3:::{S3FILES_BUCKET}/*"],
"Condition": {"StringEquals": {"aws:ResourceAccount": ACCOUNT_ID}},
},
{
"Sid": "EventBridgeManage",
"Effect": "Allow",
"Action": [
"events:DeleteRule",
"events:DisableRule",
"events:EnableRule",
"events:PutRule",
"events:PutTargets",
"events:RemoveTargets",
],
"Resource": ["arn:aws:events:*:*:rule/DO-NOT-DELETE-S3-Files*"],
"Condition": {"StringEquals": {"events:ManagedBy": "elasticfilesystem.amazonaws.com"}},
},
{
"Sid": "EventBridgeRead",
"Effect": "Allow",
"Action": [
"events:DescribeRule",
"events:ListRuleNamesByTarget",
"events:ListRules",
"events:ListTargetsByRule",
],
"Resource": ["arn:aws:events:*:*:rule/*"],
},
],
}
try:
resp = iam.create_role(
RoleName=role_name,
AssumeRolePolicyDocument=json.dumps(trust_policy),
Description=f"Execution role for {AGENT_NAME}",
)
role_arn = resp["Role"]["Arn"]
print(f"\nCreated IAM role: {role_arn}")
except iam.exceptions.EntityAlreadyExistsException:
role_arn = f"arn:aws:iam::{ACCOUNT_ID}:role/{role_name}"
print(f"\nIAM role exists: {role_arn}")
iam.put_role_policy(
RoleName=role_name,
PolicyName=f"{AGENT_NAME}-policy",
PolicyDocument=json.dumps(inline_policy),
)
print("Waiting 10s for IAM propagation...")
time.sleep(10)
return role_arn
# ── Step 2: Create AgentCore Runtime (VPC + container + S3 Files) ────────────
def create_runtime(role_arn: str) -> dict:
control = session.client("bedrock-agentcore-control", region_name=REGION)
create_params = dict(
agentRuntimeName=AGENT_NAME,
agentRuntimeArtifact={
"containerConfiguration": {
"containerUri": ECR_URI,
}
},
roleArn=role_arn,
networkConfiguration={
"networkMode": "VPC",
"networkModeConfig": {
"subnets": [SUBNET_1, SUBNET_2],
"securityGroups": [SECURITY_GROUP],
},
},
protocolConfiguration={"serverProtocol": PROTOCOL},
description="Claude Code agent on AgentCore Runtime with S3 Files",
)
if S3FILES_AP_ARN:
create_params["filesystemConfigurations"] = [
{
"s3FilesAccessPoint": {
"accessPointArn": S3FILES_AP_ARN,
"mountPath": S3FILES_MOUNT_PATH,
}
}
]
print(f"\nCreating AgentCore Runtime '{AGENT_NAME}'...")
response = control.create_agent_runtime(**create_params)
runtime_id = response["agentRuntimeId"]
runtime_arn = response["agentRuntimeArn"]
print(f"Runtime created: {runtime_id}")
print("Waiting for runtime to be ready...")
while True:
status_resp = control.get_agent_runtime(agentRuntimeId=runtime_id)
status = status_resp["status"]
print(f" Status: {status}")
if status == "READY":
break
if status in ("CREATE_FAILED", "UPDATE_FAILED"):
print(f"Failed: {status_resp.get('failureReason', 'Unknown')}")
sys.exit(1)
time.sleep(15)
return {"runtime_id": runtime_id, "runtime_arn": runtime_arn}
# ── Main ─────────────────────────────────────────────────────────────────────
def main():
print("=" * 60)
print(f"Deploying {AGENT_NAME} to AgentCore Runtime")
print(" (VPC mode + container + S3 Files)")
print("=" * 60)
role_arn = create_execution_role()
runtime = create_runtime(role_arn)
config = {
"agent_name": AGENT_NAME,
"runtime_id": runtime["runtime_id"],
"runtime_arn": runtime["runtime_arn"],
"region": REGION,
"ecr_uri": ECR_URI,
}
if S3FILES_AP_ARN:
config["s3files_access_point_arn"] = S3FILES_AP_ARN
config["s3files_mount_path"] = S3FILES_MOUNT_PATH
config_path = os.path.join(os.path.dirname(__file__), "runtime_config.json")
with open(config_path, "w") as f:
json.dump(config, f, indent=2)
print("\n" + "=" * 60)
print("Deployment complete!")
print(f" Runtime ARN: {runtime['runtime_arn']}")
if S3FILES_AP_ARN:
print(f" S3 Files mounted at: {S3FILES_MOUNT_PATH}")
print(" Config saved to: runtime_config.json")
print("\n Test with: python invoke.py")
print("=" * 60)
if __name__ == "__main__":
main()
@@ -0,0 +1,112 @@
"""
Execute a shell command on a running AgentCore Runtime session and stream output.
Usage:
python exec_cmd.py --session <id> "ls -la /mnt/s3files"
"""
import json
import os
import sys
import boto3
from botocore.config import Config
# ── Load config ──────────────────────────────────────────────────────────────
def load_dotconfig():
config_path = os.path.join(os.path.dirname(__file__), "envvars.config")
cfg = {}
if os.path.exists(config_path):
with open(config_path) as f:
for line in f:
line = line.strip()
if line and "=" in line and not line.startswith("#"):
key, value = line.split("=", 1)
cfg[key] = value.strip('"').strip("'")
return cfg
file_cfg = load_dotconfig()
def cfg(key, default=None):
return file_cfg.get(key) or os.environ.get(key) or default
# ── Configuration ────────────────────────────────────────────────────────────
REGION = cfg("AGENTCORE_REGION", "us-west-2")
def load_config() -> dict:
config_path = os.path.join(os.path.dirname(__file__), "runtime_config.json")
try:
with open(config_path) as f:
return json.load(f)
except FileNotFoundError:
print("Error: runtime_config.json not found. Run deploy.py first.")
sys.exit(1)
def exec_command(runtime_arn: str, session_id: str, command: str):
client = boto3.client(
"bedrock-agentcore",
region_name=REGION,
config=Config(read_timeout=900),
)
body = {"command": command}
response = client.invoke_agent_runtime_command(
agentRuntimeArn=runtime_arn,
runtimeSessionId=session_id,
body=body,
)
request_id = response.get("ResponseMetadata", {}).get("RequestId", "N/A")
print(f"Session: {response.get('runtimeSessionId', 'N/A')}")
print(f"Request ID: {request_id}")
print(f"Status: {response.get('statusCode', 'N/A')}")
print()
for event in response["stream"]:
chunk = event.get("chunk", {})
if "contentDelta" in chunk:
delta = chunk["contentDelta"]
print(delta.get("stdout", ""), end="", flush=True)
print(delta.get("stderr", ""), end="", flush=True)
def main():
config = load_config()
runtime_arn = config["runtime_arn"]
args = sys.argv[1:]
session_id = None
if "--session" in args:
idx = args.index("--session")
session_id = args[idx + 1]
args = args[:idx] + args[idx + 2:]
if not session_id:
session_id = os.environ.get("SESSION_ID")
if not session_id:
print("Error: session ID required. Use --session <id> or set SESSION_ID env var.")
sys.exit(1)
if not args:
print("Usage: python exec_cmd.py --session <id> '<command>'")
sys.exit(1)
command = " ".join(args)
print(f"Runtime: {runtime_arn}")
print(f"Command: {command}")
print()
exec_command(runtime_arn, session_id, command)
if __name__ == "__main__":
main()
@@ -0,0 +1,130 @@
"""
Invoke the Claude Code agent deployed on AgentCore Runtime.
Usage:
python invoke.py
python invoke.py "Write a Python function that sorts a list"
python invoke.py --session <id> "Now add type hints to it"
"""
import json
import os
import sys
import uuid
import boto3
from botocore.config import Config
from botocore.exceptions import ClientError
# ── Load config ──────────────────────────────────────────────────────────────
def load_dotconfig():
config_path = os.path.join(os.path.dirname(__file__), "envvars.config")
cfg = {}
if os.path.exists(config_path):
with open(config_path) as f:
for line in f:
line = line.strip()
if line and "=" in line and not line.startswith("#"):
key, value = line.split("=", 1)
cfg[key] = value.strip('"').strip("'")
return cfg
file_cfg = load_dotconfig()
def cfg(key, default=None):
return file_cfg.get(key) or os.environ.get(key) or default
# ── Configuration ────────────────────────────────────────────────────────────
REGION = cfg("AGENTCORE_REGION", "us-west-2")
def load_config() -> dict:
config_path = os.path.join(os.path.dirname(__file__), "runtime_config.json")
try:
with open(config_path) as f:
return json.load(f)
except FileNotFoundError:
print("Error: runtime_config.json not found. Run deploy.py first.")
sys.exit(1)
def invoke(runtime_arn: str, prompt: str, region: str, session_id: str = None) -> dict:
client = boto3.client(
"bedrock-agentcore",
region_name=region,
config=Config(read_timeout=900),
)
if not session_id:
session_id = str(uuid.uuid4())
payload_data = {"prompt": prompt}
try:
response = client.invoke_agent_runtime(
agentRuntimeArn=runtime_arn,
runtimeSessionId=session_id,
payload=json.dumps(payload_data).encode("utf-8"),
)
except ClientError as exc:
request_id = exc.response.get("ResponseMetadata", {}).get("RequestId", "N/A")
print(f" Session ID: {session_id}")
print(f" Request ID: {request_id}")
print(f" Error: {exc}")
return {"_runtimeSessionId": session_id}
body = json.loads(response["response"].read().decode("utf-8"))
runtime_session = response.get("runtimeSessionId", session_id)
request_id = response.get("ResponseMetadata", {}).get("RequestId", "N/A")
print(f" Session ID: {runtime_session}")
print(f" Request ID: {request_id}")
print(f" Status: {response.get('statusCode', 'N/A')}")
body["_runtimeSessionId"] = runtime_session
return body
def main():
config = load_config()
runtime_arn = config["runtime_arn"]
region = config["region"]
args = sys.argv[1:]
session_id = None
if "--session" in args:
idx = args.index("--session")
session_id = args[idx + 1]
args = args[:idx] + args[idx + 2:]
if args:
prompts = [" ".join(args)]
else:
prompts = [
"What is 2 + 2?",
"Now multiply that result by 10.",
]
print(f"Invoking agent: {runtime_arn}")
if session_id:
print(f"Resuming session: {session_id}")
print()
for prompt in prompts:
print(f"--- Prompt: {prompt}")
result = invoke(runtime_arn, prompt, region, session_id)
print(f"--- Response:\n{result.get('response', result)}")
session_id = result.get("_runtimeSessionId", session_id)
print(f"--- Session ID: {session_id}")
print()
if session_id:
print("To continue this conversation:")
print(f" python invoke.py --session {session_id} \"your next prompt\"")
if __name__ == "__main__":
main()
@@ -0,0 +1,9 @@
{
"agent_name": "claude_code_89491",
"runtime_id": "claude_code_89491-F84ySyFzSd",
"runtime_arn": "arn:aws:bedrock-agentcore:us-east-1:623129719707:runtime/claude_code_89491-F84ySyFzSd",
"region": "us-east-1",
"ecr_uri": "623129719707.dkr.ecr.us-east-1.amazonaws.com/agentcore-claude-code:claude_code_89491",
"s3files_access_point_arn": "arn:aws:s3files:us-east-1:623129719707:file-system/fs-0c276db1ba6347ec1/access-point/fsap-0e1b5d94dab8da3db",
"s3files_mount_path": "/mnt/s3files"
}
@@ -0,0 +1,85 @@
const http = require("http");
const { spawn } = require("child_process");
const PORT = process.env.PORT || 8080;
function runClaude(prompt, sessionId) {
return new Promise((resolve, reject) => {
const args = ["-p", "--dangerously-skip-permissions", "--output-format", "json"];
if (sessionId) {
args.push("--resume", sessionId);
} else {
args.push("--continue");
}
args.push(prompt);
console.log(`[runClaude] sessionId=${sessionId || "(none, --continue)"} prompt="${prompt}"`);
console.log(`[runClaude] args: ${JSON.stringify(args)}`);
const proc = spawn("claude", args, {
env: { ...process.env, HOME: "/home/agent" },
cwd: "/home/agent",
timeout: 300_000,
});
let stdout = "";
let stderr = "";
proc.stdout.on("data", (d) => (stdout += d));
proc.stderr.on("data", (d) => (stderr += d));
proc.on("close", (code) => {
if (code !== 0) {
reject(new Error(`claude exited ${code}: ${stderr}`));
return;
}
try {
const parsed = JSON.parse(stdout);
resolve({
response: parsed.result || stdout.trim(),
sessionId: parsed.session_id || null,
});
} catch {
resolve({ response: stdout.trim(), sessionId: null });
}
});
proc.on("error", reject);
});
}
function readBody(req) {
return new Promise((resolve) => {
let data = "";
req.on("data", (chunk) => (data += chunk));
req.on("end", () => resolve(data));
});
}
const server = http.createServer(async (req, res) => {
if (req.method === "GET") {
res.writeHead(200, { "Content-Type": "application/json" });
res.end(JSON.stringify({ status: "healthy" }));
return;
}
if (req.method === "POST") {
try {
const body = await readBody(req);
const { prompt, sessionId } = JSON.parse(body);
const result = await runClaude(prompt, sessionId);
res.writeHead(200, { "Content-Type": "application/json" });
res.end(JSON.stringify(result));
} catch (err) {
res.writeHead(500, { "Content-Type": "application/json" });
res.end(JSON.stringify({ error: err.message }));
}
return;
}
res.writeHead(405);
res.end();
});
server.listen(PORT, () => {
console.log(`Claude Code agent listening on port ${PORT}`);
});
@@ -0,0 +1,136 @@
#!/usr/bin/env bash
set -euo pipefail
REGION="${1:-us-west-2}"
STACK_NAME="agentcore-claude-code-demo"
ACCOUNT_ID=$(aws sts get-caller-identity --query Account --output text)
BUCKET_NAME="agentcore-${ACCOUNT_ID}"
AGENT_NAME="claude_code_$(date +%s | tail -c 6)"
ECR_REPO="agentcore-claude-code"
IMAGE_TAG="${AGENT_NAME}"
ECR_URI="${ACCOUNT_ID}.dkr.ecr.${REGION}.amazonaws.com/${ECR_REPO}:${IMAGE_TAG}"
echo "Region: ${REGION}"
echo "Account: ${ACCOUNT_ID}"
echo "Bucket: ${BUCKET_NAME}"
echo "Agent: ${AGENT_NAME}"
echo "Stack: ${STACK_NAME}"
# ── Create S3 bucket (skip if it already exists) ─────────────────────────────
if aws s3api head-bucket --bucket "${BUCKET_NAME}" 2>/dev/null; then
echo "Bucket already exists: ${BUCKET_NAME}"
else
echo "Creating bucket: ${BUCKET_NAME}"
if [ "${REGION}" = "us-east-1" ]; then
aws s3api create-bucket --bucket "${BUCKET_NAME}" --region "${REGION}"
else
aws s3api create-bucket \
--bucket "${BUCKET_NAME}" \
--region "${REGION}" \
--create-bucket-configuration LocationConstraint="${REGION}"
fi
echo "Bucket created: ${BUCKET_NAME}"
fi
aws s3api put-bucket-versioning \
--bucket "${BUCKET_NAME}" \
--versioning-configuration Status=Enabled \
--region "${REGION}"
echo "Bucket versioning enabled"
# ── Deploy CloudFormation stack ──────────────────────────────────────────────
echo ""
echo "Deploying CloudFormation stack: ${STACK_NAME}..."
aws cloudformation deploy \
--template-file cfn-vpc.yaml \
--stack-name "${STACK_NAME}" \
--region "${REGION}" \
--parameter-overrides BucketName="${BUCKET_NAME}" \
--capabilities CAPABILITY_NAMED_IAM \
--no-fail-on-empty-changeset
echo "Reading stack outputs..."
CFN_OUTPUTS=$(aws cloudformation describe-stacks \
--stack-name "${STACK_NAME}" \
--region "${REGION}" \
--query "Stacks[0].Outputs" \
--output json)
get_output() {
echo "${CFN_OUTPUTS}" | python3 -c "
import json, sys
outputs = json.load(sys.stdin)
for o in outputs:
if o['OutputKey'] == '$1':
print(o['OutputValue'])
break
"
}
VPC_ID=$(get_output VpcId)
PRIVATE_SUBNET_1=$(get_output PrivateSubnet1Id)
PRIVATE_SUBNET_2=$(get_output PrivateSubnet2Id)
SECURITY_GROUP_ID=$(get_output SecurityGroupId)
S3FILES_FS_ID=$(get_output S3FilesFileSystemId)
S3FILES_AP_ID=$(get_output S3FilesAccessPointId)
S3FILES_AP_ARN=$(get_output S3FilesAccessPointArn)
echo " VPC: ${VPC_ID}"
echo " Private Subnet1: ${PRIVATE_SUBNET_1}"
echo " Private Subnet2: ${PRIVATE_SUBNET_2}"
echo " Security Group: ${SECURITY_GROUP_ID}"
echo " S3 Files FS: ${S3FILES_FS_ID}"
echo " S3 Files AP: ${S3FILES_AP_ID}"
# ── Create ECR repository (skip if it already exists) ────────────────────────
if aws ecr describe-repositories --repository-names "${ECR_REPO}" --region "${REGION}" >/dev/null 2>&1; then
echo "ECR repo already exists: ${ECR_REPO}"
else
echo "Creating ECR repo: ${ECR_REPO}"
aws ecr create-repository --repository-name "${ECR_REPO}" --region "${REGION}"
echo "ECR repo created"
fi
# ── Build arm64 Docker image and push to ECR ─────────────────────────────────
echo "Logging into ECR..."
aws ecr get-login-password --region "${REGION}" | \
docker login --username AWS --password-stdin "${ACCOUNT_ID}.dkr.ecr.${REGION}.amazonaws.com"
echo "Building arm64 Docker image..."
docker buildx build \
--platform linux/arm64 \
-t "${ECR_URI}" \
-f Dockerfile \
--push \
.
echo "Image pushed: ${ECR_URI}"
# ── Save config ──────────────────────────────────────────────────────────────
cat > envvars.config <<CFGEOF
AGENTCORE_BUCKET=${BUCKET_NAME}
AGENTCORE_AGENT_NAME=${AGENT_NAME}
AGENTCORE_REGION=${REGION}
AGENTCORE_ECR_URI=${ECR_URI}
AGENTCORE_STACK_NAME=${STACK_NAME}
AGENTCORE_VPC_ID=${VPC_ID}
AGENTCORE_SUBNET_1=${PRIVATE_SUBNET_1}
AGENTCORE_SUBNET_2=${PRIVATE_SUBNET_2}
AGENTCORE_SECURITY_GROUP=${SECURITY_GROUP_ID}
AGENTCORE_S3FILES_FS_ID=${S3FILES_FS_ID}
AGENTCORE_S3FILES_AP_ID=${S3FILES_AP_ID}
AGENTCORE_S3FILES_AP_ARN=${S3FILES_AP_ARN}
CFGEOF
echo ""
echo "Config saved to envvars.config:"
cat envvars.config
echo ""
echo "Run the deploy script next:"
echo " python deploy.py"
@@ -0,0 +1,153 @@
"""
Update an existing AgentCore Runtime (e.g. after changing access point, image, or config).
Reads runtime_id from runtime_config.json and current settings from envvars.config.
Usage:
python update.py
"""
import json
import os
import sys
import time
import boto3
# ── Load config ──────────────────────────────────────────────────────────────
def load_dotconfig():
config_path = os.path.join(os.path.dirname(__file__), "envvars.config")
cfg = {}
if os.path.exists(config_path):
with open(config_path) as f:
for line in f:
line = line.strip()
if line and "=" in line and not line.startswith("#"):
key, value = line.split("=", 1)
cfg[key] = value.strip('"').strip("'")
return cfg
file_cfg = load_dotconfig()
def cfg(key, default=None):
return file_cfg.get(key) or os.environ.get(key) or default
# ── Configuration ────────────────────────────────────────────────────────────
REGION = cfg("AGENTCORE_REGION", boto3.session.Session().region_name or "us-west-2")
session = boto3.Session(region_name=REGION)
ACCOUNT_ID = session.client("sts").get_caller_identity()["Account"]
ECR_URI = cfg("AGENTCORE_ECR_URI")
SUBNET_1 = cfg("AGENTCORE_SUBNET_1")
SUBNET_2 = cfg("AGENTCORE_SUBNET_2")
SECURITY_GROUP = cfg("AGENTCORE_SECURITY_GROUP")
S3FILES_AP_ARN = cfg("AGENTCORE_S3FILES_AP_ARN")
PROTOCOL = "HTTP"
S3FILES_MOUNT_PATH = "/mnt/s3files"
def load_runtime_config() -> dict:
config_path = os.path.join(os.path.dirname(__file__), "runtime_config.json")
try:
with open(config_path) as f:
return json.load(f)
except FileNotFoundError:
print("Error: runtime_config.json not found. Run deploy.py first.")
sys.exit(1)
def update_runtime(runtime_id: str, role_arn: str) -> dict:
control = session.client("bedrock-agentcore-control", region_name=REGION)
update_params = dict(
agentRuntimeId=runtime_id,
agentRuntimeArtifact={
"containerConfiguration": {
"containerUri": ECR_URI,
}
},
roleArn=role_arn,
networkConfiguration={
"networkMode": "VPC",
"networkModeConfig": {
"subnets": [SUBNET_1, SUBNET_2],
"securityGroups": [SECURITY_GROUP],
},
},
protocolConfiguration={"serverProtocol": PROTOCOL},
description="Claude Code agent on AgentCore Runtime with S3 Files",
)
if S3FILES_AP_ARN:
update_params["filesystemConfigurations"] = [
{
"s3FilesAccessPoint": {
"accessPointArn": S3FILES_AP_ARN,
"mountPath": S3FILES_MOUNT_PATH,
}
}
]
print(f"\nUpdating AgentCore Runtime '{runtime_id}'...")
response = control.update_agent_runtime(**update_params)
runtime_arn = response["agentRuntimeArn"]
print(f"Update initiated: {runtime_id}")
print("Waiting for runtime to be ready...")
while True:
status_resp = control.get_agent_runtime(agentRuntimeId=runtime_id)
status = status_resp["status"]
print(f" Status: {status}")
if status == "READY":
break
if status in ("CREATE_FAILED", "UPDATE_FAILED"):
print(f"Failed: {status_resp.get('failureReason', 'Unknown')}")
sys.exit(1)
time.sleep(15)
return {"runtime_id": runtime_id, "runtime_arn": runtime_arn}
def main():
existing = load_runtime_config()
runtime_id = existing["runtime_id"]
role_arn = f"arn:aws:iam::{ACCOUNT_ID}:role/agentcore-{existing['agent_name']}-role"
print("=" * 60)
print(f"Updating runtime: {runtime_id}")
print(f" Image: {ECR_URI}")
print(f" Role: {role_arn}")
if S3FILES_AP_ARN:
print(f" S3 Files: {S3FILES_AP_ARN}")
print(f" Mount: {S3FILES_MOUNT_PATH}")
print("=" * 60)
runtime = update_runtime(runtime_id, role_arn)
existing.update({
"runtime_id": runtime["runtime_id"],
"runtime_arn": runtime["runtime_arn"],
"ecr_uri": ECR_URI,
})
if S3FILES_AP_ARN:
existing["s3files_access_point_arn"] = S3FILES_AP_ARN
existing["s3files_mount_path"] = S3FILES_MOUNT_PATH
config_path = os.path.join(os.path.dirname(__file__), "runtime_config.json")
with open(config_path, "w") as f:
json.dump(existing, f, indent=2)
print("\n" + "=" * 60)
print("Update complete!")
print(f" Runtime ARN: {runtime['runtime_arn']}")
print(" Config saved to: runtime_config.json")
print("=" * 60)
if __name__ == "__main__":
main()
@@ -0,0 +1,122 @@
# Claude Code on AgentCore Runtime with EFS
Deploys Claude Code as an HTTP agent on AWS Bedrock AgentCore Runtime, with an EFS file system mounted at `/mnt/efs` for persistent storage shared across sessions.
## Architecture
```
┌─────────────────────────┐ ┌─────────────────────────┐
│ AgentCore Runtime │ │ AgentCore Runtime │
│ Session A │ │ Session B │
│ (Claude Code) │ │ (Claude Code) │
│ │ │ │
│ /mnt/efs ─────-────────┼────┐ │ /mnt/efs ─────-────────┼────┐
└─────────────────────────┘ │ └─────────────────────────┘ │
│ │
▼ ▼
┌──────────────────────────────────────────────────┐
│ EFS File System (encrypted, generalPurpose) │
│ │
│ ┌────────────────────────┐ │
│ │ EFS Access Point │ │
│ │ (uid/gid 1000, │ │
│ │ root /shared) │ │
│ └────────────────────────┘ │
└──────────────────────────────────────────────────┘
```
Multiple runtime sessions mount the same EFS file system, enabling agents to share skills, results, and data across independent invocations.
```
CloudFormation stack (cfn-vpc.yaml):
VPC, subnets, NAT Gateway, Security Group
EFS file system, access point, mount targets
deploy.py creates:
IAM execution role
AgentCore Runtime (container from ECR, EFS mounted at /mnt/efs)
```
## Prerequisites
### Python environment
```bash
uv venv --python 3.13 .venv
source .venv/bin/activate
uv pip install boto3 awscli --force-reinstall --no-cache-dir
```
## Step-by-step guide
### Step 1 — Infrastructure setup (CloudFormation)
Run the setup script to deploy the CloudFormation stack (VPC, subnets, NAT Gateway, Security Group, EFS), build the arm64 Docker image, and push it to ECR.
```bash
./setup.sh us-west-2
```
All outputs are saved to `envvars.config` and used automatically by the next steps.
### Step 2 — Deploy the agent
Create the IAM execution role and the AgentCore Runtime:
```bash
python deploy.py
```
The script waits until the runtime status is `READY` and saves the runtime config to `runtime_config.json`.
If you need to update an existing runtime (e.g. after rebuilding the Docker image), run:
```bash
python update.py
```
### Step 3 — Invoke the agent
Send a prompt to the deployed agent. The first call creates a new session; subsequent calls can reuse the session ID for conversation continuity.
**Session A** — create a shared skill on the persistent filesystem:
```bash
python invoke.py "can u create a new skill, to review python code? This skill should be created into /mnt/efs/skills/"
```
Continue the conversation within the same session:
```bash
python invoke.py --session <session-a-id> "now add unit tests for that skill"
```
**Session B** — a completely new session accesses the same filesystem and uses the skill created by Session A:
```bash
python invoke.py "list the skills available in /mnt/efs/skills/ and use the python review skill to review this code: def add(a,b): return a+b"
```
Both sessions share `/mnt/efs`, so anything written by one session is immediately available to others.
### Step 4 — Execute a command on the running session
Run a shell command directly on the container using the session ID from the previous step:
```bash
python exec_cmd.py --session <session-id> "ls -l /mnt/efs"
```
### Step 5 — Cleanup
Delete all AgentCore resources (runtime, IAM role) and the CloudFormation stack.
```bash
python cleanup.py
```
Or use the shell wrapper:
```bash
./cleanup.sh
```
@@ -0,0 +1,284 @@
AWSTemplateFormatVersion: "2010-09-09"
Description: VPC infrastructure for AgentCore Claude Code with EFS
Parameters:
VpcCidr:
Type: String
Default: "10.0.0.0/16"
Mappings:
AgentCoreAZs:
us-east-1:
AZ1: use1-az1
AZ2: use1-az2
us-east-2:
AZ1: use2-az1
AZ2: use2-az2
us-west-2:
AZ1: usw2-az1
AZ2: usw2-az2
eu-west-1:
AZ1: euw1-az1
AZ2: euw1-az2
eu-central-1:
AZ1: euc1-az1
AZ2: euc1-az2
eu-north-1:
AZ1: eun1-az1
AZ2: eun1-az2
eu-west-2:
AZ1: euw2-az1
AZ2: euw2-az2
eu-west-3:
AZ1: euw3-az1
AZ2: euw3-az2
ap-southeast-1:
AZ1: apse1-az1
AZ2: apse1-az2
ap-southeast-2:
AZ1: apse2-az1
AZ2: apse2-az2
ap-northeast-1:
AZ1: apne1-az1
AZ2: apne1-az2
ap-northeast-2:
AZ1: apne2-az1
AZ2: apne2-az2
ap-south-1:
AZ1: aps1-az1
AZ2: aps1-az2
ca-central-1:
AZ1: cac1-az1
AZ2: cac1-az2
Resources:
# ── VPC ──────────────────────────────────────────────────────────────────────
VPC:
Type: AWS::EC2::VPC
Properties:
CidrBlock: !Ref VpcCidr
EnableDnsSupport: true
EnableDnsHostnames: true
Tags:
- Key: Name
Value: agentcore-vpc
InternetGateway:
Type: AWS::EC2::InternetGateway
Properties:
Tags:
- Key: Name
Value: agentcore-igw
VPCGatewayAttachment:
Type: AWS::EC2::VPCGatewayAttachment
Properties:
VpcId: !Ref VPC
InternetGatewayId: !Ref InternetGateway
# ── Public Subnets ──────────────────────────────────────────────────────────
PublicSubnet1:
Type: AWS::EC2::Subnet
Properties:
VpcId: !Ref VPC
CidrBlock: "10.0.1.0/24"
AvailabilityZoneId: !FindInMap [AgentCoreAZs, !Ref "AWS::Region", AZ1]
MapPublicIpOnLaunch: true
Tags:
- Key: Name
Value: agentcore-public-1
PublicSubnet2:
Type: AWS::EC2::Subnet
Properties:
VpcId: !Ref VPC
CidrBlock: "10.0.2.0/24"
AvailabilityZoneId: !FindInMap [AgentCoreAZs, !Ref "AWS::Region", AZ2]
MapPublicIpOnLaunch: true
Tags:
- Key: Name
Value: agentcore-public-2
# ── Private Subnets ─────────────────────────────────────────────────────────
PrivateSubnet1:
Type: AWS::EC2::Subnet
Properties:
VpcId: !Ref VPC
CidrBlock: "10.0.11.0/24"
AvailabilityZoneId: !FindInMap [AgentCoreAZs, !Ref "AWS::Region", AZ1]
Tags:
- Key: Name
Value: agentcore-private-1
PrivateSubnet2:
Type: AWS::EC2::Subnet
Properties:
VpcId: !Ref VPC
CidrBlock: "10.0.12.0/24"
AvailabilityZoneId: !FindInMap [AgentCoreAZs, !Ref "AWS::Region", AZ2]
Tags:
- Key: Name
Value: agentcore-private-2
# ── Public Route Table ──────────────────────────────────────────────────────
PublicRouteTable:
Type: AWS::EC2::RouteTable
Properties:
VpcId: !Ref VPC
Tags:
- Key: Name
Value: agentcore-public-rt
PublicRoute:
Type: AWS::EC2::Route
DependsOn: VPCGatewayAttachment
Properties:
RouteTableId: !Ref PublicRouteTable
DestinationCidrBlock: "0.0.0.0/0"
GatewayId: !Ref InternetGateway
PublicSubnet1RTAssoc:
Type: AWS::EC2::SubnetRouteTableAssociation
Properties:
SubnetId: !Ref PublicSubnet1
RouteTableId: !Ref PublicRouteTable
PublicSubnet2RTAssoc:
Type: AWS::EC2::SubnetRouteTableAssociation
Properties:
SubnetId: !Ref PublicSubnet2
RouteTableId: !Ref PublicRouteTable
# ── NAT Gateway ─────────────────────────────────────────────────────────────
NatEIP:
Type: AWS::EC2::EIP
Properties:
Domain: vpc
NatGateway:
Type: AWS::EC2::NatGateway
Properties:
AllocationId: !GetAtt NatEIP.AllocationId
SubnetId: !Ref PublicSubnet1
Tags:
- Key: Name
Value: agentcore-nat
# ── Private Route Table ─────────────────────────────────────────────────────
PrivateRouteTable:
Type: AWS::EC2::RouteTable
Properties:
VpcId: !Ref VPC
Tags:
- Key: Name
Value: agentcore-private-rt
PrivateRoute:
Type: AWS::EC2::Route
Properties:
RouteTableId: !Ref PrivateRouteTable
DestinationCidrBlock: "0.0.0.0/0"
NatGatewayId: !Ref NatGateway
PrivateSubnet1RTAssoc:
Type: AWS::EC2::SubnetRouteTableAssociation
Properties:
SubnetId: !Ref PrivateSubnet1
RouteTableId: !Ref PrivateRouteTable
PrivateSubnet2RTAssoc:
Type: AWS::EC2::SubnetRouteTableAssociation
Properties:
SubnetId: !Ref PrivateSubnet2
RouteTableId: !Ref PrivateRouteTable
# ── Security Group ──────────────────────────────────────────────────────────
AgentCoreSecurityGroup:
Type: AWS::EC2::SecurityGroup
Properties:
GroupDescription: Security group for AgentCore runtimes
VpcId: !Ref VPC
SecurityGroupIngress:
- IpProtocol: tcp
FromPort: 2049
ToPort: 2049
CidrIp: "10.0.0.0/16"
Description: Allow NFS (EFS mount) from VPC
SecurityGroupEgress:
- IpProtocol: "-1"
CidrIp: "0.0.0.0/0"
Description: Allow all outbound
Tags:
- Key: Name
Value: agentcore-sg
# ── EFS File System ─────────────────────────────────────────────────────────
EFSFileSystem:
Type: AWS::EFS::FileSystem
Properties:
Encrypted: true
PerformanceMode: generalPurpose
ThroughputMode: bursting
FileSystemTags:
- Key: Name
Value: agentcore-claude-code-efs
# ── EFS Mount Targets (one per private subnet) ──────────────────────────────
EFSMountTarget1:
Type: AWS::EFS::MountTarget
Properties:
FileSystemId: !Ref EFSFileSystem
SubnetId: !Ref PrivateSubnet1
SecurityGroups:
- !Ref AgentCoreSecurityGroup
EFSMountTarget2:
Type: AWS::EFS::MountTarget
Properties:
FileSystemId: !Ref EFSFileSystem
SubnetId: !Ref PrivateSubnet2
SecurityGroups:
- !Ref AgentCoreSecurityGroup
# ── EFS Access Point ────────────────────────────────────────────────────────
EFSAccessPoint:
Type: AWS::EFS::AccessPoint
Properties:
FileSystemId: !Ref EFSFileSystem
PosixUser:
Uid: "1000"
Gid: "1000"
RootDirectory:
Path: /shared
CreationInfo:
OwnerUid: "1000"
OwnerGid: "1000"
Permissions: "755"
AccessPointTags:
- Key: Name
Value: agentcore-claude-code-ap
Outputs:
VpcId:
Value: !Ref VPC
PrivateSubnet1Id:
Value: !Ref PrivateSubnet1
PrivateSubnet2Id:
Value: !Ref PrivateSubnet2
SecurityGroupId:
Value: !Ref AgentCoreSecurityGroup
EFSFileSystemId:
Value: !Ref EFSFileSystem
EFSAccessPointArn:
Value: !GetAtt EFSAccessPoint.Arn
@@ -0,0 +1,117 @@
"""
Clean up all resources created by setup.sh and deploy.py.
Deletes:
- AgentCore Runtime endpoint and runtime
- AgentCore IAM execution role
- CloudFormation stack (VPC, EFS, security group, NAT, etc.)
Usage:
python cleanup.py
"""
import json
import os
import sys
import time
import boto3
def load_config(filename):
path = os.path.join(os.path.dirname(__file__), filename)
if not os.path.exists(path):
return {}
with open(path) as f:
if filename.endswith(".json"):
return json.load(f)
cfg = {}
for line in f:
line = line.strip()
if line and "=" in line and not line.startswith("#"):
key, value = line.split("=", 1)
cfg[key] = value.strip('"').strip("'")
return cfg
def main():
runtime_cfg = load_config("runtime_config.json")
env_cfg = load_config("envvars.config")
if not runtime_cfg:
print("Error: runtime_config.json not found.")
sys.exit(1)
agent_name = runtime_cfg["agent_name"]
runtime_id = runtime_cfg["runtime_id"]
region = runtime_cfg["region"]
stack_name = env_cfg.get("AGENTCORE_STACK_NAME", "agentcore-claude-code")
session = boto3.Session(region_name=region)
control = session.client("bedrock-agentcore-control", region_name=region)
iam = session.client("iam")
cfn = session.client("cloudformation")
print(f"Cleaning up resources for: {agent_name}\n")
# 1. Delete AgentCore endpoints
try:
endpoints = control.list_agent_runtime_endpoints(agentRuntimeId=runtime_id)
for ep in endpoints.get("runtimeEndpoints", []):
name = ep["name"]
if name == "DEFAULT":
continue
print(f" Deleting endpoint: {name}")
control.delete_agent_runtime_endpoint(
agentRuntimeId=runtime_id, endpointName=name
)
if endpoints.get("runtimeEndpoints"):
print(" Waiting for endpoint deletion...")
time.sleep(30)
except Exception as e:
print(f" Warning: {e}")
# 2. Delete AgentCore runtime
try:
print(f" Deleting runtime: {runtime_id}")
control.delete_agent_runtime(agentRuntimeId=runtime_id)
print(" Waiting for runtime deletion...")
time.sleep(30)
except Exception as e:
print(f" Warning: {e}")
# 3. Delete AgentCore IAM execution role
role_name = f"agentcore-{agent_name}-role"
try:
policies = iam.list_role_policies(RoleName=role_name)
for policy_name in policies.get("PolicyNames", []):
iam.delete_role_policy(RoleName=role_name, PolicyName=policy_name)
iam.delete_role(RoleName=role_name)
print(f" Deleted IAM role: {role_name}")
except iam.exceptions.NoSuchEntityException:
print(f" IAM role not found: {role_name}")
except Exception as e:
print(f" Warning: {e}")
# 4. Delete CloudFormation stack (VPC, EFS, SG, NAT, etc.)
try:
print(f" Deleting CloudFormation stack: {stack_name}")
cfn.delete_stack(StackName=stack_name)
print(" Waiting for stack deletion (this may take a few minutes)...")
waiter = cfn.get_waiter("stack_delete_complete")
waiter.wait(StackName=stack_name, WaiterConfig={"Delay": 15, "MaxAttempts": 40})
print(f" Stack deleted: {stack_name}")
except Exception as e:
print(f" Warning: {e}")
# 5. Remove local config files
for f in ["runtime_config.json", "envvars.config"]:
path = os.path.join(os.path.dirname(__file__), f)
if os.path.exists(path):
os.remove(path)
print(f"\nCleanup complete for {agent_name}")
if __name__ == "__main__":
main()
@@ -0,0 +1,15 @@
#!/usr/bin/env bash
set -euo pipefail
SCRIPT_DIR="$(cd "$(dirname "$0")" && pwd)"
cd "$SCRIPT_DIR"
echo "============================================================"
echo " AgentCore Cleanup"
echo "============================================================"
echo ""
python3 cleanup.py
echo ""
echo "Done."
@@ -0,0 +1,282 @@
"""
Deploy Claude Code agent to AgentCore Runtime using a container image from ECR.
Run setup.sh first, then:
python deploy.py
Reads configuration from envvars.config (created by setup.sh).
"""
import json
import os
import sys
import time
import boto3
# ── Load config ──────────────────────────────────────────────────────────────
def load_dotconfig():
config_path = os.path.join(os.path.dirname(__file__), "envvars.config")
cfg = {}
if os.path.exists(config_path):
with open(config_path) as f:
for line in f:
line = line.strip()
if line and "=" in line and not line.startswith("#"):
key, value = line.split("=", 1)
cfg[key] = value
return cfg
file_cfg = load_dotconfig()
def cfg(key, default=None):
return file_cfg.get(key) or os.environ.get(key) or default
# ── Configuration ────────────────────────────────────────────────────────────
REGION = cfg("AGENTCORE_REGION", boto3.session.Session().region_name or "us-west-2")
session = boto3.Session(region_name=REGION)
ACCOUNT_ID = session.client("sts").get_caller_identity()["Account"]
AGENT_NAME = cfg("AGENTCORE_AGENT_NAME", f"claude_code_{int(time.time()) % 100000}")
ECR_URI = cfg("AGENTCORE_ECR_URI")
SUBNET_1 = cfg("AGENTCORE_SUBNET_1")
SUBNET_2 = cfg("AGENTCORE_SUBNET_2")
SECURITY_GROUP = cfg("AGENTCORE_SECURITY_GROUP")
EFS_AP_ARN = cfg("AGENTCORE_EFS_AP_ARN")
if not ECR_URI:
print("Error: AGENTCORE_ECR_URI not found. Run setup.sh first.")
sys.exit(1)
if not all([SUBNET_1, SUBNET_2, SECURITY_GROUP]):
print("Error: VPC config (subnets, security group) not found. Run setup.sh first.")
sys.exit(1)
PROTOCOL = "HTTP"
EFS_MOUNT_PATH = "/mnt/efs"
print(f"Region: {REGION}")
print(f"Account: {ACCOUNT_ID}")
print(f"Agent: {AGENT_NAME}")
print(f"Image: {ECR_URI}")
print(f"Subnets: {SUBNET_1}, {SUBNET_2}")
print(f"SG: {SECURITY_GROUP}")
if EFS_AP_ARN:
print(f"EFS AP: {EFS_AP_ARN}")
print(f"Mount: {EFS_MOUNT_PATH}")
# ── Step 1: Create IAM Execution Role ────────────────────────────────────────
def create_execution_role() -> str:
iam = session.client("iam")
role_name = f"agentcore-{AGENT_NAME}-role"
trust_policy = {
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {"Service": "bedrock-agentcore.amazonaws.com"},
"Action": "sts:AssumeRole",
"Condition": {"StringEquals": {"aws:SourceAccount": ACCOUNT_ID}},
},
],
}
inline_policy = {
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": ["logs:DescribeLogStreams", "logs:CreateLogGroup"],
"Resource": [f"arn:aws:logs:{REGION}:{ACCOUNT_ID}:log-group:/aws/bedrock-agentcore/runtimes/*"],
},
{
"Effect": "Allow",
"Action": ["logs:DescribeLogGroups"],
"Resource": [f"arn:aws:logs:{REGION}:{ACCOUNT_ID}:log-group:*"],
},
{
"Effect": "Allow",
"Action": ["logs:CreateLogStream", "logs:PutLogEvents"],
"Resource": [f"arn:aws:logs:{REGION}:{ACCOUNT_ID}:log-group:/aws/bedrock-agentcore/runtimes/*:log-stream:*"],
},
{
"Effect": "Allow",
"Action": ["xray:PutTraceSegments", "xray:PutTelemetryRecords", "xray:GetSamplingRules", "xray:GetSamplingTargets"],
"Resource": ["*"],
},
{
"Effect": "Allow",
"Action": "cloudwatch:PutMetricData",
"Resource": "*",
"Condition": {"StringEquals": {"cloudwatch:namespace": "bedrock-agentcore"}},
},
{
"Sid": "BedrockModelInvocation",
"Effect": "Allow",
"Action": ["bedrock:InvokeModel", "bedrock:InvokeModelWithResponseStream"],
"Resource": ["arn:aws:bedrock:*::foundation-model/*", f"arn:aws:bedrock:{REGION}:{ACCOUNT_ID}:*"],
},
{
"Sid": "ECRPull",
"Effect": "Allow",
"Action": ["ecr:GetAuthorizationToken"],
"Resource": ["*"],
},
{
"Sid": "ECRImage",
"Effect": "Allow",
"Action": ["ecr:BatchGetImage", "ecr:GetDownloadUrlForLayer"],
"Resource": [f"arn:aws:ecr:{REGION}:{ACCOUNT_ID}:repository/agentcore-claude-code"],
},
{
"Sid": "EFSClientAccess",
"Effect": "Allow",
"Action": [
"elasticfilesystem:ClientMount",
"elasticfilesystem:ClientWrite",
],
"Resource": f"arn:aws:elasticfilesystem:{REGION}:{ACCOUNT_ID}:file-system/*",
"Condition": {
"ArnLike": {
"elasticfilesystem:AccessPointArn": f"arn:aws:elasticfilesystem:{REGION}:{ACCOUNT_ID}:access-point/*",
}
},
},
{
"Sid": "EFSDescribe",
"Effect": "Allow",
"Action": [
"elasticfilesystem:DescribeAccessPoints",
"elasticfilesystem:DescribeMountTargets",
],
"Resource": [
f"arn:aws:elasticfilesystem:{REGION}:{ACCOUNT_ID}:file-system/*",
f"arn:aws:elasticfilesystem:{REGION}:{ACCOUNT_ID}:access-point/*",
],
},
],
}
try:
resp = iam.create_role(
RoleName=role_name,
AssumeRolePolicyDocument=json.dumps(trust_policy),
Description=f"Execution role for {AGENT_NAME}",
)
role_arn = resp["Role"]["Arn"]
print(f"\nCreated IAM role: {role_arn}")
except iam.exceptions.EntityAlreadyExistsException:
role_arn = f"arn:aws:iam::{ACCOUNT_ID}:role/{role_name}"
print(f"\nIAM role exists: {role_arn}")
iam.put_role_policy(
RoleName=role_name,
PolicyName=f"{AGENT_NAME}-policy",
PolicyDocument=json.dumps(inline_policy),
)
print("Waiting 10s for IAM propagation...")
time.sleep(10)
return role_arn
# ── Step 2: Create AgentCore Runtime (VPC + container + EFS) ─────────────────
def create_runtime(role_arn: str) -> dict:
control = session.client("bedrock-agentcore-control", region_name=REGION)
create_params = dict(
agentRuntimeName=AGENT_NAME,
agentRuntimeArtifact={
"containerConfiguration": {
"containerUri": ECR_URI,
}
},
roleArn=role_arn,
networkConfiguration={
"networkMode": "VPC",
"networkModeConfig": {
"subnets": [SUBNET_1, SUBNET_2],
"securityGroups": [SECURITY_GROUP],
},
},
protocolConfiguration={"serverProtocol": PROTOCOL},
description="Claude Code agent on AgentCore Runtime with EFS",
)
if EFS_AP_ARN:
create_params["filesystemConfigurations"] = [
{
"efsAccessPoint": {
"accessPointArn": EFS_AP_ARN,
"mountPath": EFS_MOUNT_PATH,
}
}
]
print(f"\nCreating AgentCore Runtime '{AGENT_NAME}'...")
response = control.create_agent_runtime(**create_params)
runtime_id = response["agentRuntimeId"]
runtime_arn = response["agentRuntimeArn"]
print(f"Runtime created: {runtime_id}")
print("Waiting for runtime to be ready...")
while True:
status_resp = control.get_agent_runtime(agentRuntimeId=runtime_id)
status = status_resp["status"]
print(f" Status: {status}")
if status == "READY":
break
if status in ("CREATE_FAILED", "UPDATE_FAILED"):
print(f"Failed: {status_resp.get('failureReason', 'Unknown')}")
sys.exit(1)
time.sleep(15)
return {"runtime_id": runtime_id, "runtime_arn": runtime_arn}
# ── Main ─────────────────────────────────────────────────────────────────────
def main():
print("=" * 60)
print(f"Deploying {AGENT_NAME} to AgentCore Runtime")
print(" (VPC mode + container + EFS)")
print("=" * 60)
role_arn = create_execution_role()
runtime = create_runtime(role_arn)
config = {
"agent_name": AGENT_NAME,
"runtime_id": runtime["runtime_id"],
"runtime_arn": runtime["runtime_arn"],
"region": REGION,
"ecr_uri": ECR_URI,
}
if EFS_AP_ARN:
config["efs_access_point_arn"] = EFS_AP_ARN
config["efs_mount_path"] = EFS_MOUNT_PATH
config_path = os.path.join(os.path.dirname(__file__), "runtime_config.json")
with open(config_path, "w") as f:
json.dump(config, f, indent=2)
print("\n" + "=" * 60)
print("Deployment complete!")
print(f" Runtime ARN: {runtime['runtime_arn']}")
if EFS_AP_ARN:
print(f" EFS mounted at: {EFS_MOUNT_PATH}")
print(" Config saved to: runtime_config.json")
print("\n Test with: python invoke.py")
print("=" * 60)
if __name__ == "__main__":
main()
@@ -0,0 +1,112 @@
"""
Execute a shell command on a running AgentCore Runtime session and stream output.
Usage:
python exec_cmd.py --session <id> "ls -la /mnt/efs"
"""
import json
import os
import sys
import boto3
from botocore.config import Config
# ── Load config ──────────────────────────────────────────────────────────────
def load_dotconfig():
config_path = os.path.join(os.path.dirname(__file__), "envvars.config")
cfg = {}
if os.path.exists(config_path):
with open(config_path) as f:
for line in f:
line = line.strip()
if line and "=" in line and not line.startswith("#"):
key, value = line.split("=", 1)
cfg[key] = value.strip('"').strip("'")
return cfg
file_cfg = load_dotconfig()
def cfg(key, default=None):
return file_cfg.get(key) or os.environ.get(key) or default
# ── Configuration ────────────────────────────────────────────────────────────
REGION = cfg("AGENTCORE_REGION", "us-west-2")
def load_config() -> dict:
config_path = os.path.join(os.path.dirname(__file__), "runtime_config.json")
try:
with open(config_path) as f:
return json.load(f)
except FileNotFoundError:
print("Error: runtime_config.json not found. Run deploy.py first.")
sys.exit(1)
def exec_command(runtime_arn: str, session_id: str, command: str):
client = boto3.client(
"bedrock-agentcore",
region_name=REGION,
config=Config(read_timeout=900),
)
body = {"command": command}
response = client.invoke_agent_runtime_command(
agentRuntimeArn=runtime_arn,
runtimeSessionId=session_id,
body=body,
)
request_id = response.get("ResponseMetadata", {}).get("RequestId", "N/A")
print(f"Session: {response.get('runtimeSessionId', 'N/A')}")
print(f"Request ID: {request_id}")
print(f"Status: {response.get('statusCode', 'N/A')}")
print()
for event in response["stream"]:
chunk = event.get("chunk", {})
if "contentDelta" in chunk:
delta = chunk["contentDelta"]
print(delta.get("stdout", ""), end="", flush=True)
print(delta.get("stderr", ""), end="", flush=True)
def main():
config = load_config()
runtime_arn = config["runtime_arn"]
args = sys.argv[1:]
session_id = None
if "--session" in args:
idx = args.index("--session")
session_id = args[idx + 1]
args = args[:idx] + args[idx + 2:]
if not session_id:
session_id = os.environ.get("SESSION_ID")
if not session_id:
print("Error: session ID required. Use --session <id> or set SESSION_ID env var.")
sys.exit(1)
if not args:
print("Usage: python exec_cmd.py --session <id> '<command>'")
sys.exit(1)
command = " ".join(args)
print(f"Runtime: {runtime_arn}")
print(f"Command: {command}")
print()
exec_command(runtime_arn, session_id, command)
if __name__ == "__main__":
main()
@@ -0,0 +1,130 @@
"""
Invoke the Claude Code agent deployed on AgentCore Runtime.
Usage:
python invoke.py
python invoke.py "Write a Python function that sorts a list"
python invoke.py --session <id> "Now add type hints to it"
"""
import json
import os
import sys
import uuid
import boto3
from botocore.config import Config
from botocore.exceptions import ClientError
# ── Load config ──────────────────────────────────────────────────────────────
def load_dotconfig():
config_path = os.path.join(os.path.dirname(__file__), "envvars.config")
cfg = {}
if os.path.exists(config_path):
with open(config_path) as f:
for line in f:
line = line.strip()
if line and "=" in line and not line.startswith("#"):
key, value = line.split("=", 1)
cfg[key] = value.strip('"').strip("'")
return cfg
file_cfg = load_dotconfig()
def cfg(key, default=None):
return file_cfg.get(key) or os.environ.get(key) or default
# ── Configuration ────────────────────────────────────────────────────────────
REGION = cfg("AGENTCORE_REGION", "us-west-2")
def load_config() -> dict:
config_path = os.path.join(os.path.dirname(__file__), "runtime_config.json")
try:
with open(config_path) as f:
return json.load(f)
except FileNotFoundError:
print("Error: runtime_config.json not found. Run deploy.py first.")
sys.exit(1)
def invoke(runtime_arn: str, prompt: str, region: str, session_id: str = None) -> dict:
client = boto3.client(
"bedrock-agentcore",
region_name=region,
config=Config(read_timeout=900),
)
if not session_id:
session_id = str(uuid.uuid4())
payload_data = {"prompt": prompt}
try:
response = client.invoke_agent_runtime(
agentRuntimeArn=runtime_arn,
runtimeSessionId=session_id,
payload=json.dumps(payload_data).encode("utf-8"),
)
except ClientError as exc:
request_id = exc.response.get("ResponseMetadata", {}).get("RequestId", "N/A")
print(f" Session ID: {session_id}")
print(f" Request ID: {request_id}")
print(f" Error: {exc}")
return {"_runtimeSessionId": session_id}
body = json.loads(response["response"].read().decode("utf-8"))
runtime_session = response.get("runtimeSessionId", session_id)
request_id = response.get("ResponseMetadata", {}).get("RequestId", "N/A")
print(f" Session ID: {runtime_session}")
print(f" Request ID: {request_id}")
print(f" Status: {response.get('statusCode', 'N/A')}")
body["_runtimeSessionId"] = runtime_session
return body
def main():
config = load_config()
runtime_arn = config["runtime_arn"]
region = config["region"]
args = sys.argv[1:]
session_id = None
if "--session" in args:
idx = args.index("--session")
session_id = args[idx + 1]
args = args[:idx] + args[idx + 2:]
if args:
prompts = [" ".join(args)]
else:
prompts = [
"What is 2 + 2?",
"Now multiply that result by 10.",
]
print(f"Invoking agent: {runtime_arn}")
if session_id:
print(f"Resuming session: {session_id}")
print()
for prompt in prompts:
print(f"--- Prompt: {prompt}")
result = invoke(runtime_arn, prompt, region, session_id)
print(f"--- Response:\n{result.get('response', result)}")
session_id = result.get("_runtimeSessionId", session_id)
print(f"--- Session ID: {session_id}")
print()
if session_id:
print("To continue this conversation:")
print(f" python invoke.py --session {session_id} \"your next prompt\"")
if __name__ == "__main__":
main()
@@ -0,0 +1,9 @@
{
"agent_name": "claude_code_02701",
"runtime_id": "claude_code_02701-zr5JYsBhVr",
"runtime_arn": "arn:aws:bedrock-agentcore:us-east-1:623129719707:runtime/claude_code_02701-zr5JYsBhVr",
"region": "us-east-1",
"ecr_uri": "623129719707.dkr.ecr.us-east-1.amazonaws.com/agentcore-claude-code:claude_code_02701",
"efs_access_point_arn": "arn:aws:elasticfilesystem:us-east-1:623129719707:access-point/fsap-0121c98f6d9fae8f8",
"efs_mount_path": "/mnt/efs"
}
@@ -0,0 +1,96 @@
const http = require("http");
const { spawn } = require("child_process");
process.on("uncaughtException", (err) => {
console.error("[FATAL] uncaughtException:", err.message, err.stack);
});
process.on("unhandledRejection", (err) => {
console.error("[FATAL] unhandledRejection:", err);
});
const PORT = process.env.PORT || 8080;
function runClaude(prompt, sessionId) {
return new Promise((resolve, reject) => {
const args = ["-p", "--dangerously-skip-permissions", "--output-format", "json"];
if (sessionId) {
args.push("--resume", sessionId);
} else {
args.push("--continue");
}
args.push(prompt);
console.log(`[runClaude] sessionId=${sessionId || "(none, --continue)"} prompt="${prompt}"`);
console.log(`[runClaude] args: ${JSON.stringify(args)}`);
const proc = spawn("claude", args, {
env: { ...process.env, HOME: "/home/agent" },
cwd: "/home/agent",
timeout: 300_000,
});
let stdout = "";
let stderr = "";
proc.stdout.on("data", (d) => (stdout += d));
proc.stderr.on("data", (d) => (stderr += d));
proc.on("close", (code, signal) => {
console.log(`[runClaude] process closed. code=${code} signal=${signal}`);
if (stderr) console.log(`[runClaude] stderr: ${stderr}`);
if (stdout) console.log(`[runClaude] stdout (first 500): ${stdout.substring(0, 500)}`);
if (code !== 0) {
reject(new Error(`claude exited ${code}: ${stderr}`));
return;
}
try {
const parsed = JSON.parse(stdout);
resolve({
response: parsed.result || stdout.trim(),
sessionId: parsed.session_id || null,
});
} catch {
resolve({ response: stdout.trim(), sessionId: null });
}
});
proc.on("error", reject);
});
}
function readBody(req) {
return new Promise((resolve) => {
let data = "";
req.on("data", (chunk) => (data += chunk));
req.on("end", () => resolve(data));
});
}
const server = http.createServer(async (req, res) => {
if (req.method === "GET") {
res.writeHead(200, { "Content-Type": "application/json" });
res.end(JSON.stringify({ status: "healthy" }));
return;
}
if (req.method === "POST") {
try {
const body = await readBody(req);
console.log(`[POST] raw body: ${body}`);
const { prompt, sessionId } = JSON.parse(body);
const result = await runClaude(prompt, sessionId);
res.writeHead(200, { "Content-Type": "application/json" });
res.end(JSON.stringify(result));
} catch (err) {
res.writeHead(500, { "Content-Type": "application/json" });
res.end(JSON.stringify({ error: err.message }));
}
return;
}
res.writeHead(405);
res.end();
});
server.listen(PORT, () => {
console.log(`Claude Code agent listening on port ${PORT}`);
});
@@ -0,0 +1,108 @@
#!/usr/bin/env bash
set -euo pipefail
REGION="${1:-us-west-2}"
STACK_NAME="agentcore-claude-code-demo"
ACCOUNT_ID=$(aws sts get-caller-identity --query Account --output text)
AGENT_NAME="claude_code_$(date +%s | tail -c 6)"
ECR_REPO="agentcore-claude-code"
IMAGE_TAG="${AGENT_NAME}"
ECR_URI="${ACCOUNT_ID}.dkr.ecr.${REGION}.amazonaws.com/${ECR_REPO}:${IMAGE_TAG}"
echo "Region: ${REGION}"
echo "Account: ${ACCOUNT_ID}"
echo "Agent: ${AGENT_NAME}"
echo "Stack: ${STACK_NAME}"
# ── Deploy CloudFormation stack ──────────────────────────────────────────────
echo ""
echo "Deploying CloudFormation stack: ${STACK_NAME}..."
aws cloudformation deploy \
--template-file cfn-vpc.yaml \
--stack-name "${STACK_NAME}" \
--region "${REGION}" \
--capabilities CAPABILITY_NAMED_IAM \
--no-fail-on-empty-changeset
echo "Reading stack outputs..."
CFN_OUTPUTS=$(aws cloudformation describe-stacks \
--stack-name "${STACK_NAME}" \
--region "${REGION}" \
--query "Stacks[0].Outputs" \
--output json)
get_output() {
echo "${CFN_OUTPUTS}" | python3 -c "
import json, sys
outputs = json.load(sys.stdin)
for o in outputs:
if o['OutputKey'] == '$1':
print(o['OutputValue'])
break
"
}
VPC_ID=$(get_output VpcId)
PRIVATE_SUBNET_1=$(get_output PrivateSubnet1Id)
PRIVATE_SUBNET_2=$(get_output PrivateSubnet2Id)
SECURITY_GROUP_ID=$(get_output SecurityGroupId)
EFS_FS_ID=$(get_output EFSFileSystemId)
EFS_AP_ARN=$(get_output EFSAccessPointArn)
echo " VPC: ${VPC_ID}"
echo " Private Subnet1: ${PRIVATE_SUBNET_1}"
echo " Private Subnet2: ${PRIVATE_SUBNET_2}"
echo " Security Group: ${SECURITY_GROUP_ID}"
echo " EFS FS: ${EFS_FS_ID}"
echo " EFS AP ARN: ${EFS_AP_ARN}"
# ── Create ECR repository (skip if it already exists) ────────────────────────
if aws ecr describe-repositories --repository-names "${ECR_REPO}" --region "${REGION}" >/dev/null 2>&1; then
echo "ECR repo already exists: ${ECR_REPO}"
else
echo "Creating ECR repo: ${ECR_REPO}"
aws ecr create-repository --repository-name "${ECR_REPO}" --region "${REGION}"
echo "ECR repo created"
fi
# ── Build arm64 Docker image and push to ECR ─────────────────────────────────
echo "Logging into ECR..."
aws ecr get-login-password --region "${REGION}" | \
docker login --username AWS --password-stdin "${ACCOUNT_ID}.dkr.ecr.${REGION}.amazonaws.com"
echo "Building arm64 Docker image..."
docker buildx build \
--platform linux/arm64 \
-t "${ECR_URI}" \
-f Dockerfile \
--push \
.
echo "Image pushed: ${ECR_URI}"
# ── Save config ──────────────────────────────────────────────────────────────
cat > envvars.config <<CFGEOF
AGENTCORE_AGENT_NAME=${AGENT_NAME}
AGENTCORE_REGION=${REGION}
AGENTCORE_ECR_URI=${ECR_URI}
AGENTCORE_STACK_NAME=${STACK_NAME}
AGENTCORE_VPC_ID=${VPC_ID}
AGENTCORE_SUBNET_1=${PRIVATE_SUBNET_1}
AGENTCORE_SUBNET_2=${PRIVATE_SUBNET_2}
AGENTCORE_SECURITY_GROUP=${SECURITY_GROUP_ID}
AGENTCORE_EFS_FS_ID=${EFS_FS_ID}
AGENTCORE_EFS_AP_ARN=${EFS_AP_ARN}
CFGEOF
echo ""
echo "Config saved to envvars.config:"
cat envvars.config
echo ""
echo "Run the deploy script next:"
echo " python deploy.py"
@@ -0,0 +1,153 @@
"""
Update an existing AgentCore Runtime (e.g. after changing access point, image, or config).
Reads runtime_id from runtime_config.json and current settings from envvars.config.
Usage:
python update.py
"""
import json
import os
import sys
import time
import boto3
# ── Load config ──────────────────────────────────────────────────────────────
def load_dotconfig():
config_path = os.path.join(os.path.dirname(__file__), "envvars.config")
cfg = {}
if os.path.exists(config_path):
with open(config_path) as f:
for line in f:
line = line.strip()
if line and "=" in line and not line.startswith("#"):
key, value = line.split("=", 1)
cfg[key] = value.strip('"').strip("'")
return cfg
file_cfg = load_dotconfig()
def cfg(key, default=None):
return file_cfg.get(key) or os.environ.get(key) or default
# ── Configuration ────────────────────────────────────────────────────────────
REGION = cfg("AGENTCORE_REGION", boto3.session.Session().region_name or "us-west-2")
session = boto3.Session(region_name=REGION)
ACCOUNT_ID = session.client("sts").get_caller_identity()["Account"]
ECR_URI = cfg("AGENTCORE_ECR_URI")
SUBNET_1 = cfg("AGENTCORE_SUBNET_1")
SUBNET_2 = cfg("AGENTCORE_SUBNET_2")
SECURITY_GROUP = cfg("AGENTCORE_SECURITY_GROUP")
EFS_AP_ARN = cfg("AGENTCORE_EFS_AP_ARN")
PROTOCOL = "HTTP"
EFS_MOUNT_PATH = "/mnt/efs"
def load_runtime_config() -> dict:
config_path = os.path.join(os.path.dirname(__file__), "runtime_config.json")
try:
with open(config_path) as f:
return json.load(f)
except FileNotFoundError:
print("Error: runtime_config.json not found. Run deploy.py first.")
sys.exit(1)
def update_runtime(runtime_id: str, role_arn: str) -> dict:
control = session.client("bedrock-agentcore-control", region_name=REGION)
update_params = dict(
agentRuntimeId=runtime_id,
agentRuntimeArtifact={
"containerConfiguration": {
"containerUri": ECR_URI,
}
},
roleArn=role_arn,
networkConfiguration={
"networkMode": "VPC",
"networkModeConfig": {
"subnets": [SUBNET_1, SUBNET_2],
"securityGroups": [SECURITY_GROUP],
},
},
protocolConfiguration={"serverProtocol": PROTOCOL},
description="Claude Code agent on AgentCore Runtime with EFS",
)
if EFS_AP_ARN:
update_params["filesystemConfigurations"] = [
{
"efsAccessPoint": {
"accessPointArn": EFS_AP_ARN,
"mountPath": EFS_MOUNT_PATH,
}
}
]
print(f"\nUpdating AgentCore Runtime '{runtime_id}'...")
response = control.update_agent_runtime(**update_params)
runtime_arn = response["agentRuntimeArn"]
print(f"Update initiated: {runtime_id}")
print("Waiting for runtime to be ready...")
while True:
status_resp = control.get_agent_runtime(agentRuntimeId=runtime_id)
status = status_resp["status"]
print(f" Status: {status}")
if status == "READY":
break
if status in ("CREATE_FAILED", "UPDATE_FAILED"):
print(f"Failed: {status_resp.get('failureReason', 'Unknown')}")
sys.exit(1)
time.sleep(15)
return {"runtime_id": runtime_id, "runtime_arn": runtime_arn}
def main():
existing = load_runtime_config()
runtime_id = existing["runtime_id"]
role_arn = f"arn:aws:iam::{ACCOUNT_ID}:role/agentcore-{existing['agent_name']}-role"
print("=" * 60)
print(f"Updating runtime: {runtime_id}")
print(f" Image: {ECR_URI}")
print(f" Role: {role_arn}")
if EFS_AP_ARN:
print(f" EFS AP: {EFS_AP_ARN}")
print(f" Mount: {EFS_MOUNT_PATH}")
print("=" * 60)
runtime = update_runtime(runtime_id, role_arn)
existing.update({
"runtime_id": runtime["runtime_id"],
"runtime_arn": runtime["runtime_arn"],
"ecr_uri": ECR_URI,
})
if EFS_AP_ARN:
existing["efs_access_point_arn"] = EFS_AP_ARN
existing["efs_mount_path"] = EFS_MOUNT_PATH
config_path = os.path.join(os.path.dirname(__file__), "runtime_config.json")
with open(config_path, "w") as f:
json.dump(existing, f, indent=2)
print("\n" + "=" * 60)
print("Update complete!")
print(f" Runtime ARN: {runtime['runtime_arn']}")
print(" Config saved to: runtime_config.json")
print("=" * 60)
if __name__ == "__main__":
main()