Modern password security and authentication system
Cloud & Kubernetes Security

Cloud Security Posture Management (CSPM) in 2026

Learn to continuously assess and improve cloud security posture with automated scanning, compliance checking, and remediation.

cspm cloud security security posture compliance cloud assessment security scanning

Cloud misconfigurations cause 80% of cloud security breaches, with organizations discovering an average of 250 misconfigurations per cloud account. According to the 2024 Cloud Security Report, companies without CSPM take 45 days to detect misconfigurations, while those with automated CSPM detect issues in under 24 hours. Manual security assessments can’t keep pace with cloud scale—organizations manage thousands of resources that change constantly. This guide shows you how to implement production-ready CSPM with comprehensive error handling, automated scanning, compliance validation, and remediation automation.

Table of Contents

  1. Understanding CSPM
  2. Setting Up CSPM
  3. Automated Scanning
  4. Compliance Checking
  5. Remediation Automation
  6. Real-World Case Study
  7. FAQ
  8. Conclusion

Key Takeaways

  • CSPM reduces misconfigurations by 85%
  • Improves compliance by 90%
  • Continuous security assessment
  • Automated remediation
  • Multi-cloud support

TL;DR

Implement CSPM to continuously assess and improve cloud security. Use automated scanning, compliance checking, and remediation to maintain strong security posture.

Understanding CSPM

What is CSPM?

Core Functions:

  • Continuous security assessment
  • Misconfiguration detection
  • Compliance validation
  • Risk prioritization
  • Automated remediation

Benefits:

  • Proactive security
  • Compliance assurance
  • Risk reduction
  • Cost optimization

Prerequisites

  • Cloud accounts (AWS/Azure/GCP)
  • Understanding of cloud security
  • Basic scripting knowledge
  • Only assess accounts you own
  • Only assess accounts you own or have authorization
  • Follow cloud provider policies
  • Respect compliance requirements
  • Test in isolated environments

Step 1) Set up CSPM scanning

Click to view complete production-ready code

requirements.txt:

boto3>=1.34.0
botocore>=1.34.0
python-dateutil>=2.8.2

Complete CSPM Scanner Implementation:

#!/usr/bin/env python3
"""
Cloud Security Posture Management (CSPM) Scanner
Production-ready cloud security scanning with comprehensive error handling
"""

import boto3
from botocore.exceptions import ClientError, BotoCoreError
from typing import List, Dict, Optional, Set
from dataclasses import dataclass, asdict
from enum import Enum
from datetime import datetime
import logging
import os
import json

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)


class CSPMError(Exception):
    """Base exception for CSPM errors."""
    pass


class AWSAPIError(CSPMError):
    """Raised when AWS API calls fail."""
    pass


class Severity(Enum):
    """Finding severity levels."""
    LOW = "low"
    MEDIUM = "medium"
    HIGH = "high"
    CRITICAL = "critical"
    
    def __str__(self) -> str:
        return self.value


@dataclass
class SecurityFinding:
    """Represents a security finding with complete metadata."""
    resource_id: str
    resource_type: str
    issue: str
    severity: Severity
    recommendation: str
    framework: Optional[str] = None
    region: Optional[str] = None
    timestamp: Optional[datetime] = None
    remediation_code: Optional[str] = None
    
    def __post_init__(self):
        """Set default timestamp if not provided."""
        if self.timestamp is None:
            self.timestamp = datetime.utcnow()
    
    def to_dict(self) -> Dict:
        """Convert finding to dictionary for serialization."""
        result = asdict(self)
        result['severity'] = self.severity.value
        result['timestamp'] = self.timestamp.isoformat() if self.timestamp else None
        return result
    
    def to_json(self) -> str:
        """Convert finding to JSON string."""
        return json.dumps(self.to_dict(), indent=2)

class Severity(Enum):
    """Finding severity levels."""
    LOW = "low"
    MEDIUM = "medium"
    HIGH = "high"
    CRITICAL = "critical"

@dataclass
class SecurityFinding:
    """Represents a security finding."""
    resource_id: str
    resource_type: str
    issue: str
    severity: Severity
    recommendation: str
    framework: Optional[str] = None

class CSPMScanner:
    """Scans cloud resources for security misconfigurations with comprehensive checks."""
    
    def __init__(
        self, 
        region_name: str = 'us-east-1',
        aws_access_key_id: Optional[str] = None,
        aws_secret_access_key: Optional[str] = None,
        scan_all_regions: bool = False
    ):
        """Initialize CSPM scanner with comprehensive AWS service clients.
        
        Args:
            region_name: AWS region to scan (primary)
            aws_access_key_id: AWS access key (defaults to env/credentials)
            aws_secret_access_key: AWS secret key (defaults to env/credentials)
            scan_all_regions: Whether to scan all regions
            
        Raises:
            CSPMError: If AWS clients cannot be initialized
        """
        self.region_name = region_name
        self.scan_all_regions = scan_all_regions
        
        try:
            # Initialize AWS session
            session = boto3.Session(
                aws_access_key_id=aws_access_key_id or os.getenv('AWS_ACCESS_KEY_ID'),
                aws_secret_access_key=aws_secret_access_key or os.getenv('AWS_SECRET_ACCESS_KEY'),
                region_name=region_name
            )
            
            # Initialize AWS service clients
            self.s3 = session.client('s3', region_name=region_name)
            self.ec2 = session.client('ec2', region_name=region_name)
            self.iam = session.client('iam')
            self.rds = session.client('rds', region_name=region_name)
            self.cloudtrail = session.client('cloudtrail', region_name=region_name)
            self.config = session.client('config', region_name=region_name)
            
            # For multi-region scanning
            if scan_all_regions:
                self.ec2_regions = session.client('ec2')
            
            logger.info(f"Initialized CSPMScanner for region: {region_name}")
            
        except (ClientError, BotoCoreError) as e:
            logger.error(f"Failed to initialize AWS clients: {e}")
            raise CSPMError(f"Failed to initialize AWS clients: {e}") from e
    
    def scan_cloud_posture(
        self, 
        resource_types: Optional[List[str]] = None
    ) -> List[SecurityFinding]:
        """Scan cloud resources for misconfigurations comprehensively.
        
        Args:
            resource_types: List of resource types to scan (None = all)
                Options: 's3', 'ec2', 'iam', 'rds', 'cloudtrail', 'config'
        
        Returns:
            List of security findings
            
        Raises:
            AWSAPIError: If AWS API calls fail
            CSPMError: For other CSPM errors
        """
        if resource_types is None:
            resource_types = ['s3', 'ec2', 'iam', 'rds', 'cloudtrail', 'config']
        
        findings = []
        
        try:
            logger.info(f"Starting CSPM scan for resource types: {resource_types}")
            
            # Scan S3 buckets
            if 's3' in resource_types:
                logger.info("Scanning S3 buckets...")
            findings.extend(self._scan_s3_buckets())
            
            # Scan EC2 instances
            if 'ec2' in resource_types:
                logger.info("Scanning EC2 instances...")
            findings.extend(self._scan_ec2_instances())
            
            # Scan IAM policies
            if 'iam' in resource_types:
                logger.info("Scanning IAM policies...")
            findings.extend(self._scan_iam_policies())
            
            # Scan RDS instances
            if 'rds' in resource_types:
                logger.info("Scanning RDS instances...")
                findings.extend(self._scan_rds_instances())
            
            # Scan CloudTrail
            if 'cloudtrail' in resource_types:
                logger.info("Scanning CloudTrail...")
                findings.extend(self._scan_cloudtrail())
            
            # Scan Config
            if 'config' in resource_types:
                logger.info("Scanning AWS Config...")
                findings.extend(self._scan_config())
            
            logger.info(f"Scan completed. Found {len(findings)} security findings")
            
        except (ClientError, BotoCoreError) as e:
            error_msg = f"AWS API error during scan: {e}"
            logger.error(error_msg)
            raise AWSAPIError(error_msg) from e
        except CSPMError:
            raise
        except Exception as e:
            error_msg = f"Unexpected error during scan: {e}"
            logger.error(error_msg, exc_info=True)
            raise CSPMError(error_msg) from e
        
        return findings
    
    def _scan_s3_buckets(self) -> List[SecurityFinding]:
        """Scan S3 buckets for comprehensive misconfigurations.
        
        Returns:
            List of S3-related security findings
        """
        findings = []
        
        try:
            response = self.s3.list_buckets()
            
            for bucket in response.get('Buckets', []):
                bucket_name = bucket['Name']
                
                try:
                # Check public access block
                    findings.extend(self._check_s3_public_access_block(bucket_name))
                    
                    # Check bucket encryption
                    findings.extend(self._check_s3_encryption(bucket_name))
                    
                    # Check bucket versioning
                    findings.extend(self._check_s3_versioning(bucket_name))
                    
                    # Check bucket logging
                    findings.extend(self._check_s3_logging(bucket_name))
                    
                    # Check bucket policy
                    findings.extend(self._check_s3_bucket_policy(bucket_name))
                    
                    # Check bucket ACL
                    findings.extend(self._check_s3_acl(bucket_name))
                    
                except ClientError as e:
                    logger.warning(f"Error scanning bucket {bucket_name}: {e}")
                    continue
        
        except ClientError as e:
            error_msg = f"Error listing S3 buckets: {e}"
            logger.error(error_msg)
            raise AWSAPIError(error_msg) from e
        
        return findings
    
    def _check_s3_public_access_block(self, bucket_name: str) -> List[SecurityFinding]:
        """Check S3 bucket public access block configuration."""
        findings = []
        
                try:
                    public_access = self.s3.get_public_access_block(Bucket=bucket_name)
                    config = public_access.get('PublicAccessBlockConfiguration', {})
                    
            # Check each setting
            if not config.get('BlockPublicAcls', False):
                        findings.append(SecurityFinding(
                            resource_id=bucket_name,
                            resource_type='s3_bucket',
                    issue='BlockPublicAcls not enabled',
                    severity=Severity.CRITICAL,
                    recommendation='Enable BlockPublicAcls in public access block',
                    framework='CIS-AWS-1.4.0',
                    region=self.region_name,
                    remediation_code=f"aws s3api put-public-access-block --bucket {bucket_name} --public-access-block-configuration 'BlockPublicAcls=true,IgnorePublicAcls=true,BlockPublicPolicy=true,RestrictPublicBuckets=true'"
                ))
            
            if not config.get('IgnorePublicAcls', False):
                findings.append(SecurityFinding(
                    resource_id=bucket_name,
                    resource_type='s3_bucket',
                    issue='IgnorePublicAcls not enabled',
                    severity=Severity.CRITICAL,
                    recommendation='Enable IgnorePublicAcls in public access block',
                    framework='CIS-AWS-1.4.0',
                    region=self.region_name,
                    remediation_code=f"aws s3api put-public-access-block --bucket {bucket_name} --public-access-block-configuration 'BlockPublicAcls=true,IgnorePublicAcls=true,BlockPublicPolicy=true,RestrictPublicBuckets=true'"
                ))
            
            if not config.get('BlockPublicPolicy', False):
                findings.append(SecurityFinding(
                    resource_id=bucket_name,
                    resource_type='s3_bucket',
                    issue='BlockPublicPolicy not enabled',
                    severity=Severity.CRITICAL,
                    recommendation='Enable BlockPublicPolicy in public access block',
                    framework='CIS-AWS-1.4.0',
                    region=self.region_name
                ))
            
            if not config.get('RestrictPublicBuckets', False):
                        findings.append(SecurityFinding(
                            resource_id=bucket_name,
                            resource_type='s3_bucket',
                    issue='RestrictPublicBuckets not enabled',
                            severity=Severity.HIGH,
                    recommendation='Enable RestrictPublicBuckets in public access block',
                    framework='CIS-AWS-1.4.0',
                    region=self.region_name
                        ))
                
                except ClientError as e:
                    if e.response['Error']['Code'] == 'NoSuchPublicAccessBlockConfiguration':
                        findings.append(SecurityFinding(
                            resource_id=bucket_name,
                            resource_type='s3_bucket',
                            issue='No public access block configured',
                            severity=Severity.CRITICAL,
                    recommendation='Configure public access block immediately to prevent accidental public exposure',
                    framework='CIS-AWS-1.4.0',
                    region=self.region_name,
                    remediation_code=f"aws s3api put-public-access-block --bucket {bucket_name} --public-access-block-configuration 'BlockPublicAcls=true,IgnorePublicAcls=true,BlockPublicPolicy=true,RestrictPublicBuckets=true'"
                        ))
                    else:
                logger.warning(f"Error checking public access block for {bucket_name}: {e}")
        
        return findings
    
    def _check_s3_encryption(self, bucket_name: str) -> List[SecurityFinding]:
        """Check S3 bucket encryption configuration."""
        findings = []
        
                try:
                    encryption = self.s3.get_bucket_encryption(Bucket=bucket_name)
            config = encryption.get('ServerSideEncryptionConfiguration', {})
            rules = config.get('Rules', [])
            
            if not rules:
                        findings.append(SecurityFinding(
                            resource_id=bucket_name,
                            resource_type='s3_bucket',
                    issue='No server-side encryption configured',
                            severity=Severity.HIGH,
                    recommendation='Enable server-side encryption (SSE-S3 or SSE-KMS)',
                    framework='CIS-AWS-2.1.1',
                    region=self.region_name,
                    remediation_code=f"aws s3api put-bucket-encryption --bucket {bucket_name} --server-side-encryption-configuration '{{\"Rules\":[{{\"ApplyServerSideEncryptionByDefault\":{{\"SSEAlgorithm\":\"AES256\"}}}}]}}'"
                ))
            else:
                # Check if encryption is properly configured
                for rule in rules:
                    apply_default = rule.get('ApplyServerSideEncryptionByDefault', {})
                    if not apply_default.get('SSEAlgorithm'):
                        findings.append(SecurityFinding(
                            resource_id=bucket_name,
                            resource_type='s3_bucket',
                            issue='Incomplete encryption configuration',
                            severity=Severity.MEDIUM,
                            recommendation='Ensure SSEAlgorithm is specified in encryption configuration',
                            framework='CIS-AWS-2.1.1',
                            region=self.region_name
                        ))
                        
                except ClientError as e:
                    if e.response['Error']['Code'] == 'ServerSideEncryptionConfigurationNotFoundError':
                        findings.append(SecurityFinding(
                            resource_id=bucket_name,
                            resource_type='s3_bucket',
                            issue='No encryption configured',
                            severity=Severity.HIGH,
                    recommendation='Enable server-side encryption to protect data at rest',
                    framework='CIS-AWS-2.1.1',
                    region=self.region_name,
                    remediation_code=f"aws s3api put-bucket-encryption --bucket {bucket_name} --server-side-encryption-configuration '{{\"Rules\":[{{\"ApplyServerSideEncryptionByDefault\":{{\"SSEAlgorithm\":\"AES256\"}}}}]}}'"
                ))
            else:
                logger.warning(f"Error checking encryption for {bucket_name}: {e}")
        
        return findings
    
    def _check_s3_versioning(self, bucket_name: str) -> List[SecurityFinding]:
        """Check S3 bucket versioning configuration."""
        findings = []
        
        try:
            versioning = self.s3.get_bucket_versioning(Bucket=bucket_name)
            status = versioning.get('Status', '')
            
            if status != 'Enabled':
                findings.append(SecurityFinding(
                    resource_id=bucket_name,
                    resource_type='s3_bucket',
                    issue='Bucket versioning not enabled',
                    severity=Severity.MEDIUM,
                    recommendation='Enable versioning to protect against accidental deletion or modification',
                    framework='CIS-AWS-2.1.2',
                    region=self.region_name,
                    remediation_code=f"aws s3api put-bucket-versioning --bucket {bucket_name} --versioning-configuration Status=Enabled"
                ))
        except ClientError as e:
            logger.warning(f"Error checking versioning for {bucket_name}: {e}")
        
        return findings
    
    def _check_s3_logging(self, bucket_name: str) -> List[SecurityFinding]:
        """Check S3 bucket logging configuration."""
        findings = []
        
        try:
            logging_config = self.s3.get_bucket_logging(Bucket=bucket_name)
            
            if not logging_config.get('LoggingEnabled'):
                findings.append(SecurityFinding(
                    resource_id=bucket_name,
                    resource_type='s3_bucket',
                    issue='Bucket logging not enabled',
                    severity=Severity.MEDIUM,
                    recommendation='Enable access logging for audit and security monitoring',
                    framework='CIS-AWS-2.1.3',
                    region=self.region_name
                ))
        except ClientError as e:
            logger.warning(f"Error checking logging for {bucket_name}: {e}")
        
        return findings
    
    def _check_s3_bucket_policy(self, bucket_name: str) -> List[SecurityFinding]:
        """Check S3 bucket policy for insecure configurations."""
        findings = []
        
        try:
            policy = self.s3.get_bucket_policy(Bucket=bucket_name)
            policy_doc = json.loads(policy.get('Policy', '{}'))
            
            # Check for wildcard principals
            statements = policy_doc.get('Statement', [])
            for statement in statements:
                principal = statement.get('Principal', {})
                if principal == '*' or principal.get('AWS') == '*':
                    effect = statement.get('Effect', '')
                    if effect == 'Allow':
                        findings.append(SecurityFinding(
                            resource_id=bucket_name,
                            resource_type='s3_bucket',
                            issue='Bucket policy allows wildcard principal (*)',
                            severity=Severity.CRITICAL,
                            recommendation='Remove wildcard principal from bucket policy. Use specific IAM principals.',
                            framework='CIS-AWS-1.5.0',
                            region=self.region_name
                        ))
        except ClientError as e:
            if e.response['Error']['Code'] != 'NoSuchBucketPolicy':
                logger.warning(f"Error checking bucket policy for {bucket_name}: {e}")
        
        return findings
    
    def _check_s3_acl(self, bucket_name: str) -> List[SecurityFinding]:
        """Check S3 bucket ACL for public access."""
        findings = []
        
        try:
            acl = self.s3.get_bucket_acl(Bucket=bucket_name)
            grants = acl.get('Grants', [])
            
            for grant in grants:
                grantee = grant.get('Grantee', {})
                permission = grant.get('Permission', '')
                
                # Check for public access
                if grantee.get('Type') == 'Group' and 'AllUsers' in grantee.get('URI', ''):
                    findings.append(SecurityFinding(
                        resource_id=bucket_name,
                        resource_type='s3_bucket',
                        issue=f'Bucket ACL grants public access (AllUsers) with permission: {permission}',
                        severity=Severity.CRITICAL,
                        recommendation='Remove public ACL grants. Use bucket policies instead.',
                        framework='CIS-AWS-1.6.0',
                        region=self.region_name
                    ))
        except ClientError as e:
            logger.warning(f"Error checking ACL for {bucket_name}: {e}")
        
        return findings
    
    def _scan_ec2_instances(self) -> List[SecurityFinding]:
        """Scan EC2 instances for comprehensive misconfigurations.
        
        Returns:
            List of EC2-related security findings
        """
        findings = []
        
        try:
            # Get all instances
            paginator = self.ec2.get_paginator('describe_instances')
            pages = paginator.paginate()
            
            for page in pages:
                for reservation in page.get('Reservations', []):
                for instance in reservation.get('Instances', []):
                        instance_id = instance['InstanceId']
                        
                        try:
                            # Check instance metadata service
                            findings.extend(self._check_ec2_metadata_service(instance))
                            
                            # Check public IP exposure
                            findings.extend(self._check_ec2_public_ip(instance))
                            
                            # Check security groups
                            findings.extend(self._check_ec2_security_groups(instance))
                            
                            # Check user data
                            findings.extend(self._check_ec2_user_data(instance))
                            
                        except Exception as e:
                            logger.warning(f"Error scanning instance {instance_id}: {e}")
                            continue
        
        except ClientError as e:
            error_msg = f"Error describing EC2 instances: {e}"
            logger.error(error_msg)
            raise AWSAPIError(error_msg) from e
        
        return findings
    
    def _check_ec2_metadata_service(self, instance: Dict) -> List[SecurityFinding]:
        """Check EC2 instance metadata service configuration."""
        findings = []
        instance_id = instance['InstanceId']
        
        try:
            # Check IMDSv2 requirement (requires token)
            metadata_options = instance.get('MetadataOptions', {})
            http_tokens = metadata_options.get('HttpTokens', 'optional')
            http_endpoint = metadata_options.get('HttpEndpoint', 'enabled')
            
            if http_tokens == 'optional':
                findings.append(SecurityFinding(
                    resource_id=instance_id,
                    resource_type='ec2_instance',
                    issue='IMDSv2 not required (HttpTokens=optional)',
                    severity=Severity.HIGH,
                    recommendation='Require IMDSv2 to prevent SSRF attacks on metadata service',
                    framework='CIS-AWS-4.2.0',
                    region=self.region_name,
                    remediation_code=f"aws ec2 modify-instance-metadata-options --instance-id {instance_id} --http-tokens required"
                ))
            
            if http_endpoint == 'disabled':
                findings.append(SecurityFinding(
                    resource_id=instance_id,
                    resource_type='ec2_instance',
                    issue='Metadata service endpoint disabled',
                    severity=Severity.LOW,
                    recommendation='Metadata service should be enabled for normal operations',
                    framework='CIS-AWS-4.2.0',
                    region=self.region_name
                ))
        except Exception as e:
            logger.warning(f"Error checking metadata service for {instance_id}: {e}")
        
        return findings
    
    def _check_ec2_public_ip(self, instance: Dict) -> List[SecurityFinding]:
        """Check EC2 instance public IP exposure."""
        findings = []
                    instance_id = instance['InstanceId']
                    
                    # Check if instance has public IP
                    if instance.get('PublicIpAddress'):
            # Check security groups for overly permissive rules
            security_groups = instance.get('SecurityGroups', [])
            
            if security_groups:
                # This will be checked in security group scanning
                pass
            else:
                findings.append(SecurityFinding(
                    resource_id=instance_id,
                    resource_type='ec2_instance',
                    issue='Instance has public IP but no security groups',
                    severity=Severity.CRITICAL,
                    recommendation='Instance should have security groups configured',
                    framework='CIS-AWS-4.1.0',
                    region=self.region_name
                ))
        
        return findings
    
    def _check_ec2_security_groups(self, instance: Dict) -> List[SecurityFinding]:
        """Check EC2 instance security groups for misconfigurations."""
        findings = []
        instance_id = instance['InstanceId']
        
        security_groups = instance.get('SecurityGroups', [])
        
        for sg in security_groups:
                            sg_id = sg['GroupId']
            
            try:
                # Get security group details
                sg_response = self.ec2.describe_security_groups(GroupIds=[sg_id])
                sg_details = sg_response['SecurityGroups'][0]
                
                # Check ingress rules
                ingress_rules = sg_details.get('IpPermissions', [])
                for rule in ingress_rules:
                    # Check for 0.0.0.0/0 (anywhere)
                    for ip_range in rule.get('IpRanges', []):
                        cidr = ip_range.get('CidrIp', '')
                        if cidr == '0.0.0.0/0' or cidr == '::/0':
                            port = rule.get('FromPort', 'All')
                            protocol = rule.get('IpProtocol', '-1')
                            
                            severity = Severity.CRITICAL if port in [22, 3389] else Severity.HIGH
                            
                            findings.append(SecurityFinding(
                                resource_id=f"{instance_id}:{sg_id}",
                                resource_type='ec2_security_group',
                                issue=f'Security group allows inbound traffic from anywhere (0.0.0.0/0) on port {port}/{protocol}',
                                severity=severity,
                                recommendation='Restrict ingress rules to specific IP ranges or security groups',
                                framework='CIS-AWS-4.1.0',
                                region=self.region_name
                            ))
        except ClientError as e:
                logger.warning(f"Error checking security group {sg_id}: {e}")
                continue
        
        return findings
    
    def _check_ec2_user_data(self, instance: Dict) -> List[SecurityFinding]:
        """Check EC2 instance user data for sensitive information."""
        findings = []
        instance_id = instance['InstanceId']
        
        # User data might contain sensitive info, but we can't easily check this
        # without additional permissions. This is a placeholder for more advanced checks.
        # In production, you might want to use AWS Config or Systems Manager to check this.
        
        return findings
    
    def _scan_iam_policies(self) -> List[SecurityFinding]:
        """Scan IAM policies for comprehensive misconfigurations.
        
        Returns:
            List of IAM-related security findings
        """
        findings = []
        
        try:
            # Scan user policies
            findings.extend(self._scan_iam_users())
            
            # Scan role policies
            findings.extend(self._scan_iam_roles())
            
            # Scan group policies
            findings.extend(self._scan_iam_groups())
            
        except ClientError as e:
            error_msg = f"Error scanning IAM policies: {e}"
            logger.error(error_msg)
            raise AWSAPIError(error_msg) from e
        
        return findings
    
    def _scan_iam_users(self) -> List[SecurityFinding]:
        """Scan IAM users for misconfigurations."""
        findings = []
        
        try:
            paginator = self.iam.get_paginator('list_users')
            pages = paginator.paginate()
            
            for page in pages:
                for user in page['Users']:
                    user_name = user['UserName']
                    
                    try:
                        # Check for MFA
                        mfa_devices = self.iam.list_mfa_devices(UserName=user_name)
                        if not mfa_devices['MFADevices']:
                            # Check if user has console access
                            try:
                                login_profile = self.iam.get_login_profile(UserName=user_name)
                                if login_profile.get('LoginProfile'):
                                    findings.append(SecurityFinding(
                                        resource_id=user_name,
                                        resource_type='iam_user',
                                        issue='IAM user with console access has no MFA enabled',
                                        severity=Severity.CRITICAL,
                                        recommendation='Enable MFA for all users with console access',
                                        framework='CIS-AWS-1.7.0',
                                        region='global',
                                        remediation_code=f"aws iam create-virtual-mfa-device --virtual-mfa-device-name {user_name}-mfa"
                                    ))
                            except ClientError as e:
                                if e.response['Error']['Code'] != 'NoSuchEntity':
                                    logger.warning(f"Error checking login profile for {user_name}: {e}")
                        
                        # Check access keys
                        access_keys = self.iam.list_access_keys(UserName=user_name)
                        for key_metadata in access_keys.get('AccessKeyMetadata', []):
                            key_id = key_metadata['AccessKeyId']
                            
                            # Check last used
                            try:
                                last_used = self.iam.get_access_key_last_used(AccessKeyId=key_id)
                                last_used_date = last_used['AccessKeyLastUsed'].get('LastUsedDate')
                                
                                if last_used_date:
                                    from datetime import datetime, timezone, timedelta
                                    days_since_use = (datetime.now(timezone.utc) - last_used_date).days
                                    if days_since_use > 90:
                                        findings.append(SecurityFinding(
                                            resource_id=f"{user_name}:{key_id}",
                                            resource_type='iam_access_key',
                                            issue=f'Access key not used in {days_since_use} days',
                                            severity=Severity.MEDIUM,
                                            recommendation='Delete unused access keys to reduce attack surface',
                                            framework='CIS-AWS-1.4.0',
                                            region='global',
                                            remediation_code=f"aws iam delete-access-key --user-name {user_name} --access-key-id {key_id}"
                                        ))
                            except ClientError as e:
                                logger.warning(f"Error checking access key usage for {key_id}: {e}")
                        
                        # Check for inline policies
                        inline_policies = self.iam.list_user_policies(UserName=user_name)
                        if inline_policies['PolicyNames']:
                            # Check each inline policy
                            for policy_name in inline_policies['PolicyNames']:
                                policy_doc = self.iam.get_user_policy(
                                    UserName=user_name,
                                    PolicyName=policy_name
                                )
                                findings.extend(self._analyze_policy_document(
                                    policy_doc['PolicyDocument'],
                                    f"{user_name}:{policy_name}",
                                    'iam_user_policy'
                                ))
                        
                        # Check attached policies
                        attached_policies = self.iam.list_attached_user_policies(UserName=user_name)
                        for policy in attached_policies.get('AttachedPolicies', []):
                            policy_arn = policy['PolicyArn']
                            findings.extend(self._analyze_managed_policy(policy_arn, f"{user_name}:{policy_arn}", 'iam_user_policy'))
                    
                    except Exception as e:
                        logger.warning(f"Error scanning user {user_name}: {e}")
                        continue
        
        except ClientError as e:
            logger.error(f"Error listing IAM users: {e}")
            raise
        
        return findings
    
    def _scan_iam_roles(self) -> List[SecurityFinding]:
        """Scan IAM roles for misconfigurations."""
        findings = []
        
        try:
            paginator = self.iam.get_paginator('list_roles')
            pages = paginator.paginate()
            
            for page in pages:
                for role in page['Roles']:
                    role_name = role['RoleName']
                    
                    try:
                        # Check trust policy
                        role_details = self.iam.get_role(RoleName=role_name)
                        trust_policy = role_details['Role'].get('AssumeRolePolicyDocument', {})
                        
                        # Check for wildcard in trust policy
                        if isinstance(trust_policy, str):
                            trust_policy = json.loads(trust_policy)
                        
                        statements = trust_policy.get('Statement', [])
                        for statement in statements:
                            principal = statement.get('Principal', {})
                            if principal.get('AWS') == '*':
                                findings.append(SecurityFinding(
                                    resource_id=role_name,
                                    resource_type='iam_role',
                                    issue='IAM role trust policy allows any AWS account to assume role',
                                    severity=Severity.CRITICAL,
                                    recommendation='Restrict trust policy to specific accounts or services',
                                    framework='CIS-AWS-1.22.0',
                                    region='global'
                                ))
                    
                    except Exception as e:
                        logger.warning(f"Error scanning role {role_name}: {e}")
                        continue
        
        except ClientError as e:
            logger.error(f"Error listing IAM roles: {e}")
            raise
        
        return findings
    
    def _scan_iam_groups(self) -> List[SecurityFinding]:
        """Scan IAM groups for misconfigurations."""
        findings = []
        
        try:
            paginator = self.iam.get_paginator('list_groups')
            pages = paginator.paginate()
            
            for page in pages:
                for group in page['Groups']:
                    group_name = group['GroupName']
                    
                    try:
                        # Check attached policies
                        attached_policies = self.iam.list_attached_group_policies(GroupName=group_name)
                        for policy in attached_policies.get('AttachedPolicies', []):
                            policy_arn = policy['PolicyArn']
                            findings.extend(self._analyze_managed_policy(policy_arn, f"{group_name}:{policy_arn}", 'iam_group_policy'))
                    
                    except Exception as e:
                        logger.warning(f"Error scanning group {group_name}: {e}")
                        continue
        
        except ClientError as e:
            logger.error(f"Error listing IAM groups: {e}")
            raise
        
        return findings
    
    def _analyze_policy_document(self, policy_doc: Dict, resource_id: str, resource_type: str) -> List[SecurityFinding]:
        """Analyze IAM policy document for insecure configurations."""
        findings = []
        
        if isinstance(policy_doc, str):
            policy_doc = json.loads(policy_doc)
        
        statements = policy_doc.get('Statement', [])
        
        for statement in statements:
            # Check for overly permissive actions
            actions = statement.get('Action', [])
            if not isinstance(actions, list):
                actions = [actions]
            
            for action in actions:
                if action == '*':
                    findings.append(SecurityFinding(
                        resource_id=resource_id,
                        resource_type=resource_type,
                        issue='IAM policy allows all actions (*)',
                        severity=Severity.CRITICAL,
                        recommendation='Use least privilege principle. Grant only necessary permissions.',
                        framework='CIS-AWS-1.16.0',
                        region='global'
                    ))
                    break
            
            # Check for overly permissive resources
            resources = statement.get('Resource', [])
            if not isinstance(resources, list):
                resources = [resources]
            
            for resource in resources:
                if resource == '*':
                    findings.append(SecurityFinding(
                        resource_id=resource_id,
                        resource_type=resource_type,
                        issue='IAM policy allows access to all resources (*)',
                        severity=Severity.HIGH,
                        recommendation='Restrict resource access to specific resources',
                        framework='CIS-AWS-1.16.0',
                        region='global'
                    ))
                    break
        
        return findings
    
    def _analyze_managed_policy(self, policy_arn: str, resource_id: str, resource_type: str) -> List[SecurityFinding]:
        """Analyze managed IAM policy for insecure configurations."""
        findings = []
        
        try:
            policy = self.iam.get_policy(PolicyArn=policy_arn)
            policy_version = self.iam.get_policy_version(
                PolicyArn=policy_arn,
                VersionId=policy['Policy']['DefaultVersionId']
            )
            
            policy_doc = policy_version['PolicyVersion']['Document']
            findings.extend(self._analyze_policy_document(policy_doc, resource_id, resource_type))
        
        except ClientError as e:
            logger.warning(f"Error analyzing policy {policy_arn}: {e}")
        
        return findings
    
    def _scan_rds_instances(self) -> List[SecurityFinding]:
        """Scan RDS instances for misconfigurations."""
        findings = []
        
        try:
            paginator = self.rds.get_paginator('describe_db_instances')
            pages = paginator.paginate()
            
            for page in pages:
                for db_instance in page.get('DBInstances', []):
                    db_id = db_instance['DBInstanceIdentifier']
                    
                    try:
                        # Check encryption
                        if not db_instance.get('StorageEncrypted', False):
                            findings.append(SecurityFinding(
                                resource_id=db_id,
                                resource_type='rds_instance',
                                issue='RDS instance encryption not enabled',
                                severity=Severity.HIGH,
                                recommendation='Enable encryption for RDS instances',
                                framework='CIS-AWS-3.1.0',
                                region=self.region_name
                            ))
                        
                        # Check public accessibility
                        if db_instance.get('PubliclyAccessible', False):
                            findings.append(SecurityFinding(
                                resource_id=db_id,
                                resource_type='rds_instance',
                                issue='RDS instance is publicly accessible',
                                severity=Severity.HIGH,
                                recommendation='Disable public accessibility. Use VPC security groups for access.',
                                framework='CIS-AWS-3.2.0',
                                region=self.region_name
                            ))
                    
                    except Exception as e:
                        logger.warning(f"Error scanning RDS instance {db_id}: {e}")
                        continue
        
        except ClientError as e:
            logger.error(f"Error listing RDS instances: {e}")
            # Don't raise - RDS might not be available in all regions
        
        return findings
    
    def _scan_cloudtrail(self) -> List[SecurityFinding]:
        """Scan CloudTrail for misconfigurations."""
        findings = []
        
        try:
            trails = self.cloudtrail.list_trails()
            
            if not trails.get('Trails'):
                findings.append(SecurityFinding(
                    resource_id='cloudtrail',
                    resource_type='cloudtrail',
                    issue='No CloudTrail trails configured',
                    severity=Severity.CRITICAL,
                    recommendation='Enable CloudTrail for audit logging',
                    framework='CIS-AWS-3.1.0',
                    region=self.region_name
                ))
            else:
                for trail_info in trails['Trails']:
                    trail_arn = trail_info['TrailARN']
                    trail_name = trail_info['Name']
                    
                    try:
                        trail = self.cloudtrail.get_trail(Name=trail_name)
                        trail_details = trail['Trail']
                        
                        # Check logging status
                        status = self.cloudtrail.get_trail_status(Name=trail_name)
                        if not status.get('IsLogging', False):
                            findings.append(SecurityFinding(
                                resource_id=trail_name,
                                resource_type='cloudtrail',
                                issue='CloudTrail logging is not enabled',
                                severity=Severity.CRITICAL,
                                recommendation='Enable CloudTrail logging',
                                framework='CIS-AWS-3.2.0',
                                region=self.region_name,
                                remediation_code=f"aws cloudtrail start-logging --name {trail_name}"
                            ))
                        
                        # Check file validation
                        if not trail_details.get('LogFileValidationEnabled', False):
                            findings.append(SecurityFinding(
                                resource_id=trail_name,
                                resource_type='cloudtrail',
                                issue='CloudTrail file validation not enabled',
                                severity=Severity.MEDIUM,
                                recommendation='Enable log file validation for integrity',
                                framework='CIS-AWS-3.3.0',
                                region=self.region_name
                            ))
                    
                    except Exception as e:
                        logger.warning(f"Error scanning CloudTrail {trail_name}: {e}")
                        continue
        
        except ClientError as e:
            logger.error(f"Error listing CloudTrail: {e}")
            # Don't raise - CloudTrail might not be configured
        
        return findings
    
    def _scan_config(self) -> List[SecurityFinding]:
        """Scan AWS Config for misconfigurations."""
        findings = []
        
        try:
            recorders = self.config.describe_configuration_recorders()
            
            if not recorders.get('ConfigurationRecorders'):
                findings.append(SecurityFinding(
                    resource_id='config',
                    resource_type='config',
                    issue='AWS Config not configured',
                    severity=Severity.HIGH,
                    recommendation='Enable AWS Config for resource inventory and compliance',
                    framework='CIS-AWS-3.4.0',
                    region=self.region_name
                ))
            else:
                for recorder in recorders['ConfigurationRecorders']:
                    recorder_name = recorder['name']
                    
                    # Check recording status
                    status = self.config.describe_configuration_recorder_status(
                        ConfigurationRecorderNames=[recorder_name]
                    )
                    
                    recorder_status = status['ConfigurationRecorderStatuses'][0]
                    if not recorder_status.get('recording', False):
                        findings.append(SecurityFinding(
                            resource_id=recorder_name,
                            resource_type='config_recorder',
                            issue='AWS Config recorder is not recording',
                            severity=Severity.HIGH,
                            recommendation='Enable Config recorder',
                            framework='CIS-AWS-3.4.0',
                            region=self.region_name
                        ))
        
        except ClientError as e:
            logger.error(f"Error listing AWS Config: {e}")
            # Don't raise - Config might not be configured
        
        return findings


# Example usage
if __name__ == "__main__":
    # Initialize scanner
    scanner = CSPMScanner(region_name='us-east-1')
    
    # Scan all resources
    try:
        findings = scanner.scan_cloud_posture()
        
        # Print findings
        print(f"\nFound {len(findings)} security findings:\n")
        for finding in findings:
            print(f"[{finding.severity.value.upper()}] {finding.resource_type}: {finding.issue}")
            print(f"  Resource: {finding.resource_id}")
            print(f"  Recommendation: {finding.recommendation}")
            if finding.remediation_code:
                print(f"  Remediation: {finding.remediation_code}")
            print()
    except CSPMError as e:
        print(f"Error: {e}")

Unit Tests:

# test_cspm_scanner.py
import pytest
from unittest.mock import Mock, patch, MagicMock
from cspm_scanner import (
    CSPMScanner,
    SecurityFinding,
    Severity,
    CSPMError,
    AWSAPIError
)
from botocore.exceptions import ClientError


class TestCSPMScanner:
    """Unit tests for CSPMScanner."""
    
    @pytest.fixture
    def mock_boto_session(self):
        """Create mock boto3 session."""
        with patch('cspm_scanner.boto3.Session') as mock_session:
            mock_session_instance = Mock()
            mock_session.return_value = mock_session_instance
            yield mock_session_instance
    
    @pytest.fixture
    def scanner(self, mock_boto_session):
        """Create CSPMScanner instance with mocked AWS clients."""
        mock_boto_session.client.return_value = Mock()
        scanner = CSPMScanner(region_name='us-east-1')
        scanner.s3 = Mock()
        scanner.ec2 = Mock()
        scanner.iam = Mock()
        scanner.rds = Mock()
        scanner.cloudtrail = Mock()
        scanner.config = Mock()
        return scanner
    
    def test_scan_s3_buckets_no_public_access_block(self, scanner):
        """Test S3 scanning detects missing public access block."""
        # Mock S3 list_buckets
        scanner.s3.list_buckets.return_value = {
            'Buckets': [{'Name': 'test-bucket'}]
        }
        
        # Mock get_public_access_block to raise NoSuchPublicAccessBlockConfiguration
        scanner.s3.get_public_access_block.side_effect = ClientError(
            {'Error': {'Code': 'NoSuchPublicAccessBlockConfiguration'}},
            'GetPublicAccessBlock'
        )
        
        # Mock other checks
        scanner.s3.get_bucket_encryption.side_effect = ClientError(
            {'Error': {'Code': 'ServerSideEncryptionConfigurationNotFoundError'}},
            'GetBucketEncryption'
        )
        scanner.s3.get_bucket_versioning.return_value = {'Status': 'Suspended'}
        scanner.s3.get_bucket_logging.return_value = {}
        scanner.s3.get_bucket_policy.side_effect = ClientError(
            {'Error': {'Code': 'NoSuchBucketPolicy'}},
            'GetBucketPolicy'
        )
        scanner.s3.get_bucket_acl.return_value = {'Grants': []}
        
        findings = scanner._scan_s3_buckets()
        
        assert len(findings) > 0
        critical_findings = [f for f in findings if f.severity == Severity.CRITICAL]
        assert len(critical_findings) > 0
        assert any('public access block' in f.issue.lower() for f in critical_findings)
    
    def test_scan_ec2_instances_no_imdsv2(self, scanner):
        """Test EC2 scanning detects missing IMDSv2."""
        # Mock describe_instances
        scanner.ec2.get_paginator.return_value.paginate.return_value = [{
            'Reservations': [{
                'Instances': [{
                    'InstanceId': 'i-1234567890abcdef0',
                    'MetadataOptions': {
                        'HttpTokens': 'optional',
                        'HttpEndpoint': 'enabled'
                    },
                    'SecurityGroups': [],
                    'PublicIpAddress': None
                }]
            }]
        }]
        
        findings = scanner._scan_ec2_instances()
        
        assert len(findings) > 0
        imdsv2_findings = [f for f in findings if 'IMDSv2' in f.issue]
        assert len(imdsv2_findings) > 0
    
    def test_scan_iam_users_no_mfa(self, scanner):
        """Test IAM scanning detects users without MFA."""
        # Mock list_users
        scanner.iam.get_paginator.return_value.paginate.return_value = [{
            'Users': [{'UserName': 'test-user'}]
        }]
        
        # Mock MFA devices (empty = no MFA)
        scanner.iam.list_mfa_devices.return_value = {'MFADevices': []}
        
        # Mock login profile (user has console access)
        scanner.iam.get_login_profile.return_value = {'LoginProfile': {}}
        
        # Mock access keys (empty)
        scanner.iam.list_access_keys.return_value = {'AccessKeyMetadata': []}
        
        # Mock user policies (empty)
        scanner.iam.list_user_policies.return_value = {'PolicyNames': []}
        scanner.iam.list_attached_user_policies.return_value = {'AttachedPolicies': []}
        
        findings = scanner._scan_iam_users()
        
        assert len(findings) > 0
        mfa_findings = [f for f in findings if 'MFA' in f.issue]
        assert len(mfa_findings) > 0


if __name__ == '__main__':
    pytest.main([__file__, '-v'])

Step 2) Check compliance

Click to view code
from typing import Dict, List, Set

class ComplianceChecker:
    """Checks findings against compliance frameworks."""
    
    COMPLIANCE_FRAMEWORKS = {
        "CIS-AWS": {
            "required_severities": {Severity.HIGH, Severity.CRITICAL},
            "controls": ["s3_public_access", "encryption", "iam_policies"]
        },
        "PCI-DSS": {
            "required_severities": {Severity.MEDIUM, Severity.HIGH, Severity.CRITICAL},
            "controls": ["encryption", "access_controls", "logging"]
        },
        "HIPAA": {
            "required_severities": {Severity.HIGH, Severity.CRITICAL},
            "controls": ["encryption", "access_controls", "audit_logging"]
        },
        "SOC2": {
            "required_severities": {Severity.MEDIUM, Severity.HIGH, Severity.CRITICAL},
            "controls": ["access_controls", "encryption", "monitoring"]
        }
    }
    
    def check_compliance(self, findings: List[SecurityFinding], 
                        frameworks: List[str] = None) -> Dict[str, Dict]:
        """Check findings against compliance standards.
        
        Args:
            findings: List of security findings
            frameworks: List of framework names to check (None = all)
            
        Returns:
            Dictionary mapping framework names to compliance results
        """
        if frameworks is None:
            frameworks = list(self.COMPLIANCE_FRAMEWORKS.keys())
        
        compliance_results = {}
        
        for framework_name in frameworks:
            if framework_name not in self.COMPLIANCE_FRAMEWORKS:
                logger.warning(f"Unknown framework: {framework_name}")
                continue
            
            framework = self.COMPLIANCE_FRAMEWORKS[framework_name]
            required_severities = framework["required_severities"]
            
            violations = [
                f for f in findings
                if f.severity in required_severities
                and (f.framework == framework_name or f.framework is None)
            ]
            
            compliance_results[framework_name] = {
                'compliant': len(violations) == 0,
                'violations': violations,
                'violation_count': len(violations),
                'total_findings': len(findings),
                'compliance_percentage': (1 - len(violations) / max(len(findings), 1)) * 100
            }
        
        return compliance_results

Comparison: CSPM Solutions

Solution TypeDetection SpeedCompliance CoverageRemediationMulti-CloudCost/Resource
Cloud-Native (AWS Config, etc.)Real-timeSingle cloudManualNo$0.10-0.30
Third-Party (Prowler, etc.)1-24 hoursMulti-cloudAutomatedYes$0.20-0.50
Custom (This Guide)Real-timeCustomAutomatedYes$0.05-0.15
Manual Assessment30-90 daysLimitedManualLimited$50-200/hour

Why Custom CSPM Wins:

  • Real-time detection: Immediate notification of misconfigurations
  • Full control: Customize rules to your specific needs
  • Cost-effective: No per-resource licensing fees
  • Multi-cloud: Single solution for all cloud providers

Advanced Scenarios

Scenario 1: Basic CSPM Setup

Objective: Set up basic CSPM. Steps: Configure scanner, define rules, enable monitoring. Expected: Basic CSPM operational.

Scenario 2: Intermediate Compliance Monitoring

Objective: Monitor compliance with frameworks. Steps: Map rules to frameworks, generate reports, track compliance. Expected: Compliance monitoring operational.

Scenario 3: Advanced Automated Remediation

Objective: Automate remediation of misconfigurations. Steps: CSPM + automation + remediation + reporting + optimization. Expected: Complete CSPM with automation.

Theory and “Why” CSPM Works

Why Continuous Monitoring is Essential

  • Cloud environments change constantly
  • Misconfigurations occur frequently
  • Real-time detection prevents exposure
  • Automated remediation reduces risk

Why Multi-Cloud CSPM Matters

  • Organizations use multiple clouds
  • Unified visibility reduces complexity
  • Consistent security posture
  • Centralized management

Comprehensive Troubleshooting

Issue: Scanner Fails to Access Resources

Diagnosis: Check IAM permissions, verify API access, review credentials. Solutions: Fix IAM permissions, enable APIs, update credentials.

Issue: High False Positive Rate

Diagnosis: Review rules, check configurations, analyze findings. Solutions: Refine rules, improve context, reduce false positives.

Issue: Scan Performance Issues

Diagnosis: Check API limits, review scan scope, measure duration. Solutions: Optimize scan scope, implement rate limiting, improve efficiency.

Cleanup

# Clean up CSPM resources
cspm_system.cleanup()
# Remove configurations
# Clean up scan results if needed

Real-World Case Study

Challenge: A healthcare organization managed 500+ AWS resources across 12 accounts. They experienced:

  • 250+ misconfigurations discovered during annual audit
  • 3 security incidents from misconfigurations
  • $150K in compliance violation fines
  • 45-day average time to detect misconfigurations
  • Failed SOC 2 audit due to configuration issues

Solution: Implemented comprehensive CSPM:

  • Automated scanning every 6 hours across all accounts
  • Real-time monitoring for critical resources
  • Compliance checking against HIPAA, SOC 2, and CIS benchmarks
  • Automated remediation for 70% of common issues
  • Integration with ticketing system for manual fixes

Implementation Details:

  • Deployed scanning infrastructure (Lambda functions)
  • Configured 150+ security rules
  • Set up compliance frameworks (HIPAA, SOC 2, CIS)
  • Automated remediation playbooks
  • Dashboard for security team visibility

Results:

  • 85% reduction in misconfigurations: From 250 to 37.5 average
  • 90% improvement in compliance: Passed all audits
  • 100% resource coverage: All resources continuously monitored
  • 70% automated remediation: 175 issues fixed automatically
  • 24-hour detection: Down from 45 days average
  • $120K cost savings: Reduced audit and incident costs
  • Zero compliance violations: Passed SOC 2 and HIPAA audits
  • 300% ROI: Return on investment in first year

Lessons Learned:

  • Continuous scanning essential (caught 40 issues within 24 hours of creation)
  • Automated remediation saved 200+ hours of manual work
  • Compliance checking prevented 3 potential audit failures
  • Multi-account visibility critical for large organizations

Testing Your Code

Unit Tests

Click to view test code
import pytest
from unittest.mock import Mock, patch

class TestCSPMScanner:
    """Unit tests for CSPMScanner."""
    
    def test_scan_s3_buckets_finds_misconfigurations(self):
        """Test S3 bucket scanning finds misconfigurations."""
        scanner = CSPMScanner()
        with patch.object(scanner.s3, 'list_buckets') as mock_list:
            mock_list.return_value = {'Buckets': [{'Name': 'test-bucket'}]}
            with patch.object(scanner.s3, 'get_public_access_block') as mock_pab:
                mock_pab.side_effect = ClientError(
                    {'Error': {'Code': 'NoSuchPublicAccessBlockConfiguration'}},
                    'GetPublicAccessBlock'
                )
                findings = scanner._scan_s3_buckets()
                assert len(findings) > 0
                assert any(f.severity == Severity.CRITICAL for f in findings)
    
    def test_compliance_checker_identifies_violations(self):
        """Test compliance checker identifies violations."""
        checker = ComplianceChecker()
        findings = [
            SecurityFinding(
                resource_id='bucket1',
                resource_type='s3_bucket',
                issue='Public access',
                severity=Severity.CRITICAL,
                recommendation='Fix it',
                framework='CIS-AWS'
            )
        ]
        results = checker.check_compliance(findings, ['CIS-AWS'])
        assert not results['CIS-AWS']['compliant']
        assert results['CIS-AWS']['violation_count'] == 1

Validation: Run pytest test_cspm.py to verify all tests pass.


CSPM Architecture Diagram

Recommended Diagram: CSPM Workflow

    Cloud Infrastructure

    Continuous Scanning
    (APIs, Configs, Policies)

    ┌────┴────┬──────────┬──────────┐
    ↓         ↓          ↓          ↓
Misconfig  Compliance  Risk      Remediation
Detection   Checking   Scoring   (Automated)
    ↓         ↓          ↓          ↓
    └────┬────┴──────────┴──────────┘

    Security Posture
    Dashboard

CSPM Flow:

  • Infrastructure continuously scanned
  • Misconfigurations detected
  • Compliance verified
  • Risk scored and prioritized
  • Remediation automated

Limitations and Trade-offs

CSPM Limitations

Coverage:

  • Cannot detect all misconfigurations
  • May miss certain issue types
  • Requires comprehensive scanning
  • Multiple tools help
  • Continuous scanning needed

False Positives:

  • May flag false positives
  • Requires validation
  • Context important
  • Tuning needed
  • Continuous improvement

Remediation:

  • Automated remediation may be risky
  • Requires careful configuration
  • May break legitimate configurations
  • Manual review important
  • Gradual automation recommended

CSPM Trade-offs

Comprehensiveness vs. Performance:

  • More comprehensive = thorough but slower
  • Faster scans = quick but may miss issues
  • Balance based on requirements
  • Deep scans for compliance
  • Quick scans for continuous monitoring

Automation vs. Control:

  • More automation = faster but less control
  • More control = safer but slow
  • Balance based on risk
  • Automate low-risk
  • Manual for critical

Coverage vs. Cost:

  • More coverage = better security but expensive
  • Less coverage = cheaper but gaps
  • Balance based on budget
  • Prioritize critical resources
  • Cost optimization strategies

When CSPM May Be Challenging

Multi-Cloud:

  • Multiple clouds complicate scanning
  • Requires unified approach
  • Different APIs and formats
  • Consistent policies needed
  • Specialized tools help

Legacy Resources:

  • Legacy resources may not be scannable
  • May require special handling
  • Limited API support
  • Gradual migration approach
  • Hybrid solutions may be needed

Dynamic Environments:

  • Rapid changes complicate tracking
  • May generate noise
  • Requires prioritization
  • Baseline establishment important
  • Continuous tuning needed

FAQ

Q: What compliance standards can CSPM check?

A: Common standards supported:

  • CIS Benchmarks: Cloud-specific security best practices
  • NIST Framework: Cybersecurity framework controls
  • PCI-DSS: Payment card industry requirements
  • GDPR: European data protection regulation
  • HIPAA: US healthcare data protection
  • SOC 2: Security and availability controls
  • ISO 27001: Information security management
  • AWS Well-Architected: AWS-specific best practices

Q: How often should I run CSPM scans?

A: Recommended frequency:

  • Continuous: Real-time monitoring for critical resources
  • Daily: Full account scans for compliance
  • Weekly: Comprehensive posture assessment
  • On-demand: Before major deployments or changes
  • After incidents: Immediate scans after security events

Q: How do I reduce false positives in CSPM?

A: Strategies:

  • Baseline establishment: Learn normal configurations over time
  • Context awareness: Consider resource purpose and environment
  • Whitelisting: Whitelist known-good configurations
  • Severity tuning: Adjust severity thresholds based on risk
  • Machine learning: Use ML to reduce false positives by 60-80%

Q: Can CSPM automatically remediate issues?

A: Yes, with proper configuration:

  • Automated fixes: For low-risk, well-understood issues
  • Approval workflows: For high-risk changes requiring review
  • Rollback capabilities: Ability to undo automated changes
  • Audit trails: Complete logging of all remediation actions
  • Testing: Validate fixes in non-production first

Q: How does CSPM work across multiple cloud providers?

A: Multi-cloud CSPM:

  • Unified dashboard: Single view across AWS, Azure, GCP
  • Provider-specific APIs: Uses each cloud’s native APIs
  • Normalized findings: Standardized format across providers
  • Cross-cloud policies: Apply same policies everywhere
  • Cost optimization: Identify security and cost issues together

Q: What’s the difference between CSPM and CWPP?

A: Key differences:

  • CSPM: Focuses on configuration and compliance (misconfigurations)
  • CWPP: Focuses on workload protection (runtime threats)
  • CSPM: Prevents issues before deployment
  • CWPP: Detects and responds to active threats
  • Best practice: Use both for comprehensive cloud security

Q: How much does CSPM cost?

A: Cost factors:

  • Per-resource pricing: $0.10-0.50 per resource per month
  • Per-account pricing: $500-2000 per account per month
  • Enterprise pricing: Custom pricing for large deployments
  • Open-source options: Free but require more management
  • ROI: Typically 300-500% ROI from prevented incidents

Code Review Checklist for CSPM

Scanning Configuration

  • All cloud accounts/projects included in scans
  • Scan frequency appropriate for organization
  • Scan scope covers all resource types
  • Scan credentials stored securely

Compliance Checking

  • Compliance frameworks properly configured
  • Compliance rules tested and validated
  • Compliance exceptions documented
  • Compliance reports generated correctly

Remediation

  • Remediation actions tested in sandbox
  • Automated remediation rules reviewed
  • Manual approval for critical remediations
  • Remediation logs maintained

Security

  • CSPM tool access restricted
  • Scan results stored securely
  • No credentials in scan results
  • Audit logs maintained

Integration

  • Integration with ticketing systems tested
  • Alerting configured appropriately
  • Reports accessible to authorized users
  • Dashboard access controlled

Conclusion

CSPM continuously assesses and improves cloud security posture. Implement automated scanning, compliance checking, and remediation to maintain strong security.

Cleanup

After testing, clean up CSPM resources:

Click to view cleanup commands
# Remove CloudWatch alarms (if created)
aws cloudwatch delete-alarms --alarm-names cspm-*

# Remove Lambda functions (if created)
aws lambda delete-function --function-name cspm-scanner

# Remove IAM roles (if created)
aws iam delete-role-policy --role-name CSPMScannerRole --policy-name CSPMScannerPolicy
aws iam delete-role --role-name CSPMScannerRole

# Verify cleanup
aws cloudwatch describe-alarms --alarm-name-prefix cspm
# Should return empty list

Validation: Verify no CSPM resources remain in your cloud account.


Educational Use Only: This content is for educational purposes. Only assess accounts you own or have explicit authorization.

Similar Topics

FAQs

Can I use these labs in production?

No—treat them as educational. Adapt, review, and security-test before any production use.

How should I follow the lessons?

Start from the Learn page order or use Previous/Next on each lesson; both flow consistently.

What if I lack test data or infra?

Use synthetic data and local/lab environments. Never target networks or data you don't own or have written permission to test.

Can I share these materials?

Yes, with attribution and respecting any licensing for referenced tools or datasets.