Compare commits
12 Commits
ac26f66516
..
main
| Author | SHA1 | Date | |
|---|---|---|---|
|
1e422eebd1
|
|||
|
9473cb00f9
|
|||
|
f10cd27120
|
|||
|
98ec149773
|
|||
|
dbcf975d4d
|
|||
|
b1f33dce46
|
|||
|
ceadf4b65f
|
|||
|
aa60831d3e
|
|||
|
ce0ea63243
|
|||
|
2fab8ccc15
|
|||
|
912433edf6
|
|||
|
1db296206a
|
@@ -0,0 +1,426 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
|
||||||
|
# CSDF - Practical-1 (Email Header Analysis)
|
||||||
|
|
||||||
|
"""
|
||||||
|
THIS CODE HAS BEEN TESTED AND IS FULLY OPERATIONAL.
|
||||||
|
|
||||||
|
Problem Statement: Email Header Analysis - Write a program for Tracking Emails and Investigating Email Crimes. i.e. Write a program to analyze e–mail header.
|
||||||
|
|
||||||
|
Code from CyberSecurityAndDigitalForensics (SPPU - Final Year - Computer Engineering - Content) repository on KSKA Git: https://git.kska.io/sppu-be-comp-content/CyberSecurityAndDigitalForensics
|
||||||
|
"""
|
||||||
|
|
||||||
|
# BEGINNING OF CODE
|
||||||
|
import re
|
||||||
|
import json
|
||||||
|
from email import message_from_string, message_from_file
|
||||||
|
from email.parser import Parser
|
||||||
|
from datetime import datetime
|
||||||
|
import socket
|
||||||
|
import ipaddress
|
||||||
|
|
||||||
|
class EmailHeaderAnalyzer:
|
||||||
|
"""
|
||||||
|
A comprehensive email header analyzer for forensic investigation
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, email_content):
|
||||||
|
"""
|
||||||
|
Initialize the analyzer with email content
|
||||||
|
|
||||||
|
Args:
|
||||||
|
email_content: Raw email string or file object
|
||||||
|
"""
|
||||||
|
if isinstance(email_content, str):
|
||||||
|
self.email = message_from_string(email_content)
|
||||||
|
else:
|
||||||
|
self.email = message_from_file(email_content)
|
||||||
|
|
||||||
|
self.analysis_results = {}
|
||||||
|
|
||||||
|
def extract_basic_headers(self):
|
||||||
|
"""Extract basic email header information"""
|
||||||
|
headers = {
|
||||||
|
'From': self.email.get('From', 'Not Found'),
|
||||||
|
'To': self.email.get('To', 'Not Found'),
|
||||||
|
'Subject': self.email.get('Subject', 'Not Found'),
|
||||||
|
'Date': self.email.get('Date', 'Not Found'),
|
||||||
|
'Message-ID': self.email.get('Message-ID', 'Not Found'),
|
||||||
|
'Return-Path': self.email.get('Return-Path', 'Not Found'),
|
||||||
|
'Reply-To': self.email.get('Reply-To', 'Not Found'),
|
||||||
|
'MIME-Version': self.email.get('MIME-Version', 'Not Found'),
|
||||||
|
'Content-Type': self.email.get('Content-Type', 'Not Found')
|
||||||
|
}
|
||||||
|
|
||||||
|
self.analysis_results['basic_headers'] = headers
|
||||||
|
return headers
|
||||||
|
|
||||||
|
def extract_received_headers(self):
|
||||||
|
"""
|
||||||
|
Extract and parse all 'Received' headers to trace email path
|
||||||
|
Critical for tracking email transmission route
|
||||||
|
"""
|
||||||
|
received_headers = self.email.get_all('Received', [])
|
||||||
|
parsed_received = []
|
||||||
|
|
||||||
|
for idx, received in enumerate(received_headers):
|
||||||
|
hop_info = {
|
||||||
|
'hop_number': idx + 1,
|
||||||
|
'raw_header': received,
|
||||||
|
'timestamp': self._extract_timestamp(received),
|
||||||
|
'from_server': self._extract_from_server(received),
|
||||||
|
'by_server': self._extract_by_server(received),
|
||||||
|
'ip_address': self._extract_ip_from_received(received)
|
||||||
|
}
|
||||||
|
parsed_received.append(hop_info)
|
||||||
|
|
||||||
|
self.analysis_results['received_headers'] = parsed_received
|
||||||
|
return parsed_received
|
||||||
|
|
||||||
|
def extract_originating_ip(self):
|
||||||
|
"""
|
||||||
|
Extract the originating IP address (X-Originating-IP)
|
||||||
|
This is crucial for tracing the actual sender location
|
||||||
|
"""
|
||||||
|
originating_ip = self.email.get('X-Originating-IP', None)
|
||||||
|
|
||||||
|
if originating_ip:
|
||||||
|
# Clean up IP address (remove brackets if present)
|
||||||
|
originating_ip = re.search(r'(\d+\.\d+\.\d+\.\d+)', originating_ip)
|
||||||
|
if originating_ip:
|
||||||
|
originating_ip = originating_ip.group(1)
|
||||||
|
|
||||||
|
# If X-Originating-IP not found, try to get from first Received header
|
||||||
|
if not originating_ip:
|
||||||
|
received_headers = self.email.get_all('Received', [])
|
||||||
|
if received_headers:
|
||||||
|
originating_ip = self._extract_ip_from_received(received_headers[-1])
|
||||||
|
|
||||||
|
self.analysis_results['originating_ip'] = originating_ip
|
||||||
|
return originating_ip
|
||||||
|
|
||||||
|
def analyze_authentication(self):
|
||||||
|
"""
|
||||||
|
Analyze email authentication headers (SPF, DKIM, DMARC)
|
||||||
|
Helps detect spoofing and verify email authenticity
|
||||||
|
"""
|
||||||
|
auth_results = {
|
||||||
|
'SPF': self.email.get('Received-SPF', 'Not Found'),
|
||||||
|
'DKIM-Signature': self.email.get('DKIM-Signature', 'Not Found'),
|
||||||
|
'Authentication-Results': self.email.get('Authentication-Results', 'Not Found'),
|
||||||
|
'ARC-Authentication-Results': self.email.get('ARC-Authentication-Results', 'Not Found')
|
||||||
|
}
|
||||||
|
|
||||||
|
# Determine if email passed authentication
|
||||||
|
spf_pass = 'pass' in str(auth_results['SPF']).lower()
|
||||||
|
dkim_pass = 'DKIM-Signature' in str(auth_results['DKIM-Signature'])
|
||||||
|
|
||||||
|
auth_results['spf_passed'] = spf_pass
|
||||||
|
auth_results['dkim_present'] = dkim_pass
|
||||||
|
auth_results['likely_spoofed'] = not (spf_pass or dkim_pass)
|
||||||
|
|
||||||
|
self.analysis_results['authentication'] = auth_results
|
||||||
|
return auth_results
|
||||||
|
|
||||||
|
def extract_message_id(self):
|
||||||
|
"""
|
||||||
|
Extract and analyze Message-ID
|
||||||
|
Useful for tracking email threads and identifying patterns
|
||||||
|
"""
|
||||||
|
message_id = self.email.get('Message-ID', 'Not Found')
|
||||||
|
|
||||||
|
# Extract domain from Message-ID
|
||||||
|
domain = None
|
||||||
|
if message_id != 'Not Found':
|
||||||
|
domain_match = re.search(r'@([a-zA-Z0-9.-]+)', message_id)
|
||||||
|
if domain_match:
|
||||||
|
domain = domain_match.group(1)
|
||||||
|
|
||||||
|
message_id_info = {
|
||||||
|
'message_id': message_id,
|
||||||
|
'domain': domain
|
||||||
|
}
|
||||||
|
|
||||||
|
self.analysis_results['message_id_info'] = message_id_info
|
||||||
|
return message_id_info
|
||||||
|
|
||||||
|
def analyze_sender_info(self):
|
||||||
|
"""
|
||||||
|
Detailed analysis of sender information
|
||||||
|
Extracts email addresses and identifies potential spoofing
|
||||||
|
"""
|
||||||
|
from_header = self.email.get('From', '')
|
||||||
|
return_path = self.email.get('Return-Path', '')
|
||||||
|
|
||||||
|
# Extract email addresses
|
||||||
|
from_email = self._extract_email_address(from_header)
|
||||||
|
return_email = self._extract_email_address(return_path)
|
||||||
|
|
||||||
|
# Check for mismatch (potential spoofing indicator)
|
||||||
|
mismatch = from_email != return_email if from_email and return_email else False
|
||||||
|
|
||||||
|
sender_info = {
|
||||||
|
'from_header': from_header,
|
||||||
|
'from_email': from_email,
|
||||||
|
'return_path': return_path,
|
||||||
|
'return_email': return_email,
|
||||||
|
'address_mismatch': mismatch,
|
||||||
|
'potential_spoofing': mismatch
|
||||||
|
}
|
||||||
|
|
||||||
|
self.analysis_results['sender_analysis'] = sender_info
|
||||||
|
return sender_info
|
||||||
|
|
||||||
|
def get_ip_geolocation_info(self, ip_address):
|
||||||
|
"""
|
||||||
|
Get basic information about an IP address
|
||||||
|
Note: For production, integrate with geolocation APIs
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
ip_obj = ipaddress.ip_address(ip_address)
|
||||||
|
ip_info = {
|
||||||
|
'ip': ip_address,
|
||||||
|
'is_private': ip_obj.is_private,
|
||||||
|
'is_global': ip_obj.is_global,
|
||||||
|
'is_loopback': ip_obj.is_loopback,
|
||||||
|
'version': ip_obj.version
|
||||||
|
}
|
||||||
|
|
||||||
|
# Try reverse DNS lookup
|
||||||
|
try:
|
||||||
|
hostname = socket.gethostbyaddr(ip_address)[0]
|
||||||
|
ip_info['hostname'] = hostname
|
||||||
|
except:
|
||||||
|
ip_info['hostname'] = 'Reverse DNS lookup failed'
|
||||||
|
|
||||||
|
return ip_info
|
||||||
|
except ValueError:
|
||||||
|
return {'error': 'Invalid IP address'}
|
||||||
|
|
||||||
|
def detect_suspicious_patterns(self):
|
||||||
|
"""
|
||||||
|
Detect common patterns associated with email crimes
|
||||||
|
"""
|
||||||
|
suspicious_indicators = []
|
||||||
|
|
||||||
|
# Check for authentication failures
|
||||||
|
if self.analysis_results.get('authentication', {}).get('likely_spoofed'):
|
||||||
|
suspicious_indicators.append('Email failed authentication checks (SPF/DKIM)')
|
||||||
|
|
||||||
|
# Check for sender/return-path mismatch
|
||||||
|
if self.analysis_results.get('sender_analysis', {}).get('potential_spoofing'):
|
||||||
|
suspicious_indicators.append('Mismatch between From and Return-Path addresses')
|
||||||
|
|
||||||
|
# Check for missing Message-ID
|
||||||
|
if self.analysis_results.get('message_id_info', {}).get('message_id') == 'Not Found':
|
||||||
|
suspicious_indicators.append('Missing Message-ID (unusual for legitimate emails)')
|
||||||
|
|
||||||
|
# Check for suspicious keywords in subject
|
||||||
|
subject = self.email.get('Subject', '').lower()
|
||||||
|
suspicious_keywords = ['urgent', 'verify account', 'suspended', 'confirm', 'prize', 'winner']
|
||||||
|
found_keywords = [kw for kw in suspicious_keywords if kw in subject]
|
||||||
|
if found_keywords:
|
||||||
|
suspicious_indicators.append(f'Suspicious keywords in subject: {", ".join(found_keywords)}')
|
||||||
|
|
||||||
|
self.analysis_results['suspicious_indicators'] = suspicious_indicators
|
||||||
|
return suspicious_indicators
|
||||||
|
|
||||||
|
def generate_forensic_report(self):
|
||||||
|
"""
|
||||||
|
Generate a comprehensive forensic analysis report
|
||||||
|
"""
|
||||||
|
# Run all analysis methods
|
||||||
|
self.extract_basic_headers()
|
||||||
|
self.extract_received_headers()
|
||||||
|
self.extract_originating_ip()
|
||||||
|
self.analyze_authentication()
|
||||||
|
self.extract_message_id()
|
||||||
|
self.analyze_sender_info()
|
||||||
|
self.detect_suspicious_patterns()
|
||||||
|
|
||||||
|
# Analyze originating IP if available
|
||||||
|
orig_ip = self.analysis_results.get('originating_ip')
|
||||||
|
if orig_ip:
|
||||||
|
self.analysis_results['ip_analysis'] = self.get_ip_geolocation_info(orig_ip)
|
||||||
|
|
||||||
|
return self.analysis_results
|
||||||
|
|
||||||
|
def print_report(self):
|
||||||
|
"""Print a formatted forensic report"""
|
||||||
|
report = self.generate_forensic_report()
|
||||||
|
|
||||||
|
print("="*80)
|
||||||
|
print(" EMAIL FORENSIC ANALYSIS REPORT ".center(80, "="))
|
||||||
|
print("="*80)
|
||||||
|
print(f"\nGenerated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
|
||||||
|
|
||||||
|
# Basic Headers
|
||||||
|
print("\n" + "="*80)
|
||||||
|
print("1. BASIC HEADER INFORMATION")
|
||||||
|
print("="*80)
|
||||||
|
for key, value in report['basic_headers'].items():
|
||||||
|
print(f"{key:20s}: {value}")
|
||||||
|
|
||||||
|
# Sender Analysis
|
||||||
|
print("\n" + "="*80)
|
||||||
|
print("2. SENDER ANALYSIS")
|
||||||
|
print("="*80)
|
||||||
|
sender = report.get('sender_analysis', {})
|
||||||
|
print(f"From Email : {sender.get('from_email', 'N/A')}")
|
||||||
|
print(f"Return Email : {sender.get('return_email', 'N/A')}")
|
||||||
|
print(f"Address Mismatch : {sender.get('address_mismatch', False)}")
|
||||||
|
print(f"Potential Spoofing : {sender.get('potential_spoofing', False)}")
|
||||||
|
|
||||||
|
# Authentication
|
||||||
|
print("\n" + "="*80)
|
||||||
|
print("3. AUTHENTICATION ANALYSIS")
|
||||||
|
print("="*80)
|
||||||
|
auth = report.get('authentication', {})
|
||||||
|
print(f"SPF Passed : {auth.get('spf_passed', False)}")
|
||||||
|
print(f"DKIM Present : {auth.get('dkim_present', False)}")
|
||||||
|
print(f"Likely Spoofed : {auth.get('likely_spoofed', True)}")
|
||||||
|
|
||||||
|
# Message ID
|
||||||
|
print("\n" + "="*80)
|
||||||
|
print("4. MESSAGE ID ANALYSIS")
|
||||||
|
print("="*80)
|
||||||
|
msg_id = report.get('message_id_info', {})
|
||||||
|
print(f"Message ID : {msg_id.get('message_id', 'N/A')}")
|
||||||
|
print(f"Domain : {msg_id.get('domain', 'N/A')}")
|
||||||
|
|
||||||
|
# Originating IP
|
||||||
|
print("\n" + "="*80)
|
||||||
|
print("5. ORIGINATING IP INFORMATION")
|
||||||
|
print("="*80)
|
||||||
|
print(f"Originating IP : {report.get('originating_ip', 'Not Found')}")
|
||||||
|
|
||||||
|
if 'ip_analysis' in report:
|
||||||
|
ip_info = report['ip_analysis']
|
||||||
|
print(f"IP Version : IPv{ip_info.get('version', 'N/A')}")
|
||||||
|
print(f"Is Private : {ip_info.get('is_private', 'N/A')}")
|
||||||
|
print(f"Is Global : {ip_info.get('is_global', 'N/A')}")
|
||||||
|
print(f"Hostname : {ip_info.get('hostname', 'N/A')}")
|
||||||
|
|
||||||
|
# Transmission Path
|
||||||
|
print("\n" + "="*80)
|
||||||
|
print("6. EMAIL TRANSMISSION PATH")
|
||||||
|
print("="*80)
|
||||||
|
received = report.get('received_headers', [])
|
||||||
|
if received:
|
||||||
|
for hop in received:
|
||||||
|
print(f"\nHop {hop['hop_number']}:")
|
||||||
|
print(f" From Server : {hop.get('from_server', 'N/A')}")
|
||||||
|
print(f" By Server : {hop.get('by_server', 'N/A')}")
|
||||||
|
print(f" IP Address : {hop.get('ip_address', 'N/A')}")
|
||||||
|
print(f" Timestamp : {hop.get('timestamp', 'N/A')}")
|
||||||
|
else:
|
||||||
|
print("No Received headers found")
|
||||||
|
|
||||||
|
# Suspicious Indicators
|
||||||
|
print("\n" + "="*80)
|
||||||
|
print("7. SUSPICIOUS INDICATORS")
|
||||||
|
print("="*80)
|
||||||
|
indicators = report.get('suspicious_indicators', [])
|
||||||
|
if indicators:
|
||||||
|
for idx, indicator in enumerate(indicators, 1):
|
||||||
|
print(f"{idx}. {indicator}")
|
||||||
|
else:
|
||||||
|
print("No suspicious indicators detected")
|
||||||
|
|
||||||
|
print("\n" + "="*80)
|
||||||
|
print(" END OF REPORT ".center(80, "="))
|
||||||
|
print("="*80)
|
||||||
|
|
||||||
|
def export_json(self, filename='email_analysis.json'):
|
||||||
|
"""Export analysis results to JSON file"""
|
||||||
|
report = self.generate_forensic_report()
|
||||||
|
with open(filename, 'w') as f:
|
||||||
|
json.dump(report, f, indent=4)
|
||||||
|
print(f"\nAnalysis exported to {filename}")
|
||||||
|
|
||||||
|
# Helper methods
|
||||||
|
def _extract_timestamp(self, received_header):
|
||||||
|
"""Extract timestamp from Received header"""
|
||||||
|
timestamp_match = re.search(r';\s*(.+)$', received_header)
|
||||||
|
return timestamp_match.group(1).strip() if timestamp_match else 'Not Found'
|
||||||
|
|
||||||
|
def _extract_from_server(self, received_header):
|
||||||
|
"""Extract 'from' server information"""
|
||||||
|
from_match = re.search(r'from\s+([^\s]+)', received_header, re.IGNORECASE)
|
||||||
|
return from_match.group(1) if from_match else 'Not Found'
|
||||||
|
|
||||||
|
def _extract_by_server(self, received_header):
|
||||||
|
"""Extract 'by' server information"""
|
||||||
|
by_match = re.search(r'by\s+([^\s]+)', received_header, re.IGNORECASE)
|
||||||
|
return by_match.group(1) if by_match else 'Not Found'
|
||||||
|
|
||||||
|
def _extract_ip_from_received(self, received_header):
|
||||||
|
"""Extract IP address from Received header"""
|
||||||
|
ip_match = re.search(r'\[(\d+\.\d+\.\d+\.\d+)\]', received_header)
|
||||||
|
return ip_match.group(1) if ip_match else 'Not Found'
|
||||||
|
|
||||||
|
def _extract_email_address(self, header_value):
|
||||||
|
"""Extract email address from header value"""
|
||||||
|
email_match = re.search(r'[\w\.-]+@[\w\.-]+\.\w+', header_value)
|
||||||
|
return email_match.group(0) if email_match else None
|
||||||
|
|
||||||
|
|
||||||
|
def main():
|
||||||
|
"""
|
||||||
|
Main function to demonstrate email header analysis
|
||||||
|
"""
|
||||||
|
print("Email Header Analysis Tool for Digital Forensics\n")
|
||||||
|
print("Choose input method:")
|
||||||
|
print("1. Paste raw email content")
|
||||||
|
print("2. Load from file")
|
||||||
|
print("3. Use sample email")
|
||||||
|
|
||||||
|
choice = input("\nEnter choice (1-3): ").strip()
|
||||||
|
|
||||||
|
if choice == '1':
|
||||||
|
print("\nPaste the raw email (including headers). Press Ctrl+D (Linux/MacOS) or Ctrl+Z (Windows) when done: ")
|
||||||
|
import sys
|
||||||
|
email_content = sys.stdin.read()
|
||||||
|
|
||||||
|
elif choice == '2':
|
||||||
|
filename = input("Enter email file path: ").strip()
|
||||||
|
try:
|
||||||
|
with open(filename, 'r') as f:
|
||||||
|
email_content = f.read()
|
||||||
|
except FileNotFoundError:
|
||||||
|
print(f"Error: File '{filename}' not found")
|
||||||
|
return
|
||||||
|
|
||||||
|
else:
|
||||||
|
# Sample email for demonstration
|
||||||
|
email_content = """From: sender@example.com
|
||||||
|
To: recipient@example.com
|
||||||
|
Subject: Urgent Account Verification Required
|
||||||
|
Date: Mon, 9 Oct 2025 10:30:00 +0530
|
||||||
|
Message-ID: <12345.67890@mail.example.com>
|
||||||
|
Return-Path: different@suspicious.com
|
||||||
|
Received: from mail.example.com ([192.168.1.100]) by server.example.com with SMTP; Mon, 9 Oct 2025 10:30:00 +0530
|
||||||
|
Received: from client.suspicious.com ([203.0.113.45]) by mail.example.com with ESMTP; Mon, 9 Oct 2025 10:29:55 +0530
|
||||||
|
X-Originating-IP: [203.0.113.45]
|
||||||
|
MIME-Version: 1.0
|
||||||
|
Content-Type: text/plain; charset=utf-8
|
||||||
|
|
||||||
|
This is a sample email body for forensic analysis.
|
||||||
|
"""
|
||||||
|
|
||||||
|
# Create analyzer instance and generate report
|
||||||
|
analyzer = EmailHeaderAnalyzer(email_content)
|
||||||
|
analyzer.print_report()
|
||||||
|
|
||||||
|
# Ask if user wants to export to JSON
|
||||||
|
export = input("\nExport analysis to JSON? (y/n): ").strip().lower()
|
||||||
|
if export == 'y':
|
||||||
|
filename = input("Enter filename (default: email_analysis.json): ").strip()
|
||||||
|
if not filename:
|
||||||
|
filename = 'email_analysis.json'
|
||||||
|
analyzer.export_json(filename)
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
||||||
|
# END OF CODE
|
||||||
@@ -0,0 +1,236 @@
|
|||||||
|
# CSDF - Practical-1 (Email Header Analysis)
|
||||||
|
|
||||||
|
"""
|
||||||
|
THIS CODE HAS BEEN TESTED AND IS FULLY OPERATIONAL.
|
||||||
|
|
||||||
|
Problem Statement: CAPTCHA image - Implement a program to generate and verify CAPTCHA image.
|
||||||
|
|
||||||
|
Code from CyberSecurityAndDigitalForensics (SPPU - Final Year - Computer Engineering - Content) repository on KSKA Git: https://git.kska.io/sppu-be-comp-content/CyberSecurityAndDigitalForensics
|
||||||
|
"""
|
||||||
|
|
||||||
|
# BEGINNING OF CODE
|
||||||
|
import random
|
||||||
|
import string
|
||||||
|
from captcha.image import ImageCaptcha
|
||||||
|
from PIL import Image
|
||||||
|
import hashlib
|
||||||
|
import time
|
||||||
|
import os
|
||||||
|
|
||||||
|
class CaptchaManager:
|
||||||
|
"""
|
||||||
|
A comprehensive CAPTCHA generation and verification system
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, length=6, width=280, height=90):
|
||||||
|
"""
|
||||||
|
Initialize the CAPTCHA manager
|
||||||
|
|
||||||
|
Args:
|
||||||
|
length: Number of characters in CAPTCHA
|
||||||
|
width: Image width in pixels
|
||||||
|
height: Image height in pixels
|
||||||
|
"""
|
||||||
|
self.length = length
|
||||||
|
self.image_captcha = ImageCaptcha(width=width, height=height)
|
||||||
|
self.captcha_store = {} # Store CAPTCHA with expiry time
|
||||||
|
self.expiry_time = 180 # 3 minutes in seconds
|
||||||
|
|
||||||
|
def generate_captcha_text(self):
|
||||||
|
"""
|
||||||
|
Generate random CAPTCHA text
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
String of random alphanumeric characters
|
||||||
|
"""
|
||||||
|
characters = string.ascii_uppercase + string.digits
|
||||||
|
# Exclude ambiguous characters like 0, O, I, 1
|
||||||
|
characters = characters.replace('0', '').replace('O', '').replace('I', '').replace('1', '')
|
||||||
|
captcha_text = ''.join(random.choice(characters) for _ in range(self.length))
|
||||||
|
return captcha_text
|
||||||
|
|
||||||
|
def generate_captcha_image(self, captcha_text=None, filename='captcha.png'):
|
||||||
|
"""
|
||||||
|
Generate CAPTCHA image
|
||||||
|
|
||||||
|
Args:
|
||||||
|
captcha_text: Text to display (auto-generated if None)
|
||||||
|
filename: Output filename
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Tuple of (captcha_text, session_id, filename)
|
||||||
|
"""
|
||||||
|
if captcha_text is None:
|
||||||
|
captcha_text = self.generate_captcha_text()
|
||||||
|
|
||||||
|
# Generate image
|
||||||
|
data = self.image_captcha.generate(captcha_text)
|
||||||
|
self.image_captcha.write(captcha_text, filename)
|
||||||
|
|
||||||
|
# Create session ID
|
||||||
|
session_id = hashlib.md5(
|
||||||
|
f"{captcha_text}{time.time()}".encode()
|
||||||
|
).hexdigest()
|
||||||
|
|
||||||
|
# Store CAPTCHA with expiry time
|
||||||
|
self.captcha_store[session_id] = {
|
||||||
|
'text': captcha_text,
|
||||||
|
'timestamp': time.time()
|
||||||
|
}
|
||||||
|
|
||||||
|
return captcha_text, session_id, filename
|
||||||
|
|
||||||
|
def verify_captcha(self, session_id, user_input):
|
||||||
|
"""
|
||||||
|
Verify user's CAPTCHA input
|
||||||
|
|
||||||
|
Args:
|
||||||
|
session_id: Unique session identifier
|
||||||
|
user_input: User's input text
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Dictionary with verification result
|
||||||
|
"""
|
||||||
|
# Check if session exists
|
||||||
|
if session_id not in self.captcha_store:
|
||||||
|
return {
|
||||||
|
'success': False,
|
||||||
|
'message': 'Invalid session or CAPTCHA expired'
|
||||||
|
}
|
||||||
|
|
||||||
|
captcha_data = self.captcha_store[session_id]
|
||||||
|
current_time = time.time()
|
||||||
|
|
||||||
|
# Check expiry
|
||||||
|
if current_time - captcha_data['timestamp'] > self.expiry_time:
|
||||||
|
del self.captcha_store[session_id]
|
||||||
|
return {
|
||||||
|
'success': False,
|
||||||
|
'message': 'CAPTCHA expired. Please try again.'
|
||||||
|
}
|
||||||
|
|
||||||
|
# Verify CAPTCHA (case-insensitive)
|
||||||
|
if user_input.upper() == captcha_data['text'].upper():
|
||||||
|
del self.captcha_store[session_id] # Remove after successful verification
|
||||||
|
return {
|
||||||
|
'success': True,
|
||||||
|
'message': 'CAPTCHA verified successfully!'
|
||||||
|
}
|
||||||
|
else:
|
||||||
|
return {
|
||||||
|
'success': False,
|
||||||
|
'message': 'Incorrect CAPTCHA. Please try again.'
|
||||||
|
}
|
||||||
|
|
||||||
|
def cleanup_expired(self):
|
||||||
|
"""
|
||||||
|
Remove expired CAPTCHAs from storage
|
||||||
|
"""
|
||||||
|
current_time = time.time()
|
||||||
|
expired_sessions = [
|
||||||
|
sid for sid, data in self.captcha_store.items()
|
||||||
|
if current_time - data['timestamp'] > self.expiry_time
|
||||||
|
]
|
||||||
|
for sid in expired_sessions:
|
||||||
|
del self.captcha_store[sid]
|
||||||
|
|
||||||
|
|
||||||
|
# Alternative simple implementation using Pillow
|
||||||
|
class SimpleCaptcha:
|
||||||
|
"""
|
||||||
|
Simple CAPTCHA implementation using Pillow
|
||||||
|
"""
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def generate(length=6):
|
||||||
|
"""
|
||||||
|
Generate simple CAPTCHA with noise
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Tuple of (captcha_text, Image object)
|
||||||
|
"""
|
||||||
|
from PIL import ImageDraw, ImageFont
|
||||||
|
|
||||||
|
# Generate random text
|
||||||
|
chars = ''.join(random.choice(string.ascii_uppercase + string.digits)
|
||||||
|
for _ in range(length))
|
||||||
|
|
||||||
|
# Create image
|
||||||
|
img = Image.new('RGB', (200, 100), color=(73, 109, 137))
|
||||||
|
draw = ImageDraw.Draw(img)
|
||||||
|
|
||||||
|
# Try to load a font, fallback to default if not available
|
||||||
|
try:
|
||||||
|
font = ImageFont.truetype('arial.ttf', 36)
|
||||||
|
except:
|
||||||
|
font = ImageFont.load_default()
|
||||||
|
|
||||||
|
# Draw characters with random positions
|
||||||
|
x = 20
|
||||||
|
for char in chars:
|
||||||
|
y = random.randint(25, 35)
|
||||||
|
draw.text((x, y), char, font=font, fill=(255, 255, 0))
|
||||||
|
x += 30
|
||||||
|
|
||||||
|
# Add noise (dots)
|
||||||
|
for _ in range(150):
|
||||||
|
x = random.randint(0, 200)
|
||||||
|
y = random.randint(0, 100)
|
||||||
|
draw.point((x, y), fill=(random.randint(0, 255),
|
||||||
|
random.randint(0, 255),
|
||||||
|
random.randint(0, 255)))
|
||||||
|
|
||||||
|
# Add lines for distortion
|
||||||
|
for _ in range(5):
|
||||||
|
start = (random.randint(0, 200), random.randint(0, 100))
|
||||||
|
end = (random.randint(0, 200), random.randint(0, 100))
|
||||||
|
draw.line([start, end], fill=(255, 255, 255), width=1)
|
||||||
|
|
||||||
|
return chars, img
|
||||||
|
|
||||||
|
|
||||||
|
# Example usage and testing
|
||||||
|
def main():
|
||||||
|
"""
|
||||||
|
Demonstration of CAPTCHA generation and verification
|
||||||
|
"""
|
||||||
|
print("=== CAPTCHA Generation and Verification Demo ===\n")
|
||||||
|
|
||||||
|
# Initialize CAPTCHA manager
|
||||||
|
manager = CaptchaManager(length=6)
|
||||||
|
|
||||||
|
# Generate CAPTCHA
|
||||||
|
captcha_text, session_id, filename = manager.generate_captcha_image()
|
||||||
|
print(f"CAPTCHA Generated: {captcha_text}")
|
||||||
|
print(f"Session ID: {session_id}")
|
||||||
|
print(f"Image saved as: {filename}\n")
|
||||||
|
|
||||||
|
# Simulate user input
|
||||||
|
print("Verification Tests:")
|
||||||
|
|
||||||
|
# Test 1: Correct input
|
||||||
|
result = manager.verify_captcha(session_id, captcha_text)
|
||||||
|
print(f"Test 1 (Correct): {result}")
|
||||||
|
|
||||||
|
# Generate new CAPTCHA for next test
|
||||||
|
captcha_text, session_id, _ = manager.generate_captcha_image('test_captcha_2.png')
|
||||||
|
|
||||||
|
# Test 2: Incorrect input
|
||||||
|
result = manager.verify_captcha(session_id, "WRONG")
|
||||||
|
print(f"Test 2 (Incorrect): {result}")
|
||||||
|
|
||||||
|
# Test 3: Invalid session
|
||||||
|
result = manager.verify_captcha("invalid_session", "TEST")
|
||||||
|
print(f"Test 3 (Invalid Session): {result}")
|
||||||
|
|
||||||
|
# Demonstrate simple CAPTCHA
|
||||||
|
print("\n=== Simple CAPTCHA Demo ===")
|
||||||
|
captcha_text, img = SimpleCaptcha.generate()
|
||||||
|
img.save('simple_captcha.png')
|
||||||
|
print(f"Simple CAPTCHA Text: {captcha_text}")
|
||||||
|
print("Image saved as: simple_captcha.png")
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
||||||
|
# END OF CODE
|
||||||
Binary file not shown.
BIN
Binary file not shown.
BIN
Binary file not shown.
BIN
Binary file not shown.
BIN
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
BIN
Binary file not shown.
@@ -10,9 +10,13 @@ This repository contains essential resources for the Cyber Security and Digital
|
|||||||
|
|
||||||
1. [Unit 1 - Introduction to Cyber Security](Notes/Unit%201%20-%20Introduction%20to%20Cyber%20Security)
|
1. [Unit 1 - Introduction to Cyber Security](Notes/Unit%201%20-%20Introduction%20to%20Cyber%20Security)
|
||||||
2. [Unit 2 - Cyber Crime Issues and Cyber Attacks](Notes/Unit%202%20-%20Cyber%20Crime%20Issues%20and%20Cyber%20Attacks)
|
2. [Unit 2 - Cyber Crime Issues and Cyber Attacks](Notes/Unit%202%20-%20Cyber%20Crime%20Issues%20and%20Cyber%20Attacks)
|
||||||
|
3. [Unit 3 - 6](Notes/CSDF%20-%20Unit%203-6%20-%20Notes.pdf) *(provided by faculty)*
|
||||||
|
|
||||||
### Codes
|
### Codes
|
||||||
|
|
||||||
|
1. [Code-1.1 - Email Header Analysis](Codes/Code-1.1.py)
|
||||||
|
2. [Code-1.2 - CAPTCHA Image](Codes/Code-1.2.py)
|
||||||
|
|
||||||
### Assignments
|
### Assignments
|
||||||
|
|
||||||
1. Assignment - Unit 5:
|
1. Assignment - Unit 5:
|
||||||
@@ -36,6 +40,7 @@ This repository contains essential resources for the Cyber Security and Digital
|
|||||||
- [END-SEM](Question%20Papers/END-SEM)
|
- [END-SEM](Question%20Papers/END-SEM)
|
||||||
|
|
||||||
### [IN-SEM PYQ Answers](Notes/IN-SEM%20PYQ%20Answers)
|
### [IN-SEM PYQ Answers](Notes/IN-SEM%20PYQ%20Answers)
|
||||||
|
### [END-SEM PYQ Answers](Notes/END-SEM%20PYQ%20Answers)
|
||||||
|
|
||||||
---
|
---
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user