Added code for practical-1.1, i.e. email header analysis.

This commit is contained in:
K
2025-10-09 20:24:13 +05:30
parent ac26f66516
commit 1db296206a
+426
View File
@@ -0,0 +1,426 @@
#!/usr/bin/env python3
# CSDF - Practical-1 (Email Header Analysis)
"""
THIS CODE HAS BEEN TESTED AND IS FULLY OPERATIONAL.
Problem Statement: Email Header Analysis - Write a program for Tracking Emails and Investigating Email Crimes. i.e. Write a program to analyze email header.
Code from CyberSecurityAndDigitalForensics (SPPU - Final Year - Computer Engineering - Content) repository on KSKA Git: https://git.kska.io/sppu-be-comp-content/CyberSecurityAndDigitalForensics
"""
# BEGINNING OF CODE
import re
import json
from email import message_from_string, message_from_file
from email.parser import Parser
from datetime import datetime
import socket
import ipaddress
class EmailHeaderAnalyzer:
"""
A comprehensive email header analyzer for forensic investigation
"""
def __init__(self, email_content):
"""
Initialize the analyzer with email content
Args:
email_content: Raw email string or file object
"""
if isinstance(email_content, str):
self.email = message_from_string(email_content)
else:
self.email = message_from_file(email_content)
self.analysis_results = {}
def extract_basic_headers(self):
"""Extract basic email header information"""
headers = {
'From': self.email.get('From', 'Not Found'),
'To': self.email.get('To', 'Not Found'),
'Subject': self.email.get('Subject', 'Not Found'),
'Date': self.email.get('Date', 'Not Found'),
'Message-ID': self.email.get('Message-ID', 'Not Found'),
'Return-Path': self.email.get('Return-Path', 'Not Found'),
'Reply-To': self.email.get('Reply-To', 'Not Found'),
'MIME-Version': self.email.get('MIME-Version', 'Not Found'),
'Content-Type': self.email.get('Content-Type', 'Not Found')
}
self.analysis_results['basic_headers'] = headers
return headers
def extract_received_headers(self):
"""
Extract and parse all 'Received' headers to trace email path
Critical for tracking email transmission route
"""
received_headers = self.email.get_all('Received', [])
parsed_received = []
for idx, received in enumerate(received_headers):
hop_info = {
'hop_number': idx + 1,
'raw_header': received,
'timestamp': self._extract_timestamp(received),
'from_server': self._extract_from_server(received),
'by_server': self._extract_by_server(received),
'ip_address': self._extract_ip_from_received(received)
}
parsed_received.append(hop_info)
self.analysis_results['received_headers'] = parsed_received
return parsed_received
def extract_originating_ip(self):
"""
Extract the originating IP address (X-Originating-IP)
This is crucial for tracing the actual sender location
"""
originating_ip = self.email.get('X-Originating-IP', None)
if originating_ip:
# Clean up IP address (remove brackets if present)
originating_ip = re.search(r'(\d+\.\d+\.\d+\.\d+)', originating_ip)
if originating_ip:
originating_ip = originating_ip.group(1)
# If X-Originating-IP not found, try to get from first Received header
if not originating_ip:
received_headers = self.email.get_all('Received', [])
if received_headers:
originating_ip = self._extract_ip_from_received(received_headers[-1])
self.analysis_results['originating_ip'] = originating_ip
return originating_ip
def analyze_authentication(self):
"""
Analyze email authentication headers (SPF, DKIM, DMARC)
Helps detect spoofing and verify email authenticity
"""
auth_results = {
'SPF': self.email.get('Received-SPF', 'Not Found'),
'DKIM-Signature': self.email.get('DKIM-Signature', 'Not Found'),
'Authentication-Results': self.email.get('Authentication-Results', 'Not Found'),
'ARC-Authentication-Results': self.email.get('ARC-Authentication-Results', 'Not Found')
}
# Determine if email passed authentication
spf_pass = 'pass' in str(auth_results['SPF']).lower()
dkim_pass = 'DKIM-Signature' in str(auth_results['DKIM-Signature'])
auth_results['spf_passed'] = spf_pass
auth_results['dkim_present'] = dkim_pass
auth_results['likely_spoofed'] = not (spf_pass or dkim_pass)
self.analysis_results['authentication'] = auth_results
return auth_results
def extract_message_id(self):
"""
Extract and analyze Message-ID
Useful for tracking email threads and identifying patterns
"""
message_id = self.email.get('Message-ID', 'Not Found')
# Extract domain from Message-ID
domain = None
if message_id != 'Not Found':
domain_match = re.search(r'@([a-zA-Z0-9.-]+)', message_id)
if domain_match:
domain = domain_match.group(1)
message_id_info = {
'message_id': message_id,
'domain': domain
}
self.analysis_results['message_id_info'] = message_id_info
return message_id_info
def analyze_sender_info(self):
"""
Detailed analysis of sender information
Extracts email addresses and identifies potential spoofing
"""
from_header = self.email.get('From', '')
return_path = self.email.get('Return-Path', '')
# Extract email addresses
from_email = self._extract_email_address(from_header)
return_email = self._extract_email_address(return_path)
# Check for mismatch (potential spoofing indicator)
mismatch = from_email != return_email if from_email and return_email else False
sender_info = {
'from_header': from_header,
'from_email': from_email,
'return_path': return_path,
'return_email': return_email,
'address_mismatch': mismatch,
'potential_spoofing': mismatch
}
self.analysis_results['sender_analysis'] = sender_info
return sender_info
def get_ip_geolocation_info(self, ip_address):
"""
Get basic information about an IP address
Note: For production, integrate with geolocation APIs
"""
try:
ip_obj = ipaddress.ip_address(ip_address)
ip_info = {
'ip': ip_address,
'is_private': ip_obj.is_private,
'is_global': ip_obj.is_global,
'is_loopback': ip_obj.is_loopback,
'version': ip_obj.version
}
# Try reverse DNS lookup
try:
hostname = socket.gethostbyaddr(ip_address)[0]
ip_info['hostname'] = hostname
except:
ip_info['hostname'] = 'Reverse DNS lookup failed'
return ip_info
except ValueError:
return {'error': 'Invalid IP address'}
def detect_suspicious_patterns(self):
"""
Detect common patterns associated with email crimes
"""
suspicious_indicators = []
# Check for authentication failures
if self.analysis_results.get('authentication', {}).get('likely_spoofed'):
suspicious_indicators.append('Email failed authentication checks (SPF/DKIM)')
# Check for sender/return-path mismatch
if self.analysis_results.get('sender_analysis', {}).get('potential_spoofing'):
suspicious_indicators.append('Mismatch between From and Return-Path addresses')
# Check for missing Message-ID
if self.analysis_results.get('message_id_info', {}).get('message_id') == 'Not Found':
suspicious_indicators.append('Missing Message-ID (unusual for legitimate emails)')
# Check for suspicious keywords in subject
subject = self.email.get('Subject', '').lower()
suspicious_keywords = ['urgent', 'verify account', 'suspended', 'confirm', 'prize', 'winner']
found_keywords = [kw for kw in suspicious_keywords if kw in subject]
if found_keywords:
suspicious_indicators.append(f'Suspicious keywords in subject: {", ".join(found_keywords)}')
self.analysis_results['suspicious_indicators'] = suspicious_indicators
return suspicious_indicators
def generate_forensic_report(self):
"""
Generate a comprehensive forensic analysis report
"""
# Run all analysis methods
self.extract_basic_headers()
self.extract_received_headers()
self.extract_originating_ip()
self.analyze_authentication()
self.extract_message_id()
self.analyze_sender_info()
self.detect_suspicious_patterns()
# Analyze originating IP if available
orig_ip = self.analysis_results.get('originating_ip')
if orig_ip:
self.analysis_results['ip_analysis'] = self.get_ip_geolocation_info(orig_ip)
return self.analysis_results
def print_report(self):
"""Print a formatted forensic report"""
report = self.generate_forensic_report()
print("="*80)
print(" EMAIL FORENSIC ANALYSIS REPORT ".center(80, "="))
print("="*80)
print(f"\nGenerated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
# Basic Headers
print("\n" + "="*80)
print("1. BASIC HEADER INFORMATION")
print("="*80)
for key, value in report['basic_headers'].items():
print(f"{key:20s}: {value}")
# Sender Analysis
print("\n" + "="*80)
print("2. SENDER ANALYSIS")
print("="*80)
sender = report.get('sender_analysis', {})
print(f"From Email : {sender.get('from_email', 'N/A')}")
print(f"Return Email : {sender.get('return_email', 'N/A')}")
print(f"Address Mismatch : {sender.get('address_mismatch', False)}")
print(f"Potential Spoofing : {sender.get('potential_spoofing', False)}")
# Authentication
print("\n" + "="*80)
print("3. AUTHENTICATION ANALYSIS")
print("="*80)
auth = report.get('authentication', {})
print(f"SPF Passed : {auth.get('spf_passed', False)}")
print(f"DKIM Present : {auth.get('dkim_present', False)}")
print(f"Likely Spoofed : {auth.get('likely_spoofed', True)}")
# Message ID
print("\n" + "="*80)
print("4. MESSAGE ID ANALYSIS")
print("="*80)
msg_id = report.get('message_id_info', {})
print(f"Message ID : {msg_id.get('message_id', 'N/A')}")
print(f"Domain : {msg_id.get('domain', 'N/A')}")
# Originating IP
print("\n" + "="*80)
print("5. ORIGINATING IP INFORMATION")
print("="*80)
print(f"Originating IP : {report.get('originating_ip', 'Not Found')}")
if 'ip_analysis' in report:
ip_info = report['ip_analysis']
print(f"IP Version : IPv{ip_info.get('version', 'N/A')}")
print(f"Is Private : {ip_info.get('is_private', 'N/A')}")
print(f"Is Global : {ip_info.get('is_global', 'N/A')}")
print(f"Hostname : {ip_info.get('hostname', 'N/A')}")
# Transmission Path
print("\n" + "="*80)
print("6. EMAIL TRANSMISSION PATH")
print("="*80)
received = report.get('received_headers', [])
if received:
for hop in received:
print(f"\nHop {hop['hop_number']}:")
print(f" From Server : {hop.get('from_server', 'N/A')}")
print(f" By Server : {hop.get('by_server', 'N/A')}")
print(f" IP Address : {hop.get('ip_address', 'N/A')}")
print(f" Timestamp : {hop.get('timestamp', 'N/A')}")
else:
print("No Received headers found")
# Suspicious Indicators
print("\n" + "="*80)
print("7. SUSPICIOUS INDICATORS")
print("="*80)
indicators = report.get('suspicious_indicators', [])
if indicators:
for idx, indicator in enumerate(indicators, 1):
print(f"{idx}. {indicator}")
else:
print("No suspicious indicators detected")
print("\n" + "="*80)
print(" END OF REPORT ".center(80, "="))
print("="*80)
def export_json(self, filename='email_analysis.json'):
"""Export analysis results to JSON file"""
report = self.generate_forensic_report()
with open(filename, 'w') as f:
json.dump(report, f, indent=4)
print(f"\nAnalysis exported to {filename}")
# Helper methods
def _extract_timestamp(self, received_header):
"""Extract timestamp from Received header"""
timestamp_match = re.search(r';\s*(.+)$', received_header)
return timestamp_match.group(1).strip() if timestamp_match else 'Not Found'
def _extract_from_server(self, received_header):
"""Extract 'from' server information"""
from_match = re.search(r'from\s+([^\s]+)', received_header, re.IGNORECASE)
return from_match.group(1) if from_match else 'Not Found'
def _extract_by_server(self, received_header):
"""Extract 'by' server information"""
by_match = re.search(r'by\s+([^\s]+)', received_header, re.IGNORECASE)
return by_match.group(1) if by_match else 'Not Found'
def _extract_ip_from_received(self, received_header):
"""Extract IP address from Received header"""
ip_match = re.search(r'\[(\d+\.\d+\.\d+\.\d+)\]', received_header)
return ip_match.group(1) if ip_match else 'Not Found'
def _extract_email_address(self, header_value):
"""Extract email address from header value"""
email_match = re.search(r'[\w\.-]+@[\w\.-]+\.\w+', header_value)
return email_match.group(0) if email_match else None
def main():
"""
Main function to demonstrate email header analysis
"""
print("Email Header Analysis Tool for Digital Forensics\n")
print("Choose input method:")
print("1. Paste raw email content")
print("2. Load from file")
print("3. Use sample email")
choice = input("\nEnter choice (1-3): ").strip()
if choice == '1':
print("\nPaste the raw email (including headers). Press Ctrl+D (Linux/MacOS) or Ctrl+Z (Windows) when done: ")
import sys
email_content = sys.stdin.read()
elif choice == '2':
filename = input("Enter email file path: ").strip()
try:
with open(filename, 'r') as f:
email_content = f.read()
except FileNotFoundError:
print(f"Error: File '{filename}' not found")
return
else:
# Sample email for demonstration
email_content = """From: sender@example.com
To: recipient@example.com
Subject: Urgent Account Verification Required
Date: Mon, 9 Oct 2025 10:30:00 +0530
Message-ID: <12345.67890@mail.example.com>
Return-Path: different@suspicious.com
Received: from mail.example.com ([192.168.1.100]) by server.example.com with SMTP; Mon, 9 Oct 2025 10:30:00 +0530
Received: from client.suspicious.com ([203.0.113.45]) by mail.example.com with ESMTP; Mon, 9 Oct 2025 10:29:55 +0530
X-Originating-IP: [203.0.113.45]
MIME-Version: 1.0
Content-Type: text/plain; charset=utf-8
This is a sample email body for forensic analysis.
"""
# Create analyzer instance and generate report
analyzer = EmailHeaderAnalyzer(email_content)
analyzer.print_report()
# Ask if user wants to export to JSON
export = input("\nExport analysis to JSON? (y/n): ").strip().lower()
if export == 'y':
filename = input("Enter filename (default: email_analysis.json): ").strip()
if not filename:
filename = 'email_analysis.json'
analyzer.export_json(filename)
if __name__ == "__main__":
main()
# END OF CODE