diff --git a/Codes/Code-1.1.py b/Codes/Code-1.1.py new file mode 100644 index 0000000..a93d7a7 --- /dev/null +++ b/Codes/Code-1.1.py @@ -0,0 +1,426 @@ +#!/usr/bin/env python3 + +# CSDF - Practical-1 (Email Header Analysis) + +""" +THIS CODE HAS BEEN TESTED AND IS FULLY OPERATIONAL. + +Problem Statement: Email Header Analysis - Write a program for Tracking Emails and Investigating Email Crimes. i.e. Write a program to analyze e–mail header. + +Code from CyberSecurityAndDigitalForensics (SPPU - Final Year - Computer Engineering - Content) repository on KSKA Git: https://git.kska.io/sppu-be-comp-content/CyberSecurityAndDigitalForensics +""" + +# BEGINNING OF CODE +import re +import json +from email import message_from_string, message_from_file +from email.parser import Parser +from datetime import datetime +import socket +import ipaddress + +class EmailHeaderAnalyzer: + """ + A comprehensive email header analyzer for forensic investigation + """ + + def __init__(self, email_content): + """ + Initialize the analyzer with email content + + Args: + email_content: Raw email string or file object + """ + if isinstance(email_content, str): + self.email = message_from_string(email_content) + else: + self.email = message_from_file(email_content) + + self.analysis_results = {} + + def extract_basic_headers(self): + """Extract basic email header information""" + headers = { + 'From': self.email.get('From', 'Not Found'), + 'To': self.email.get('To', 'Not Found'), + 'Subject': self.email.get('Subject', 'Not Found'), + 'Date': self.email.get('Date', 'Not Found'), + 'Message-ID': self.email.get('Message-ID', 'Not Found'), + 'Return-Path': self.email.get('Return-Path', 'Not Found'), + 'Reply-To': self.email.get('Reply-To', 'Not Found'), + 'MIME-Version': self.email.get('MIME-Version', 'Not Found'), + 'Content-Type': self.email.get('Content-Type', 'Not Found') + } + + self.analysis_results['basic_headers'] = headers + return headers + + def extract_received_headers(self): + """ + Extract and parse all 'Received' headers to trace email path + Critical for tracking email transmission route + """ + received_headers = self.email.get_all('Received', []) + parsed_received = [] + + for idx, received in enumerate(received_headers): + hop_info = { + 'hop_number': idx + 1, + 'raw_header': received, + 'timestamp': self._extract_timestamp(received), + 'from_server': self._extract_from_server(received), + 'by_server': self._extract_by_server(received), + 'ip_address': self._extract_ip_from_received(received) + } + parsed_received.append(hop_info) + + self.analysis_results['received_headers'] = parsed_received + return parsed_received + + def extract_originating_ip(self): + """ + Extract the originating IP address (X-Originating-IP) + This is crucial for tracing the actual sender location + """ + originating_ip = self.email.get('X-Originating-IP', None) + + if originating_ip: + # Clean up IP address (remove brackets if present) + originating_ip = re.search(r'(\d+\.\d+\.\d+\.\d+)', originating_ip) + if originating_ip: + originating_ip = originating_ip.group(1) + + # If X-Originating-IP not found, try to get from first Received header + if not originating_ip: + received_headers = self.email.get_all('Received', []) + if received_headers: + originating_ip = self._extract_ip_from_received(received_headers[-1]) + + self.analysis_results['originating_ip'] = originating_ip + return originating_ip + + def analyze_authentication(self): + """ + Analyze email authentication headers (SPF, DKIM, DMARC) + Helps detect spoofing and verify email authenticity + """ + auth_results = { + 'SPF': self.email.get('Received-SPF', 'Not Found'), + 'DKIM-Signature': self.email.get('DKIM-Signature', 'Not Found'), + 'Authentication-Results': self.email.get('Authentication-Results', 'Not Found'), + 'ARC-Authentication-Results': self.email.get('ARC-Authentication-Results', 'Not Found') + } + + # Determine if email passed authentication + spf_pass = 'pass' in str(auth_results['SPF']).lower() + dkim_pass = 'DKIM-Signature' in str(auth_results['DKIM-Signature']) + + auth_results['spf_passed'] = spf_pass + auth_results['dkim_present'] = dkim_pass + auth_results['likely_spoofed'] = not (spf_pass or dkim_pass) + + self.analysis_results['authentication'] = auth_results + return auth_results + + def extract_message_id(self): + """ + Extract and analyze Message-ID + Useful for tracking email threads and identifying patterns + """ + message_id = self.email.get('Message-ID', 'Not Found') + + # Extract domain from Message-ID + domain = None + if message_id != 'Not Found': + domain_match = re.search(r'@([a-zA-Z0-9.-]+)', message_id) + if domain_match: + domain = domain_match.group(1) + + message_id_info = { + 'message_id': message_id, + 'domain': domain + } + + self.analysis_results['message_id_info'] = message_id_info + return message_id_info + + def analyze_sender_info(self): + """ + Detailed analysis of sender information + Extracts email addresses and identifies potential spoofing + """ + from_header = self.email.get('From', '') + return_path = self.email.get('Return-Path', '') + + # Extract email addresses + from_email = self._extract_email_address(from_header) + return_email = self._extract_email_address(return_path) + + # Check for mismatch (potential spoofing indicator) + mismatch = from_email != return_email if from_email and return_email else False + + sender_info = { + 'from_header': from_header, + 'from_email': from_email, + 'return_path': return_path, + 'return_email': return_email, + 'address_mismatch': mismatch, + 'potential_spoofing': mismatch + } + + self.analysis_results['sender_analysis'] = sender_info + return sender_info + + def get_ip_geolocation_info(self, ip_address): + """ + Get basic information about an IP address + Note: For production, integrate with geolocation APIs + """ + try: + ip_obj = ipaddress.ip_address(ip_address) + ip_info = { + 'ip': ip_address, + 'is_private': ip_obj.is_private, + 'is_global': ip_obj.is_global, + 'is_loopback': ip_obj.is_loopback, + 'version': ip_obj.version + } + + # Try reverse DNS lookup + try: + hostname = socket.gethostbyaddr(ip_address)[0] + ip_info['hostname'] = hostname + except: + ip_info['hostname'] = 'Reverse DNS lookup failed' + + return ip_info + except ValueError: + return {'error': 'Invalid IP address'} + + def detect_suspicious_patterns(self): + """ + Detect common patterns associated with email crimes + """ + suspicious_indicators = [] + + # Check for authentication failures + if self.analysis_results.get('authentication', {}).get('likely_spoofed'): + suspicious_indicators.append('Email failed authentication checks (SPF/DKIM)') + + # Check for sender/return-path mismatch + if self.analysis_results.get('sender_analysis', {}).get('potential_spoofing'): + suspicious_indicators.append('Mismatch between From and Return-Path addresses') + + # Check for missing Message-ID + if self.analysis_results.get('message_id_info', {}).get('message_id') == 'Not Found': + suspicious_indicators.append('Missing Message-ID (unusual for legitimate emails)') + + # Check for suspicious keywords in subject + subject = self.email.get('Subject', '').lower() + suspicious_keywords = ['urgent', 'verify account', 'suspended', 'confirm', 'prize', 'winner'] + found_keywords = [kw for kw in suspicious_keywords if kw in subject] + if found_keywords: + suspicious_indicators.append(f'Suspicious keywords in subject: {", ".join(found_keywords)}') + + self.analysis_results['suspicious_indicators'] = suspicious_indicators + return suspicious_indicators + + def generate_forensic_report(self): + """ + Generate a comprehensive forensic analysis report + """ + # Run all analysis methods + self.extract_basic_headers() + self.extract_received_headers() + self.extract_originating_ip() + self.analyze_authentication() + self.extract_message_id() + self.analyze_sender_info() + self.detect_suspicious_patterns() + + # Analyze originating IP if available + orig_ip = self.analysis_results.get('originating_ip') + if orig_ip: + self.analysis_results['ip_analysis'] = self.get_ip_geolocation_info(orig_ip) + + return self.analysis_results + + def print_report(self): + """Print a formatted forensic report""" + report = self.generate_forensic_report() + + print("="*80) + print(" EMAIL FORENSIC ANALYSIS REPORT ".center(80, "=")) + print("="*80) + print(f"\nGenerated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n") + + # Basic Headers + print("\n" + "="*80) + print("1. BASIC HEADER INFORMATION") + print("="*80) + for key, value in report['basic_headers'].items(): + print(f"{key:20s}: {value}") + + # Sender Analysis + print("\n" + "="*80) + print("2. SENDER ANALYSIS") + print("="*80) + sender = report.get('sender_analysis', {}) + print(f"From Email : {sender.get('from_email', 'N/A')}") + print(f"Return Email : {sender.get('return_email', 'N/A')}") + print(f"Address Mismatch : {sender.get('address_mismatch', False)}") + print(f"Potential Spoofing : {sender.get('potential_spoofing', False)}") + + # Authentication + print("\n" + "="*80) + print("3. AUTHENTICATION ANALYSIS") + print("="*80) + auth = report.get('authentication', {}) + print(f"SPF Passed : {auth.get('spf_passed', False)}") + print(f"DKIM Present : {auth.get('dkim_present', False)}") + print(f"Likely Spoofed : {auth.get('likely_spoofed', True)}") + + # Message ID + print("\n" + "="*80) + print("4. MESSAGE ID ANALYSIS") + print("="*80) + msg_id = report.get('message_id_info', {}) + print(f"Message ID : {msg_id.get('message_id', 'N/A')}") + print(f"Domain : {msg_id.get('domain', 'N/A')}") + + # Originating IP + print("\n" + "="*80) + print("5. ORIGINATING IP INFORMATION") + print("="*80) + print(f"Originating IP : {report.get('originating_ip', 'Not Found')}") + + if 'ip_analysis' in report: + ip_info = report['ip_analysis'] + print(f"IP Version : IPv{ip_info.get('version', 'N/A')}") + print(f"Is Private : {ip_info.get('is_private', 'N/A')}") + print(f"Is Global : {ip_info.get('is_global', 'N/A')}") + print(f"Hostname : {ip_info.get('hostname', 'N/A')}") + + # Transmission Path + print("\n" + "="*80) + print("6. EMAIL TRANSMISSION PATH") + print("="*80) + received = report.get('received_headers', []) + if received: + for hop in received: + print(f"\nHop {hop['hop_number']}:") + print(f" From Server : {hop.get('from_server', 'N/A')}") + print(f" By Server : {hop.get('by_server', 'N/A')}") + print(f" IP Address : {hop.get('ip_address', 'N/A')}") + print(f" Timestamp : {hop.get('timestamp', 'N/A')}") + else: + print("No Received headers found") + + # Suspicious Indicators + print("\n" + "="*80) + print("7. SUSPICIOUS INDICATORS") + print("="*80) + indicators = report.get('suspicious_indicators', []) + if indicators: + for idx, indicator in enumerate(indicators, 1): + print(f"{idx}. {indicator}") + else: + print("No suspicious indicators detected") + + print("\n" + "="*80) + print(" END OF REPORT ".center(80, "=")) + print("="*80) + + def export_json(self, filename='email_analysis.json'): + """Export analysis results to JSON file""" + report = self.generate_forensic_report() + with open(filename, 'w') as f: + json.dump(report, f, indent=4) + print(f"\nAnalysis exported to {filename}") + + # Helper methods + def _extract_timestamp(self, received_header): + """Extract timestamp from Received header""" + timestamp_match = re.search(r';\s*(.+)$', received_header) + return timestamp_match.group(1).strip() if timestamp_match else 'Not Found' + + def _extract_from_server(self, received_header): + """Extract 'from' server information""" + from_match = re.search(r'from\s+([^\s]+)', received_header, re.IGNORECASE) + return from_match.group(1) if from_match else 'Not Found' + + def _extract_by_server(self, received_header): + """Extract 'by' server information""" + by_match = re.search(r'by\s+([^\s]+)', received_header, re.IGNORECASE) + return by_match.group(1) if by_match else 'Not Found' + + def _extract_ip_from_received(self, received_header): + """Extract IP address from Received header""" + ip_match = re.search(r'\[(\d+\.\d+\.\d+\.\d+)\]', received_header) + return ip_match.group(1) if ip_match else 'Not Found' + + def _extract_email_address(self, header_value): + """Extract email address from header value""" + email_match = re.search(r'[\w\.-]+@[\w\.-]+\.\w+', header_value) + return email_match.group(0) if email_match else None + + +def main(): + """ + Main function to demonstrate email header analysis + """ + print("Email Header Analysis Tool for Digital Forensics\n") + print("Choose input method:") + print("1. Paste raw email content") + print("2. Load from file") + print("3. Use sample email") + + choice = input("\nEnter choice (1-3): ").strip() + + if choice == '1': + print("\nPaste the raw email (including headers). Press Ctrl+D (Linux/MacOS) or Ctrl+Z (Windows) when done: ") + import sys + email_content = sys.stdin.read() + + elif choice == '2': + filename = input("Enter email file path: ").strip() + try: + with open(filename, 'r') as f: + email_content = f.read() + except FileNotFoundError: + print(f"Error: File '{filename}' not found") + return + + else: + # Sample email for demonstration + email_content = """From: sender@example.com +To: recipient@example.com +Subject: Urgent Account Verification Required +Date: Mon, 9 Oct 2025 10:30:00 +0530 +Message-ID: <12345.67890@mail.example.com> +Return-Path: different@suspicious.com +Received: from mail.example.com ([192.168.1.100]) by server.example.com with SMTP; Mon, 9 Oct 2025 10:30:00 +0530 +Received: from client.suspicious.com ([203.0.113.45]) by mail.example.com with ESMTP; Mon, 9 Oct 2025 10:29:55 +0530 +X-Originating-IP: [203.0.113.45] +MIME-Version: 1.0 +Content-Type: text/plain; charset=utf-8 + +This is a sample email body for forensic analysis. +""" + + # Create analyzer instance and generate report + analyzer = EmailHeaderAnalyzer(email_content) + analyzer.print_report() + + # Ask if user wants to export to JSON + export = input("\nExport analysis to JSON? (y/n): ").strip().lower() + if export == 'y': + filename = input("Enter filename (default: email_analysis.json): ").strip() + if not filename: + filename = 'email_analysis.json' + analyzer.export_json(filename) + + +if __name__ == "__main__": + main() +# END OF CODE