Compare commits
3 Commits
ac26f66516
...
2fab8ccc15
| Author | SHA1 | Date | |
|---|---|---|---|
|
2fab8ccc15
|
|||
|
912433edf6
|
|||
|
1db296206a
|
@@ -0,0 +1,426 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
|
||||||
|
# CSDF - Practical-1 (Email Header Analysis)
|
||||||
|
|
||||||
|
"""
|
||||||
|
THIS CODE HAS BEEN TESTED AND IS FULLY OPERATIONAL.
|
||||||
|
|
||||||
|
Problem Statement: Email Header Analysis - Write a program for Tracking Emails and Investigating Email Crimes. i.e. Write a program to analyze e–mail header.
|
||||||
|
|
||||||
|
Code from CyberSecurityAndDigitalForensics (SPPU - Final Year - Computer Engineering - Content) repository on KSKA Git: https://git.kska.io/sppu-be-comp-content/CyberSecurityAndDigitalForensics
|
||||||
|
"""
|
||||||
|
|
||||||
|
# BEGINNING OF CODE
|
||||||
|
import re
|
||||||
|
import json
|
||||||
|
from email import message_from_string, message_from_file
|
||||||
|
from email.parser import Parser
|
||||||
|
from datetime import datetime
|
||||||
|
import socket
|
||||||
|
import ipaddress
|
||||||
|
|
||||||
|
class EmailHeaderAnalyzer:
|
||||||
|
"""
|
||||||
|
A comprehensive email header analyzer for forensic investigation
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, email_content):
|
||||||
|
"""
|
||||||
|
Initialize the analyzer with email content
|
||||||
|
|
||||||
|
Args:
|
||||||
|
email_content: Raw email string or file object
|
||||||
|
"""
|
||||||
|
if isinstance(email_content, str):
|
||||||
|
self.email = message_from_string(email_content)
|
||||||
|
else:
|
||||||
|
self.email = message_from_file(email_content)
|
||||||
|
|
||||||
|
self.analysis_results = {}
|
||||||
|
|
||||||
|
def extract_basic_headers(self):
|
||||||
|
"""Extract basic email header information"""
|
||||||
|
headers = {
|
||||||
|
'From': self.email.get('From', 'Not Found'),
|
||||||
|
'To': self.email.get('To', 'Not Found'),
|
||||||
|
'Subject': self.email.get('Subject', 'Not Found'),
|
||||||
|
'Date': self.email.get('Date', 'Not Found'),
|
||||||
|
'Message-ID': self.email.get('Message-ID', 'Not Found'),
|
||||||
|
'Return-Path': self.email.get('Return-Path', 'Not Found'),
|
||||||
|
'Reply-To': self.email.get('Reply-To', 'Not Found'),
|
||||||
|
'MIME-Version': self.email.get('MIME-Version', 'Not Found'),
|
||||||
|
'Content-Type': self.email.get('Content-Type', 'Not Found')
|
||||||
|
}
|
||||||
|
|
||||||
|
self.analysis_results['basic_headers'] = headers
|
||||||
|
return headers
|
||||||
|
|
||||||
|
def extract_received_headers(self):
|
||||||
|
"""
|
||||||
|
Extract and parse all 'Received' headers to trace email path
|
||||||
|
Critical for tracking email transmission route
|
||||||
|
"""
|
||||||
|
received_headers = self.email.get_all('Received', [])
|
||||||
|
parsed_received = []
|
||||||
|
|
||||||
|
for idx, received in enumerate(received_headers):
|
||||||
|
hop_info = {
|
||||||
|
'hop_number': idx + 1,
|
||||||
|
'raw_header': received,
|
||||||
|
'timestamp': self._extract_timestamp(received),
|
||||||
|
'from_server': self._extract_from_server(received),
|
||||||
|
'by_server': self._extract_by_server(received),
|
||||||
|
'ip_address': self._extract_ip_from_received(received)
|
||||||
|
}
|
||||||
|
parsed_received.append(hop_info)
|
||||||
|
|
||||||
|
self.analysis_results['received_headers'] = parsed_received
|
||||||
|
return parsed_received
|
||||||
|
|
||||||
|
def extract_originating_ip(self):
|
||||||
|
"""
|
||||||
|
Extract the originating IP address (X-Originating-IP)
|
||||||
|
This is crucial for tracing the actual sender location
|
||||||
|
"""
|
||||||
|
originating_ip = self.email.get('X-Originating-IP', None)
|
||||||
|
|
||||||
|
if originating_ip:
|
||||||
|
# Clean up IP address (remove brackets if present)
|
||||||
|
originating_ip = re.search(r'(\d+\.\d+\.\d+\.\d+)', originating_ip)
|
||||||
|
if originating_ip:
|
||||||
|
originating_ip = originating_ip.group(1)
|
||||||
|
|
||||||
|
# If X-Originating-IP not found, try to get from first Received header
|
||||||
|
if not originating_ip:
|
||||||
|
received_headers = self.email.get_all('Received', [])
|
||||||
|
if received_headers:
|
||||||
|
originating_ip = self._extract_ip_from_received(received_headers[-1])
|
||||||
|
|
||||||
|
self.analysis_results['originating_ip'] = originating_ip
|
||||||
|
return originating_ip
|
||||||
|
|
||||||
|
def analyze_authentication(self):
|
||||||
|
"""
|
||||||
|
Analyze email authentication headers (SPF, DKIM, DMARC)
|
||||||
|
Helps detect spoofing and verify email authenticity
|
||||||
|
"""
|
||||||
|
auth_results = {
|
||||||
|
'SPF': self.email.get('Received-SPF', 'Not Found'),
|
||||||
|
'DKIM-Signature': self.email.get('DKIM-Signature', 'Not Found'),
|
||||||
|
'Authentication-Results': self.email.get('Authentication-Results', 'Not Found'),
|
||||||
|
'ARC-Authentication-Results': self.email.get('ARC-Authentication-Results', 'Not Found')
|
||||||
|
}
|
||||||
|
|
||||||
|
# Determine if email passed authentication
|
||||||
|
spf_pass = 'pass' in str(auth_results['SPF']).lower()
|
||||||
|
dkim_pass = 'DKIM-Signature' in str(auth_results['DKIM-Signature'])
|
||||||
|
|
||||||
|
auth_results['spf_passed'] = spf_pass
|
||||||
|
auth_results['dkim_present'] = dkim_pass
|
||||||
|
auth_results['likely_spoofed'] = not (spf_pass or dkim_pass)
|
||||||
|
|
||||||
|
self.analysis_results['authentication'] = auth_results
|
||||||
|
return auth_results
|
||||||
|
|
||||||
|
def extract_message_id(self):
|
||||||
|
"""
|
||||||
|
Extract and analyze Message-ID
|
||||||
|
Useful for tracking email threads and identifying patterns
|
||||||
|
"""
|
||||||
|
message_id = self.email.get('Message-ID', 'Not Found')
|
||||||
|
|
||||||
|
# Extract domain from Message-ID
|
||||||
|
domain = None
|
||||||
|
if message_id != 'Not Found':
|
||||||
|
domain_match = re.search(r'@([a-zA-Z0-9.-]+)', message_id)
|
||||||
|
if domain_match:
|
||||||
|
domain = domain_match.group(1)
|
||||||
|
|
||||||
|
message_id_info = {
|
||||||
|
'message_id': message_id,
|
||||||
|
'domain': domain
|
||||||
|
}
|
||||||
|
|
||||||
|
self.analysis_results['message_id_info'] = message_id_info
|
||||||
|
return message_id_info
|
||||||
|
|
||||||
|
def analyze_sender_info(self):
|
||||||
|
"""
|
||||||
|
Detailed analysis of sender information
|
||||||
|
Extracts email addresses and identifies potential spoofing
|
||||||
|
"""
|
||||||
|
from_header = self.email.get('From', '')
|
||||||
|
return_path = self.email.get('Return-Path', '')
|
||||||
|
|
||||||
|
# Extract email addresses
|
||||||
|
from_email = self._extract_email_address(from_header)
|
||||||
|
return_email = self._extract_email_address(return_path)
|
||||||
|
|
||||||
|
# Check for mismatch (potential spoofing indicator)
|
||||||
|
mismatch = from_email != return_email if from_email and return_email else False
|
||||||
|
|
||||||
|
sender_info = {
|
||||||
|
'from_header': from_header,
|
||||||
|
'from_email': from_email,
|
||||||
|
'return_path': return_path,
|
||||||
|
'return_email': return_email,
|
||||||
|
'address_mismatch': mismatch,
|
||||||
|
'potential_spoofing': mismatch
|
||||||
|
}
|
||||||
|
|
||||||
|
self.analysis_results['sender_analysis'] = sender_info
|
||||||
|
return sender_info
|
||||||
|
|
||||||
|
def get_ip_geolocation_info(self, ip_address):
|
||||||
|
"""
|
||||||
|
Get basic information about an IP address
|
||||||
|
Note: For production, integrate with geolocation APIs
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
ip_obj = ipaddress.ip_address(ip_address)
|
||||||
|
ip_info = {
|
||||||
|
'ip': ip_address,
|
||||||
|
'is_private': ip_obj.is_private,
|
||||||
|
'is_global': ip_obj.is_global,
|
||||||
|
'is_loopback': ip_obj.is_loopback,
|
||||||
|
'version': ip_obj.version
|
||||||
|
}
|
||||||
|
|
||||||
|
# Try reverse DNS lookup
|
||||||
|
try:
|
||||||
|
hostname = socket.gethostbyaddr(ip_address)[0]
|
||||||
|
ip_info['hostname'] = hostname
|
||||||
|
except:
|
||||||
|
ip_info['hostname'] = 'Reverse DNS lookup failed'
|
||||||
|
|
||||||
|
return ip_info
|
||||||
|
except ValueError:
|
||||||
|
return {'error': 'Invalid IP address'}
|
||||||
|
|
||||||
|
def detect_suspicious_patterns(self):
|
||||||
|
"""
|
||||||
|
Detect common patterns associated with email crimes
|
||||||
|
"""
|
||||||
|
suspicious_indicators = []
|
||||||
|
|
||||||
|
# Check for authentication failures
|
||||||
|
if self.analysis_results.get('authentication', {}).get('likely_spoofed'):
|
||||||
|
suspicious_indicators.append('Email failed authentication checks (SPF/DKIM)')
|
||||||
|
|
||||||
|
# Check for sender/return-path mismatch
|
||||||
|
if self.analysis_results.get('sender_analysis', {}).get('potential_spoofing'):
|
||||||
|
suspicious_indicators.append('Mismatch between From and Return-Path addresses')
|
||||||
|
|
||||||
|
# Check for missing Message-ID
|
||||||
|
if self.analysis_results.get('message_id_info', {}).get('message_id') == 'Not Found':
|
||||||
|
suspicious_indicators.append('Missing Message-ID (unusual for legitimate emails)')
|
||||||
|
|
||||||
|
# Check for suspicious keywords in subject
|
||||||
|
subject = self.email.get('Subject', '').lower()
|
||||||
|
suspicious_keywords = ['urgent', 'verify account', 'suspended', 'confirm', 'prize', 'winner']
|
||||||
|
found_keywords = [kw for kw in suspicious_keywords if kw in subject]
|
||||||
|
if found_keywords:
|
||||||
|
suspicious_indicators.append(f'Suspicious keywords in subject: {", ".join(found_keywords)}')
|
||||||
|
|
||||||
|
self.analysis_results['suspicious_indicators'] = suspicious_indicators
|
||||||
|
return suspicious_indicators
|
||||||
|
|
||||||
|
def generate_forensic_report(self):
|
||||||
|
"""
|
||||||
|
Generate a comprehensive forensic analysis report
|
||||||
|
"""
|
||||||
|
# Run all analysis methods
|
||||||
|
self.extract_basic_headers()
|
||||||
|
self.extract_received_headers()
|
||||||
|
self.extract_originating_ip()
|
||||||
|
self.analyze_authentication()
|
||||||
|
self.extract_message_id()
|
||||||
|
self.analyze_sender_info()
|
||||||
|
self.detect_suspicious_patterns()
|
||||||
|
|
||||||
|
# Analyze originating IP if available
|
||||||
|
orig_ip = self.analysis_results.get('originating_ip')
|
||||||
|
if orig_ip:
|
||||||
|
self.analysis_results['ip_analysis'] = self.get_ip_geolocation_info(orig_ip)
|
||||||
|
|
||||||
|
return self.analysis_results
|
||||||
|
|
||||||
|
def print_report(self):
|
||||||
|
"""Print a formatted forensic report"""
|
||||||
|
report = self.generate_forensic_report()
|
||||||
|
|
||||||
|
print("="*80)
|
||||||
|
print(" EMAIL FORENSIC ANALYSIS REPORT ".center(80, "="))
|
||||||
|
print("="*80)
|
||||||
|
print(f"\nGenerated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
|
||||||
|
|
||||||
|
# Basic Headers
|
||||||
|
print("\n" + "="*80)
|
||||||
|
print("1. BASIC HEADER INFORMATION")
|
||||||
|
print("="*80)
|
||||||
|
for key, value in report['basic_headers'].items():
|
||||||
|
print(f"{key:20s}: {value}")
|
||||||
|
|
||||||
|
# Sender Analysis
|
||||||
|
print("\n" + "="*80)
|
||||||
|
print("2. SENDER ANALYSIS")
|
||||||
|
print("="*80)
|
||||||
|
sender = report.get('sender_analysis', {})
|
||||||
|
print(f"From Email : {sender.get('from_email', 'N/A')}")
|
||||||
|
print(f"Return Email : {sender.get('return_email', 'N/A')}")
|
||||||
|
print(f"Address Mismatch : {sender.get('address_mismatch', False)}")
|
||||||
|
print(f"Potential Spoofing : {sender.get('potential_spoofing', False)}")
|
||||||
|
|
||||||
|
# Authentication
|
||||||
|
print("\n" + "="*80)
|
||||||
|
print("3. AUTHENTICATION ANALYSIS")
|
||||||
|
print("="*80)
|
||||||
|
auth = report.get('authentication', {})
|
||||||
|
print(f"SPF Passed : {auth.get('spf_passed', False)}")
|
||||||
|
print(f"DKIM Present : {auth.get('dkim_present', False)}")
|
||||||
|
print(f"Likely Spoofed : {auth.get('likely_spoofed', True)}")
|
||||||
|
|
||||||
|
# Message ID
|
||||||
|
print("\n" + "="*80)
|
||||||
|
print("4. MESSAGE ID ANALYSIS")
|
||||||
|
print("="*80)
|
||||||
|
msg_id = report.get('message_id_info', {})
|
||||||
|
print(f"Message ID : {msg_id.get('message_id', 'N/A')}")
|
||||||
|
print(f"Domain : {msg_id.get('domain', 'N/A')}")
|
||||||
|
|
||||||
|
# Originating IP
|
||||||
|
print("\n" + "="*80)
|
||||||
|
print("5. ORIGINATING IP INFORMATION")
|
||||||
|
print("="*80)
|
||||||
|
print(f"Originating IP : {report.get('originating_ip', 'Not Found')}")
|
||||||
|
|
||||||
|
if 'ip_analysis' in report:
|
||||||
|
ip_info = report['ip_analysis']
|
||||||
|
print(f"IP Version : IPv{ip_info.get('version', 'N/A')}")
|
||||||
|
print(f"Is Private : {ip_info.get('is_private', 'N/A')}")
|
||||||
|
print(f"Is Global : {ip_info.get('is_global', 'N/A')}")
|
||||||
|
print(f"Hostname : {ip_info.get('hostname', 'N/A')}")
|
||||||
|
|
||||||
|
# Transmission Path
|
||||||
|
print("\n" + "="*80)
|
||||||
|
print("6. EMAIL TRANSMISSION PATH")
|
||||||
|
print("="*80)
|
||||||
|
received = report.get('received_headers', [])
|
||||||
|
if received:
|
||||||
|
for hop in received:
|
||||||
|
print(f"\nHop {hop['hop_number']}:")
|
||||||
|
print(f" From Server : {hop.get('from_server', 'N/A')}")
|
||||||
|
print(f" By Server : {hop.get('by_server', 'N/A')}")
|
||||||
|
print(f" IP Address : {hop.get('ip_address', 'N/A')}")
|
||||||
|
print(f" Timestamp : {hop.get('timestamp', 'N/A')}")
|
||||||
|
else:
|
||||||
|
print("No Received headers found")
|
||||||
|
|
||||||
|
# Suspicious Indicators
|
||||||
|
print("\n" + "="*80)
|
||||||
|
print("7. SUSPICIOUS INDICATORS")
|
||||||
|
print("="*80)
|
||||||
|
indicators = report.get('suspicious_indicators', [])
|
||||||
|
if indicators:
|
||||||
|
for idx, indicator in enumerate(indicators, 1):
|
||||||
|
print(f"{idx}. {indicator}")
|
||||||
|
else:
|
||||||
|
print("No suspicious indicators detected")
|
||||||
|
|
||||||
|
print("\n" + "="*80)
|
||||||
|
print(" END OF REPORT ".center(80, "="))
|
||||||
|
print("="*80)
|
||||||
|
|
||||||
|
def export_json(self, filename='email_analysis.json'):
|
||||||
|
"""Export analysis results to JSON file"""
|
||||||
|
report = self.generate_forensic_report()
|
||||||
|
with open(filename, 'w') as f:
|
||||||
|
json.dump(report, f, indent=4)
|
||||||
|
print(f"\nAnalysis exported to {filename}")
|
||||||
|
|
||||||
|
# Helper methods
|
||||||
|
def _extract_timestamp(self, received_header):
|
||||||
|
"""Extract timestamp from Received header"""
|
||||||
|
timestamp_match = re.search(r';\s*(.+)$', received_header)
|
||||||
|
return timestamp_match.group(1).strip() if timestamp_match else 'Not Found'
|
||||||
|
|
||||||
|
def _extract_from_server(self, received_header):
|
||||||
|
"""Extract 'from' server information"""
|
||||||
|
from_match = re.search(r'from\s+([^\s]+)', received_header, re.IGNORECASE)
|
||||||
|
return from_match.group(1) if from_match else 'Not Found'
|
||||||
|
|
||||||
|
def _extract_by_server(self, received_header):
|
||||||
|
"""Extract 'by' server information"""
|
||||||
|
by_match = re.search(r'by\s+([^\s]+)', received_header, re.IGNORECASE)
|
||||||
|
return by_match.group(1) if by_match else 'Not Found'
|
||||||
|
|
||||||
|
def _extract_ip_from_received(self, received_header):
|
||||||
|
"""Extract IP address from Received header"""
|
||||||
|
ip_match = re.search(r'\[(\d+\.\d+\.\d+\.\d+)\]', received_header)
|
||||||
|
return ip_match.group(1) if ip_match else 'Not Found'
|
||||||
|
|
||||||
|
def _extract_email_address(self, header_value):
|
||||||
|
"""Extract email address from header value"""
|
||||||
|
email_match = re.search(r'[\w\.-]+@[\w\.-]+\.\w+', header_value)
|
||||||
|
return email_match.group(0) if email_match else None
|
||||||
|
|
||||||
|
|
||||||
|
def main():
|
||||||
|
"""
|
||||||
|
Main function to demonstrate email header analysis
|
||||||
|
"""
|
||||||
|
print("Email Header Analysis Tool for Digital Forensics\n")
|
||||||
|
print("Choose input method:")
|
||||||
|
print("1. Paste raw email content")
|
||||||
|
print("2. Load from file")
|
||||||
|
print("3. Use sample email")
|
||||||
|
|
||||||
|
choice = input("\nEnter choice (1-3): ").strip()
|
||||||
|
|
||||||
|
if choice == '1':
|
||||||
|
print("\nPaste the raw email (including headers). Press Ctrl+D (Linux/MacOS) or Ctrl+Z (Windows) when done: ")
|
||||||
|
import sys
|
||||||
|
email_content = sys.stdin.read()
|
||||||
|
|
||||||
|
elif choice == '2':
|
||||||
|
filename = input("Enter email file path: ").strip()
|
||||||
|
try:
|
||||||
|
with open(filename, 'r') as f:
|
||||||
|
email_content = f.read()
|
||||||
|
except FileNotFoundError:
|
||||||
|
print(f"Error: File '{filename}' not found")
|
||||||
|
return
|
||||||
|
|
||||||
|
else:
|
||||||
|
# Sample email for demonstration
|
||||||
|
email_content = """From: sender@example.com
|
||||||
|
To: recipient@example.com
|
||||||
|
Subject: Urgent Account Verification Required
|
||||||
|
Date: Mon, 9 Oct 2025 10:30:00 +0530
|
||||||
|
Message-ID: <12345.67890@mail.example.com>
|
||||||
|
Return-Path: different@suspicious.com
|
||||||
|
Received: from mail.example.com ([192.168.1.100]) by server.example.com with SMTP; Mon, 9 Oct 2025 10:30:00 +0530
|
||||||
|
Received: from client.suspicious.com ([203.0.113.45]) by mail.example.com with ESMTP; Mon, 9 Oct 2025 10:29:55 +0530
|
||||||
|
X-Originating-IP: [203.0.113.45]
|
||||||
|
MIME-Version: 1.0
|
||||||
|
Content-Type: text/plain; charset=utf-8
|
||||||
|
|
||||||
|
This is a sample email body for forensic analysis.
|
||||||
|
"""
|
||||||
|
|
||||||
|
# Create analyzer instance and generate report
|
||||||
|
analyzer = EmailHeaderAnalyzer(email_content)
|
||||||
|
analyzer.print_report()
|
||||||
|
|
||||||
|
# Ask if user wants to export to JSON
|
||||||
|
export = input("\nExport analysis to JSON? (y/n): ").strip().lower()
|
||||||
|
if export == 'y':
|
||||||
|
filename = input("Enter filename (default: email_analysis.json): ").strip()
|
||||||
|
if not filename:
|
||||||
|
filename = 'email_analysis.json'
|
||||||
|
analyzer.export_json(filename)
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
||||||
|
# END OF CODE
|
||||||
@@ -0,0 +1,236 @@
|
|||||||
|
# CSDF - Practical-1 (Email Header Analysis)
|
||||||
|
|
||||||
|
"""
|
||||||
|
THIS CODE HAS BEEN TESTED AND IS FULLY OPERATIONAL.
|
||||||
|
|
||||||
|
Problem Statement: CAPTCHA image - Implement a program to generate and verify CAPTCHA image.
|
||||||
|
|
||||||
|
Code from CyberSecurityAndDigitalForensics (SPPU - Final Year - Computer Engineering - Content) repository on KSKA Git: https://git.kska.io/sppu-be-comp-content/CyberSecurityAndDigitalForensics
|
||||||
|
"""
|
||||||
|
|
||||||
|
# BEGINNING OF CODE
|
||||||
|
import random
|
||||||
|
import string
|
||||||
|
from captcha.image import ImageCaptcha
|
||||||
|
from PIL import Image
|
||||||
|
import hashlib
|
||||||
|
import time
|
||||||
|
import os
|
||||||
|
|
||||||
|
class CaptchaManager:
|
||||||
|
"""
|
||||||
|
A comprehensive CAPTCHA generation and verification system
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, length=6, width=280, height=90):
|
||||||
|
"""
|
||||||
|
Initialize the CAPTCHA manager
|
||||||
|
|
||||||
|
Args:
|
||||||
|
length: Number of characters in CAPTCHA
|
||||||
|
width: Image width in pixels
|
||||||
|
height: Image height in pixels
|
||||||
|
"""
|
||||||
|
self.length = length
|
||||||
|
self.image_captcha = ImageCaptcha(width=width, height=height)
|
||||||
|
self.captcha_store = {} # Store CAPTCHA with expiry time
|
||||||
|
self.expiry_time = 180 # 3 minutes in seconds
|
||||||
|
|
||||||
|
def generate_captcha_text(self):
|
||||||
|
"""
|
||||||
|
Generate random CAPTCHA text
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
String of random alphanumeric characters
|
||||||
|
"""
|
||||||
|
characters = string.ascii_uppercase + string.digits
|
||||||
|
# Exclude ambiguous characters like 0, O, I, 1
|
||||||
|
characters = characters.replace('0', '').replace('O', '').replace('I', '').replace('1', '')
|
||||||
|
captcha_text = ''.join(random.choice(characters) for _ in range(self.length))
|
||||||
|
return captcha_text
|
||||||
|
|
||||||
|
def generate_captcha_image(self, captcha_text=None, filename='captcha.png'):
|
||||||
|
"""
|
||||||
|
Generate CAPTCHA image
|
||||||
|
|
||||||
|
Args:
|
||||||
|
captcha_text: Text to display (auto-generated if None)
|
||||||
|
filename: Output filename
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Tuple of (captcha_text, session_id, filename)
|
||||||
|
"""
|
||||||
|
if captcha_text is None:
|
||||||
|
captcha_text = self.generate_captcha_text()
|
||||||
|
|
||||||
|
# Generate image
|
||||||
|
data = self.image_captcha.generate(captcha_text)
|
||||||
|
self.image_captcha.write(captcha_text, filename)
|
||||||
|
|
||||||
|
# Create session ID
|
||||||
|
session_id = hashlib.md5(
|
||||||
|
f"{captcha_text}{time.time()}".encode()
|
||||||
|
).hexdigest()
|
||||||
|
|
||||||
|
# Store CAPTCHA with expiry time
|
||||||
|
self.captcha_store[session_id] = {
|
||||||
|
'text': captcha_text,
|
||||||
|
'timestamp': time.time()
|
||||||
|
}
|
||||||
|
|
||||||
|
return captcha_text, session_id, filename
|
||||||
|
|
||||||
|
def verify_captcha(self, session_id, user_input):
|
||||||
|
"""
|
||||||
|
Verify user's CAPTCHA input
|
||||||
|
|
||||||
|
Args:
|
||||||
|
session_id: Unique session identifier
|
||||||
|
user_input: User's input text
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Dictionary with verification result
|
||||||
|
"""
|
||||||
|
# Check if session exists
|
||||||
|
if session_id not in self.captcha_store:
|
||||||
|
return {
|
||||||
|
'success': False,
|
||||||
|
'message': 'Invalid session or CAPTCHA expired'
|
||||||
|
}
|
||||||
|
|
||||||
|
captcha_data = self.captcha_store[session_id]
|
||||||
|
current_time = time.time()
|
||||||
|
|
||||||
|
# Check expiry
|
||||||
|
if current_time - captcha_data['timestamp'] > self.expiry_time:
|
||||||
|
del self.captcha_store[session_id]
|
||||||
|
return {
|
||||||
|
'success': False,
|
||||||
|
'message': 'CAPTCHA expired. Please try again.'
|
||||||
|
}
|
||||||
|
|
||||||
|
# Verify CAPTCHA (case-insensitive)
|
||||||
|
if user_input.upper() == captcha_data['text'].upper():
|
||||||
|
del self.captcha_store[session_id] # Remove after successful verification
|
||||||
|
return {
|
||||||
|
'success': True,
|
||||||
|
'message': 'CAPTCHA verified successfully!'
|
||||||
|
}
|
||||||
|
else:
|
||||||
|
return {
|
||||||
|
'success': False,
|
||||||
|
'message': 'Incorrect CAPTCHA. Please try again.'
|
||||||
|
}
|
||||||
|
|
||||||
|
def cleanup_expired(self):
|
||||||
|
"""
|
||||||
|
Remove expired CAPTCHAs from storage
|
||||||
|
"""
|
||||||
|
current_time = time.time()
|
||||||
|
expired_sessions = [
|
||||||
|
sid for sid, data in self.captcha_store.items()
|
||||||
|
if current_time - data['timestamp'] > self.expiry_time
|
||||||
|
]
|
||||||
|
for sid in expired_sessions:
|
||||||
|
del self.captcha_store[sid]
|
||||||
|
|
||||||
|
|
||||||
|
# Alternative simple implementation using Pillow
|
||||||
|
class SimpleCaptcha:
|
||||||
|
"""
|
||||||
|
Simple CAPTCHA implementation using Pillow
|
||||||
|
"""
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def generate(length=6):
|
||||||
|
"""
|
||||||
|
Generate simple CAPTCHA with noise
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Tuple of (captcha_text, Image object)
|
||||||
|
"""
|
||||||
|
from PIL import ImageDraw, ImageFont
|
||||||
|
|
||||||
|
# Generate random text
|
||||||
|
chars = ''.join(random.choice(string.ascii_uppercase + string.digits)
|
||||||
|
for _ in range(length))
|
||||||
|
|
||||||
|
# Create image
|
||||||
|
img = Image.new('RGB', (200, 100), color=(73, 109, 137))
|
||||||
|
draw = ImageDraw.Draw(img)
|
||||||
|
|
||||||
|
# Try to load a font, fallback to default if not available
|
||||||
|
try:
|
||||||
|
font = ImageFont.truetype('arial.ttf', 36)
|
||||||
|
except:
|
||||||
|
font = ImageFont.load_default()
|
||||||
|
|
||||||
|
# Draw characters with random positions
|
||||||
|
x = 20
|
||||||
|
for char in chars:
|
||||||
|
y = random.randint(25, 35)
|
||||||
|
draw.text((x, y), char, font=font, fill=(255, 255, 0))
|
||||||
|
x += 30
|
||||||
|
|
||||||
|
# Add noise (dots)
|
||||||
|
for _ in range(150):
|
||||||
|
x = random.randint(0, 200)
|
||||||
|
y = random.randint(0, 100)
|
||||||
|
draw.point((x, y), fill=(random.randint(0, 255),
|
||||||
|
random.randint(0, 255),
|
||||||
|
random.randint(0, 255)))
|
||||||
|
|
||||||
|
# Add lines for distortion
|
||||||
|
for _ in range(5):
|
||||||
|
start = (random.randint(0, 200), random.randint(0, 100))
|
||||||
|
end = (random.randint(0, 200), random.randint(0, 100))
|
||||||
|
draw.line([start, end], fill=(255, 255, 255), width=1)
|
||||||
|
|
||||||
|
return chars, img
|
||||||
|
|
||||||
|
|
||||||
|
# Example usage and testing
|
||||||
|
def main():
|
||||||
|
"""
|
||||||
|
Demonstration of CAPTCHA generation and verification
|
||||||
|
"""
|
||||||
|
print("=== CAPTCHA Generation and Verification Demo ===\n")
|
||||||
|
|
||||||
|
# Initialize CAPTCHA manager
|
||||||
|
manager = CaptchaManager(length=6)
|
||||||
|
|
||||||
|
# Generate CAPTCHA
|
||||||
|
captcha_text, session_id, filename = manager.generate_captcha_image()
|
||||||
|
print(f"CAPTCHA Generated: {captcha_text}")
|
||||||
|
print(f"Session ID: {session_id}")
|
||||||
|
print(f"Image saved as: {filename}\n")
|
||||||
|
|
||||||
|
# Simulate user input
|
||||||
|
print("Verification Tests:")
|
||||||
|
|
||||||
|
# Test 1: Correct input
|
||||||
|
result = manager.verify_captcha(session_id, captcha_text)
|
||||||
|
print(f"Test 1 (Correct): {result}")
|
||||||
|
|
||||||
|
# Generate new CAPTCHA for next test
|
||||||
|
captcha_text, session_id, _ = manager.generate_captcha_image('test_captcha_2.png')
|
||||||
|
|
||||||
|
# Test 2: Incorrect input
|
||||||
|
result = manager.verify_captcha(session_id, "WRONG")
|
||||||
|
print(f"Test 2 (Incorrect): {result}")
|
||||||
|
|
||||||
|
# Test 3: Invalid session
|
||||||
|
result = manager.verify_captcha("invalid_session", "TEST")
|
||||||
|
print(f"Test 3 (Invalid Session): {result}")
|
||||||
|
|
||||||
|
# Demonstrate simple CAPTCHA
|
||||||
|
print("\n=== Simple CAPTCHA Demo ===")
|
||||||
|
captcha_text, img = SimpleCaptcha.generate()
|
||||||
|
img.save('simple_captcha.png')
|
||||||
|
print(f"Simple CAPTCHA Text: {captcha_text}")
|
||||||
|
print("Image saved as: simple_captcha.png")
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
||||||
|
# END OF CODE
|
||||||
Binary file not shown.
Binary file not shown.
Reference in New Issue
Block a user