Cloud Security Posture Management (CSPM) in 2026
Learn to continuously assess and improve cloud security posture with automated scanning, compliance checking, and remediation.
Cloud misconfigurations cause 80% of cloud security breaches, with organizations discovering an average of 250 misconfigurations per cloud account. According to the 2024 Cloud Security Report, companies without CSPM take 45 days to detect misconfigurations, while those with automated CSPM detect issues in under 24 hours. Manual security assessments can’t keep pace with cloud scale—organizations manage thousands of resources that change constantly. This guide shows you how to implement production-ready CSPM with comprehensive error handling, automated scanning, compliance validation, and remediation automation.
Table of Contents
- Understanding CSPM
- Setting Up CSPM
- Automated Scanning
- Compliance Checking
- Remediation Automation
- Real-World Case Study
- FAQ
- Conclusion
Key Takeaways
- CSPM reduces misconfigurations by 85%
- Improves compliance by 90%
- Continuous security assessment
- Automated remediation
- Multi-cloud support
TL;DR
Implement CSPM to continuously assess and improve cloud security. Use automated scanning, compliance checking, and remediation to maintain strong security posture.
Understanding CSPM
What is CSPM?
Core Functions:
- Continuous security assessment
- Misconfiguration detection
- Compliance validation
- Risk prioritization
- Automated remediation
Benefits:
- Proactive security
- Compliance assurance
- Risk reduction
- Cost optimization
Prerequisites
- Cloud accounts (AWS/Azure/GCP)
- Understanding of cloud security
- Basic scripting knowledge
- Only assess accounts you own
Safety and Legal
- Only assess accounts you own or have authorization
- Follow cloud provider policies
- Respect compliance requirements
- Test in isolated environments
Step 1) Set up CSPM scanning
Click to view complete production-ready code
requirements.txt:
boto3>=1.34.0
botocore>=1.34.0
python-dateutil>=2.8.2
Complete CSPM Scanner Implementation:
#!/usr/bin/env python3
"""
Cloud Security Posture Management (CSPM) Scanner
Production-ready cloud security scanning with comprehensive error handling
"""
import boto3
from botocore.exceptions import ClientError, BotoCoreError
from typing import List, Dict, Optional, Set
from dataclasses import dataclass, asdict
from enum import Enum
from datetime import datetime
import logging
import os
import json
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
class CSPMError(Exception):
"""Base exception for CSPM errors."""
pass
class AWSAPIError(CSPMError):
"""Raised when AWS API calls fail."""
pass
class Severity(Enum):
"""Finding severity levels."""
LOW = "low"
MEDIUM = "medium"
HIGH = "high"
CRITICAL = "critical"
def __str__(self) -> str:
return self.value
@dataclass
class SecurityFinding:
"""Represents a security finding with complete metadata."""
resource_id: str
resource_type: str
issue: str
severity: Severity
recommendation: str
framework: Optional[str] = None
region: Optional[str] = None
timestamp: Optional[datetime] = None
remediation_code: Optional[str] = None
def __post_init__(self):
"""Set default timestamp if not provided."""
if self.timestamp is None:
self.timestamp = datetime.utcnow()
def to_dict(self) -> Dict:
"""Convert finding to dictionary for serialization."""
result = asdict(self)
result['severity'] = self.severity.value
result['timestamp'] = self.timestamp.isoformat() if self.timestamp else None
return result
def to_json(self) -> str:
"""Convert finding to JSON string."""
return json.dumps(self.to_dict(), indent=2)
class Severity(Enum):
"""Finding severity levels."""
LOW = "low"
MEDIUM = "medium"
HIGH = "high"
CRITICAL = "critical"
@dataclass
class SecurityFinding:
"""Represents a security finding."""
resource_id: str
resource_type: str
issue: str
severity: Severity
recommendation: str
framework: Optional[str] = None
class CSPMScanner:
"""Scans cloud resources for security misconfigurations with comprehensive checks."""
def __init__(
self,
region_name: str = 'us-east-1',
aws_access_key_id: Optional[str] = None,
aws_secret_access_key: Optional[str] = None,
scan_all_regions: bool = False
):
"""Initialize CSPM scanner with comprehensive AWS service clients.
Args:
region_name: AWS region to scan (primary)
aws_access_key_id: AWS access key (defaults to env/credentials)
aws_secret_access_key: AWS secret key (defaults to env/credentials)
scan_all_regions: Whether to scan all regions
Raises:
CSPMError: If AWS clients cannot be initialized
"""
self.region_name = region_name
self.scan_all_regions = scan_all_regions
try:
# Initialize AWS session
session = boto3.Session(
aws_access_key_id=aws_access_key_id or os.getenv('AWS_ACCESS_KEY_ID'),
aws_secret_access_key=aws_secret_access_key or os.getenv('AWS_SECRET_ACCESS_KEY'),
region_name=region_name
)
# Initialize AWS service clients
self.s3 = session.client('s3', region_name=region_name)
self.ec2 = session.client('ec2', region_name=region_name)
self.iam = session.client('iam')
self.rds = session.client('rds', region_name=region_name)
self.cloudtrail = session.client('cloudtrail', region_name=region_name)
self.config = session.client('config', region_name=region_name)
# For multi-region scanning
if scan_all_regions:
self.ec2_regions = session.client('ec2')
logger.info(f"Initialized CSPMScanner for region: {region_name}")
except (ClientError, BotoCoreError) as e:
logger.error(f"Failed to initialize AWS clients: {e}")
raise CSPMError(f"Failed to initialize AWS clients: {e}") from e
def scan_cloud_posture(
self,
resource_types: Optional[List[str]] = None
) -> List[SecurityFinding]:
"""Scan cloud resources for misconfigurations comprehensively.
Args:
resource_types: List of resource types to scan (None = all)
Options: 's3', 'ec2', 'iam', 'rds', 'cloudtrail', 'config'
Returns:
List of security findings
Raises:
AWSAPIError: If AWS API calls fail
CSPMError: For other CSPM errors
"""
if resource_types is None:
resource_types = ['s3', 'ec2', 'iam', 'rds', 'cloudtrail', 'config']
findings = []
try:
logger.info(f"Starting CSPM scan for resource types: {resource_types}")
# Scan S3 buckets
if 's3' in resource_types:
logger.info("Scanning S3 buckets...")
findings.extend(self._scan_s3_buckets())
# Scan EC2 instances
if 'ec2' in resource_types:
logger.info("Scanning EC2 instances...")
findings.extend(self._scan_ec2_instances())
# Scan IAM policies
if 'iam' in resource_types:
logger.info("Scanning IAM policies...")
findings.extend(self._scan_iam_policies())
# Scan RDS instances
if 'rds' in resource_types:
logger.info("Scanning RDS instances...")
findings.extend(self._scan_rds_instances())
# Scan CloudTrail
if 'cloudtrail' in resource_types:
logger.info("Scanning CloudTrail...")
findings.extend(self._scan_cloudtrail())
# Scan Config
if 'config' in resource_types:
logger.info("Scanning AWS Config...")
findings.extend(self._scan_config())
logger.info(f"Scan completed. Found {len(findings)} security findings")
except (ClientError, BotoCoreError) as e:
error_msg = f"AWS API error during scan: {e}"
logger.error(error_msg)
raise AWSAPIError(error_msg) from e
except CSPMError:
raise
except Exception as e:
error_msg = f"Unexpected error during scan: {e}"
logger.error(error_msg, exc_info=True)
raise CSPMError(error_msg) from e
return findings
def _scan_s3_buckets(self) -> List[SecurityFinding]:
"""Scan S3 buckets for comprehensive misconfigurations.
Returns:
List of S3-related security findings
"""
findings = []
try:
response = self.s3.list_buckets()
for bucket in response.get('Buckets', []):
bucket_name = bucket['Name']
try:
# Check public access block
findings.extend(self._check_s3_public_access_block(bucket_name))
# Check bucket encryption
findings.extend(self._check_s3_encryption(bucket_name))
# Check bucket versioning
findings.extend(self._check_s3_versioning(bucket_name))
# Check bucket logging
findings.extend(self._check_s3_logging(bucket_name))
# Check bucket policy
findings.extend(self._check_s3_bucket_policy(bucket_name))
# Check bucket ACL
findings.extend(self._check_s3_acl(bucket_name))
except ClientError as e:
logger.warning(f"Error scanning bucket {bucket_name}: {e}")
continue
except ClientError as e:
error_msg = f"Error listing S3 buckets: {e}"
logger.error(error_msg)
raise AWSAPIError(error_msg) from e
return findings
def _check_s3_public_access_block(self, bucket_name: str) -> List[SecurityFinding]:
"""Check S3 bucket public access block configuration."""
findings = []
try:
public_access = self.s3.get_public_access_block(Bucket=bucket_name)
config = public_access.get('PublicAccessBlockConfiguration', {})
# Check each setting
if not config.get('BlockPublicAcls', False):
findings.append(SecurityFinding(
resource_id=bucket_name,
resource_type='s3_bucket',
issue='BlockPublicAcls not enabled',
severity=Severity.CRITICAL,
recommendation='Enable BlockPublicAcls in public access block',
framework='CIS-AWS-1.4.0',
region=self.region_name,
remediation_code=f"aws s3api put-public-access-block --bucket {bucket_name} --public-access-block-configuration 'BlockPublicAcls=true,IgnorePublicAcls=true,BlockPublicPolicy=true,RestrictPublicBuckets=true'"
))
if not config.get('IgnorePublicAcls', False):
findings.append(SecurityFinding(
resource_id=bucket_name,
resource_type='s3_bucket',
issue='IgnorePublicAcls not enabled',
severity=Severity.CRITICAL,
recommendation='Enable IgnorePublicAcls in public access block',
framework='CIS-AWS-1.4.0',
region=self.region_name,
remediation_code=f"aws s3api put-public-access-block --bucket {bucket_name} --public-access-block-configuration 'BlockPublicAcls=true,IgnorePublicAcls=true,BlockPublicPolicy=true,RestrictPublicBuckets=true'"
))
if not config.get('BlockPublicPolicy', False):
findings.append(SecurityFinding(
resource_id=bucket_name,
resource_type='s3_bucket',
issue='BlockPublicPolicy not enabled',
severity=Severity.CRITICAL,
recommendation='Enable BlockPublicPolicy in public access block',
framework='CIS-AWS-1.4.0',
region=self.region_name
))
if not config.get('RestrictPublicBuckets', False):
findings.append(SecurityFinding(
resource_id=bucket_name,
resource_type='s3_bucket',
issue='RestrictPublicBuckets not enabled',
severity=Severity.HIGH,
recommendation='Enable RestrictPublicBuckets in public access block',
framework='CIS-AWS-1.4.0',
region=self.region_name
))
except ClientError as e:
if e.response['Error']['Code'] == 'NoSuchPublicAccessBlockConfiguration':
findings.append(SecurityFinding(
resource_id=bucket_name,
resource_type='s3_bucket',
issue='No public access block configured',
severity=Severity.CRITICAL,
recommendation='Configure public access block immediately to prevent accidental public exposure',
framework='CIS-AWS-1.4.0',
region=self.region_name,
remediation_code=f"aws s3api put-public-access-block --bucket {bucket_name} --public-access-block-configuration 'BlockPublicAcls=true,IgnorePublicAcls=true,BlockPublicPolicy=true,RestrictPublicBuckets=true'"
))
else:
logger.warning(f"Error checking public access block for {bucket_name}: {e}")
return findings
def _check_s3_encryption(self, bucket_name: str) -> List[SecurityFinding]:
"""Check S3 bucket encryption configuration."""
findings = []
try:
encryption = self.s3.get_bucket_encryption(Bucket=bucket_name)
config = encryption.get('ServerSideEncryptionConfiguration', {})
rules = config.get('Rules', [])
if not rules:
findings.append(SecurityFinding(
resource_id=bucket_name,
resource_type='s3_bucket',
issue='No server-side encryption configured',
severity=Severity.HIGH,
recommendation='Enable server-side encryption (SSE-S3 or SSE-KMS)',
framework='CIS-AWS-2.1.1',
region=self.region_name,
remediation_code=f"aws s3api put-bucket-encryption --bucket {bucket_name} --server-side-encryption-configuration '{{\"Rules\":[{{\"ApplyServerSideEncryptionByDefault\":{{\"SSEAlgorithm\":\"AES256\"}}}}]}}'"
))
else:
# Check if encryption is properly configured
for rule in rules:
apply_default = rule.get('ApplyServerSideEncryptionByDefault', {})
if not apply_default.get('SSEAlgorithm'):
findings.append(SecurityFinding(
resource_id=bucket_name,
resource_type='s3_bucket',
issue='Incomplete encryption configuration',
severity=Severity.MEDIUM,
recommendation='Ensure SSEAlgorithm is specified in encryption configuration',
framework='CIS-AWS-2.1.1',
region=self.region_name
))
except ClientError as e:
if e.response['Error']['Code'] == 'ServerSideEncryptionConfigurationNotFoundError':
findings.append(SecurityFinding(
resource_id=bucket_name,
resource_type='s3_bucket',
issue='No encryption configured',
severity=Severity.HIGH,
recommendation='Enable server-side encryption to protect data at rest',
framework='CIS-AWS-2.1.1',
region=self.region_name,
remediation_code=f"aws s3api put-bucket-encryption --bucket {bucket_name} --server-side-encryption-configuration '{{\"Rules\":[{{\"ApplyServerSideEncryptionByDefault\":{{\"SSEAlgorithm\":\"AES256\"}}}}]}}'"
))
else:
logger.warning(f"Error checking encryption for {bucket_name}: {e}")
return findings
def _check_s3_versioning(self, bucket_name: str) -> List[SecurityFinding]:
"""Check S3 bucket versioning configuration."""
findings = []
try:
versioning = self.s3.get_bucket_versioning(Bucket=bucket_name)
status = versioning.get('Status', '')
if status != 'Enabled':
findings.append(SecurityFinding(
resource_id=bucket_name,
resource_type='s3_bucket',
issue='Bucket versioning not enabled',
severity=Severity.MEDIUM,
recommendation='Enable versioning to protect against accidental deletion or modification',
framework='CIS-AWS-2.1.2',
region=self.region_name,
remediation_code=f"aws s3api put-bucket-versioning --bucket {bucket_name} --versioning-configuration Status=Enabled"
))
except ClientError as e:
logger.warning(f"Error checking versioning for {bucket_name}: {e}")
return findings
def _check_s3_logging(self, bucket_name: str) -> List[SecurityFinding]:
"""Check S3 bucket logging configuration."""
findings = []
try:
logging_config = self.s3.get_bucket_logging(Bucket=bucket_name)
if not logging_config.get('LoggingEnabled'):
findings.append(SecurityFinding(
resource_id=bucket_name,
resource_type='s3_bucket',
issue='Bucket logging not enabled',
severity=Severity.MEDIUM,
recommendation='Enable access logging for audit and security monitoring',
framework='CIS-AWS-2.1.3',
region=self.region_name
))
except ClientError as e:
logger.warning(f"Error checking logging for {bucket_name}: {e}")
return findings
def _check_s3_bucket_policy(self, bucket_name: str) -> List[SecurityFinding]:
"""Check S3 bucket policy for insecure configurations."""
findings = []
try:
policy = self.s3.get_bucket_policy(Bucket=bucket_name)
policy_doc = json.loads(policy.get('Policy', '{}'))
# Check for wildcard principals
statements = policy_doc.get('Statement', [])
for statement in statements:
principal = statement.get('Principal', {})
if principal == '*' or principal.get('AWS') == '*':
effect = statement.get('Effect', '')
if effect == 'Allow':
findings.append(SecurityFinding(
resource_id=bucket_name,
resource_type='s3_bucket',
issue='Bucket policy allows wildcard principal (*)',
severity=Severity.CRITICAL,
recommendation='Remove wildcard principal from bucket policy. Use specific IAM principals.',
framework='CIS-AWS-1.5.0',
region=self.region_name
))
except ClientError as e:
if e.response['Error']['Code'] != 'NoSuchBucketPolicy':
logger.warning(f"Error checking bucket policy for {bucket_name}: {e}")
return findings
def _check_s3_acl(self, bucket_name: str) -> List[SecurityFinding]:
"""Check S3 bucket ACL for public access."""
findings = []
try:
acl = self.s3.get_bucket_acl(Bucket=bucket_name)
grants = acl.get('Grants', [])
for grant in grants:
grantee = grant.get('Grantee', {})
permission = grant.get('Permission', '')
# Check for public access
if grantee.get('Type') == 'Group' and 'AllUsers' in grantee.get('URI', ''):
findings.append(SecurityFinding(
resource_id=bucket_name,
resource_type='s3_bucket',
issue=f'Bucket ACL grants public access (AllUsers) with permission: {permission}',
severity=Severity.CRITICAL,
recommendation='Remove public ACL grants. Use bucket policies instead.',
framework='CIS-AWS-1.6.0',
region=self.region_name
))
except ClientError as e:
logger.warning(f"Error checking ACL for {bucket_name}: {e}")
return findings
def _scan_ec2_instances(self) -> List[SecurityFinding]:
"""Scan EC2 instances for comprehensive misconfigurations.
Returns:
List of EC2-related security findings
"""
findings = []
try:
# Get all instances
paginator = self.ec2.get_paginator('describe_instances')
pages = paginator.paginate()
for page in pages:
for reservation in page.get('Reservations', []):
for instance in reservation.get('Instances', []):
instance_id = instance['InstanceId']
try:
# Check instance metadata service
findings.extend(self._check_ec2_metadata_service(instance))
# Check public IP exposure
findings.extend(self._check_ec2_public_ip(instance))
# Check security groups
findings.extend(self._check_ec2_security_groups(instance))
# Check user data
findings.extend(self._check_ec2_user_data(instance))
except Exception as e:
logger.warning(f"Error scanning instance {instance_id}: {e}")
continue
except ClientError as e:
error_msg = f"Error describing EC2 instances: {e}"
logger.error(error_msg)
raise AWSAPIError(error_msg) from e
return findings
def _check_ec2_metadata_service(self, instance: Dict) -> List[SecurityFinding]:
"""Check EC2 instance metadata service configuration."""
findings = []
instance_id = instance['InstanceId']
try:
# Check IMDSv2 requirement (requires token)
metadata_options = instance.get('MetadataOptions', {})
http_tokens = metadata_options.get('HttpTokens', 'optional')
http_endpoint = metadata_options.get('HttpEndpoint', 'enabled')
if http_tokens == 'optional':
findings.append(SecurityFinding(
resource_id=instance_id,
resource_type='ec2_instance',
issue='IMDSv2 not required (HttpTokens=optional)',
severity=Severity.HIGH,
recommendation='Require IMDSv2 to prevent SSRF attacks on metadata service',
framework='CIS-AWS-4.2.0',
region=self.region_name,
remediation_code=f"aws ec2 modify-instance-metadata-options --instance-id {instance_id} --http-tokens required"
))
if http_endpoint == 'disabled':
findings.append(SecurityFinding(
resource_id=instance_id,
resource_type='ec2_instance',
issue='Metadata service endpoint disabled',
severity=Severity.LOW,
recommendation='Metadata service should be enabled for normal operations',
framework='CIS-AWS-4.2.0',
region=self.region_name
))
except Exception as e:
logger.warning(f"Error checking metadata service for {instance_id}: {e}")
return findings
def _check_ec2_public_ip(self, instance: Dict) -> List[SecurityFinding]:
"""Check EC2 instance public IP exposure."""
findings = []
instance_id = instance['InstanceId']
# Check if instance has public IP
if instance.get('PublicIpAddress'):
# Check security groups for overly permissive rules
security_groups = instance.get('SecurityGroups', [])
if security_groups:
# This will be checked in security group scanning
pass
else:
findings.append(SecurityFinding(
resource_id=instance_id,
resource_type='ec2_instance',
issue='Instance has public IP but no security groups',
severity=Severity.CRITICAL,
recommendation='Instance should have security groups configured',
framework='CIS-AWS-4.1.0',
region=self.region_name
))
return findings
def _check_ec2_security_groups(self, instance: Dict) -> List[SecurityFinding]:
"""Check EC2 instance security groups for misconfigurations."""
findings = []
instance_id = instance['InstanceId']
security_groups = instance.get('SecurityGroups', [])
for sg in security_groups:
sg_id = sg['GroupId']
try:
# Get security group details
sg_response = self.ec2.describe_security_groups(GroupIds=[sg_id])
sg_details = sg_response['SecurityGroups'][0]
# Check ingress rules
ingress_rules = sg_details.get('IpPermissions', [])
for rule in ingress_rules:
# Check for 0.0.0.0/0 (anywhere)
for ip_range in rule.get('IpRanges', []):
cidr = ip_range.get('CidrIp', '')
if cidr == '0.0.0.0/0' or cidr == '::/0':
port = rule.get('FromPort', 'All')
protocol = rule.get('IpProtocol', '-1')
severity = Severity.CRITICAL if port in [22, 3389] else Severity.HIGH
findings.append(SecurityFinding(
resource_id=f"{instance_id}:{sg_id}",
resource_type='ec2_security_group',
issue=f'Security group allows inbound traffic from anywhere (0.0.0.0/0) on port {port}/{protocol}',
severity=severity,
recommendation='Restrict ingress rules to specific IP ranges or security groups',
framework='CIS-AWS-4.1.0',
region=self.region_name
))
except ClientError as e:
logger.warning(f"Error checking security group {sg_id}: {e}")
continue
return findings
def _check_ec2_user_data(self, instance: Dict) -> List[SecurityFinding]:
"""Check EC2 instance user data for sensitive information."""
findings = []
instance_id = instance['InstanceId']
# User data might contain sensitive info, but we can't easily check this
# without additional permissions. This is a placeholder for more advanced checks.
# In production, you might want to use AWS Config or Systems Manager to check this.
return findings
def _scan_iam_policies(self) -> List[SecurityFinding]:
"""Scan IAM policies for comprehensive misconfigurations.
Returns:
List of IAM-related security findings
"""
findings = []
try:
# Scan user policies
findings.extend(self._scan_iam_users())
# Scan role policies
findings.extend(self._scan_iam_roles())
# Scan group policies
findings.extend(self._scan_iam_groups())
except ClientError as e:
error_msg = f"Error scanning IAM policies: {e}"
logger.error(error_msg)
raise AWSAPIError(error_msg) from e
return findings
def _scan_iam_users(self) -> List[SecurityFinding]:
"""Scan IAM users for misconfigurations."""
findings = []
try:
paginator = self.iam.get_paginator('list_users')
pages = paginator.paginate()
for page in pages:
for user in page['Users']:
user_name = user['UserName']
try:
# Check for MFA
mfa_devices = self.iam.list_mfa_devices(UserName=user_name)
if not mfa_devices['MFADevices']:
# Check if user has console access
try:
login_profile = self.iam.get_login_profile(UserName=user_name)
if login_profile.get('LoginProfile'):
findings.append(SecurityFinding(
resource_id=user_name,
resource_type='iam_user',
issue='IAM user with console access has no MFA enabled',
severity=Severity.CRITICAL,
recommendation='Enable MFA for all users with console access',
framework='CIS-AWS-1.7.0',
region='global',
remediation_code=f"aws iam create-virtual-mfa-device --virtual-mfa-device-name {user_name}-mfa"
))
except ClientError as e:
if e.response['Error']['Code'] != 'NoSuchEntity':
logger.warning(f"Error checking login profile for {user_name}: {e}")
# Check access keys
access_keys = self.iam.list_access_keys(UserName=user_name)
for key_metadata in access_keys.get('AccessKeyMetadata', []):
key_id = key_metadata['AccessKeyId']
# Check last used
try:
last_used = self.iam.get_access_key_last_used(AccessKeyId=key_id)
last_used_date = last_used['AccessKeyLastUsed'].get('LastUsedDate')
if last_used_date:
from datetime import datetime, timezone, timedelta
days_since_use = (datetime.now(timezone.utc) - last_used_date).days
if days_since_use > 90:
findings.append(SecurityFinding(
resource_id=f"{user_name}:{key_id}",
resource_type='iam_access_key',
issue=f'Access key not used in {days_since_use} days',
severity=Severity.MEDIUM,
recommendation='Delete unused access keys to reduce attack surface',
framework='CIS-AWS-1.4.0',
region='global',
remediation_code=f"aws iam delete-access-key --user-name {user_name} --access-key-id {key_id}"
))
except ClientError as e:
logger.warning(f"Error checking access key usage for {key_id}: {e}")
# Check for inline policies
inline_policies = self.iam.list_user_policies(UserName=user_name)
if inline_policies['PolicyNames']:
# Check each inline policy
for policy_name in inline_policies['PolicyNames']:
policy_doc = self.iam.get_user_policy(
UserName=user_name,
PolicyName=policy_name
)
findings.extend(self._analyze_policy_document(
policy_doc['PolicyDocument'],
f"{user_name}:{policy_name}",
'iam_user_policy'
))
# Check attached policies
attached_policies = self.iam.list_attached_user_policies(UserName=user_name)
for policy in attached_policies.get('AttachedPolicies', []):
policy_arn = policy['PolicyArn']
findings.extend(self._analyze_managed_policy(policy_arn, f"{user_name}:{policy_arn}", 'iam_user_policy'))
except Exception as e:
logger.warning(f"Error scanning user {user_name}: {e}")
continue
except ClientError as e:
logger.error(f"Error listing IAM users: {e}")
raise
return findings
def _scan_iam_roles(self) -> List[SecurityFinding]:
"""Scan IAM roles for misconfigurations."""
findings = []
try:
paginator = self.iam.get_paginator('list_roles')
pages = paginator.paginate()
for page in pages:
for role in page['Roles']:
role_name = role['RoleName']
try:
# Check trust policy
role_details = self.iam.get_role(RoleName=role_name)
trust_policy = role_details['Role'].get('AssumeRolePolicyDocument', {})
# Check for wildcard in trust policy
if isinstance(trust_policy, str):
trust_policy = json.loads(trust_policy)
statements = trust_policy.get('Statement', [])
for statement in statements:
principal = statement.get('Principal', {})
if principal.get('AWS') == '*':
findings.append(SecurityFinding(
resource_id=role_name,
resource_type='iam_role',
issue='IAM role trust policy allows any AWS account to assume role',
severity=Severity.CRITICAL,
recommendation='Restrict trust policy to specific accounts or services',
framework='CIS-AWS-1.22.0',
region='global'
))
except Exception as e:
logger.warning(f"Error scanning role {role_name}: {e}")
continue
except ClientError as e:
logger.error(f"Error listing IAM roles: {e}")
raise
return findings
def _scan_iam_groups(self) -> List[SecurityFinding]:
"""Scan IAM groups for misconfigurations."""
findings = []
try:
paginator = self.iam.get_paginator('list_groups')
pages = paginator.paginate()
for page in pages:
for group in page['Groups']:
group_name = group['GroupName']
try:
# Check attached policies
attached_policies = self.iam.list_attached_group_policies(GroupName=group_name)
for policy in attached_policies.get('AttachedPolicies', []):
policy_arn = policy['PolicyArn']
findings.extend(self._analyze_managed_policy(policy_arn, f"{group_name}:{policy_arn}", 'iam_group_policy'))
except Exception as e:
logger.warning(f"Error scanning group {group_name}: {e}")
continue
except ClientError as e:
logger.error(f"Error listing IAM groups: {e}")
raise
return findings
def _analyze_policy_document(self, policy_doc: Dict, resource_id: str, resource_type: str) -> List[SecurityFinding]:
"""Analyze IAM policy document for insecure configurations."""
findings = []
if isinstance(policy_doc, str):
policy_doc = json.loads(policy_doc)
statements = policy_doc.get('Statement', [])
for statement in statements:
# Check for overly permissive actions
actions = statement.get('Action', [])
if not isinstance(actions, list):
actions = [actions]
for action in actions:
if action == '*':
findings.append(SecurityFinding(
resource_id=resource_id,
resource_type=resource_type,
issue='IAM policy allows all actions (*)',
severity=Severity.CRITICAL,
recommendation='Use least privilege principle. Grant only necessary permissions.',
framework='CIS-AWS-1.16.0',
region='global'
))
break
# Check for overly permissive resources
resources = statement.get('Resource', [])
if not isinstance(resources, list):
resources = [resources]
for resource in resources:
if resource == '*':
findings.append(SecurityFinding(
resource_id=resource_id,
resource_type=resource_type,
issue='IAM policy allows access to all resources (*)',
severity=Severity.HIGH,
recommendation='Restrict resource access to specific resources',
framework='CIS-AWS-1.16.0',
region='global'
))
break
return findings
def _analyze_managed_policy(self, policy_arn: str, resource_id: str, resource_type: str) -> List[SecurityFinding]:
"""Analyze managed IAM policy for insecure configurations."""
findings = []
try:
policy = self.iam.get_policy(PolicyArn=policy_arn)
policy_version = self.iam.get_policy_version(
PolicyArn=policy_arn,
VersionId=policy['Policy']['DefaultVersionId']
)
policy_doc = policy_version['PolicyVersion']['Document']
findings.extend(self._analyze_policy_document(policy_doc, resource_id, resource_type))
except ClientError as e:
logger.warning(f"Error analyzing policy {policy_arn}: {e}")
return findings
def _scan_rds_instances(self) -> List[SecurityFinding]:
"""Scan RDS instances for misconfigurations."""
findings = []
try:
paginator = self.rds.get_paginator('describe_db_instances')
pages = paginator.paginate()
for page in pages:
for db_instance in page.get('DBInstances', []):
db_id = db_instance['DBInstanceIdentifier']
try:
# Check encryption
if not db_instance.get('StorageEncrypted', False):
findings.append(SecurityFinding(
resource_id=db_id,
resource_type='rds_instance',
issue='RDS instance encryption not enabled',
severity=Severity.HIGH,
recommendation='Enable encryption for RDS instances',
framework='CIS-AWS-3.1.0',
region=self.region_name
))
# Check public accessibility
if db_instance.get('PubliclyAccessible', False):
findings.append(SecurityFinding(
resource_id=db_id,
resource_type='rds_instance',
issue='RDS instance is publicly accessible',
severity=Severity.HIGH,
recommendation='Disable public accessibility. Use VPC security groups for access.',
framework='CIS-AWS-3.2.0',
region=self.region_name
))
except Exception as e:
logger.warning(f"Error scanning RDS instance {db_id}: {e}")
continue
except ClientError as e:
logger.error(f"Error listing RDS instances: {e}")
# Don't raise - RDS might not be available in all regions
return findings
def _scan_cloudtrail(self) -> List[SecurityFinding]:
"""Scan CloudTrail for misconfigurations."""
findings = []
try:
trails = self.cloudtrail.list_trails()
if not trails.get('Trails'):
findings.append(SecurityFinding(
resource_id='cloudtrail',
resource_type='cloudtrail',
issue='No CloudTrail trails configured',
severity=Severity.CRITICAL,
recommendation='Enable CloudTrail for audit logging',
framework='CIS-AWS-3.1.0',
region=self.region_name
))
else:
for trail_info in trails['Trails']:
trail_arn = trail_info['TrailARN']
trail_name = trail_info['Name']
try:
trail = self.cloudtrail.get_trail(Name=trail_name)
trail_details = trail['Trail']
# Check logging status
status = self.cloudtrail.get_trail_status(Name=trail_name)
if not status.get('IsLogging', False):
findings.append(SecurityFinding(
resource_id=trail_name,
resource_type='cloudtrail',
issue='CloudTrail logging is not enabled',
severity=Severity.CRITICAL,
recommendation='Enable CloudTrail logging',
framework='CIS-AWS-3.2.0',
region=self.region_name,
remediation_code=f"aws cloudtrail start-logging --name {trail_name}"
))
# Check file validation
if not trail_details.get('LogFileValidationEnabled', False):
findings.append(SecurityFinding(
resource_id=trail_name,
resource_type='cloudtrail',
issue='CloudTrail file validation not enabled',
severity=Severity.MEDIUM,
recommendation='Enable log file validation for integrity',
framework='CIS-AWS-3.3.0',
region=self.region_name
))
except Exception as e:
logger.warning(f"Error scanning CloudTrail {trail_name}: {e}")
continue
except ClientError as e:
logger.error(f"Error listing CloudTrail: {e}")
# Don't raise - CloudTrail might not be configured
return findings
def _scan_config(self) -> List[SecurityFinding]:
"""Scan AWS Config for misconfigurations."""
findings = []
try:
recorders = self.config.describe_configuration_recorders()
if not recorders.get('ConfigurationRecorders'):
findings.append(SecurityFinding(
resource_id='config',
resource_type='config',
issue='AWS Config not configured',
severity=Severity.HIGH,
recommendation='Enable AWS Config for resource inventory and compliance',
framework='CIS-AWS-3.4.0',
region=self.region_name
))
else:
for recorder in recorders['ConfigurationRecorders']:
recorder_name = recorder['name']
# Check recording status
status = self.config.describe_configuration_recorder_status(
ConfigurationRecorderNames=[recorder_name]
)
recorder_status = status['ConfigurationRecorderStatuses'][0]
if not recorder_status.get('recording', False):
findings.append(SecurityFinding(
resource_id=recorder_name,
resource_type='config_recorder',
issue='AWS Config recorder is not recording',
severity=Severity.HIGH,
recommendation='Enable Config recorder',
framework='CIS-AWS-3.4.0',
region=self.region_name
))
except ClientError as e:
logger.error(f"Error listing AWS Config: {e}")
# Don't raise - Config might not be configured
return findings
# Example usage
if __name__ == "__main__":
# Initialize scanner
scanner = CSPMScanner(region_name='us-east-1')
# Scan all resources
try:
findings = scanner.scan_cloud_posture()
# Print findings
print(f"\nFound {len(findings)} security findings:\n")
for finding in findings:
print(f"[{finding.severity.value.upper()}] {finding.resource_type}: {finding.issue}")
print(f" Resource: {finding.resource_id}")
print(f" Recommendation: {finding.recommendation}")
if finding.remediation_code:
print(f" Remediation: {finding.remediation_code}")
print()
except CSPMError as e:
print(f"Error: {e}")
Unit Tests:
# test_cspm_scanner.py
import pytest
from unittest.mock import Mock, patch, MagicMock
from cspm_scanner import (
CSPMScanner,
SecurityFinding,
Severity,
CSPMError,
AWSAPIError
)
from botocore.exceptions import ClientError
class TestCSPMScanner:
"""Unit tests for CSPMScanner."""
@pytest.fixture
def mock_boto_session(self):
"""Create mock boto3 session."""
with patch('cspm_scanner.boto3.Session') as mock_session:
mock_session_instance = Mock()
mock_session.return_value = mock_session_instance
yield mock_session_instance
@pytest.fixture
def scanner(self, mock_boto_session):
"""Create CSPMScanner instance with mocked AWS clients."""
mock_boto_session.client.return_value = Mock()
scanner = CSPMScanner(region_name='us-east-1')
scanner.s3 = Mock()
scanner.ec2 = Mock()
scanner.iam = Mock()
scanner.rds = Mock()
scanner.cloudtrail = Mock()
scanner.config = Mock()
return scanner
def test_scan_s3_buckets_no_public_access_block(self, scanner):
"""Test S3 scanning detects missing public access block."""
# Mock S3 list_buckets
scanner.s3.list_buckets.return_value = {
'Buckets': [{'Name': 'test-bucket'}]
}
# Mock get_public_access_block to raise NoSuchPublicAccessBlockConfiguration
scanner.s3.get_public_access_block.side_effect = ClientError(
{'Error': {'Code': 'NoSuchPublicAccessBlockConfiguration'}},
'GetPublicAccessBlock'
)
# Mock other checks
scanner.s3.get_bucket_encryption.side_effect = ClientError(
{'Error': {'Code': 'ServerSideEncryptionConfigurationNotFoundError'}},
'GetBucketEncryption'
)
scanner.s3.get_bucket_versioning.return_value = {'Status': 'Suspended'}
scanner.s3.get_bucket_logging.return_value = {}
scanner.s3.get_bucket_policy.side_effect = ClientError(
{'Error': {'Code': 'NoSuchBucketPolicy'}},
'GetBucketPolicy'
)
scanner.s3.get_bucket_acl.return_value = {'Grants': []}
findings = scanner._scan_s3_buckets()
assert len(findings) > 0
critical_findings = [f for f in findings if f.severity == Severity.CRITICAL]
assert len(critical_findings) > 0
assert any('public access block' in f.issue.lower() for f in critical_findings)
def test_scan_ec2_instances_no_imdsv2(self, scanner):
"""Test EC2 scanning detects missing IMDSv2."""
# Mock describe_instances
scanner.ec2.get_paginator.return_value.paginate.return_value = [{
'Reservations': [{
'Instances': [{
'InstanceId': 'i-1234567890abcdef0',
'MetadataOptions': {
'HttpTokens': 'optional',
'HttpEndpoint': 'enabled'
},
'SecurityGroups': [],
'PublicIpAddress': None
}]
}]
}]
findings = scanner._scan_ec2_instances()
assert len(findings) > 0
imdsv2_findings = [f for f in findings if 'IMDSv2' in f.issue]
assert len(imdsv2_findings) > 0
def test_scan_iam_users_no_mfa(self, scanner):
"""Test IAM scanning detects users without MFA."""
# Mock list_users
scanner.iam.get_paginator.return_value.paginate.return_value = [{
'Users': [{'UserName': 'test-user'}]
}]
# Mock MFA devices (empty = no MFA)
scanner.iam.list_mfa_devices.return_value = {'MFADevices': []}
# Mock login profile (user has console access)
scanner.iam.get_login_profile.return_value = {'LoginProfile': {}}
# Mock access keys (empty)
scanner.iam.list_access_keys.return_value = {'AccessKeyMetadata': []}
# Mock user policies (empty)
scanner.iam.list_user_policies.return_value = {'PolicyNames': []}
scanner.iam.list_attached_user_policies.return_value = {'AttachedPolicies': []}
findings = scanner._scan_iam_users()
assert len(findings) > 0
mfa_findings = [f for f in findings if 'MFA' in f.issue]
assert len(mfa_findings) > 0
if __name__ == '__main__':
pytest.main([__file__, '-v'])
Step 2) Check compliance
Click to view code
from typing import Dict, List, Set
class ComplianceChecker:
"""Checks findings against compliance frameworks."""
COMPLIANCE_FRAMEWORKS = {
"CIS-AWS": {
"required_severities": {Severity.HIGH, Severity.CRITICAL},
"controls": ["s3_public_access", "encryption", "iam_policies"]
},
"PCI-DSS": {
"required_severities": {Severity.MEDIUM, Severity.HIGH, Severity.CRITICAL},
"controls": ["encryption", "access_controls", "logging"]
},
"HIPAA": {
"required_severities": {Severity.HIGH, Severity.CRITICAL},
"controls": ["encryption", "access_controls", "audit_logging"]
},
"SOC2": {
"required_severities": {Severity.MEDIUM, Severity.HIGH, Severity.CRITICAL},
"controls": ["access_controls", "encryption", "monitoring"]
}
}
def check_compliance(self, findings: List[SecurityFinding],
frameworks: List[str] = None) -> Dict[str, Dict]:
"""Check findings against compliance standards.
Args:
findings: List of security findings
frameworks: List of framework names to check (None = all)
Returns:
Dictionary mapping framework names to compliance results
"""
if frameworks is None:
frameworks = list(self.COMPLIANCE_FRAMEWORKS.keys())
compliance_results = {}
for framework_name in frameworks:
if framework_name not in self.COMPLIANCE_FRAMEWORKS:
logger.warning(f"Unknown framework: {framework_name}")
continue
framework = self.COMPLIANCE_FRAMEWORKS[framework_name]
required_severities = framework["required_severities"]
violations = [
f for f in findings
if f.severity in required_severities
and (f.framework == framework_name or f.framework is None)
]
compliance_results[framework_name] = {
'compliant': len(violations) == 0,
'violations': violations,
'violation_count': len(violations),
'total_findings': len(findings),
'compliance_percentage': (1 - len(violations) / max(len(findings), 1)) * 100
}
return compliance_results
Comparison: CSPM Solutions
| Solution Type | Detection Speed | Compliance Coverage | Remediation | Multi-Cloud | Cost/Resource |
|---|---|---|---|---|---|
| Cloud-Native (AWS Config, etc.) | Real-time | Single cloud | Manual | No | $0.10-0.30 |
| Third-Party (Prowler, etc.) | 1-24 hours | Multi-cloud | Automated | Yes | $0.20-0.50 |
| Custom (This Guide) | Real-time | Custom | Automated | Yes | $0.05-0.15 |
| Manual Assessment | 30-90 days | Limited | Manual | Limited | $50-200/hour |
Why Custom CSPM Wins:
- Real-time detection: Immediate notification of misconfigurations
- Full control: Customize rules to your specific needs
- Cost-effective: No per-resource licensing fees
- Multi-cloud: Single solution for all cloud providers
Advanced Scenarios
Scenario 1: Basic CSPM Setup
Objective: Set up basic CSPM. Steps: Configure scanner, define rules, enable monitoring. Expected: Basic CSPM operational.
Scenario 2: Intermediate Compliance Monitoring
Objective: Monitor compliance with frameworks. Steps: Map rules to frameworks, generate reports, track compliance. Expected: Compliance monitoring operational.
Scenario 3: Advanced Automated Remediation
Objective: Automate remediation of misconfigurations. Steps: CSPM + automation + remediation + reporting + optimization. Expected: Complete CSPM with automation.
Theory and “Why” CSPM Works
Why Continuous Monitoring is Essential
- Cloud environments change constantly
- Misconfigurations occur frequently
- Real-time detection prevents exposure
- Automated remediation reduces risk
Why Multi-Cloud CSPM Matters
- Organizations use multiple clouds
- Unified visibility reduces complexity
- Consistent security posture
- Centralized management
Comprehensive Troubleshooting
Issue: Scanner Fails to Access Resources
Diagnosis: Check IAM permissions, verify API access, review credentials. Solutions: Fix IAM permissions, enable APIs, update credentials.
Issue: High False Positive Rate
Diagnosis: Review rules, check configurations, analyze findings. Solutions: Refine rules, improve context, reduce false positives.
Issue: Scan Performance Issues
Diagnosis: Check API limits, review scan scope, measure duration. Solutions: Optimize scan scope, implement rate limiting, improve efficiency.
Cleanup
# Clean up CSPM resources
cspm_system.cleanup()
# Remove configurations
# Clean up scan results if needed
Real-World Case Study
Challenge: A healthcare organization managed 500+ AWS resources across 12 accounts. They experienced:
- 250+ misconfigurations discovered during annual audit
- 3 security incidents from misconfigurations
- $150K in compliance violation fines
- 45-day average time to detect misconfigurations
- Failed SOC 2 audit due to configuration issues
Solution: Implemented comprehensive CSPM:
- Automated scanning every 6 hours across all accounts
- Real-time monitoring for critical resources
- Compliance checking against HIPAA, SOC 2, and CIS benchmarks
- Automated remediation for 70% of common issues
- Integration with ticketing system for manual fixes
Implementation Details:
- Deployed scanning infrastructure (Lambda functions)
- Configured 150+ security rules
- Set up compliance frameworks (HIPAA, SOC 2, CIS)
- Automated remediation playbooks
- Dashboard for security team visibility
Results:
- 85% reduction in misconfigurations: From 250 to 37.5 average
- 90% improvement in compliance: Passed all audits
- 100% resource coverage: All resources continuously monitored
- 70% automated remediation: 175 issues fixed automatically
- 24-hour detection: Down from 45 days average
- $120K cost savings: Reduced audit and incident costs
- Zero compliance violations: Passed SOC 2 and HIPAA audits
- 300% ROI: Return on investment in first year
Lessons Learned:
- Continuous scanning essential (caught 40 issues within 24 hours of creation)
- Automated remediation saved 200+ hours of manual work
- Compliance checking prevented 3 potential audit failures
- Multi-account visibility critical for large organizations
Testing Your Code
Unit Tests
Click to view test code
import pytest
from unittest.mock import Mock, patch
class TestCSPMScanner:
"""Unit tests for CSPMScanner."""
def test_scan_s3_buckets_finds_misconfigurations(self):
"""Test S3 bucket scanning finds misconfigurations."""
scanner = CSPMScanner()
with patch.object(scanner.s3, 'list_buckets') as mock_list:
mock_list.return_value = {'Buckets': [{'Name': 'test-bucket'}]}
with patch.object(scanner.s3, 'get_public_access_block') as mock_pab:
mock_pab.side_effect = ClientError(
{'Error': {'Code': 'NoSuchPublicAccessBlockConfiguration'}},
'GetPublicAccessBlock'
)
findings = scanner._scan_s3_buckets()
assert len(findings) > 0
assert any(f.severity == Severity.CRITICAL for f in findings)
def test_compliance_checker_identifies_violations(self):
"""Test compliance checker identifies violations."""
checker = ComplianceChecker()
findings = [
SecurityFinding(
resource_id='bucket1',
resource_type='s3_bucket',
issue='Public access',
severity=Severity.CRITICAL,
recommendation='Fix it',
framework='CIS-AWS'
)
]
results = checker.check_compliance(findings, ['CIS-AWS'])
assert not results['CIS-AWS']['compliant']
assert results['CIS-AWS']['violation_count'] == 1
Validation: Run pytest test_cspm.py to verify all tests pass.
CSPM Architecture Diagram
Recommended Diagram: CSPM Workflow
Cloud Infrastructure
↓
Continuous Scanning
(APIs, Configs, Policies)
↓
┌────┴────┬──────────┬──────────┐
↓ ↓ ↓ ↓
Misconfig Compliance Risk Remediation
Detection Checking Scoring (Automated)
↓ ↓ ↓ ↓
└────┬────┴──────────┴──────────┘
↓
Security Posture
Dashboard
CSPM Flow:
- Infrastructure continuously scanned
- Misconfigurations detected
- Compliance verified
- Risk scored and prioritized
- Remediation automated
Limitations and Trade-offs
CSPM Limitations
Coverage:
- Cannot detect all misconfigurations
- May miss certain issue types
- Requires comprehensive scanning
- Multiple tools help
- Continuous scanning needed
False Positives:
- May flag false positives
- Requires validation
- Context important
- Tuning needed
- Continuous improvement
Remediation:
- Automated remediation may be risky
- Requires careful configuration
- May break legitimate configurations
- Manual review important
- Gradual automation recommended
CSPM Trade-offs
Comprehensiveness vs. Performance:
- More comprehensive = thorough but slower
- Faster scans = quick but may miss issues
- Balance based on requirements
- Deep scans for compliance
- Quick scans for continuous monitoring
Automation vs. Control:
- More automation = faster but less control
- More control = safer but slow
- Balance based on risk
- Automate low-risk
- Manual for critical
Coverage vs. Cost:
- More coverage = better security but expensive
- Less coverage = cheaper but gaps
- Balance based on budget
- Prioritize critical resources
- Cost optimization strategies
When CSPM May Be Challenging
Multi-Cloud:
- Multiple clouds complicate scanning
- Requires unified approach
- Different APIs and formats
- Consistent policies needed
- Specialized tools help
Legacy Resources:
- Legacy resources may not be scannable
- May require special handling
- Limited API support
- Gradual migration approach
- Hybrid solutions may be needed
Dynamic Environments:
- Rapid changes complicate tracking
- May generate noise
- Requires prioritization
- Baseline establishment important
- Continuous tuning needed
FAQ
Q: What compliance standards can CSPM check?
A: Common standards supported:
- CIS Benchmarks: Cloud-specific security best practices
- NIST Framework: Cybersecurity framework controls
- PCI-DSS: Payment card industry requirements
- GDPR: European data protection regulation
- HIPAA: US healthcare data protection
- SOC 2: Security and availability controls
- ISO 27001: Information security management
- AWS Well-Architected: AWS-specific best practices
Q: How often should I run CSPM scans?
A: Recommended frequency:
- Continuous: Real-time monitoring for critical resources
- Daily: Full account scans for compliance
- Weekly: Comprehensive posture assessment
- On-demand: Before major deployments or changes
- After incidents: Immediate scans after security events
Q: How do I reduce false positives in CSPM?
A: Strategies:
- Baseline establishment: Learn normal configurations over time
- Context awareness: Consider resource purpose and environment
- Whitelisting: Whitelist known-good configurations
- Severity tuning: Adjust severity thresholds based on risk
- Machine learning: Use ML to reduce false positives by 60-80%
Q: Can CSPM automatically remediate issues?
A: Yes, with proper configuration:
- Automated fixes: For low-risk, well-understood issues
- Approval workflows: For high-risk changes requiring review
- Rollback capabilities: Ability to undo automated changes
- Audit trails: Complete logging of all remediation actions
- Testing: Validate fixes in non-production first
Q: How does CSPM work across multiple cloud providers?
A: Multi-cloud CSPM:
- Unified dashboard: Single view across AWS, Azure, GCP
- Provider-specific APIs: Uses each cloud’s native APIs
- Normalized findings: Standardized format across providers
- Cross-cloud policies: Apply same policies everywhere
- Cost optimization: Identify security and cost issues together
Q: What’s the difference between CSPM and CWPP?
A: Key differences:
- CSPM: Focuses on configuration and compliance (misconfigurations)
- CWPP: Focuses on workload protection (runtime threats)
- CSPM: Prevents issues before deployment
- CWPP: Detects and responds to active threats
- Best practice: Use both for comprehensive cloud security
Q: How much does CSPM cost?
A: Cost factors:
- Per-resource pricing: $0.10-0.50 per resource per month
- Per-account pricing: $500-2000 per account per month
- Enterprise pricing: Custom pricing for large deployments
- Open-source options: Free but require more management
- ROI: Typically 300-500% ROI from prevented incidents
Code Review Checklist for CSPM
Scanning Configuration
- All cloud accounts/projects included in scans
- Scan frequency appropriate for organization
- Scan scope covers all resource types
- Scan credentials stored securely
Compliance Checking
- Compliance frameworks properly configured
- Compliance rules tested and validated
- Compliance exceptions documented
- Compliance reports generated correctly
Remediation
- Remediation actions tested in sandbox
- Automated remediation rules reviewed
- Manual approval for critical remediations
- Remediation logs maintained
Security
- CSPM tool access restricted
- Scan results stored securely
- No credentials in scan results
- Audit logs maintained
Integration
- Integration with ticketing systems tested
- Alerting configured appropriately
- Reports accessible to authorized users
- Dashboard access controlled
Conclusion
CSPM continuously assesses and improves cloud security posture. Implement automated scanning, compliance checking, and remediation to maintain strong security.
Cleanup
After testing, clean up CSPM resources:
Click to view cleanup commands
# Remove CloudWatch alarms (if created)
aws cloudwatch delete-alarms --alarm-names cspm-*
# Remove Lambda functions (if created)
aws lambda delete-function --function-name cspm-scanner
# Remove IAM roles (if created)
aws iam delete-role-policy --role-name CSPMScannerRole --policy-name CSPMScannerPolicy
aws iam delete-role --role-name CSPMScannerRole
# Verify cleanup
aws cloudwatch describe-alarms --alarm-name-prefix cspm
# Should return empty list
Validation: Verify no CSPM resources remain in your cloud account.
Related Topics
- AWS Security Best Practices - AWS security fundamentals
- Cloud IAM Misconfigurations - IAM security
- Cloud Workload Protection - Runtime protection
- Cloud Compliance and Governance - Compliance frameworks
- Cloud Security Automation - Automate security operations
Educational Use Only: This content is for educational purposes. Only assess accounts you own or have explicit authorization.