#!/usr/bin/env python3 # CSDF - Practical-1 (Email Header Analysis) """ THIS CODE HAS BEEN TESTED AND IS FULLY OPERATIONAL. Problem Statement: Email Header Analysis - Write a program for Tracking Emails and Investigating Email Crimes. i.e. Write a program to analyze e–mail header. Code from CyberSecurityAndDigitalForensics (SPPU - Final Year - Computer Engineering - Content) repository on KSKA Git: https://git.kska.io/sppu-be-comp-content/CyberSecurityAndDigitalForensics """ # BEGINNING OF CODE import re import json from email import message_from_string, message_from_file from email.parser import Parser from datetime import datetime import socket import ipaddress class EmailHeaderAnalyzer: """ A comprehensive email header analyzer for forensic investigation """ def __init__(self, email_content): """ Initialize the analyzer with email content Args: email_content: Raw email string or file object """ if isinstance(email_content, str): self.email = message_from_string(email_content) else: self.email = message_from_file(email_content) self.analysis_results = {} def extract_basic_headers(self): """Extract basic email header information""" headers = { 'From': self.email.get('From', 'Not Found'), 'To': self.email.get('To', 'Not Found'), 'Subject': self.email.get('Subject', 'Not Found'), 'Date': self.email.get('Date', 'Not Found'), 'Message-ID': self.email.get('Message-ID', 'Not Found'), 'Return-Path': self.email.get('Return-Path', 'Not Found'), 'Reply-To': self.email.get('Reply-To', 'Not Found'), 'MIME-Version': self.email.get('MIME-Version', 'Not Found'), 'Content-Type': self.email.get('Content-Type', 'Not Found') } self.analysis_results['basic_headers'] = headers return headers def extract_received_headers(self): """ Extract and parse all 'Received' headers to trace email path Critical for tracking email transmission route """ received_headers = self.email.get_all('Received', []) parsed_received = [] for idx, received in enumerate(received_headers): hop_info = { 'hop_number': idx + 1, 'raw_header': received, 'timestamp': self._extract_timestamp(received), 'from_server': self._extract_from_server(received), 'by_server': self._extract_by_server(received), 'ip_address': self._extract_ip_from_received(received) } parsed_received.append(hop_info) self.analysis_results['received_headers'] = parsed_received return parsed_received def extract_originating_ip(self): """ Extract the originating IP address (X-Originating-IP) This is crucial for tracing the actual sender location """ originating_ip = self.email.get('X-Originating-IP', None) if originating_ip: # Clean up IP address (remove brackets if present) originating_ip = re.search(r'(\d+\.\d+\.\d+\.\d+)', originating_ip) if originating_ip: originating_ip = originating_ip.group(1) # If X-Originating-IP not found, try to get from first Received header if not originating_ip: received_headers = self.email.get_all('Received', []) if received_headers: originating_ip = self._extract_ip_from_received(received_headers[-1]) self.analysis_results['originating_ip'] = originating_ip return originating_ip def analyze_authentication(self): """ Analyze email authentication headers (SPF, DKIM, DMARC) Helps detect spoofing and verify email authenticity """ auth_results = { 'SPF': self.email.get('Received-SPF', 'Not Found'), 'DKIM-Signature': self.email.get('DKIM-Signature', 'Not Found'), 'Authentication-Results': self.email.get('Authentication-Results', 'Not Found'), 'ARC-Authentication-Results': self.email.get('ARC-Authentication-Results', 'Not Found') } # Determine if email passed authentication spf_pass = 'pass' in str(auth_results['SPF']).lower() dkim_pass = 'DKIM-Signature' in str(auth_results['DKIM-Signature']) auth_results['spf_passed'] = spf_pass auth_results['dkim_present'] = dkim_pass auth_results['likely_spoofed'] = not (spf_pass or dkim_pass) self.analysis_results['authentication'] = auth_results return auth_results def extract_message_id(self): """ Extract and analyze Message-ID Useful for tracking email threads and identifying patterns """ message_id = self.email.get('Message-ID', 'Not Found') # Extract domain from Message-ID domain = None if message_id != 'Not Found': domain_match = re.search(r'@([a-zA-Z0-9.-]+)', message_id) if domain_match: domain = domain_match.group(1) message_id_info = { 'message_id': message_id, 'domain': domain } self.analysis_results['message_id_info'] = message_id_info return message_id_info def analyze_sender_info(self): """ Detailed analysis of sender information Extracts email addresses and identifies potential spoofing """ from_header = self.email.get('From', '') return_path = self.email.get('Return-Path', '') # Extract email addresses from_email = self._extract_email_address(from_header) return_email = self._extract_email_address(return_path) # Check for mismatch (potential spoofing indicator) mismatch = from_email != return_email if from_email and return_email else False sender_info = { 'from_header': from_header, 'from_email': from_email, 'return_path': return_path, 'return_email': return_email, 'address_mismatch': mismatch, 'potential_spoofing': mismatch } self.analysis_results['sender_analysis'] = sender_info return sender_info def get_ip_geolocation_info(self, ip_address): """ Get basic information about an IP address Note: For production, integrate with geolocation APIs """ try: ip_obj = ipaddress.ip_address(ip_address) ip_info = { 'ip': ip_address, 'is_private': ip_obj.is_private, 'is_global': ip_obj.is_global, 'is_loopback': ip_obj.is_loopback, 'version': ip_obj.version } # Try reverse DNS lookup try: hostname = socket.gethostbyaddr(ip_address)[0] ip_info['hostname'] = hostname except: ip_info['hostname'] = 'Reverse DNS lookup failed' return ip_info except ValueError: return {'error': 'Invalid IP address'} def detect_suspicious_patterns(self): """ Detect common patterns associated with email crimes """ suspicious_indicators = [] # Check for authentication failures if self.analysis_results.get('authentication', {}).get('likely_spoofed'): suspicious_indicators.append('Email failed authentication checks (SPF/DKIM)') # Check for sender/return-path mismatch if self.analysis_results.get('sender_analysis', {}).get('potential_spoofing'): suspicious_indicators.append('Mismatch between From and Return-Path addresses') # Check for missing Message-ID if self.analysis_results.get('message_id_info', {}).get('message_id') == 'Not Found': suspicious_indicators.append('Missing Message-ID (unusual for legitimate emails)') # Check for suspicious keywords in subject subject = self.email.get('Subject', '').lower() suspicious_keywords = ['urgent', 'verify account', 'suspended', 'confirm', 'prize', 'winner'] found_keywords = [kw for kw in suspicious_keywords if kw in subject] if found_keywords: suspicious_indicators.append(f'Suspicious keywords in subject: {", ".join(found_keywords)}') self.analysis_results['suspicious_indicators'] = suspicious_indicators return suspicious_indicators def generate_forensic_report(self): """ Generate a comprehensive forensic analysis report """ # Run all analysis methods self.extract_basic_headers() self.extract_received_headers() self.extract_originating_ip() self.analyze_authentication() self.extract_message_id() self.analyze_sender_info() self.detect_suspicious_patterns() # Analyze originating IP if available orig_ip = self.analysis_results.get('originating_ip') if orig_ip: self.analysis_results['ip_analysis'] = self.get_ip_geolocation_info(orig_ip) return self.analysis_results def print_report(self): """Print a formatted forensic report""" report = self.generate_forensic_report() print("="*80) print(" EMAIL FORENSIC ANALYSIS REPORT ".center(80, "=")) print("="*80) print(f"\nGenerated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n") # Basic Headers print("\n" + "="*80) print("1. BASIC HEADER INFORMATION") print("="*80) for key, value in report['basic_headers'].items(): print(f"{key:20s}: {value}") # Sender Analysis print("\n" + "="*80) print("2. SENDER ANALYSIS") print("="*80) sender = report.get('sender_analysis', {}) print(f"From Email : {sender.get('from_email', 'N/A')}") print(f"Return Email : {sender.get('return_email', 'N/A')}") print(f"Address Mismatch : {sender.get('address_mismatch', False)}") print(f"Potential Spoofing : {sender.get('potential_spoofing', False)}") # Authentication print("\n" + "="*80) print("3. AUTHENTICATION ANALYSIS") print("="*80) auth = report.get('authentication', {}) print(f"SPF Passed : {auth.get('spf_passed', False)}") print(f"DKIM Present : {auth.get('dkim_present', False)}") print(f"Likely Spoofed : {auth.get('likely_spoofed', True)}") # Message ID print("\n" + "="*80) print("4. MESSAGE ID ANALYSIS") print("="*80) msg_id = report.get('message_id_info', {}) print(f"Message ID : {msg_id.get('message_id', 'N/A')}") print(f"Domain : {msg_id.get('domain', 'N/A')}") # Originating IP print("\n" + "="*80) print("5. ORIGINATING IP INFORMATION") print("="*80) print(f"Originating IP : {report.get('originating_ip', 'Not Found')}") if 'ip_analysis' in report: ip_info = report['ip_analysis'] print(f"IP Version : IPv{ip_info.get('version', 'N/A')}") print(f"Is Private : {ip_info.get('is_private', 'N/A')}") print(f"Is Global : {ip_info.get('is_global', 'N/A')}") print(f"Hostname : {ip_info.get('hostname', 'N/A')}") # Transmission Path print("\n" + "="*80) print("6. EMAIL TRANSMISSION PATH") print("="*80) received = report.get('received_headers', []) if received: for hop in received: print(f"\nHop {hop['hop_number']}:") print(f" From Server : {hop.get('from_server', 'N/A')}") print(f" By Server : {hop.get('by_server', 'N/A')}") print(f" IP Address : {hop.get('ip_address', 'N/A')}") print(f" Timestamp : {hop.get('timestamp', 'N/A')}") else: print("No Received headers found") # Suspicious Indicators print("\n" + "="*80) print("7. SUSPICIOUS INDICATORS") print("="*80) indicators = report.get('suspicious_indicators', []) if indicators: for idx, indicator in enumerate(indicators, 1): print(f"{idx}. {indicator}") else: print("No suspicious indicators detected") print("\n" + "="*80) print(" END OF REPORT ".center(80, "=")) print("="*80) def export_json(self, filename='email_analysis.json'): """Export analysis results to JSON file""" report = self.generate_forensic_report() with open(filename, 'w') as f: json.dump(report, f, indent=4) print(f"\nAnalysis exported to {filename}") # Helper methods def _extract_timestamp(self, received_header): """Extract timestamp from Received header""" timestamp_match = re.search(r';\s*(.+)$', received_header) return timestamp_match.group(1).strip() if timestamp_match else 'Not Found' def _extract_from_server(self, received_header): """Extract 'from' server information""" from_match = re.search(r'from\s+([^\s]+)', received_header, re.IGNORECASE) return from_match.group(1) if from_match else 'Not Found' def _extract_by_server(self, received_header): """Extract 'by' server information""" by_match = re.search(r'by\s+([^\s]+)', received_header, re.IGNORECASE) return by_match.group(1) if by_match else 'Not Found' def _extract_ip_from_received(self, received_header): """Extract IP address from Received header""" ip_match = re.search(r'\[(\d+\.\d+\.\d+\.\d+)\]', received_header) return ip_match.group(1) if ip_match else 'Not Found' def _extract_email_address(self, header_value): """Extract email address from header value""" email_match = re.search(r'[\w\.-]+@[\w\.-]+\.\w+', header_value) return email_match.group(0) if email_match else None def main(): """ Main function to demonstrate email header analysis """ print("Email Header Analysis Tool for Digital Forensics\n") print("Choose input method:") print("1. Paste raw email content") print("2. Load from file") print("3. Use sample email") choice = input("\nEnter choice (1-3): ").strip() if choice == '1': print("\nPaste the raw email (including headers). Press Ctrl+D (Linux/MacOS) or Ctrl+Z (Windows) when done: ") import sys email_content = sys.stdin.read() elif choice == '2': filename = input("Enter email file path: ").strip() try: with open(filename, 'r') as f: email_content = f.read() except FileNotFoundError: print(f"Error: File '{filename}' not found") return else: # Sample email for demonstration email_content = """From: sender@example.com To: recipient@example.com Subject: Urgent Account Verification Required Date: Mon, 9 Oct 2025 10:30:00 +0530 Message-ID: <12345.67890@mail.example.com> Return-Path: different@suspicious.com Received: from mail.example.com ([192.168.1.100]) by server.example.com with SMTP; Mon, 9 Oct 2025 10:30:00 +0530 Received: from client.suspicious.com ([203.0.113.45]) by mail.example.com with ESMTP; Mon, 9 Oct 2025 10:29:55 +0530 X-Originating-IP: [203.0.113.45] MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 This is a sample email body for forensic analysis. """ # Create analyzer instance and generate report analyzer = EmailHeaderAnalyzer(email_content) analyzer.print_report() # Ask if user wants to export to JSON export = input("\nExport analysis to JSON? (y/n): ").strip().lower() if export == 'y': filename = input("Enter filename (default: email_analysis.json): ").strip() if not filename: filename = 'email_analysis.json' analyzer.export_json(filename) if __name__ == "__main__": main() # END OF CODE