import imaplib import argparse import sys import re import time import email import sqlite3 from email.policy import default # Increase limit for large emails imaplib._MAXLINE = 10000000 # ------------------------------------------------------------------------- # DATABASE MANAGER # ------------------------------------------------------------------------- class MigrationDB: def __init__(self, db_path="migration_history.db"): self.conn = sqlite3.connect(db_path) self.create_table() def create_table(self): query = """ CREATE TABLE IF NOT EXISTS migration_log ( id INTEGER PRIMARY KEY AUTOINCREMENT, src_host TEXT, src_user TEXT, folder TEXT, message_id TEXT, status TEXT, timestamp DATETIME DEFAULT CURRENT_TIMESTAMP, UNIQUE(src_host, src_user, folder, message_id) ) """ self.conn.execute(query) self.conn.commit() def is_processed(self, src_host, src_user, folder, message_id): """Checks if a message ID has been successfully migrated.""" cursor = self.conn.cursor() cursor.execute(""" SELECT 1 FROM migration_log WHERE src_host=? AND src_user=? AND folder=? AND message_id=? AND status='success' """, (src_host, src_user, folder, message_id)) return cursor.fetchone() is not None def log_success(self, src_host, src_user, folder, message_id): try: query = """ INSERT OR REPLACE INTO migration_log (src_host, src_user, folder, message_id, status) VALUES (?, ?, ?, ?, 'success') """ self.conn.execute(query, (src_host, src_user, folder, message_id)) self.conn.commit() except sqlite3.Error as e: print(f"DB Error: {e}") def log_error(self, src_host, src_user, folder, message_id, error_msg): # We append error status but don't replace if it was previously success try: query = """ INSERT OR IGNORE INTO migration_log (src_host, src_user, folder, message_id, status) VALUES (?, ?, ?, ?, ?) """ self.conn.execute(query, (src_host, src_user, folder, message_id, f"error: {error_msg}")) self.conn.commit() except sqlite3.Error: pass def close(self): self.conn.close() # ------------------------------------------------------------------------- # IMAP UTILS # ------------------------------------------------------------------------- def parse_args(): parser = argparse.ArgumentParser(description="Migrate emails with SQLite State Tracking.") parser.add_argument('--src-host', required=True) parser.add_argument('--src-user', required=True) parser.add_argument('--src-pass', required=True) parser.add_argument('--dst-host', required=True) parser.add_argument('--dst-user', required=True) parser.add_argument('--dst-pass', required=True) parser.add_argument('--dry-run', action='store_true', help="Simulate actions") parser.add_argument('--ssl', action='store_true', default=True, help="Use SSL") parser.add_argument('--db', default="migration_history.db", help="Path to SQLite DB") return parser.parse_args() def connect_imap(host, user, password, use_ssl=True): try: if use_ssl: mail = imaplib.IMAP4_SSL(host) else: mail = imaplib.IMAP4(host) mail.login(user, password) return mail except Exception as e: print(f"Error connecting to {host}: {e}") sys.exit(1) def get_folder_list(mail): status, folders = mail.list() if status != 'OK': return [] clean_folders = [] for folder in folders: decoded = folder.decode('utf-8') parts = re.search(r' \".\" \"?(.+?)\"?$', decoded) or re.search(r' \S \S (.+)$', decoded) if parts: name = parts.group(1).replace('"', '') clean_folders.append(name) else: clean_folders.append(decoded.split(' ')[-1].replace('"', '')) return clean_folders def get_message_id_from_bytes(raw_header): """Safe extraction of Message-ID from bytes""" try: msg = email.message_from_bytes(raw_header, policy=default) mid = msg.get("Message-ID") return mid.strip() if mid else None except: return None # ------------------------------------------------------------------------- # MIGRATION LOGIC # ------------------------------------------------------------------------- def migrate_folder(src, dst, folder_name, db, args): print(f"\n--- Processing: {folder_name} ---") # 1. Ensure Destination Exists if not args.dry_run: try: dst.create(f'"{folder_name}"') except imaplib.IMAP4.error: pass # 2. Select Source status, _ = src.select(f'"{folder_name}"', readonly=True) if status != 'OK': print(f"Skipping {folder_name}: Could not select.") return # 3. Batch Fetch ALL Source Message-IDs first # This prevents the connection from timing out during a long read loop # and allows us to filter against the DB instantly. print(" -> Scanning source messages...") status, messages = src.search(None, 'ALL') if status != 'OK' or not messages[0]: print(" -> Folder empty.") return msg_nums = messages[0].split() total_msgs = len(msg_nums) # We fetch headers in bulk (e.g. 1:1000) or simply iterate. # For stability, we iterate but only fetch headers first. to_migrate = [] # List of (imap_sequence_num, message_id) print(f" -> Analyzing {total_msgs} emails against local DB...") # Pre-fetch check loop for num in msg_nums: # Fetch ONLY the Message-ID header (Very fast) typ, data = src.fetch(num, '(BODY.PEEK[HEADER.FIELDS (MESSAGE-ID)])') if typ != 'OK': continue header_data = data[0][1] msg_id = get_message_id_from_bytes(header_data) if not msg_id: # If no Message-ID, we cannot reliably track it in DB. # We treat it as "always migrate" or skip. Here we skip to avoid dupes. # You can change logic to migrate these blindly if needed. continue # Check DB if db.is_processed(args.src_host, args.src_user, folder_name, msg_id): # Already migrated continue else: to_migrate.append((num, msg_id)) print(f" -> {len(to_migrate)} new emails to migrate. ({total_msgs - len(to_migrate)} skipped).") # 4. Migration Loop count = 0 for num, msg_id in to_migrate: count += 1 # Fetch Full Body + Flags res, msg_data = src.fetch(num, '(FLAGS INTERNALDATE BODY.PEEK[])') if res != 'OK': print("X", end="", flush=True) db.log_error(args.src_host, args.src_user, folder_name, msg_id, "Fetch Error") continue raw_email = None flags = None date_str = None for part in msg_data: if isinstance(part, tuple): flags_match = re.search(r'FLAGS \((.*?)\)', part[0].decode('utf-8')) if flags_match: flags = flags_match.group(1) date_match = re.search(r'INTERNALDATE \"(.*?)\"', part[0].decode('utf-8')) if date_match: date_str = date_match.group(1) raw_email = part[1] if not raw_email: continue if args.dry_run: print(".", end="", flush=True) else: try: delivery_time = imaplib.Time2Internaldate(time.localtime()) if date_str: delivery_time = f'"{date_str}"' flag_str = f'({flags})' if flags else None dst.append(f'"{folder_name}"', flag_str, delivery_time, raw_email) # SUCCESS: Log to DB db.log_success(args.src_host, args.src_user, folder_name, msg_id) print(".", end="", flush=True) except Exception as e: print(f"E", end="", flush=True) db.log_error(args.src_host, args.src_user, folder_name, msg_id, str(e)) # If connection dropped here, the script crashes, # but the DB has saved progress up to the previous email. def main(): args = parse_args() db = MigrationDB(args.db) try: print(f"Logging to database: {args.db}") print("Connecting to Source...") src = connect_imap(args.src_host, args.src_user, args.src_pass, args.ssl) print("Connecting to Destination...") dst = connect_imap(args.dst_host, args.dst_user, args.dst_pass, args.ssl) print("Fetching folder list...") folders = get_folder_list(src) if args.dry_run: print("\n*** DRY RUN MODE ***") for folder in folders: try: migrate_folder(src, dst, folder, db, args) except (imaplib.IMAP4.abort, ConnectionResetError, BrokenPipeError) as e: print(f"\nCRITICAL CONNECTION ERROR on folder {folder}: {e}") print("The script will exit. Please run it again; it will resume from where it left off.") sys.exit(1) except Exception as e: print(f"\nUnexpected error processing {folder}: {e}") finally: try: src.logout() except: pass try: dst.logout() except: pass db.close() print("\n\nProcess Finished.") if __name__ == "__main__": main()