rds_option_group_sgs - robjcook/sync GitHub Wiki

import boto3
import csv

def export_rds_option_group_sgs_to_csv(filename='rds_option_group_sgs.csv'):
    rds = boto3.client('rds')
    
    # 1. Map Option Groups to their VPC Security Groups
    # We create a lookup dictionary: { 'OptionGroupName': ['sg-123', 'sg-456'] }
    og_sg_map = {}
    og_paginator = rds.get_paginator('describe_option_groups')
    
    for page in og_paginator.paginate():
        for og in page['OptionGroupsList']:
            og_name = og['OptionGroupName']
            sgs_in_og = []
            for option in og.get('Options', []):
                vpc_members = option.get('VpcSecurityGroupMemberships', [])
                for member in vpc_members:
                    sgs_in_og.append(member['VpcSecurityGroupId'])
            
            if sgs_in_og:
                og_sg_map[og_name] = sgs_in_og

    # 2. Get RDS Instances and check their Option Group memberships
    db_paginator = rds.get_paginator('describe_db_instances')
    csv_data = []

    for page in db_paginator.paginate():
        for db in page['DBInstances']:
            instance_id = db['DBInstanceIdentifier']
            
            # An instance can have multiple option group memberships (though usually just one)
            for og_membership in db.get('OptionGroupMemberships', []):
                og_name = og_membership['OptionGroupName']
                
                # Check if this Option Group is in our "SGs found" map
                if og_name in og_sg_map:
                    for sg_id in og_sg_map[og_name]:
                        csv_data.append({
                            'RDSInstanceID': instance_id,
                            'OptionGroupName': og_name,
                            'VPCSecurityGroupID': sg_id
                        })
                else:
                    # Optional: Include instances even if their OG has no SGs
                    csv_data.append({
                        'RDSInstanceID': instance_id,
                        'OptionGroupName': og_name,
                        'VPCSecurityGroupID': 'None'
                    })

    # 3. Write to CSV
    keys = ['RDSInstanceID', 'OptionGroupName', 'VPCSecurityGroupID']
    with open(filename, 'w', newline='') as f:
        writer = csv.DictWriter(f, fieldnames=keys)
        writer.writeheader()
        writer.writerows(csv_data)

    print(f"Success! Data exported to {filename}")

if __name__ == "__main__":
    export_rds_option_group_sgs_to_csv()