Build a Simple AI-Based Phishing Detector (Beginner Tutor...
Train a lightweight phishing classifier with text features, evaluate accuracy, and add anti-spoofing safeguards.Learn essential cybersecurity strategies and ...
Train and test a lightweight phishing detector end to end with synthetic email data, clear validation, and safety guardrails.
Table of Contents
- Understanding Phishing Detection
- Setting Up the Project
- Generating a Labeled Dataset
- Training the Classifier
- Real-Time Scoring Loop
- Defense in Depth Guardrails
- What This Lesson Does NOT Cover
- Limitations and Trade-offs
- Cleanup
- Career Alignment
TL;DR
Build a basic AI phishing detector using Python and scikit-learn. Learn to transform text into numerical features with TF-IDF, train a Logistic Regression model, and implement defense-in-depth guardrails to handle false positives and adversarial evasions.
Learning Outcomes (You Will Be Able To)
By the end of this lesson, you will be able to:
- Explain why signature-based filters miss modern AI-generated phishing
- Convert raw email text into machine-learning features using TF-IDF
- Build a binary classifier to distinguish between benign and phishing emails
- Identify the “Intentional Failure” mode when datasets are too small or biased
- Map AI phishing risks to specific security controls (SPF/DKIM/DMARC)
What You’ll Build
- A small TF-IDF + Logistic Regression text classifier for phishing vs benign emails.
- Reproducible dataset generation to avoid leaking real PII.
- Validation after each step plus cleanup.
Prerequisites
- macOS or Linux with Python 3.12+.
pipavailable; ~200 MB free disk.- No email access needed; we generate synthetic samples.
Safety and Legal
- Never train on real mailbox data without explicit approval and PII scrubbing.
- Avoid storing raw emails; keep hashes or redacted text when possible.
- Keep humans in the loop for blocking decisions; start with “quarantine + review.”
Step 1) Create an isolated environment
Click to view commands
python3 -m venv .venv-phish
source .venv-phish/bin/activate
pip install --upgrade pip
pip install pandas scikit-learn joblib
Common fix: If activation fails, run chmod +x .venv-phish/bin/activate.
Step 2) Generate a synthetic labeled dataset
Click to view commands
cat > make_dataset.py <<'PY'
import pandas as pd
phish_samples = [
("Your account is locked. Verify immediately at http://fake-bank.com", 1),
("Urgent: update payroll info now or your pay is delayed", 1),
("Security alert: login from unknown device. Download the attached form", 1),
("Package held: pay customs fee via gift card", 1),
("Congrats, you won a prize! Click to claim", 1),
]
benign_samples = [
("Team meeting notes and next sprint goals", 0),
("Invoice attached for approved purchase order", 0),
("Reminder: security training scheduled next week", 0),
("Quarterly newsletter and product updates", 0),
("Welcome to the platform—getting started guide", 0),
]
df = pd.DataFrame(phish_samples + benign_samples, columns=["text", "label"])
df.to_csv("emails.csv", index=False)
print("Wrote emails.csv with", len(df), "rows")
PY
python make_dataset.py
Step 3) Train and evaluate the classifier
Click to view commands
cat > train_and_eval.py <<'PY'
import json
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.linear_model import LogisticRegression
from sklearn.pipeline import Pipeline
from sklearn.metrics import classification_report, confusion_matrix
df = pd.read_csv("emails.csv")
X_train, X_test, y_train, y_test = train_test_split(df["text"], df["label"], test_size=0.3, random_state=42, stratify=df["label"])
pipeline = Pipeline([
("tfidf", TfidfVectorizer(ngram_range=(1, 2), min_df=1)),
("clf", LogisticRegression(max_iter=400, class_weight="balanced")),
])
pipeline.fit(X_train, y_train)
preds = pipeline.predict(X_test)
report = classification_report(y_test, preds, target_names=["benign", "phish"], digits=3, output_dict=True)
cm = confusion_matrix(y_test, preds, labels=[0, 1])
with open("model.json", "w") as f:
json.dump({"params": pipeline.get_params(deep=False)}, f, indent=2)
print("Confusion matrix [[TN, FP], [FN, TP]]:", cm.tolist())
print("Precision/Recall/F1:", json.dumps(report, indent=2))
PY
python train_and_eval.py
Intentional Failure Exercise (Important)
AI models are only as good as their training data. Try this:
- Modify
make_dataset.py: Add 5 “benign” samples that contain the word “Urgent” (e.g., “Urgent: Project deadline moved up”). - Retrain: Run the script and observe how the model starts flagging legitimate “Urgent” emails as phishing.
- Lesson: This is “Bias.” If your phishing samples only contain the word “Urgent” and your benign ones don’t, the model learns a shortcut rather than understanding the intent.
Common fixes:
ValueError: empty vocabulary=> ensureemails.csvis not empty andmin_df≤ sample size.- If class imbalance arises, keep
class_weight="balanced"or add more phishing examples.
Step 4) Add a simple scoring script with safety checks
Click to view commands
cat > score_email.py <<'PY'
import sys
import joblib
import pandas as pd
from sklearn.pipeline import Pipeline
MODEL_PATH = "model.pkl"
def load_model():
return joblib.load(MODEL_PATH)
def main():
if len(sys.argv) < 2:
print("Usage: python score_email.py 'email text'")
sys.exit(1)
text = sys.argv[1]
model: Pipeline = load_model()
proba = model.predict_proba([text])[0][1]
print(f"phish_probability={proba:.3f}")
if proba > 0.7:
print("Action: quarantine and send to human review")
if __name__ == "__main__":
main()
PY
Click to view commands
pip install joblib
python - <<'PY'
import joblib
from sklearn.pipeline import Pipeline
import pandas as pd
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.linear_model import LogisticRegression
df = pd.read_csv("emails.csv")
pipe = Pipeline([
("tfidf", TfidfVectorizer(ngram_range=(1, 2), min_df=1)),
("clf", LogisticRegression(max_iter=400, class_weight="balanced")),
])
pipe.fit(df["text"], df["label"])
joblib.dump(pipe, "model.pkl")
print("Saved model.pkl")
PY
python score_email.py "Please reset your password at http://fake.com/reset"
Understanding Why AI Phishing Detection Works
Why Traditional Detection Fails
Signature-Based Limitations: Traditional email filters rely on known patterns and signatures. Modern phishing uses:
- AI-generated content that evades signatures
- Personalized attacks that look legitimate
- Zero-day techniques not yet in databases
Why ML Detection Works:
- Learns patterns from data, not just signatures
- Adapts to new phishing techniques
- Identifies subtle indicators humans miss
- Scales to handle millions of emails
How TF-IDF + Logistic Regression Works
TF-IDF (Term Frequency-Inverse Document Frequency):
- Identifies important words in emails
- Weights words by rarity (phishing terms are rare in legitimate emails)
- Creates numerical features from text
Logistic Regression:
- Learns which features indicate phishing
- Provides probability scores (not just binary)
- Interpretable (can see which words matter)
Step 5) Add non-ML controls (defense in depth)
Why Defense in Depth Matters
ML Limitations: ML models can be fooled by adversarial examples. Multiple security layers ensure that if one fails, others still protect.
AI Threat → Security Control Mapping
| AI Risk | Real-World Impact | Control Implemented |
|---|---|---|
| Model Evasion | Attackers use “good” words to lower score | Hybrid detection (ML + regex for keywords) |
| Phishing Drift | New lure themes (e.g., QR codes) missed | Weekly retraining + human-in-the-loop review |
| False Positives | Critical business emails blocked | ”Quarantine + Review” policy (never auto-delete) |
| Identity Spoofing | ”Legit” looking sender evades text check | SPF/DKIM/DMARC hard enforcement |
Production-Ready Controls:
- Enforce SPF/DKIM/DMARC on inbound mail; reject or quarantine failures
- Strip or rewrite links; sandbox attachments separately
- Log decisions and top contributing features for analyst review (use
pipeline["tfidf"].get_feature_names_out()and model coefficients) - Rate-limit scoring API to prevent prompt flooding or model abuse
Enhanced Logging Example:
Click to view Python code
import hashlib
import json
from datetime import datetime
def log_decision(email_text: str, probability: float, model, top_features: int = 5):
"""Log decision with top contributing features"""
# Hash email to protect privacy
email_hash = hashlib.sha256(email_text.encode()).hexdigest()[:16]
# Get feature names and coefficients
feature_names = model.named_steps['tfidf'].get_feature_names_out()
coefficients = model.named_steps['clf'].coef_[0]
# Get top contributing features
feature_importance = list(zip(feature_names, coefficients))
top_contributors = sorted(feature_importance, key=lambda x: abs(x[1]), reverse=True)[:top_features]
log_entry = {
"timestamp": datetime.utcnow().isoformat(),
"email_hash": email_hash,
"phishing_probability": probability,
"decision": "quarantine" if probability > 0.7 else "allow",
"top_features": [{"feature": f, "coefficient": float(c)} for f, c in top_contributors]
}
print(json.dumps(log_entry))
# In production: write to secure log storage
Advanced Scenarios
Scenario 1: Adversarial Phishing Emails
Challenge: Attackers craft emails to evade ML detection
Solution:
- Use ensemble models (multiple models vote)
- Add adversarial training examples
- Implement feature engineering to detect evasion
- Combine ML with rule-based detection
- Human review for high-value targets
Scenario 2: Low False Positive Rate
Challenge: Too many false positives overwhelm analysts
Solution:
- Tune classification threshold
- Improve feature engineering
- Add more training data
- Use confidence scoring
- Implement feedback loop
Scenario 3: Real-Time Detection
Challenge: Detecting phishing in real-time at scale
Solution:
- Optimize model for speed
- Use caching for common patterns
- Implement rate limiting
- Scale horizontally
- Monitor performance metrics
Troubleshooting Guide
Problem: Model accuracy too low
Diagnosis:
# Check confusion matrix
print(confusion_matrix(y_test, y_pred))
# Review feature importance
feature_names = pipeline.named_steps['tfidf'].get_feature_names_out()
coefficients = pipeline.named_steps['clf'].coef_[0]
Solutions:
- Add more training data
- Improve feature engineering
- Tune hyperparameters
- Try different algorithms
- Check for data quality issues
Problem: High false positive rate
Diagnosis:
- Review confusion matrix
- Analyze false positive patterns
- Check feature distributions
Solutions:
- Adjust classification threshold
- Improve feature selection
- Add more negative examples
- Use ensemble methods
- Implement confidence scoring
Problem: Model not detecting new phishing techniques
Diagnosis:
- Check if new techniques have different features
- Compare feature distributions
- Review model performance over time
Solutions:
- Retrain with new examples
- Update feature engineering
- Use online learning
- Implement model versioning
- Regular model updates
Code Review Checklist for AI Phishing Detection
Model Security
- Training data integrity verified
- Model versioning and rollback
- Adversarial robustness tested
- Performance monitoring
Production Readiness
- Error handling in all code paths
- Rate limiting implemented
- Logging configured (privacy-preserving)
- Human-in-the-loop processes
Defense in Depth
- SPF/DKIM/DMARC enforcement
- Link rewriting/sandboxing
- Attachment analysis
- Multiple detection methods
AI Phishing Detection Architecture Diagram
Recommended Diagram: Phishing Detection Pipeline
Email Input
↓
Feature Extraction
(Headers, Content, Links)
↓
AI Model Analysis
(ML Classifier)
↓
┌────┴────┐
↓ ↓
Legitimate Phishing
↓ ↓
└────┬────┘
↓
Action (Block/Quarantine/Allow)
Detection Flow:
- Email features extracted (headers, content, links)
- AI model analyzes features
- Classification as legitimate or phishing
- Action taken based on result
Phishing Detection Methods Comparison
| Method | Accuracy | False Positives | Resource Usage | Best For |
|---|---|---|---|---|
| AI/ML Detection | High (90%+) | Low | Medium | Content analysis |
| URL Blacklists | Medium (70%) | Very Low | Low | Known malicious URLs |
| SPF/DKIM/DMARC | High (85%+) | Very Low | Low | Email authentication |
| Reputation Scoring | Medium (75%) | Medium | Low | Sender analysis |
| Hybrid Approach | Very High (95%+) | Very Low | Medium | Comprehensive defense |
Key Insight: Combining multiple methods provides best results. Use AI for content analysis, authentication for sender verification, and blacklists for known threats.
Real World Project: Build an AI System That Detects Phishing Emails Using Real Datasets
This comprehensive project demonstrates building a production-ready phishing detection system using real-world datasets (Enron and SpamAssassin), advanced feature engineering, and deployment-ready code.
Project Overview
Objective: Build a complete AI-powered phishing email detection system that:
- Processes real email datasets (Enron corpus, SpamAssassin)
- Extracts comprehensive features (headers, content, URLs, attachments)
- Trains and evaluates multiple ML models
- Provides real-time detection API
- Includes web dashboard for monitoring
- Implements production-ready security controls
Project Architecture
┌─────────────────┐
│ Email Dataset │
│ (Enron/SpamAss) │
└────────┬────────┘
│
┌────────▼────────┐
│ Data Preprocessing│
│ & Feature Extract│
└────────┬────────┘
│
┌────────▼────────┐
│ ML Training │
│ (Multiple Models)│
└────────┬────────┘
│
┌────────▼────────┐
│ Model Evaluation│
│ & Selection │
└────────┬────────┘
│
┌────────▼────────┐
│ Detection API │
│ (Flask/FastAPI)│
└────────┬────────┘
│
┌────────▼────────┐
│ Web Dashboard │
│ (Monitoring) │
└─────────────────┘
Step 1: Dataset Preparation
Click to view complete dataset preparation code
cat > prepare_datasets.py <<'PY'
#!/usr/bin/env python3
"""
Real-World Phishing Detection Dataset Preparation
Uses Enron and SpamAssassin datasets
"""
import os
import email
import re
import pandas as pd
import numpy as np
from pathlib import Path
import urllib.request
import tarfile
import zipfile
from typing import List, Dict, Tuple
import hashlib
class DatasetPreparer:
"""Prepare real-world email datasets for phishing detection"""
def __init__(self, data_dir="datasets"):
self.data_dir = Path(data_dir)
self.data_dir.mkdir(exist_ok=True)
self.enron_url = "https://www.cs.cmu.edu/~enron/enron_mail_20150507.tar.gz"
self.spamassassin_url = "https://spamassassin.apache.org/old/publiccorpus/"
def download_enron_dataset(self):
"""Download Enron email dataset (legitimate emails)"""
print("Downloading Enron dataset...")
enron_file = self.data_dir / "enron.tar.gz"
if not enron_file.exists():
print("Note: Enron dataset is large (~400MB). Download manually from:")
print("https://www.cs.cmu.edu/~enron/enron_mail_20150507.tar.gz")
print("Or use SpamAssassin dataset which is smaller.")
return False
# Extract if needed
enron_extracted = self.data_dir / "enron_mail"
if not enron_extracted.exists():
print("Extracting Enron dataset...")
with tarfile.open(enron_file, 'r:gz') as tar:
tar.extractall(self.data_dir)
return True
def download_spamassassin_dataset(self):
"""Download SpamAssassin dataset (spam/phishing emails)"""
print("Downloading SpamAssassin dataset...")
# SpamAssassin public corpus files
spam_files = [
"20021010_easy_ham.tar.bz2",
"20021010_hard_ham.tar.bz2",
"20021010_spam.tar.bz2"
]
base_url = "https://spamassassin.apache.org/old/publiccorpus/"
for filename in spam_files:
filepath = self.data_dir / filename
if not filepath.exists():
print(f"Downloading {filename}...")
try:
urllib.request.urlretrieve(
base_url + filename,
filepath
)
except Exception as e:
print(f"Could not download {filename}: {e}")
print("You can download manually from:")
print(f"{base_url}{filename}")
continue
# Extract
extracted_dir = self.data_dir / filename.replace('.tar.bz2', '')
if not extracted_dir.exists():
print(f"Extracting {filename}...")
with tarfile.open(filepath, 'r:bz2') as tar:
tar.extractall(self.data_dir)
return True
def parse_email_file(self, filepath: Path) -> Dict:
"""Parse a single email file"""
try:
with open(filepath, 'rb') as f:
msg = email.message_from_bytes(f.read())
# Extract headers
subject = msg.get('Subject', '')
from_addr = msg.get('From', '')
to_addr = msg.get('To', '')
date = msg.get('Date', '')
# Extract body
body = ""
if msg.is_multipart():
for part in msg.walk():
if part.get_content_type() == "text/plain":
body += part.get_payload(decode=True).decode('utf-8', errors='ignore')
else:
body = msg.get_payload(decode=True).decode('utf-8', errors='ignore')
# Extract URLs
urls = re.findall(r'http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\\(\\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+', body)
# Extract attachments info
attachments = []
if msg.is_multipart():
for part in msg.walk():
if part.get_content_disposition() == 'attachment':
attachments.append(part.get_filename())
return {
'subject': subject,
'from': from_addr,
'to': to_addr,
'date': date,
'body': body,
'urls': urls,
'attachment_count': len(attachments),
'attachments': attachments,
'content_type': msg.get_content_type()
}
except Exception as e:
print(f"Error parsing {filepath}: {e}")
return None
def process_enron_emails(self, max_emails=5000) -> pd.DataFrame:
"""Process Enron emails (legitimate)"""
print("Processing Enron emails...")
enron_dir = self.data_dir / "enron_mail"
if not enron_dir.exists():
print("Enron dataset not found. Using synthetic data.")
return self._generate_synthetic_legitimate(max_emails)
emails = []
count = 0
for mail_file in enron_dir.rglob("*"):
if mail_file.is_file() and count < max_emails:
parsed = self.parse_email_file(mail_file)
if parsed:
parsed['label'] = 0 # Legitimate
parsed['source'] = 'enron'
emails.append(parsed)
count += 1
print(f"Processed {len(emails)} Enron emails")
return pd.DataFrame(emails)
def process_spamassassin_emails(self, max_emails=5000) -> pd.DataFrame:
"""Process SpamAssassin emails (spam/phishing)"""
print("Processing SpamAssassin emails...")
emails = []
count = 0
# Process spam emails
spam_dirs = [
self.data_dir / "20021010_spam" / "spam",
self.data_dir / "20021010_spam" / "spam_2"
]
for spam_dir in spam_dirs:
if spam_dir.exists():
for mail_file in spam_dir.glob("*"):
if mail_file.is_file() and count < max_emails:
parsed = self.parse_email_file(mail_file)
if parsed:
parsed['label'] = 1 # Phishing/Spam
parsed['source'] = 'spamassassin'
emails.append(parsed)
count += 1
if len(emails) == 0:
print("SpamAssassin dataset not found. Using synthetic data.")
return self._generate_synthetic_phishing(max_emails)
print(f"Processed {len(emails)} SpamAssassin emails")
return pd.DataFrame(emails)
def _generate_synthetic_legitimate(self, n: int) -> pd.DataFrame:
"""Generate synthetic legitimate emails if dataset not available"""
legitimate_templates = [
"Team meeting scheduled for {date}",
"Quarterly report attached for review",
"Project update: {project} status",
"Reminder: Security training next week",
"Invoice #{number} for services rendered"
]
emails = []
for i in range(n):
template = np.random.choice(legitimate_templates)
emails.append({
'subject': template.format(date="2025-12-15", project="Project Alpha", number=i),
'from': f"colleague{i}@company.com",
'to': "team@company.com",
'body': f"This is a legitimate business email. {template}",
'urls': [],
'attachment_count': 0,
'label': 0,
'source': 'synthetic'
})
return pd.DataFrame(emails)
def _generate_synthetic_phishing(self, n: int) -> pd.DataFrame:
"""Generate synthetic phishing emails if dataset not available"""
phishing_templates = [
"Urgent: Your account will be locked. Verify now: {url}",
"Security alert: Suspicious login detected. Click here: {url}",
"You've won a prize! Claim now: {url}",
"Payment required: Your subscription expired. Renew: {url}",
"Verify your identity: {url} or account will be closed"
]
emails = []
for i in range(n):
template = np.random.choice(phishing_templates)
fake_url = f"http://fake-bank-{i}.com/verify"
emails.append({
'subject': template.split(':')[0],
'from': f"noreply{i}@fake-bank.com",
'to': "victim@example.com",
'body': template.format(url=fake_url) + " This is a phishing attempt.",
'urls': [fake_url],
'attachment_count': 0,
'label': 1,
'source': 'synthetic'
})
return pd.DataFrame(emails)
def create_combined_dataset(self) -> pd.DataFrame:
"""Create combined dataset from all sources"""
print("Creating combined dataset...")
# Process legitimate emails
legitimate_df = self.process_enron_emails(max_emails=5000)
# Process phishing emails
phishing_df = self.process_spamassassin_emails(max_emails=5000)
# Combine
combined = pd.concat([legitimate_df, phishing_df], ignore_index=True)
# Shuffle
combined = combined.sample(frac=1, random_state=42).reset_index(drop=True)
# Save
output_file = self.data_dir / "combined_emails.csv"
combined.to_csv(output_file, index=False)
print(f"Created combined dataset with {len(combined)} emails")
print(f"Legitimate: {len(combined[combined['label']==0])}")
print(f"Phishing: {len(combined[combined['label']==1])}")
print(f"Saved to {output_file}")
return combined
def main():
preparer = DatasetPreparer()
# Try to download datasets (optional - will use synthetic if fails)
print("Attempting to download real datasets...")
print("(Will use synthetic data if download fails)")
# Create combined dataset
df = preparer.create_combined_dataset()
print("\nDataset preparation complete!")
print(f"Total emails: {len(df)}")
print(f"Features: {list(df.columns)}")
if __name__ == "__main__":
main()
PY
python prepare_datasets.py
Step 2: Advanced Feature Extraction
Click to view complete feature extraction code
cat > advanced_features.py <<'PY'
#!/usr/bin/env python3
"""
Advanced Feature Extraction for Phishing Detection
Extracts comprehensive features from email headers, content, and URLs
"""
import re
import pandas as pd
import numpy as np
from urllib.parse import urlparse
from typing import Dict, List
import hashlib
from collections import Counter
class AdvancedFeatureExtractor:
"""Extract comprehensive features for phishing detection"""
def __init__(self):
self.suspicious_keywords = [
'urgent', 'verify', 'suspended', 'locked', 'expired',
'click here', 'act now', 'limited time', 'prize', 'winner',
'confirm', 'update', 'security', 'alert', 'warning'
]
self.legitimate_domains = [
'gmail.com', 'yahoo.com', 'outlook.com', 'company.com',
'microsoft.com', 'google.com', 'amazon.com'
]
def extract_header_features(self, email: Dict) -> Dict:
"""Extract features from email headers"""
features = {}
subject = str(email.get('subject', '')).lower()
from_addr = str(email.get('from', '')).lower()
to_addr = str(email.get('to', '')).lower()
# Subject features
features['subject_length'] = len(subject)
features['subject_has_urgent'] = any(kw in subject for kw in ['urgent', 'asap', 'immediate'])
features['subject_has_suspicious'] = any(kw in subject for kw in self.suspicious_keywords)
features['subject_has_numbers'] = bool(re.search(r'\d+', subject))
features['subject_has_special_chars'] = bool(re.search(r'[!@#$%^&*()]', subject))
features['subject_all_caps_ratio'] = sum(1 for c in subject if c.isupper()) / max(len(subject), 1)
# From address features
features['from_has_digits'] = bool(re.search(r'\d', from_addr))
features['from_domain_length'] = len(from_addr.split('@')[-1]) if '@' in from_addr else 0
features['from_is_legitimate'] = any(dom in from_addr for dom in self.legitimate_domains)
features['from_subdomain_count'] = from_addr.count('.') if '@' in from_addr else 0
# To address features
features['to_count'] = to_addr.count(',') + 1 if to_addr else 0
features['to_is_broadcast'] = to_count > 10 if 'to_count' in locals() else False
return features
def extract_content_features(self, email: Dict) -> Dict:
"""Extract features from email body content"""
features = {}
body = str(email.get('body', '')).lower()
# Basic content features
features['body_length'] = len(body)
features['body_word_count'] = len(body.split())
features['body_has_html'] = '<html' in body or '<body' in body
features['body_has_links'] = 'http' in body or 'www.' in body
# Suspicious content patterns
features['suspicious_keyword_count'] = sum(1 for kw in self.suspicious_keywords if kw in body)
features['suspicious_keyword_ratio'] = features['suspicious_keyword_count'] / max(features['body_word_count'], 1)
# Urgency indicators
features['urgency_indicators'] = sum(1 for word in ['urgent', 'asap', 'immediate', 'now', 'today'] if word in body)
# Grammar and spelling (simplified)
features['typo_indicators'] = len(re.findall(r'\b\w{15,}\b', body)) # Very long words
features['repeated_chars'] = len(re.findall(r'(.)\1{3,}', body)) # Repeated characters
# Financial/security terms
features['financial_terms'] = sum(1 for word in ['payment', 'invoice', 'account', 'bank', 'credit', 'card'] if word in body)
features['security_terms'] = sum(1 for word in ['verify', 'confirm', 'security', 'password', 'login'] if word in body)
# Punctuation patterns
features['exclamation_count'] = body.count('!')
features['question_count'] = body.count('?')
features['exclamation_ratio'] = features['exclamation_count'] / max(features['body_word_count'], 1)
return features
def extract_url_features(self, email: Dict) -> Dict:
"""Extract features from URLs in email"""
features = {}
urls = email.get('urls', [])
features['url_count'] = len(urls)
features['has_urls'] = len(urls) > 0
if urls:
url_features = []
for url in urls:
try:
parsed = urlparse(url)
url_feat = {
'is_https': parsed.scheme == 'https',
'domain_length': len(parsed.netloc),
'path_length': len(parsed.path),
'has_ip': bool(re.match(r'^\d+\.\d+\.\d+\.\d+', parsed.netloc)),
'has_port': ':' in parsed.netloc,
'subdomain_count': parsed.netloc.count('.') - 1,
'is_shortened': any(domain in parsed.netloc for domain in ['bit.ly', 'tinyurl', 'goo.gl', 't.co']),
'has_suspicious_tld': any(tld in parsed.netloc for tld in ['.tk', '.ml', '.ga', '.cf']),
'domain_has_digits': bool(re.search(r'\d', parsed.netloc)),
'path_has_suspicious': any(kw in parsed.path.lower() for kw in ['login', 'verify', 'confirm', 'secure'])
}
url_features.append(url_feat)
except:
continue
if url_features:
# Aggregate URL features
for key in url_features[0].keys():
features[f'url_{key}_mean'] = np.mean([uf[key] for uf in url_features])
features[f'url_{key}_max'] = np.max([uf[key] for uf in url_features])
else:
# No URLs
for key in ['is_https', 'domain_length', 'path_length', 'has_ip', 'has_port',
'subdomain_count', 'is_shortened', 'has_suspicious_tld',
'domain_has_digits', 'path_has_suspicious']:
features[f'url_{key}_mean'] = 0
features[f'url_{key}_max'] = 0
return features
def extract_attachment_features(self, email: Dict) -> Dict:
"""Extract features from email attachments"""
features = {}
attachments = email.get('attachments', [])
attachment_count = email.get('attachment_count', 0)
features['attachment_count'] = attachment_count
features['has_attachments'] = attachment_count > 0
if attachments:
# Check for suspicious file types
suspicious_extensions = ['.exe', '.bat', '.scr', '.vbs', '.js', '.jar', '.zip', '.rar']
features['suspicious_attachment'] = any(
any(ext in att.lower() for ext in suspicious_extensions)
for att in attachments
)
# Check for double extensions (common in malware)
features['double_extension'] = any('.' in att and att.count('.') > 1 for att in attachments)
else:
features['suspicious_attachment'] = False
features['double_extension'] = False
return features
def extract_all_features(self, email: Dict) -> Dict:
"""Extract all features from an email"""
all_features = {}
# Extract from different sources
all_features.update(self.extract_header_features(email))
all_features.update(self.extract_content_features(email))
all_features.update(self.extract_url_features(email))
all_features.update(self.extract_attachment_features(email))
return all_features
def extract_batch_features(self, emails: pd.DataFrame) -> pd.DataFrame:
"""Extract features for a batch of emails"""
print("Extracting features from emails...")
feature_list = []
for idx, email in emails.iterrows():
features = self.extract_all_features(email.to_dict())
features['email_id'] = idx
feature_list.append(features)
features_df = pd.DataFrame(feature_list)
# Merge with original labels
if 'label' in emails.columns:
features_df['label'] = emails['label'].values
print(f"Extracted {len(features_df.columns)} features from {len(emails)} emails")
return features_df
def main():
# Load dataset
df = pd.read_csv("datasets/combined_emails.csv")
# Initialize extractor
extractor = AdvancedFeatureExtractor()
# Extract features
features_df = extractor.extract_batch_features(df)
# Save features
features_df.to_csv("datasets/email_features.csv", index=False)
print(f"\nFeatures saved to datasets/email_features.csv")
print(f"Feature columns: {len(features_df.columns)}")
print(f"Sample features: {list(features_df.columns[:10])}")
if __name__ == "__main__":
main()
PY
python advanced_features.py
Step 3: Model Training with Multiple Algorithms
Click to view complete model training code
cat > train_production_model.py <<'PY'
#!/usr/bin/env python3
"""
Production-Ready Phishing Detection Model Training
Trains multiple models and selects the best one
"""
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split, cross_val_score, GridSearchCV
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.svm import SVC
from sklearn.metrics import (
accuracy_score, precision_score, recall_score, f1_score,
roc_auc_score, confusion_matrix, classification_report
)
from sklearn.preprocessing import StandardScaler
import joblib
import json
from datetime import datetime
class PhishingModelTrainer:
"""Train and evaluate multiple phishing detection models"""
def __init__(self):
self.models = {}
self.scaler = StandardScaler()
self.best_model = None
self.best_score = 0
self.feature_names = []
def load_data(self, features_file: str):
"""Load feature data"""
print(f"Loading features from {features_file}...")
df = pd.read_csv(features_file)
# Separate features and labels
if 'label' in df.columns:
X = df.drop(['label', 'email_id'], axis=1, errors='ignore')
y = df['label']
else:
raise ValueError("Label column not found in dataset")
# Handle missing values
X = X.fillna(0)
# Store feature names
self.feature_names = X.columns.tolist()
return X, y
def train_models(self, X, y, test_size=0.2):
"""Train multiple models and compare"""
print("\n" + "="*60)
print("Training Multiple Models")
print("="*60)
# Split data
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=test_size, random_state=42, stratify=y
)
# Scale features
X_train_scaled = self.scaler.fit_transform(X_train)
X_test_scaled = self.scaler.transform(X_test)
# Define models to train
models_to_train = {
'Logistic Regression': LogisticRegression(max_iter=1000, random_state=42),
'Random Forest': RandomForestClassifier(n_estimators=100, random_state=42, n_jobs=-1),
'Gradient Boosting': GradientBoostingClassifier(n_estimators=100, random_state=42),
'SVM': SVC(probability=True, random_state=42)
}
results = {}
for name, model in models_to_train.items():
print(f"\nTraining {name}...")
# Use scaled data for linear models, original for tree-based
if name in ['Logistic Regression', 'SVM']:
X_train_use = X_train_scaled
X_test_use = X_test_scaled
else:
X_train_use = X_train
X_test_use = X_test
# Train
model.fit(X_train_use, y_train)
# Predict
y_pred = model.predict(X_test_use)
y_pred_proba = model.predict_proba(X_test_use)[:, 1] if hasattr(model, 'predict_proba') else None
# Evaluate
accuracy = accuracy_score(y_test, y_pred)
precision = precision_score(y_test, y_pred)
recall = recall_score(y_test, y_pred)
f1 = f1_score(y_test, y_pred)
roc_auc = roc_auc_score(y_test, y_pred_proba) if y_pred_proba is not None else 0
results[name] = {
'model': model,
'accuracy': accuracy,
'precision': precision,
'recall': recall,
'f1': f1,
'roc_auc': roc_auc,
'confusion_matrix': confusion_matrix(y_test, y_pred).tolist()
}
print(f" Accuracy: {accuracy:.4f}")
print(f" Precision: {precision:.4f}")
print(f" Recall: {recall:.4f}")
print(f" F1-Score: {f1:.4f}")
print(f" ROC-AUC: {roc_auc:.4f}")
# Select best model (based on F1 score)
best_model_name = max(results.keys(), key=lambda k: results[k]['f1'])
self.best_model = results[best_model_name]['model']
self.best_score = results[best_model_name]['f1']
print("\n" + "="*60)
print(f"Best Model: {best_model_name}")
print(f"Best F1-Score: {self.best_score:.4f}")
print("="*60)
# Save all results
self.models = results
return results, X_test, y_test
def hyperparameter_tuning(self, X_train, y_train, model_name='Random Forest'):
"""Perform hyperparameter tuning for best model"""
print(f"\nPerforming hyperparameter tuning for {model_name}...")
if model_name == 'Random Forest':
param_grid = {
'n_estimators': [100, 200, 300],
'max_depth': [10, 20, None],
'min_samples_split': [2, 5, 10]
}
base_model = RandomForestClassifier(random_state=42, n_jobs=-1)
elif model_name == 'Gradient Boosting':
param_grid = {
'n_estimators': [100, 200],
'learning_rate': [0.01, 0.1, 0.2],
'max_depth': [3, 5, 7]
}
base_model = GradientBoostingClassifier(random_state=42)
else:
print("Hyperparameter tuning not implemented for this model")
return self.best_model
grid_search = GridSearchCV(
base_model, param_grid, cv=5, scoring='f1', n_jobs=-1, verbose=1
)
grid_search.fit(X_train, y_train)
print(f"Best parameters: {grid_search.best_params_}")
print(f"Best CV score: {grid_search.best_score_:.4f}")
self.best_model = grid_search.best_estimator_
return self.best_model
def save_model(self, filepath: str):
"""Save the best model and metadata"""
model_data = {
'model': self.best_model,
'scaler': self.scaler,
'feature_names': self.feature_names,
'training_date': datetime.now().isoformat(),
'performance': {
'f1_score': self.best_score,
'model_type': type(self.best_model).__name__
}
}
joblib.dump(model_data, filepath)
print(f"\nModel saved to {filepath}")
# Save metadata as JSON
metadata = {
'training_date': model_data['training_date'],
'model_type': model_data['performance']['model_type'],
'f1_score': float(model_data['performance']['f1_score']),
'feature_count': len(self.feature_names),
'features': self.feature_names[:20] # First 20 features
}
metadata_file = filepath.replace('.pkl', '_metadata.json')
with open(metadata_file, 'w') as f:
json.dump(metadata, f, indent=2)
print(f"Metadata saved to {metadata_file}")
def main():
trainer = PhishingModelTrainer()
# Load data
X, y = trainer.load_data("datasets/email_features.csv")
print(f"\nDataset Info:")
print(f" Total samples: {len(X)}")
print(f" Features: {len(X.columns)}")
print(f" Legitimate: {sum(y == 0)}")
print(f" Phishing: {sum(y == 1)}")
# Train models
results, X_test, y_test = trainer.train_models(X, y)
# Save best model
trainer.save_model("models/phishing_detector_production.pkl")
print("\n" + "="*60)
print("Training Complete!")
print("="*60)
print("\nNext steps:")
print("1. Review model performance metrics")
print("2. Test model on new emails")
print("3. Deploy model via API (see api_server.py)")
if __name__ == "__main__":
import os
os.makedirs("models", exist_ok=True)
main()
PY
python train_production_model.py
Step 4: Real-Time Detection API
Click to view complete API server code
cat > api_server.py <<'PY'
#!/usr/bin/env python3
"""
Production-Ready Phishing Detection API Server
Flask/FastAPI server for real-time email analysis
"""
from flask import Flask, request, jsonify
from flask_cors import CORS
import joblib
import pandas as pd
import numpy as np
from advanced_features import AdvancedFeatureExtractor
import logging
from datetime import datetime
import os
app = Flask(__name__)
CORS(app)
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
# Load model
MODEL_PATH = "models/phishing_detector_production.pkl"
model_data = None
feature_extractor = AdvancedFeatureExtractor()
def load_model():
"""Load the trained model"""
global model_data
try:
model_data = joblib.load(MODEL_PATH)
logger.info("Model loaded successfully")
return True
except Exception as e:
logger.error(f"Failed to load model: {e}")
return False
@app.route('/health', methods=['GET'])
def health_check():
"""Health check endpoint"""
return jsonify({
'status': 'healthy',
'model_loaded': model_data is not None,
'timestamp': datetime.now().isoformat()
})
@app.route('/predict', methods=['POST'])
def predict():
"""Predict if an email is phishing"""
try:
data = request.json
# Validate input
required_fields = ['subject', 'body', 'from']
for field in required_fields:
if field not in data:
return jsonify({
'error': f'Missing required field: {field}'
}), 400
# Extract features
email_dict = {
'subject': data.get('subject', ''),
'from': data.get('from', ''),
'to': data.get('to', ''),
'body': data.get('body', ''),
'urls': data.get('urls', []),
'attachments': data.get('attachments', []),
'attachment_count': len(data.get('attachments', []))
}
features = feature_extractor.extract_all_features(email_dict)
# Convert to DataFrame
features_df = pd.DataFrame([features])
# Ensure all feature columns exist
for col in model_data['feature_names']:
if col not in features_df.columns:
features_df[col] = 0
# Reorder columns
features_df = features_df[model_data['feature_names']]
# Scale features if needed
if hasattr(model_data['model'], 'predict_proba'):
# Check if model needs scaling (linear models)
if 'LogisticRegression' in str(type(model_data['model'])):
features_scaled = model_data['scaler'].transform(features_df)
prediction = model_data['model'].predict(features_scaled)[0]
probability = model_data['model'].predict_proba(features_scaled)[0]
else:
prediction = model_data['model'].predict(features_df)[0]
probability = model_data['model'].predict_proba(features_df)[0]
else:
prediction = model_data['model'].predict(features_df)[0]
probability = [0.5, 0.5] # Default if no probability
# Prepare response
result = {
'is_phishing': bool(prediction),
'confidence': float(max(probability)),
'phishing_probability': float(probability[1]) if len(probability) > 1 else float(probability[0]),
'legitimate_probability': float(probability[0]),
'timestamp': datetime.now().isoformat()
}
# Add risk level
if result['phishing_probability'] > 0.8:
result['risk_level'] = 'HIGH'
elif result['phishing_probability'] > 0.5:
result['risk_level'] = 'MEDIUM'
else:
result['risk_level'] = 'LOW'
logger.info(f"Prediction: {result['is_phishing']} (confidence: {result['confidence']:.2f})")
return jsonify(result)
except Exception as e:
logger.error(f"Prediction error: {e}")
return jsonify({
'error': 'Prediction failed',
'message': str(e)
}), 500
@app.route('/batch_predict', methods=['POST'])
def batch_predict():
"""Predict multiple emails at once"""
try:
data = request.json
if 'emails' not in data:
return jsonify({'error': 'Missing emails array'}), 400
results = []
for email in data['emails']:
# Create a temporary request-like object
with app.test_request_context(json=email):
result = predict()
results.append(result.get_json())
return jsonify({
'results': results,
'total': len(results),
'phishing_count': sum(1 for r in results if r.get('is_phishing', False))
})
except Exception as e:
logger.error(f"Batch prediction error: {e}")
return jsonify({
'error': 'Batch prediction failed',
'message': str(e)
}), 500
if __name__ == '__main__':
# Load model on startup
if not load_model():
logger.error("Failed to load model. Exiting.")
exit(1)
port = int(os.getenv('PORT', 5000))
app.run(host='0.0.0.0', port=port, debug=False)
PY
# Install Flask
pip install flask flask-cors
# Run API server
python api_server.py
Step 5: Web Dashboard
Click to view complete dashboard code
cat > dashboard.html <<'HTML'
<!DOCTYPE html>
<html>
<head>
<title>Phishing Detection Dashboard</title>
<style>
body { font-family: Arial, sans-serif; margin: 20px; background: #f5f5f5; }
.container { max-width: 1200px; margin: 0 auto; background: white; padding: 20px; border-radius: 8px; }
.input-section { margin: 20px 0; padding: 20px; background: #f9f9f9; border-radius: 4px; }
textarea, input { width: 100%; padding: 10px; margin: 5px 0; border: 1px solid #ddd; border-radius: 4px; }
button { padding: 10px 20px; background: #007bff; color: white; border: none; border-radius: 4px; cursor: pointer; }
button:hover { background: #0056b3; }
.result { margin: 20px 0; padding: 20px; border-radius: 4px; }
.result.phishing { background: #fee; border-left: 4px solid #f00; }
.result.legitimate { background: #efe; border-left: 4px solid #0a0; }
.risk-high { color: #d00; font-weight: bold; }
.risk-medium { color: #f80; font-weight: bold; }
.risk-low { color: #0a0; }
</style>
</head>
<body>
<div class="container">
<h1>🔐 Phishing Detection Dashboard</h1>
<div class="input-section">
<h2>Analyze Email</h2>
<label>From:</label>
<input type="text" id="from" placeholder="sender@example.com">
<label>Subject:</label>
<input type="text" id="subject" placeholder="Email subject">
<label>Body:</label>
<textarea id="body" rows="10" placeholder="Email body content"></textarea>
<button onclick="analyzeEmail()">Analyze Email</button>
</div>
<div id="result"></div>
</div>
<script>
async function analyzeEmail() {
const email = {
from: document.getElementById('from').value,
subject: document.getElementById('subject').value,
body: document.getElementById('body').value
};
const resultDiv = document.getElementById('result');
resultDiv.innerHTML = '<p>Analyzing...</p>';
try {
const response = await fetch('http://localhost:5000/predict', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify(email)
});
const result = await response.json();
const riskClass = `risk-${result.risk_level.toLowerCase()}`;
const resultClass = result.is_phishing ? 'phishing' : 'legitimate';
resultDiv.innerHTML = `
<div class="result ${resultClass}">
<h2>Analysis Result</h2>
<p><strong>Status:</strong> ${result.is_phishing ? '⚠️ PHISHING DETECTED' : '✅ Legitimate Email'}</p>
<p><strong>Risk Level:</strong> <span class="${riskClass}">${result.risk_level}</span></p>
<p><strong>Confidence:</strong> ${(result.confidence * 100).toFixed(2)}%</p>
<p><strong>Phishing Probability:</strong> ${(result.phishing_probability * 100).toFixed(2)}%</p>
<p><strong>Legitimate Probability:</strong> ${(result.legitimate_probability * 100).toFixed(2)}%</p>
</div>
`;
} catch (error) {
resultDiv.innerHTML = `<div class="result"><p>Error: ${error.message}</p></div>`;
}
}
</script>
</body>
</html>
HTML
# Open dashboard.html in browser
Project Summary
This real-world project provides:
- ✅ Real dataset integration (Enron, SpamAssassin)
- ✅ Advanced feature extraction (50+ features)
- ✅ Multiple ML model comparison
- ✅ Production-ready API server
- ✅ Web dashboard for monitoring
- ✅ Complete error handling and logging
- ✅ Model versioning and metadata
Usage:
- Prepare datasets:
python prepare_datasets.py - Extract features:
python advanced_features.py - Train models:
python train_production_model.py - Start API:
python api_server.py - Open dashboard:
open dashboard.html
What This Lesson Does NOT Cover (On Purpose)
This lesson intentionally does not cover:
- Image Analysis: Detecting phishing logos or QR codes.
- Header Forgery: Deep analysis of SMTP headers (covered in Email Security).
- Automated Takedowns: Logic for automatically reporting domains.
- Neural Networks: Transformers (BERT/GPT) for complex language understanding.
Limitations and Trade-offs
AI Phishing Detection Limitations
Zero-Day Attacks:
- AI models trained on known patterns
- May miss new attack techniques
- Requires continuous retraining
- Cannot detect completely novel attacks
- Combine with other methods
False Positives:
- May flag legitimate emails
- Business communication may be affected
- Requires tuning and refinement
- Context important for accuracy
- Regular model updates needed
Adversarial Emails:
- Attackers may craft emails to evade AI
- Obfuscation techniques can fool models
- Requires robust feature engineering
- Continuous monitoring needed
- Defense must evolve
Phishing Detection Trade-offs
Accuracy vs. Speed:
- More thorough analysis = better accuracy but slower
- Faster analysis = quicker decisions but may miss threats
- Balance based on requirements
- Real-time vs. batch processing
- Choose appropriate approach
Blocking vs. Quarantine:
- Blocking is safer but may block legitimate emails
- Quarantine allows review but delays delivery
- Balance based on confidence
- High confidence = block
- Low confidence = quarantine
Automation vs. Human Review:
- Full automation is fast but risky
- Human review is accurate but slow
- Balance based on risk level
- Automate clear cases
- Human review for ambiguous
When AI Phishing Detection May Be Challenging
Highly Targeted Attacks:
- Sophisticated targeted attacks may evade detection
- Small volume makes training difficult
- Requires advanced techniques
- Combine with threat intelligence
- Human analysis important
Multilingual Content:
- Models trained on one language may miss others
- Requires multilingual training data
- Language-specific models needed
- Consider language coverage
- Expand training data
Encrypted Content:
- Cannot analyze encrypted email content
- Must rely on metadata and headers
- Limited detection capabilities
- Consider email gateway inspection
- Metadata analysis helps
Cleanup
Click to view commands
deactivate || true
rm -rf .venv-phish emails.csv make_dataset.py train_and_eval.py score_email.py model.pkl
Career Alignment
After completing this lesson, you are prepared for:
- Email Security Specialist (Entry Level)
- Junior Detection Engineer
- SOC Analyst (L1) with AI focus
- Security Researcher (Phishing/Malware)
Next recommended steps:
→ Advanced NLP for threat detection
→ Building automated phishing playbooks
→ Integrating AI scores into SIEM/SOAR
Quick Reference
- Use synthetic/redacted data; keep humans in the decision loop.
- Validate with precision/recall; watch false positives before blocking.
- Pair ML with email-auth controls and attachment/link sandboxing.
- Keep models versioned (
model.pkl) and log every scored message.