Files

427 lines
16 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
# CSDF - Practical-1 (Email Header Analysis)
"""
THIS CODE HAS BEEN TESTED AND IS FULLY OPERATIONAL.
Problem Statement: Email Header Analysis - Write a program for Tracking Emails and Investigating Email Crimes. i.e. Write a program to analyze email header.
Code from CyberSecurityAndDigitalForensics (SPPU - Final Year - Computer Engineering - Content) repository on KSKA Git: https://git.kska.io/sppu-be-comp-content/CyberSecurityAndDigitalForensics
"""
# BEGINNING OF CODE
import re
import json
from email import message_from_string, message_from_file
from email.parser import Parser
from datetime import datetime
import socket
import ipaddress
class EmailHeaderAnalyzer:
"""
A comprehensive email header analyzer for forensic investigation
"""
def __init__(self, email_content):
"""
Initialize the analyzer with email content
Args:
email_content: Raw email string or file object
"""
if isinstance(email_content, str):
self.email = message_from_string(email_content)
else:
self.email = message_from_file(email_content)
self.analysis_results = {}
def extract_basic_headers(self):
"""Extract basic email header information"""
headers = {
'From': self.email.get('From', 'Not Found'),
'To': self.email.get('To', 'Not Found'),
'Subject': self.email.get('Subject', 'Not Found'),
'Date': self.email.get('Date', 'Not Found'),
'Message-ID': self.email.get('Message-ID', 'Not Found'),
'Return-Path': self.email.get('Return-Path', 'Not Found'),
'Reply-To': self.email.get('Reply-To', 'Not Found'),
'MIME-Version': self.email.get('MIME-Version', 'Not Found'),
'Content-Type': self.email.get('Content-Type', 'Not Found')
}
self.analysis_results['basic_headers'] = headers
return headers
def extract_received_headers(self):
"""
Extract and parse all 'Received' headers to trace email path
Critical for tracking email transmission route
"""
received_headers = self.email.get_all('Received', [])
parsed_received = []
for idx, received in enumerate(received_headers):
hop_info = {
'hop_number': idx + 1,
'raw_header': received,
'timestamp': self._extract_timestamp(received),
'from_server': self._extract_from_server(received),
'by_server': self._extract_by_server(received),
'ip_address': self._extract_ip_from_received(received)
}
parsed_received.append(hop_info)
self.analysis_results['received_headers'] = parsed_received
return parsed_received
def extract_originating_ip(self):
"""
Extract the originating IP address (X-Originating-IP)
This is crucial for tracing the actual sender location
"""
originating_ip = self.email.get('X-Originating-IP', None)
if originating_ip:
# Clean up IP address (remove brackets if present)
originating_ip = re.search(r'(\d+\.\d+\.\d+\.\d+)', originating_ip)
if originating_ip:
originating_ip = originating_ip.group(1)
# If X-Originating-IP not found, try to get from first Received header
if not originating_ip:
received_headers = self.email.get_all('Received', [])
if received_headers:
originating_ip = self._extract_ip_from_received(received_headers[-1])
self.analysis_results['originating_ip'] = originating_ip
return originating_ip
def analyze_authentication(self):
"""
Analyze email authentication headers (SPF, DKIM, DMARC)
Helps detect spoofing and verify email authenticity
"""
auth_results = {
'SPF': self.email.get('Received-SPF', 'Not Found'),
'DKIM-Signature': self.email.get('DKIM-Signature', 'Not Found'),
'Authentication-Results': self.email.get('Authentication-Results', 'Not Found'),
'ARC-Authentication-Results': self.email.get('ARC-Authentication-Results', 'Not Found')
}
# Determine if email passed authentication
spf_pass = 'pass' in str(auth_results['SPF']).lower()
dkim_pass = 'DKIM-Signature' in str(auth_results['DKIM-Signature'])
auth_results['spf_passed'] = spf_pass
auth_results['dkim_present'] = dkim_pass
auth_results['likely_spoofed'] = not (spf_pass or dkim_pass)
self.analysis_results['authentication'] = auth_results
return auth_results
def extract_message_id(self):
"""
Extract and analyze Message-ID
Useful for tracking email threads and identifying patterns
"""
message_id = self.email.get('Message-ID', 'Not Found')
# Extract domain from Message-ID
domain = None
if message_id != 'Not Found':
domain_match = re.search(r'@([a-zA-Z0-9.-]+)', message_id)
if domain_match:
domain = domain_match.group(1)
message_id_info = {
'message_id': message_id,
'domain': domain
}
self.analysis_results['message_id_info'] = message_id_info
return message_id_info
def analyze_sender_info(self):
"""
Detailed analysis of sender information
Extracts email addresses and identifies potential spoofing
"""
from_header = self.email.get('From', '')
return_path = self.email.get('Return-Path', '')
# Extract email addresses
from_email = self._extract_email_address(from_header)
return_email = self._extract_email_address(return_path)
# Check for mismatch (potential spoofing indicator)
mismatch = from_email != return_email if from_email and return_email else False
sender_info = {
'from_header': from_header,
'from_email': from_email,
'return_path': return_path,
'return_email': return_email,
'address_mismatch': mismatch,
'potential_spoofing': mismatch
}
self.analysis_results['sender_analysis'] = sender_info
return sender_info
def get_ip_geolocation_info(self, ip_address):
"""
Get basic information about an IP address
Note: For production, integrate with geolocation APIs
"""
try:
ip_obj = ipaddress.ip_address(ip_address)
ip_info = {
'ip': ip_address,
'is_private': ip_obj.is_private,
'is_global': ip_obj.is_global,
'is_loopback': ip_obj.is_loopback,
'version': ip_obj.version
}
# Try reverse DNS lookup
try:
hostname = socket.gethostbyaddr(ip_address)[0]
ip_info['hostname'] = hostname
except:
ip_info['hostname'] = 'Reverse DNS lookup failed'
return ip_info
except ValueError:
return {'error': 'Invalid IP address'}
def detect_suspicious_patterns(self):
"""
Detect common patterns associated with email crimes
"""
suspicious_indicators = []
# Check for authentication failures
if self.analysis_results.get('authentication', {}).get('likely_spoofed'):
suspicious_indicators.append('Email failed authentication checks (SPF/DKIM)')
# Check for sender/return-path mismatch
if self.analysis_results.get('sender_analysis', {}).get('potential_spoofing'):
suspicious_indicators.append('Mismatch between From and Return-Path addresses')
# Check for missing Message-ID
if self.analysis_results.get('message_id_info', {}).get('message_id') == 'Not Found':
suspicious_indicators.append('Missing Message-ID (unusual for legitimate emails)')
# Check for suspicious keywords in subject
subject = self.email.get('Subject', '').lower()
suspicious_keywords = ['urgent', 'verify account', 'suspended', 'confirm', 'prize', 'winner']
found_keywords = [kw for kw in suspicious_keywords if kw in subject]
if found_keywords:
suspicious_indicators.append(f'Suspicious keywords in subject: {", ".join(found_keywords)}')
self.analysis_results['suspicious_indicators'] = suspicious_indicators
return suspicious_indicators
def generate_forensic_report(self):
"""
Generate a comprehensive forensic analysis report
"""
# Run all analysis methods
self.extract_basic_headers()
self.extract_received_headers()
self.extract_originating_ip()
self.analyze_authentication()
self.extract_message_id()
self.analyze_sender_info()
self.detect_suspicious_patterns()
# Analyze originating IP if available
orig_ip = self.analysis_results.get('originating_ip')
if orig_ip:
self.analysis_results['ip_analysis'] = self.get_ip_geolocation_info(orig_ip)
return self.analysis_results
def print_report(self):
"""Print a formatted forensic report"""
report = self.generate_forensic_report()
print("="*80)
print(" EMAIL FORENSIC ANALYSIS REPORT ".center(80, "="))
print("="*80)
print(f"\nGenerated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
# Basic Headers
print("\n" + "="*80)
print("1. BASIC HEADER INFORMATION")
print("="*80)
for key, value in report['basic_headers'].items():
print(f"{key:20s}: {value}")
# Sender Analysis
print("\n" + "="*80)
print("2. SENDER ANALYSIS")
print("="*80)
sender = report.get('sender_analysis', {})
print(f"From Email : {sender.get('from_email', 'N/A')}")
print(f"Return Email : {sender.get('return_email', 'N/A')}")
print(f"Address Mismatch : {sender.get('address_mismatch', False)}")
print(f"Potential Spoofing : {sender.get('potential_spoofing', False)}")
# Authentication
print("\n" + "="*80)
print("3. AUTHENTICATION ANALYSIS")
print("="*80)
auth = report.get('authentication', {})
print(f"SPF Passed : {auth.get('spf_passed', False)}")
print(f"DKIM Present : {auth.get('dkim_present', False)}")
print(f"Likely Spoofed : {auth.get('likely_spoofed', True)}")
# Message ID
print("\n" + "="*80)
print("4. MESSAGE ID ANALYSIS")
print("="*80)
msg_id = report.get('message_id_info', {})
print(f"Message ID : {msg_id.get('message_id', 'N/A')}")
print(f"Domain : {msg_id.get('domain', 'N/A')}")
# Originating IP
print("\n" + "="*80)
print("5. ORIGINATING IP INFORMATION")
print("="*80)
print(f"Originating IP : {report.get('originating_ip', 'Not Found')}")
if 'ip_analysis' in report:
ip_info = report['ip_analysis']
print(f"IP Version : IPv{ip_info.get('version', 'N/A')}")
print(f"Is Private : {ip_info.get('is_private', 'N/A')}")
print(f"Is Global : {ip_info.get('is_global', 'N/A')}")
print(f"Hostname : {ip_info.get('hostname', 'N/A')}")
# Transmission Path
print("\n" + "="*80)
print("6. EMAIL TRANSMISSION PATH")
print("="*80)
received = report.get('received_headers', [])
if received:
for hop in received:
print(f"\nHop {hop['hop_number']}:")
print(f" From Server : {hop.get('from_server', 'N/A')}")
print(f" By Server : {hop.get('by_server', 'N/A')}")
print(f" IP Address : {hop.get('ip_address', 'N/A')}")
print(f" Timestamp : {hop.get('timestamp', 'N/A')}")
else:
print("No Received headers found")
# Suspicious Indicators
print("\n" + "="*80)
print("7. SUSPICIOUS INDICATORS")
print("="*80)
indicators = report.get('suspicious_indicators', [])
if indicators:
for idx, indicator in enumerate(indicators, 1):
print(f"{idx}. {indicator}")
else:
print("No suspicious indicators detected")
print("\n" + "="*80)
print(" END OF REPORT ".center(80, "="))
print("="*80)
def export_json(self, filename='email_analysis.json'):
"""Export analysis results to JSON file"""
report = self.generate_forensic_report()
with open(filename, 'w') as f:
json.dump(report, f, indent=4)
print(f"\nAnalysis exported to {filename}")
# Helper methods
def _extract_timestamp(self, received_header):
"""Extract timestamp from Received header"""
timestamp_match = re.search(r';\s*(.+)$', received_header)
return timestamp_match.group(1).strip() if timestamp_match else 'Not Found'
def _extract_from_server(self, received_header):
"""Extract 'from' server information"""
from_match = re.search(r'from\s+([^\s]+)', received_header, re.IGNORECASE)
return from_match.group(1) if from_match else 'Not Found'
def _extract_by_server(self, received_header):
"""Extract 'by' server information"""
by_match = re.search(r'by\s+([^\s]+)', received_header, re.IGNORECASE)
return by_match.group(1) if by_match else 'Not Found'
def _extract_ip_from_received(self, received_header):
"""Extract IP address from Received header"""
ip_match = re.search(r'\[(\d+\.\d+\.\d+\.\d+)\]', received_header)
return ip_match.group(1) if ip_match else 'Not Found'
def _extract_email_address(self, header_value):
"""Extract email address from header value"""
email_match = re.search(r'[\w\.-]+@[\w\.-]+\.\w+', header_value)
return email_match.group(0) if email_match else None
def main():
"""
Main function to demonstrate email header analysis
"""
print("Email Header Analysis Tool for Digital Forensics\n")
print("Choose input method:")
print("1. Paste raw email content")
print("2. Load from file")
print("3. Use sample email")
choice = input("\nEnter choice (1-3): ").strip()
if choice == '1':
print("\nPaste the raw email (including headers). Press Ctrl+D (Linux/MacOS) or Ctrl+Z (Windows) when done: ")
import sys
email_content = sys.stdin.read()
elif choice == '2':
filename = input("Enter email file path: ").strip()
try:
with open(filename, 'r') as f:
email_content = f.read()
except FileNotFoundError:
print(f"Error: File '{filename}' not found")
return
else:
# Sample email for demonstration
email_content = """From: sender@example.com
To: recipient@example.com
Subject: Urgent Account Verification Required
Date: Mon, 9 Oct 2025 10:30:00 +0530
Message-ID: <12345.67890@mail.example.com>
Return-Path: different@suspicious.com
Received: from mail.example.com ([192.168.1.100]) by server.example.com with SMTP; Mon, 9 Oct 2025 10:30:00 +0530
Received: from client.suspicious.com ([203.0.113.45]) by mail.example.com with ESMTP; Mon, 9 Oct 2025 10:29:55 +0530
X-Originating-IP: [203.0.113.45]
MIME-Version: 1.0
Content-Type: text/plain; charset=utf-8
This is a sample email body for forensic analysis.
"""
# Create analyzer instance and generate report
analyzer = EmailHeaderAnalyzer(email_content)
analyzer.print_report()
# Ask if user wants to export to JSON
export = input("\nExport analysis to JSON? (y/n): ").strip().lower()
if export == 'y':
filename = input("Enter filename (default: email_analysis.json): ").strip()
if not filename:
filename = 'email_analysis.json'
analyzer.export_json(filename)
if __name__ == "__main__":
main()
# END OF CODE