Extending AWSault - baeziy/AWSault GitHub Wiki
AWSault is designed to be easily extensible. Each component has a clear registration pattern — write a function, add it to a dict.
File: src/awsault/services.py
Add an entry to the relevant category dict. The scanner picks it up automatically.
# Example: adding AWS AppMesh
"appmesh": {
"client": "appmesh",
"calls": [
{"method": "list_meshes", "key": "meshes", "paginate": True},
{"method": "list_virtual_services", "key": "virtualServices"},
],
},| Field | Required | Description |
|---|---|---|
client |
Yes | The boto3 client name (what you'd pass to boto3.client()) |
global |
No | Set to True for region-independent services (IAM, S3, etc.) |
calls |
Yes | List of API call definitions |
| Field | Required | Description |
|---|---|---|
method |
Yes | The boto3 method name in snake_case |
key |
Yes | Response dict key containing the data, or None to capture the full response |
paginate |
No | Set to True if a boto3 paginator exists for this method |
params |
No | Dict of fixed parameters to pass with every call |
- Only add read-only API calls (List, Get, Describe)
- Check if the method supports pagination:
boto3.client('service').can_paginate('method_name') - The
keyfield should match the response structure. Check the boto3 docs for the exact response shape - Test with
awsault --services your_service --verboseto verify
File: src/awsault/recon/deep.py
def chain_appmesh(session, quick_results):
"""Pull detailed mesh configurations."""
results = {}
# Get data from the surface scan
meshes = _get_quick_data(quick_results, "appmesh", "list_meshes")
if not meshes:
return results
client = session.client("appmesh")
for mesh in meshes:
mesh_name = mesh.get("meshName")
if not mesh_name:
continue
detail = _try(lambda: client.describe_mesh(meshName=mesh_name))
if detail:
results[mesh_name] = detail
return resultsAdd the function to the CHAINS dict at the bottom of deep.py:
CHAINS = {
"s3": chain_s3,
"iam_users": chain_iam_users,
# ... existing chains ...
"appmesh": chain_appmesh, # new
}| Function | Description |
|---|---|
_try(func, default=None) |
Run a callable and swallow exceptions. Returns default on failure. |
_get_quick_data(quick, service, method) |
Extract data from surface scan results for a given service and method |
- Chain functions receive
(session, quick_results)and return a dict - Use
_try()for optional API calls that may be denied - Chains run in parallel — don't share mutable state between chains
- Keep chains focused on one service or resource type
File: src/awsault/recon/audit.py
def _rule_appmesh_encryption(quick, deep, findings):
"""Check AppMesh virtual services for missing TLS."""
mesh_data = deep.get("appmesh", {})
for mesh_name, detail in mesh_data.items():
spec = detail.get("mesh", {}).get("spec", {})
egress = spec.get("egressFilter", {}).get("type", "")
if egress == "ALLOW_ALL":
findings.append(Finding(
severity="MEDIUM",
service="appmesh",
resource=mesh_name,
title="Mesh allows all egress traffic",
detail=f"Mesh '{mesh_name}' egress filter is ALLOW_ALL",
recommendation="Set egress filter to DROP_ALL and explicitly allow required traffic",
))Add the function to the _ALL_RULES list:
_ALL_RULES = [
_rule_s3_public,
_rule_sg_open,
# ... existing rules ...
_rule_appmesh_encryption, # new
]def _rule_name(quick_results, deep_results, findings_list):| Parameter | Type | Description |
|---|---|---|
quick |
dict | Surface scan results (service → ServiceResult) |
deep |
dict | Deep enumeration results (chain_name → data) |
findings |
list | Mutable list — append Finding objects to it |
| Field | Description |
|---|---|
severity |
CRITICAL, HIGH, MEDIUM, LOW, or INFO |
service |
AWS service name |
resource |
Specific resource identifier |
title |
Short description of the issue |
detail |
Longer explanation |
recommendation |
How to fix it |
File: src/awsault/recon/loot.py
def _loot_amplify(session):
"""Extract environment variables from Amplify apps."""
results = []
try:
amp = session.client("amplify")
apps = amp.list_apps().get("apps", [])
for app in apps:
branches = amp.list_branches(appId=app["appId"]).get("branches", [])
for branch in branches:
env = branch.get("environmentVariables", {})
if env:
results.append({
"Source": "Amplify",
"AppName": app.get("name"),
"Branch": branch.get("branchName"),
"Variables": env,
})
except Exception:
pass
return resultsAdd the function to the LOOT_SOURCES dict:
LOOT_SOURCES = {
"SecretsManager": _loot_secrets_manager,
"SSM Parameters": _loot_ssm,
# ... existing sources ...
"Amplify Env": _loot_amplify, # new
}- Extractor functions receive a boto3
sessionand return a list of dicts - Each dict should have a
"Source"field matching the registry key - Wrap everything in try/except — if you don't have access, return an empty list
- Functions run in parallel with other extractors
- Include a
"Readable"boolean field if the source involves attempting to read protected values
File: src/awsault/recon/suggestions.py
Add entries to the _SUGGESTIONS dict. Each service maps to a list of (method, description, command_template) tuples.
"appmesh": [
("list_meshes", "List App Mesh meshes",
"aws appmesh list-meshes {p}"),
("list_meshes", "Describe a mesh",
"aws appmesh describe-mesh --mesh-name <mesh-name> {p}"),
],| Placeholder | Expands to |
|---|---|
{profile} |
--profile <name> or empty |
{region} |
--region <name> or empty |
{p} |
{profile} {region} combined |
{account_id} |
The AWS account ID |
- The
methodfield must match a method name fromservices.py— the suggestion only fires if that method returned OK - Focus on post-exploitation value: what would a pentester want to try next?
- High-value services (IAM, EC2, S3, Lambda, RDS, etc.) should have more detailed suggestions
- Commands should be read-only by default
# test a new service
awsault --services your_service --verbose
# test deep enumeration
awsault --services your_service --godeep --verbose
# test with specific credentials
awsault --profile test --services your_service --godeep
# verify output formats
awsault --godeep --output test.html
awsault --godeep --output test.json- Use
_try()for API calls that may fail due to permissions - Keep functions focused — one chain per service, one rule per check category
- Use
_safe()or_serial()for datetime/bytes serialization - Follow the existing naming patterns (
chain_*,_rule_*,_loot_*) - All API calls should be read-only