Phishing attack email on computer screen with warning indicators and security alerts
Learn Cybersecurity

Build a Simple AI-Based Phishing Detector (Beginner Tutor...

Train a lightweight phishing classifier with text features, evaluate accuracy, and add anti-spoofing safeguards.Learn essential cybersecurity strategies and ...

phishing detection ml ai security text classification email security

Train and test a lightweight phishing detector end to end with synthetic email data, clear validation, and safety guardrails.

Table of Contents

  1. Understanding Phishing Detection
  2. Setting Up the Project
  3. Generating a Labeled Dataset
  4. Training the Classifier
  5. Real-Time Scoring Loop
  6. Defense in Depth Guardrails
  7. What This Lesson Does NOT Cover
  8. Limitations and Trade-offs
  9. Cleanup
  10. Career Alignment

TL;DR

Build a basic AI phishing detector using Python and scikit-learn. Learn to transform text into numerical features with TF-IDF, train a Logistic Regression model, and implement defense-in-depth guardrails to handle false positives and adversarial evasions.

Learning Outcomes (You Will Be Able To)

By the end of this lesson, you will be able to:

  • Explain why signature-based filters miss modern AI-generated phishing
  • Convert raw email text into machine-learning features using TF-IDF
  • Build a binary classifier to distinguish between benign and phishing emails
  • Identify the “Intentional Failure” mode when datasets are too small or biased
  • Map AI phishing risks to specific security controls (SPF/DKIM/DMARC)

What You’ll Build

  • A small TF-IDF + Logistic Regression text classifier for phishing vs benign emails.
  • Reproducible dataset generation to avoid leaking real PII.
  • Validation after each step plus cleanup.

Prerequisites

  • macOS or Linux with Python 3.12+.
  • pip available; ~200 MB free disk.
  • No email access needed; we generate synthetic samples.
  • Never train on real mailbox data without explicit approval and PII scrubbing.
  • Avoid storing raw emails; keep hashes or redacted text when possible.
  • Keep humans in the loop for blocking decisions; start with “quarantine + review.”

Step 1) Create an isolated environment

Click to view commands
python3 -m venv .venv-phish
source .venv-phish/bin/activate
pip install --upgrade pip
pip install pandas scikit-learn joblib
Validation: `pip show scikit-learn | grep Version` should be 1.5.x or newer.

Common fix: If activation fails, run chmod +x .venv-phish/bin/activate.

Step 2) Generate a synthetic labeled dataset

Click to view commands
cat > make_dataset.py <<'PY'
import pandas as pd

phish_samples = [
    ("Your account is locked. Verify immediately at http://fake-bank.com", 1),
    ("Urgent: update payroll info now or your pay is delayed", 1),
    ("Security alert: login from unknown device. Download the attached form", 1),
    ("Package held: pay customs fee via gift card", 1),
    ("Congrats, you won a prize! Click to claim", 1),
]

benign_samples = [
    ("Team meeting notes and next sprint goals", 0),
    ("Invoice attached for approved purchase order", 0),
    ("Reminder: security training scheduled next week", 0),
    ("Quarterly newsletter and product updates", 0),
    ("Welcome to the platform—getting started guide", 0),
]

df = pd.DataFrame(phish_samples + benign_samples, columns=["text", "label"])
df.to_csv("emails.csv", index=False)
print("Wrote emails.csv with", len(df), "rows")
PY

python make_dataset.py
Validation: `cat emails.csv` should show 10 rows with `text,label`.

Step 3) Train and evaluate the classifier

Click to view commands
cat > train_and_eval.py <<'PY'
import json
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.linear_model import LogisticRegression
from sklearn.pipeline import Pipeline
from sklearn.metrics import classification_report, confusion_matrix

df = pd.read_csv("emails.csv")
X_train, X_test, y_train, y_test = train_test_split(df["text"], df["label"], test_size=0.3, random_state=42, stratify=df["label"])

pipeline = Pipeline([
    ("tfidf", TfidfVectorizer(ngram_range=(1, 2), min_df=1)),
    ("clf", LogisticRegression(max_iter=400, class_weight="balanced")),
])

pipeline.fit(X_train, y_train)
preds = pipeline.predict(X_test)

report = classification_report(y_test, preds, target_names=["benign", "phish"], digits=3, output_dict=True)
cm = confusion_matrix(y_test, preds, labels=[0, 1])

with open("model.json", "w") as f:
    json.dump({"params": pipeline.get_params(deep=False)}, f, indent=2)

print("Confusion matrix [[TN, FP], [FN, TP]]:", cm.tolist())
print("Precision/Recall/F1:", json.dumps(report, indent=2))
PY

python train_and_eval.py
Validation: Expect high precision/recall on this toy set. Sample confusion matrix could be `[[2,0],[0,1]]`. If metrics are poor, increase samples or adjust `ngram_range`.

Intentional Failure Exercise (Important)

AI models are only as good as their training data. Try this:

  1. Modify make_dataset.py: Add 5 “benign” samples that contain the word “Urgent” (e.g., “Urgent: Project deadline moved up”).
  2. Retrain: Run the script and observe how the model starts flagging legitimate “Urgent” emails as phishing.
  3. Lesson: This is “Bias.” If your phishing samples only contain the word “Urgent” and your benign ones don’t, the model learns a shortcut rather than understanding the intent.

Common fixes:

  • ValueError: empty vocabulary => ensure emails.csv is not empty and min_df ≤ sample size.
  • If class imbalance arises, keep class_weight="balanced" or add more phishing examples.

Step 4) Add a simple scoring script with safety checks

Click to view commands
cat > score_email.py <<'PY'
import sys
import joblib
import pandas as pd
from sklearn.pipeline import Pipeline

MODEL_PATH = "model.pkl"

def load_model():
    return joblib.load(MODEL_PATH)

def main():
    if len(sys.argv) < 2:
        print("Usage: python score_email.py 'email text'")
        sys.exit(1)
    text = sys.argv[1]
    model: Pipeline = load_model()
    proba = model.predict_proba([text])[0][1]
    print(f"phish_probability={proba:.3f}")
    if proba > 0.7:
        print("Action: quarantine and send to human review")

if __name__ == "__main__":
    main()
PY
Save the trained model:
Click to view commands
pip install joblib
python - <<'PY'
import joblib
from sklearn.pipeline import Pipeline
import pandas as pd
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.linear_model import LogisticRegression

df = pd.read_csv("emails.csv")
pipe = Pipeline([
    ("tfidf", TfidfVectorizer(ngram_range=(1, 2), min_df=1)),
    ("clf", LogisticRegression(max_iter=400, class_weight="balanced")),
])
pipe.fit(df["text"], df["label"])
joblib.dump(pipe, "model.pkl")
print("Saved model.pkl")
PY

python score_email.py "Please reset your password at http://fake.com/reset"
Validation: Output should include `phish_probability` near 0.7–0.9 for the phishing-like sample.

Understanding Why AI Phishing Detection Works

Why Traditional Detection Fails

Signature-Based Limitations: Traditional email filters rely on known patterns and signatures. Modern phishing uses:

  • AI-generated content that evades signatures
  • Personalized attacks that look legitimate
  • Zero-day techniques not yet in databases

Why ML Detection Works:

  • Learns patterns from data, not just signatures
  • Adapts to new phishing techniques
  • Identifies subtle indicators humans miss
  • Scales to handle millions of emails

How TF-IDF + Logistic Regression Works

TF-IDF (Term Frequency-Inverse Document Frequency):

  • Identifies important words in emails
  • Weights words by rarity (phishing terms are rare in legitimate emails)
  • Creates numerical features from text

Logistic Regression:

  • Learns which features indicate phishing
  • Provides probability scores (not just binary)
  • Interpretable (can see which words matter)

Step 5) Add non-ML controls (defense in depth)

Why Defense in Depth Matters

ML Limitations: ML models can be fooled by adversarial examples. Multiple security layers ensure that if one fails, others still protect.

AI Threat → Security Control Mapping

AI RiskReal-World ImpactControl Implemented
Model EvasionAttackers use “good” words to lower scoreHybrid detection (ML + regex for keywords)
Phishing DriftNew lure themes (e.g., QR codes) missedWeekly retraining + human-in-the-loop review
False PositivesCritical business emails blocked”Quarantine + Review” policy (never auto-delete)
Identity Spoofing”Legit” looking sender evades text checkSPF/DKIM/DMARC hard enforcement

Production-Ready Controls:

  • Enforce SPF/DKIM/DMARC on inbound mail; reject or quarantine failures
  • Strip or rewrite links; sandbox attachments separately
  • Log decisions and top contributing features for analyst review (use pipeline["tfidf"].get_feature_names_out() and model coefficients)
  • Rate-limit scoring API to prevent prompt flooding or model abuse

Enhanced Logging Example:

Click to view Python code
import hashlib
import json
from datetime import datetime

def log_decision(email_text: str, probability: float, model, top_features: int = 5):
    """Log decision with top contributing features"""
    # Hash email to protect privacy
    email_hash = hashlib.sha256(email_text.encode()).hexdigest()[:16]
    
    # Get feature names and coefficients
    feature_names = model.named_steps['tfidf'].get_feature_names_out()
    coefficients = model.named_steps['clf'].coef_[0]
    
    # Get top contributing features
    feature_importance = list(zip(feature_names, coefficients))
    top_contributors = sorted(feature_importance, key=lambda x: abs(x[1]), reverse=True)[:top_features]
    
    log_entry = {
        "timestamp": datetime.utcnow().isoformat(),
        "email_hash": email_hash,
        "phishing_probability": probability,
        "decision": "quarantine" if probability > 0.7 else "allow",
        "top_features": [{"feature": f, "coefficient": float(c)} for f, c in top_contributors]
    }
    
    print(json.dumps(log_entry))
    # In production: write to secure log storage

Advanced Scenarios

Scenario 1: Adversarial Phishing Emails

Challenge: Attackers craft emails to evade ML detection

Solution:

  • Use ensemble models (multiple models vote)
  • Add adversarial training examples
  • Implement feature engineering to detect evasion
  • Combine ML with rule-based detection
  • Human review for high-value targets

Scenario 2: Low False Positive Rate

Challenge: Too many false positives overwhelm analysts

Solution:

  • Tune classification threshold
  • Improve feature engineering
  • Add more training data
  • Use confidence scoring
  • Implement feedback loop

Scenario 3: Real-Time Detection

Challenge: Detecting phishing in real-time at scale

Solution:

  • Optimize model for speed
  • Use caching for common patterns
  • Implement rate limiting
  • Scale horizontally
  • Monitor performance metrics

Troubleshooting Guide

Problem: Model accuracy too low

Diagnosis:

# Check confusion matrix
print(confusion_matrix(y_test, y_pred))

# Review feature importance
feature_names = pipeline.named_steps['tfidf'].get_feature_names_out()
coefficients = pipeline.named_steps['clf'].coef_[0]

Solutions:

  • Add more training data
  • Improve feature engineering
  • Tune hyperparameters
  • Try different algorithms
  • Check for data quality issues

Problem: High false positive rate

Diagnosis:

  • Review confusion matrix
  • Analyze false positive patterns
  • Check feature distributions

Solutions:

  • Adjust classification threshold
  • Improve feature selection
  • Add more negative examples
  • Use ensemble methods
  • Implement confidence scoring

Problem: Model not detecting new phishing techniques

Diagnosis:

  • Check if new techniques have different features
  • Compare feature distributions
  • Review model performance over time

Solutions:

  • Retrain with new examples
  • Update feature engineering
  • Use online learning
  • Implement model versioning
  • Regular model updates

Code Review Checklist for AI Phishing Detection

Model Security

  • Training data integrity verified
  • Model versioning and rollback
  • Adversarial robustness tested
  • Performance monitoring

Production Readiness

  • Error handling in all code paths
  • Rate limiting implemented
  • Logging configured (privacy-preserving)
  • Human-in-the-loop processes

Defense in Depth

  • SPF/DKIM/DMARC enforcement
  • Link rewriting/sandboxing
  • Attachment analysis
  • Multiple detection methods

AI Phishing Detection Architecture Diagram

Recommended Diagram: Phishing Detection Pipeline

    Email Input

    Feature Extraction
    (Headers, Content, Links)

    AI Model Analysis
    (ML Classifier)

    ┌────┴────┐
    ↓         ↓
 Legitimate  Phishing
    ↓         ↓
    └────┬────┘

    Action (Block/Quarantine/Allow)

Detection Flow:

  • Email features extracted (headers, content, links)
  • AI model analyzes features
  • Classification as legitimate or phishing
  • Action taken based on result

Phishing Detection Methods Comparison

MethodAccuracyFalse PositivesResource UsageBest For
AI/ML DetectionHigh (90%+)LowMediumContent analysis
URL BlacklistsMedium (70%)Very LowLowKnown malicious URLs
SPF/DKIM/DMARCHigh (85%+)Very LowLowEmail authentication
Reputation ScoringMedium (75%)MediumLowSender analysis
Hybrid ApproachVery High (95%+)Very LowMediumComprehensive defense

Key Insight: Combining multiple methods provides best results. Use AI for content analysis, authentication for sender verification, and blacklists for known threats.


Real World Project: Build an AI System That Detects Phishing Emails Using Real Datasets

This comprehensive project demonstrates building a production-ready phishing detection system using real-world datasets (Enron and SpamAssassin), advanced feature engineering, and deployment-ready code.

Project Overview

Objective: Build a complete AI-powered phishing email detection system that:

  • Processes real email datasets (Enron corpus, SpamAssassin)
  • Extracts comprehensive features (headers, content, URLs, attachments)
  • Trains and evaluates multiple ML models
  • Provides real-time detection API
  • Includes web dashboard for monitoring
  • Implements production-ready security controls

Project Architecture

┌─────────────────┐
│  Email Dataset  │
│ (Enron/SpamAss) │
└────────┬────────┘

┌────────▼────────┐
│ Data Preprocessing│
│ & Feature Extract│
└────────┬────────┘

┌────────▼────────┐
│  ML Training    │
│ (Multiple Models)│
└────────┬────────┘

┌────────▼────────┐
│ Model Evaluation│
│ & Selection     │
└────────┬────────┘

┌────────▼────────┐
│  Detection API  │
│  (Flask/FastAPI)│
└────────┬────────┘

┌────────▼────────┐
│  Web Dashboard  │
│  (Monitoring)   │
└─────────────────┘

Step 1: Dataset Preparation

Click to view complete dataset preparation code
cat > prepare_datasets.py <<'PY'
#!/usr/bin/env python3
"""
Real-World Phishing Detection Dataset Preparation
Uses Enron and SpamAssassin datasets
"""

import os
import email
import re
import pandas as pd
import numpy as np
from pathlib import Path
import urllib.request
import tarfile
import zipfile
from typing import List, Dict, Tuple
import hashlib

class DatasetPreparer:
    """Prepare real-world email datasets for phishing detection"""
    
    def __init__(self, data_dir="datasets"):
        self.data_dir = Path(data_dir)
        self.data_dir.mkdir(exist_ok=True)
        self.enron_url = "https://www.cs.cmu.edu/~enron/enron_mail_20150507.tar.gz"
        self.spamassassin_url = "https://spamassassin.apache.org/old/publiccorpus/"
    
    def download_enron_dataset(self):
        """Download Enron email dataset (legitimate emails)"""
        print("Downloading Enron dataset...")
        enron_file = self.data_dir / "enron.tar.gz"
        
        if not enron_file.exists():
            print("Note: Enron dataset is large (~400MB). Download manually from:")
            print("https://www.cs.cmu.edu/~enron/enron_mail_20150507.tar.gz")
            print("Or use SpamAssassin dataset which is smaller.")
            return False
        
        # Extract if needed
        enron_extracted = self.data_dir / "enron_mail"
        if not enron_extracted.exists():
            print("Extracting Enron dataset...")
            with tarfile.open(enron_file, 'r:gz') as tar:
                tar.extractall(self.data_dir)
        
        return True
    
    def download_spamassassin_dataset(self):
        """Download SpamAssassin dataset (spam/phishing emails)"""
        print("Downloading SpamAssassin dataset...")
        
        # SpamAssassin public corpus files
        spam_files = [
            "20021010_easy_ham.tar.bz2",
            "20021010_hard_ham.tar.bz2",
            "20021010_spam.tar.bz2"
        ]
        
        base_url = "https://spamassassin.apache.org/old/publiccorpus/"
        
        for filename in spam_files:
            filepath = self.data_dir / filename
            if not filepath.exists():
                print(f"Downloading {filename}...")
                try:
                    urllib.request.urlretrieve(
                        base_url + filename,
                        filepath
                    )
                except Exception as e:
                    print(f"Could not download {filename}: {e}")
                    print("You can download manually from:")
                    print(f"{base_url}{filename}")
                    continue
            
            # Extract
            extracted_dir = self.data_dir / filename.replace('.tar.bz2', '')
            if not extracted_dir.exists():
                print(f"Extracting {filename}...")
                with tarfile.open(filepath, 'r:bz2') as tar:
                    tar.extractall(self.data_dir)
        
        return True
    
    def parse_email_file(self, filepath: Path) -> Dict:
        """Parse a single email file"""
        try:
            with open(filepath, 'rb') as f:
                msg = email.message_from_bytes(f.read())
            
            # Extract headers
            subject = msg.get('Subject', '')
            from_addr = msg.get('From', '')
            to_addr = msg.get('To', '')
            date = msg.get('Date', '')
            
            # Extract body
            body = ""
            if msg.is_multipart():
                for part in msg.walk():
                    if part.get_content_type() == "text/plain":
                        body += part.get_payload(decode=True).decode('utf-8', errors='ignore')
            else:
                body = msg.get_payload(decode=True).decode('utf-8', errors='ignore')
            
            # Extract URLs
            urls = re.findall(r'http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\\(\\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+', body)
            
            # Extract attachments info
            attachments = []
            if msg.is_multipart():
                for part in msg.walk():
                    if part.get_content_disposition() == 'attachment':
                        attachments.append(part.get_filename())
            
            return {
                'subject': subject,
                'from': from_addr,
                'to': to_addr,
                'date': date,
                'body': body,
                'urls': urls,
                'attachment_count': len(attachments),
                'attachments': attachments,
                'content_type': msg.get_content_type()
            }
        except Exception as e:
            print(f"Error parsing {filepath}: {e}")
            return None
    
    def process_enron_emails(self, max_emails=5000) -> pd.DataFrame:
        """Process Enron emails (legitimate)"""
        print("Processing Enron emails...")
        enron_dir = self.data_dir / "enron_mail"
        
        if not enron_dir.exists():
            print("Enron dataset not found. Using synthetic data.")
            return self._generate_synthetic_legitimate(max_emails)
        
        emails = []
        count = 0
        
        for mail_file in enron_dir.rglob("*"):
            if mail_file.is_file() and count < max_emails:
                parsed = self.parse_email_file(mail_file)
                if parsed:
                    parsed['label'] = 0  # Legitimate
                    parsed['source'] = 'enron'
                    emails.append(parsed)
                    count += 1
        
        print(f"Processed {len(emails)} Enron emails")
        return pd.DataFrame(emails)
    
    def process_spamassassin_emails(self, max_emails=5000) -> pd.DataFrame:
        """Process SpamAssassin emails (spam/phishing)"""
        print("Processing SpamAssassin emails...")
        
        emails = []
        count = 0
        
        # Process spam emails
        spam_dirs = [
            self.data_dir / "20021010_spam" / "spam",
            self.data_dir / "20021010_spam" / "spam_2"
        ]
        
        for spam_dir in spam_dirs:
            if spam_dir.exists():
                for mail_file in spam_dir.glob("*"):
                    if mail_file.is_file() and count < max_emails:
                        parsed = self.parse_email_file(mail_file)
                        if parsed:
                            parsed['label'] = 1  # Phishing/Spam
                            parsed['source'] = 'spamassassin'
                            emails.append(parsed)
                            count += 1
        
        if len(emails) == 0:
            print("SpamAssassin dataset not found. Using synthetic data.")
            return self._generate_synthetic_phishing(max_emails)
        
        print(f"Processed {len(emails)} SpamAssassin emails")
        return pd.DataFrame(emails)
    
    def _generate_synthetic_legitimate(self, n: int) -> pd.DataFrame:
        """Generate synthetic legitimate emails if dataset not available"""
        legitimate_templates = [
            "Team meeting scheduled for {date}",
            "Quarterly report attached for review",
            "Project update: {project} status",
            "Reminder: Security training next week",
            "Invoice #{number} for services rendered"
        ]
        
        emails = []
        for i in range(n):
            template = np.random.choice(legitimate_templates)
            emails.append({
                'subject': template.format(date="2025-12-15", project="Project Alpha", number=i),
                'from': f"colleague{i}@company.com",
                'to': "team@company.com",
                'body': f"This is a legitimate business email. {template}",
                'urls': [],
                'attachment_count': 0,
                'label': 0,
                'source': 'synthetic'
            })
        
        return pd.DataFrame(emails)
    
    def _generate_synthetic_phishing(self, n: int) -> pd.DataFrame:
        """Generate synthetic phishing emails if dataset not available"""
        phishing_templates = [
            "Urgent: Your account will be locked. Verify now: {url}",
            "Security alert: Suspicious login detected. Click here: {url}",
            "You've won a prize! Claim now: {url}",
            "Payment required: Your subscription expired. Renew: {url}",
            "Verify your identity: {url} or account will be closed"
        ]
        
        emails = []
        for i in range(n):
            template = np.random.choice(phishing_templates)
            fake_url = f"http://fake-bank-{i}.com/verify"
            emails.append({
                'subject': template.split(':')[0],
                'from': f"noreply{i}@fake-bank.com",
                'to': "victim@example.com",
                'body': template.format(url=fake_url) + " This is a phishing attempt.",
                'urls': [fake_url],
                'attachment_count': 0,
                'label': 1,
                'source': 'synthetic'
            })
        
        return pd.DataFrame(emails)
    
    def create_combined_dataset(self) -> pd.DataFrame:
        """Create combined dataset from all sources"""
        print("Creating combined dataset...")
        
        # Process legitimate emails
        legitimate_df = self.process_enron_emails(max_emails=5000)
        
        # Process phishing emails
        phishing_df = self.process_spamassassin_emails(max_emails=5000)
        
        # Combine
        combined = pd.concat([legitimate_df, phishing_df], ignore_index=True)
        
        # Shuffle
        combined = combined.sample(frac=1, random_state=42).reset_index(drop=True)
        
        # Save
        output_file = self.data_dir / "combined_emails.csv"
        combined.to_csv(output_file, index=False)
        
        print(f"Created combined dataset with {len(combined)} emails")
        print(f"Legitimate: {len(combined[combined['label']==0])}")
        print(f"Phishing: {len(combined[combined['label']==1])}")
        print(f"Saved to {output_file}")
        
        return combined

def main():
    preparer = DatasetPreparer()
    
    # Try to download datasets (optional - will use synthetic if fails)
    print("Attempting to download real datasets...")
    print("(Will use synthetic data if download fails)")
    
    # Create combined dataset
    df = preparer.create_combined_dataset()
    
    print("\nDataset preparation complete!")
    print(f"Total emails: {len(df)}")
    print(f"Features: {list(df.columns)}")

if __name__ == "__main__":
    main()
PY

python prepare_datasets.py

Step 2: Advanced Feature Extraction

Click to view complete feature extraction code
cat > advanced_features.py <<'PY'
#!/usr/bin/env python3
"""
Advanced Feature Extraction for Phishing Detection
Extracts comprehensive features from email headers, content, and URLs
"""

import re
import pandas as pd
import numpy as np
from urllib.parse import urlparse
from typing import Dict, List
import hashlib
from collections import Counter

class AdvancedFeatureExtractor:
    """Extract comprehensive features for phishing detection"""
    
    def __init__(self):
        self.suspicious_keywords = [
            'urgent', 'verify', 'suspended', 'locked', 'expired',
            'click here', 'act now', 'limited time', 'prize', 'winner',
            'confirm', 'update', 'security', 'alert', 'warning'
        ]
        
        self.legitimate_domains = [
            'gmail.com', 'yahoo.com', 'outlook.com', 'company.com',
            'microsoft.com', 'google.com', 'amazon.com'
        ]
    
    def extract_header_features(self, email: Dict) -> Dict:
        """Extract features from email headers"""
        features = {}
        
        subject = str(email.get('subject', '')).lower()
        from_addr = str(email.get('from', '')).lower()
        to_addr = str(email.get('to', '')).lower()
        
        # Subject features
        features['subject_length'] = len(subject)
        features['subject_has_urgent'] = any(kw in subject for kw in ['urgent', 'asap', 'immediate'])
        features['subject_has_suspicious'] = any(kw in subject for kw in self.suspicious_keywords)
        features['subject_has_numbers'] = bool(re.search(r'\d+', subject))
        features['subject_has_special_chars'] = bool(re.search(r'[!@#$%^&*()]', subject))
        features['subject_all_caps_ratio'] = sum(1 for c in subject if c.isupper()) / max(len(subject), 1)
        
        # From address features
        features['from_has_digits'] = bool(re.search(r'\d', from_addr))
        features['from_domain_length'] = len(from_addr.split('@')[-1]) if '@' in from_addr else 0
        features['from_is_legitimate'] = any(dom in from_addr for dom in self.legitimate_domains)
        features['from_subdomain_count'] = from_addr.count('.') if '@' in from_addr else 0
        
        # To address features
        features['to_count'] = to_addr.count(',') + 1 if to_addr else 0
        features['to_is_broadcast'] = to_count > 10 if 'to_count' in locals() else False
        
        return features
    
    def extract_content_features(self, email: Dict) -> Dict:
        """Extract features from email body content"""
        features = {}
        
        body = str(email.get('body', '')).lower()
        
        # Basic content features
        features['body_length'] = len(body)
        features['body_word_count'] = len(body.split())
        features['body_has_html'] = '<html' in body or '<body' in body
        features['body_has_links'] = 'http' in body or 'www.' in body
        
        # Suspicious content patterns
        features['suspicious_keyword_count'] = sum(1 for kw in self.suspicious_keywords if kw in body)
        features['suspicious_keyword_ratio'] = features['suspicious_keyword_count'] / max(features['body_word_count'], 1)
        
        # Urgency indicators
        features['urgency_indicators'] = sum(1 for word in ['urgent', 'asap', 'immediate', 'now', 'today'] if word in body)
        
        # Grammar and spelling (simplified)
        features['typo_indicators'] = len(re.findall(r'\b\w{15,}\b', body))  # Very long words
        features['repeated_chars'] = len(re.findall(r'(.)\1{3,}', body))  # Repeated characters
        
        # Financial/security terms
        features['financial_terms'] = sum(1 for word in ['payment', 'invoice', 'account', 'bank', 'credit', 'card'] if word in body)
        features['security_terms'] = sum(1 for word in ['verify', 'confirm', 'security', 'password', 'login'] if word in body)
        
        # Punctuation patterns
        features['exclamation_count'] = body.count('!')
        features['question_count'] = body.count('?')
        features['exclamation_ratio'] = features['exclamation_count'] / max(features['body_word_count'], 1)
        
        return features
    
    def extract_url_features(self, email: Dict) -> Dict:
        """Extract features from URLs in email"""
        features = {}
        
        urls = email.get('urls', [])
        
        features['url_count'] = len(urls)
        features['has_urls'] = len(urls) > 0
        
        if urls:
            url_features = []
            for url in urls:
                try:
                    parsed = urlparse(url)
                    url_feat = {
                        'is_https': parsed.scheme == 'https',
                        'domain_length': len(parsed.netloc),
                        'path_length': len(parsed.path),
                        'has_ip': bool(re.match(r'^\d+\.\d+\.\d+\.\d+', parsed.netloc)),
                        'has_port': ':' in parsed.netloc,
                        'subdomain_count': parsed.netloc.count('.') - 1,
                        'is_shortened': any(domain in parsed.netloc for domain in ['bit.ly', 'tinyurl', 'goo.gl', 't.co']),
                        'has_suspicious_tld': any(tld in parsed.netloc for tld in ['.tk', '.ml', '.ga', '.cf']),
                        'domain_has_digits': bool(re.search(r'\d', parsed.netloc)),
                        'path_has_suspicious': any(kw in parsed.path.lower() for kw in ['login', 'verify', 'confirm', 'secure'])
                    }
                    url_features.append(url_feat)
                except:
                    continue
            
            if url_features:
                # Aggregate URL features
                for key in url_features[0].keys():
                    features[f'url_{key}_mean'] = np.mean([uf[key] for uf in url_features])
                    features[f'url_{key}_max'] = np.max([uf[key] for uf in url_features])
        else:
            # No URLs
            for key in ['is_https', 'domain_length', 'path_length', 'has_ip', 'has_port', 
                       'subdomain_count', 'is_shortened', 'has_suspicious_tld', 
                       'domain_has_digits', 'path_has_suspicious']:
                features[f'url_{key}_mean'] = 0
                features[f'url_{key}_max'] = 0
        
        return features
    
    def extract_attachment_features(self, email: Dict) -> Dict:
        """Extract features from email attachments"""
        features = {}
        
        attachments = email.get('attachments', [])
        attachment_count = email.get('attachment_count', 0)
        
        features['attachment_count'] = attachment_count
        features['has_attachments'] = attachment_count > 0
        
        if attachments:
            # Check for suspicious file types
            suspicious_extensions = ['.exe', '.bat', '.scr', '.vbs', '.js', '.jar', '.zip', '.rar']
            features['suspicious_attachment'] = any(
                any(ext in att.lower() for ext in suspicious_extensions) 
                for att in attachments
            )
            
            # Check for double extensions (common in malware)
            features['double_extension'] = any('.' in att and att.count('.') > 1 for att in attachments)
        else:
            features['suspicious_attachment'] = False
            features['double_extension'] = False
        
        return features
    
    def extract_all_features(self, email: Dict) -> Dict:
        """Extract all features from an email"""
        all_features = {}
        
        # Extract from different sources
        all_features.update(self.extract_header_features(email))
        all_features.update(self.extract_content_features(email))
        all_features.update(self.extract_url_features(email))
        all_features.update(self.extract_attachment_features(email))
        
        return all_features
    
    def extract_batch_features(self, emails: pd.DataFrame) -> pd.DataFrame:
        """Extract features for a batch of emails"""
        print("Extracting features from emails...")
        
        feature_list = []
        for idx, email in emails.iterrows():
            features = self.extract_all_features(email.to_dict())
            features['email_id'] = idx
            feature_list.append(features)
        
        features_df = pd.DataFrame(feature_list)
        
        # Merge with original labels
        if 'label' in emails.columns:
            features_df['label'] = emails['label'].values
        
        print(f"Extracted {len(features_df.columns)} features from {len(emails)} emails")
        
        return features_df

def main():
    # Load dataset
    df = pd.read_csv("datasets/combined_emails.csv")
    
    # Initialize extractor
    extractor = AdvancedFeatureExtractor()
    
    # Extract features
    features_df = extractor.extract_batch_features(df)
    
    # Save features
    features_df.to_csv("datasets/email_features.csv", index=False)
    print(f"\nFeatures saved to datasets/email_features.csv")
    print(f"Feature columns: {len(features_df.columns)}")
    print(f"Sample features: {list(features_df.columns[:10])}")

if __name__ == "__main__":
    main()
PY

python advanced_features.py

Step 3: Model Training with Multiple Algorithms

Click to view complete model training code
cat > train_production_model.py <<'PY'
#!/usr/bin/env python3
"""
Production-Ready Phishing Detection Model Training
Trains multiple models and selects the best one
"""

import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split, cross_val_score, GridSearchCV
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.svm import SVC
from sklearn.metrics import (
    accuracy_score, precision_score, recall_score, f1_score,
    roc_auc_score, confusion_matrix, classification_report
)
from sklearn.preprocessing import StandardScaler
import joblib
import json
from datetime import datetime

class PhishingModelTrainer:
    """Train and evaluate multiple phishing detection models"""
    
    def __init__(self):
        self.models = {}
        self.scaler = StandardScaler()
        self.best_model = None
        self.best_score = 0
        self.feature_names = []
    
    def load_data(self, features_file: str):
        """Load feature data"""
        print(f"Loading features from {features_file}...")
        df = pd.read_csv(features_file)
        
        # Separate features and labels
        if 'label' in df.columns:
            X = df.drop(['label', 'email_id'], axis=1, errors='ignore')
            y = df['label']
        else:
            raise ValueError("Label column not found in dataset")
        
        # Handle missing values
        X = X.fillna(0)
        
        # Store feature names
        self.feature_names = X.columns.tolist()
        
        return X, y
    
    def train_models(self, X, y, test_size=0.2):
        """Train multiple models and compare"""
        print("\n" + "="*60)
        print("Training Multiple Models")
        print("="*60)
        
        # Split data
        X_train, X_test, y_train, y_test = train_test_split(
            X, y, test_size=test_size, random_state=42, stratify=y
        )
        
        # Scale features
        X_train_scaled = self.scaler.fit_transform(X_train)
        X_test_scaled = self.scaler.transform(X_test)
        
        # Define models to train
        models_to_train = {
            'Logistic Regression': LogisticRegression(max_iter=1000, random_state=42),
            'Random Forest': RandomForestClassifier(n_estimators=100, random_state=42, n_jobs=-1),
            'Gradient Boosting': GradientBoostingClassifier(n_estimators=100, random_state=42),
            'SVM': SVC(probability=True, random_state=42)
        }
        
        results = {}
        
        for name, model in models_to_train.items():
            print(f"\nTraining {name}...")
            
            # Use scaled data for linear models, original for tree-based
            if name in ['Logistic Regression', 'SVM']:
                X_train_use = X_train_scaled
                X_test_use = X_test_scaled
            else:
                X_train_use = X_train
                X_test_use = X_test
            
            # Train
            model.fit(X_train_use, y_train)
            
            # Predict
            y_pred = model.predict(X_test_use)
            y_pred_proba = model.predict_proba(X_test_use)[:, 1] if hasattr(model, 'predict_proba') else None
            
            # Evaluate
            accuracy = accuracy_score(y_test, y_pred)
            precision = precision_score(y_test, y_pred)
            recall = recall_score(y_test, y_pred)
            f1 = f1_score(y_test, y_pred)
            roc_auc = roc_auc_score(y_test, y_pred_proba) if y_pred_proba is not None else 0
            
            results[name] = {
                'model': model,
                'accuracy': accuracy,
                'precision': precision,
                'recall': recall,
                'f1': f1,
                'roc_auc': roc_auc,
                'confusion_matrix': confusion_matrix(y_test, y_pred).tolist()
            }
            
            print(f"  Accuracy: {accuracy:.4f}")
            print(f"  Precision: {precision:.4f}")
            print(f"  Recall: {recall:.4f}")
            print(f"  F1-Score: {f1:.4f}")
            print(f"  ROC-AUC: {roc_auc:.4f}")
        
        # Select best model (based on F1 score)
        best_model_name = max(results.keys(), key=lambda k: results[k]['f1'])
        self.best_model = results[best_model_name]['model']
        self.best_score = results[best_model_name]['f1']
        
        print("\n" + "="*60)
        print(f"Best Model: {best_model_name}")
        print(f"Best F1-Score: {self.best_score:.4f}")
        print("="*60)
        
        # Save all results
        self.models = results
        
        return results, X_test, y_test
    
    def hyperparameter_tuning(self, X_train, y_train, model_name='Random Forest'):
        """Perform hyperparameter tuning for best model"""
        print(f"\nPerforming hyperparameter tuning for {model_name}...")
        
        if model_name == 'Random Forest':
            param_grid = {
                'n_estimators': [100, 200, 300],
                'max_depth': [10, 20, None],
                'min_samples_split': [2, 5, 10]
            }
            base_model = RandomForestClassifier(random_state=42, n_jobs=-1)
        elif model_name == 'Gradient Boosting':
            param_grid = {
                'n_estimators': [100, 200],
                'learning_rate': [0.01, 0.1, 0.2],
                'max_depth': [3, 5, 7]
            }
            base_model = GradientBoostingClassifier(random_state=42)
        else:
            print("Hyperparameter tuning not implemented for this model")
            return self.best_model
        
        grid_search = GridSearchCV(
            base_model, param_grid, cv=5, scoring='f1', n_jobs=-1, verbose=1
        )
        
        grid_search.fit(X_train, y_train)
        
        print(f"Best parameters: {grid_search.best_params_}")
        print(f"Best CV score: {grid_search.best_score_:.4f}")
        
        self.best_model = grid_search.best_estimator_
        
        return self.best_model
    
    def save_model(self, filepath: str):
        """Save the best model and metadata"""
        model_data = {
            'model': self.best_model,
            'scaler': self.scaler,
            'feature_names': self.feature_names,
            'training_date': datetime.now().isoformat(),
            'performance': {
                'f1_score': self.best_score,
                'model_type': type(self.best_model).__name__
            }
        }
        
        joblib.dump(model_data, filepath)
        print(f"\nModel saved to {filepath}")
        
        # Save metadata as JSON
        metadata = {
            'training_date': model_data['training_date'],
            'model_type': model_data['performance']['model_type'],
            'f1_score': float(model_data['performance']['f1_score']),
            'feature_count': len(self.feature_names),
            'features': self.feature_names[:20]  # First 20 features
        }
        
        metadata_file = filepath.replace('.pkl', '_metadata.json')
        with open(metadata_file, 'w') as f:
            json.dump(metadata, f, indent=2)
        
        print(f"Metadata saved to {metadata_file}")

def main():
    trainer = PhishingModelTrainer()
    
    # Load data
    X, y = trainer.load_data("datasets/email_features.csv")
    
    print(f"\nDataset Info:")
    print(f"  Total samples: {len(X)}")
    print(f"  Features: {len(X.columns)}")
    print(f"  Legitimate: {sum(y == 0)}")
    print(f"  Phishing: {sum(y == 1)}")
    
    # Train models
    results, X_test, y_test = trainer.train_models(X, y)
    
    # Save best model
    trainer.save_model("models/phishing_detector_production.pkl")
    
    print("\n" + "="*60)
    print("Training Complete!")
    print("="*60)
    print("\nNext steps:")
    print("1. Review model performance metrics")
    print("2. Test model on new emails")
    print("3. Deploy model via API (see api_server.py)")

if __name__ == "__main__":
    import os
    os.makedirs("models", exist_ok=True)
    main()
PY

python train_production_model.py

Step 4: Real-Time Detection API

Click to view complete API server code
cat > api_server.py <<'PY'
#!/usr/bin/env python3
"""
Production-Ready Phishing Detection API Server
Flask/FastAPI server for real-time email analysis
"""

from flask import Flask, request, jsonify
from flask_cors import CORS
import joblib
import pandas as pd
import numpy as np
from advanced_features import AdvancedFeatureExtractor
import logging
from datetime import datetime
import os

app = Flask(__name__)
CORS(app)

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)

# Load model
MODEL_PATH = "models/phishing_detector_production.pkl"
model_data = None
feature_extractor = AdvancedFeatureExtractor()

def load_model():
    """Load the trained model"""
    global model_data
    try:
        model_data = joblib.load(MODEL_PATH)
        logger.info("Model loaded successfully")
        return True
    except Exception as e:
        logger.error(f"Failed to load model: {e}")
        return False

@app.route('/health', methods=['GET'])
def health_check():
    """Health check endpoint"""
    return jsonify({
        'status': 'healthy',
        'model_loaded': model_data is not None,
        'timestamp': datetime.now().isoformat()
    })

@app.route('/predict', methods=['POST'])
def predict():
    """Predict if an email is phishing"""
    try:
        data = request.json
        
        # Validate input
        required_fields = ['subject', 'body', 'from']
        for field in required_fields:
            if field not in data:
                return jsonify({
                    'error': f'Missing required field: {field}'
                }), 400
        
        # Extract features
        email_dict = {
            'subject': data.get('subject', ''),
            'from': data.get('from', ''),
            'to': data.get('to', ''),
            'body': data.get('body', ''),
            'urls': data.get('urls', []),
            'attachments': data.get('attachments', []),
            'attachment_count': len(data.get('attachments', []))
        }
        
        features = feature_extractor.extract_all_features(email_dict)
        
        # Convert to DataFrame
        features_df = pd.DataFrame([features])
        
        # Ensure all feature columns exist
        for col in model_data['feature_names']:
            if col not in features_df.columns:
                features_df[col] = 0
        
        # Reorder columns
        features_df = features_df[model_data['feature_names']]
        
        # Scale features if needed
        if hasattr(model_data['model'], 'predict_proba'):
            # Check if model needs scaling (linear models)
            if 'LogisticRegression' in str(type(model_data['model'])):
                features_scaled = model_data['scaler'].transform(features_df)
                prediction = model_data['model'].predict(features_scaled)[0]
                probability = model_data['model'].predict_proba(features_scaled)[0]
            else:
                prediction = model_data['model'].predict(features_df)[0]
                probability = model_data['model'].predict_proba(features_df)[0]
        else:
            prediction = model_data['model'].predict(features_df)[0]
            probability = [0.5, 0.5]  # Default if no probability
        
        # Prepare response
        result = {
            'is_phishing': bool(prediction),
            'confidence': float(max(probability)),
            'phishing_probability': float(probability[1]) if len(probability) > 1 else float(probability[0]),
            'legitimate_probability': float(probability[0]),
            'timestamp': datetime.now().isoformat()
        }
        
        # Add risk level
        if result['phishing_probability'] > 0.8:
            result['risk_level'] = 'HIGH'
        elif result['phishing_probability'] > 0.5:
            result['risk_level'] = 'MEDIUM'
        else:
            result['risk_level'] = 'LOW'
        
        logger.info(f"Prediction: {result['is_phishing']} (confidence: {result['confidence']:.2f})")
        
        return jsonify(result)
    
    except Exception as e:
        logger.error(f"Prediction error: {e}")
        return jsonify({
            'error': 'Prediction failed',
            'message': str(e)
        }), 500

@app.route('/batch_predict', methods=['POST'])
def batch_predict():
    """Predict multiple emails at once"""
    try:
        data = request.json
        
        if 'emails' not in data:
            return jsonify({'error': 'Missing emails array'}), 400
        
        results = []
        for email in data['emails']:
            # Create a temporary request-like object
            with app.test_request_context(json=email):
                result = predict()
                results.append(result.get_json())
        
        return jsonify({
            'results': results,
            'total': len(results),
            'phishing_count': sum(1 for r in results if r.get('is_phishing', False))
        })
    
    except Exception as e:
        logger.error(f"Batch prediction error: {e}")
        return jsonify({
            'error': 'Batch prediction failed',
            'message': str(e)
        }), 500

if __name__ == '__main__':
    # Load model on startup
    if not load_model():
        logger.error("Failed to load model. Exiting.")
        exit(1)
    
    port = int(os.getenv('PORT', 5000))
    app.run(host='0.0.0.0', port=port, debug=False)
PY

# Install Flask
pip install flask flask-cors

# Run API server
python api_server.py

Step 5: Web Dashboard

Click to view complete dashboard code
cat > dashboard.html <<'HTML'
<!DOCTYPE html>
<html>
<head>
    <title>Phishing Detection Dashboard</title>
    <style>
        body { font-family: Arial, sans-serif; margin: 20px; background: #f5f5f5; }
        .container { max-width: 1200px; margin: 0 auto; background: white; padding: 20px; border-radius: 8px; }
        .input-section { margin: 20px 0; padding: 20px; background: #f9f9f9; border-radius: 4px; }
        textarea, input { width: 100%; padding: 10px; margin: 5px 0; border: 1px solid #ddd; border-radius: 4px; }
        button { padding: 10px 20px; background: #007bff; color: white; border: none; border-radius: 4px; cursor: pointer; }
        button:hover { background: #0056b3; }
        .result { margin: 20px 0; padding: 20px; border-radius: 4px; }
        .result.phishing { background: #fee; border-left: 4px solid #f00; }
        .result.legitimate { background: #efe; border-left: 4px solid #0a0; }
        .risk-high { color: #d00; font-weight: bold; }
        .risk-medium { color: #f80; font-weight: bold; }
        .risk-low { color: #0a0; }
    </style>
</head>
<body>
    <div class="container">
        <h1>🔐 Phishing Detection Dashboard</h1>
        
        <div class="input-section">
            <h2>Analyze Email</h2>
            <label>From:</label>
            <input type="text" id="from" placeholder="sender@example.com">
            
            <label>Subject:</label>
            <input type="text" id="subject" placeholder="Email subject">
            
            <label>Body:</label>
            <textarea id="body" rows="10" placeholder="Email body content"></textarea>
            
            <button onclick="analyzeEmail()">Analyze Email</button>
        </div>
        
        <div id="result"></div>
    </div>
    
    <script>
        async function analyzeEmail() {
            const email = {
                from: document.getElementById('from').value,
                subject: document.getElementById('subject').value,
                body: document.getElementById('body').value
            };
            
            const resultDiv = document.getElementById('result');
            resultDiv.innerHTML = '<p>Analyzing...</p>';
            
            try {
                const response = await fetch('http://localhost:5000/predict', {
                    method: 'POST',
                    headers: { 'Content-Type': 'application/json' },
                    body: JSON.stringify(email)
                });
                
                const result = await response.json();
                
                const riskClass = `risk-${result.risk_level.toLowerCase()}`;
                const resultClass = result.is_phishing ? 'phishing' : 'legitimate';
                
                resultDiv.innerHTML = `
                    <div class="result ${resultClass}">
                        <h2>Analysis Result</h2>
                        <p><strong>Status:</strong> ${result.is_phishing ? '⚠️ PHISHING DETECTED' : '✅ Legitimate Email'}</p>
                        <p><strong>Risk Level:</strong> <span class="${riskClass}">${result.risk_level}</span></p>
                        <p><strong>Confidence:</strong> ${(result.confidence * 100).toFixed(2)}%</p>
                        <p><strong>Phishing Probability:</strong> ${(result.phishing_probability * 100).toFixed(2)}%</p>
                        <p><strong>Legitimate Probability:</strong> ${(result.legitimate_probability * 100).toFixed(2)}%</p>
                    </div>
                `;
            } catch (error) {
                resultDiv.innerHTML = `<div class="result"><p>Error: ${error.message}</p></div>`;
            }
        }
    </script>
</body>
</html>
HTML

# Open dashboard.html in browser

Project Summary

This real-world project provides:

  • ✅ Real dataset integration (Enron, SpamAssassin)
  • ✅ Advanced feature extraction (50+ features)
  • ✅ Multiple ML model comparison
  • ✅ Production-ready API server
  • ✅ Web dashboard for monitoring
  • ✅ Complete error handling and logging
  • ✅ Model versioning and metadata

Usage:

  1. Prepare datasets: python prepare_datasets.py
  2. Extract features: python advanced_features.py
  3. Train models: python train_production_model.py
  4. Start API: python api_server.py
  5. Open dashboard: open dashboard.html

What This Lesson Does NOT Cover (On Purpose)

This lesson intentionally does not cover:

  • Image Analysis: Detecting phishing logos or QR codes.
  • Header Forgery: Deep analysis of SMTP headers (covered in Email Security).
  • Automated Takedowns: Logic for automatically reporting domains.
  • Neural Networks: Transformers (BERT/GPT) for complex language understanding.

Limitations and Trade-offs

AI Phishing Detection Limitations

Zero-Day Attacks:

  • AI models trained on known patterns
  • May miss new attack techniques
  • Requires continuous retraining
  • Cannot detect completely novel attacks
  • Combine with other methods

False Positives:

  • May flag legitimate emails
  • Business communication may be affected
  • Requires tuning and refinement
  • Context important for accuracy
  • Regular model updates needed

Adversarial Emails:

  • Attackers may craft emails to evade AI
  • Obfuscation techniques can fool models
  • Requires robust feature engineering
  • Continuous monitoring needed
  • Defense must evolve

Phishing Detection Trade-offs

Accuracy vs. Speed:

  • More thorough analysis = better accuracy but slower
  • Faster analysis = quicker decisions but may miss threats
  • Balance based on requirements
  • Real-time vs. batch processing
  • Choose appropriate approach

Blocking vs. Quarantine:

  • Blocking is safer but may block legitimate emails
  • Quarantine allows review but delays delivery
  • Balance based on confidence
  • High confidence = block
  • Low confidence = quarantine

Automation vs. Human Review:

  • Full automation is fast but risky
  • Human review is accurate but slow
  • Balance based on risk level
  • Automate clear cases
  • Human review for ambiguous

When AI Phishing Detection May Be Challenging

Highly Targeted Attacks:

  • Sophisticated targeted attacks may evade detection
  • Small volume makes training difficult
  • Requires advanced techniques
  • Combine with threat intelligence
  • Human analysis important

Multilingual Content:

  • Models trained on one language may miss others
  • Requires multilingual training data
  • Language-specific models needed
  • Consider language coverage
  • Expand training data

Encrypted Content:

  • Cannot analyze encrypted email content
  • Must rely on metadata and headers
  • Limited detection capabilities
  • Consider email gateway inspection
  • Metadata analysis helps

Cleanup

Click to view commands
deactivate || true
rm -rf .venv-phish emails.csv make_dataset.py train_and_eval.py score_email.py model.pkl
Validation: `ls .venv-phish` should fail with “No such file or directory”.

Career Alignment

After completing this lesson, you are prepared for:

  • Email Security Specialist (Entry Level)
  • Junior Detection Engineer
  • SOC Analyst (L1) with AI focus
  • Security Researcher (Phishing/Malware)

Next recommended steps: → Advanced NLP for threat detection
→ Building automated phishing playbooks
→ Integrating AI scores into SIEM/SOAR

Quick Reference

  • Use synthetic/redacted data; keep humans in the decision loop.
  • Validate with precision/recall; watch false positives before blocking.
  • Pair ML with email-auth controls and attachment/link sandboxing.
  • Keep models versioned (model.pkl) and log every scored message.

Similar Topics

FAQs

Can I use these labs in production?

No—treat them as educational. Adapt, review, and security-test before any production use.

How should I follow the lessons?

Start from the Learn page order or use Previous/Next on each lesson; both flow consistently.

What if I lack test data or infra?

Use synthetic data and local/lab environments. Never target networks or data you don't own or have written permission to test.

Can I share these materials?

Yes, with attribution and respecting any licensing for referenced tools or datasets.