From 0086d93be3b0a432cc94d158c365b948755b0576 Mon Sep 17 00:00:00 2001 From: Michael Manning Date: Wed, 28 Jan 2026 17:54:34 +1100 Subject: [PATCH] Initial commit for a test Python project for email migration between two IMAP servers. --- migrate_emails.py | 212 +++++++++++++++++++++++++++++++ migrate_emails_db.py | 287 ++++++++++++++++++++++++++++++++++++++++++ migrate_emails_old.oy | 171 +++++++++++++++++++++++++ 3 files changed, 670 insertions(+) create mode 100644 migrate_emails.py create mode 100644 migrate_emails_db.py create mode 100644 migrate_emails_old.oy diff --git a/migrate_emails.py b/migrate_emails.py new file mode 100644 index 0000000..22331dd --- /dev/null +++ b/migrate_emails.py @@ -0,0 +1,212 @@ +import imaplib +import argparse +import sys +import re +import time +import email +from email.policy import default + +# Increase limit for large emails +imaplib._MAXLINE = 10000000 + +def parse_args(): + parser = argparse.ArgumentParser(description="Migrate emails via IMAP with Deduplication.") + + # Source Creds + parser.add_argument('--src-host', required=True, help="Source IMAP Hostname") + parser.add_argument('--src-user', required=True, help="Source Username") + parser.add_argument('--src-pass', required=True, help="Source Password") + + # Dest Creds + parser.add_argument('--dst-host', required=True, help="Destination IMAP Hostname") + parser.add_argument('--dst-user', required=True, help="Destination Username") + parser.add_argument('--dst-pass', required=True, help="Destination Password") + + parser.add_argument('--dry-run', action='store_true', help="Simulate actions without moving data") + parser.add_argument('--ssl', action='store_true', default=True, help="Use SSL (Default: True)") + + return parser.parse_args() + +def connect_imap(host, user, password, use_ssl=True): + try: + if use_ssl: + mail = imaplib.IMAP4_SSL(host) + else: + mail = imaplib.IMAP4(host) + mail.login(user, password) + return mail + except Exception as e: + print(f"Error connecting to {host}: {e}") + sys.exit(1) + +def get_folder_list(mail): + status, folders = mail.list() + if status != 'OK': + print("Failed to retrieve folder list.") + return [] + + clean_folders = [] + for folder in folders: + decoded = folder.decode('utf-8') + # Regex to extract folder name from standard IMAP response + parts = re.search(r' \".\" \"?(.+?)\"?$', decoded) or re.search(r' \S \S (.+)$', decoded) + if parts: + name = parts.group(1).replace('"', '') + clean_folders.append(name) + else: + clean_folders.append(decoded.split(' ')[-1].replace('"', '')) + return clean_folders + +def get_destination_message_ids(dst, folder_name): + """ + Fetches only the Message-IDs from the destination folder + to build a set for deduplication. + """ + existing_ids = set() + + # Select folder on destination + try: + status, _ = dst.select(f'"{folder_name}"', readonly=True) + if status != 'OK': + return existing_ids # Folder probably empty or doesn't exist yet + except: + return existing_ids + + status, messages = dst.search(None, 'ALL') + msg_ids = messages[0].split() + + if not msg_ids: + return existing_ids + + print(f" -> Indexing {len(msg_ids)} existing emails on destination for deduplication...") + + # Fetch headers in batches would be faster, but simple iteration is safer for compatibility + # We fetch ONLY the header fields to save bandwidth + for num in msg_ids: + try: + # PEEK prevents marking read, though usually not critical on destination + typ, data = dst.fetch(num, '(BODY.PEEK[HEADER.FIELDS (MESSAGE-ID)])') + if typ == 'OK': + header_data = data[0][1] + # Parse the byte string into an email object to extract ID reliably + msg = email.message_from_bytes(header_data, policy=default) + msg_id = msg.get("Message-ID") + if msg_id: + existing_ids.add(msg_id.strip()) + except Exception: + continue + + return existing_ids + +def migrate_folder(src, dst, folder_name, dry_run): + print(f"\n--- Processing: {folder_name} ---") + + # 1. Prepare Destination (Create & Index) + existing_ids = set() + if not dry_run: + try: + dst.create(f'"{folder_name}"') + except imaplib.IMAP4.error: + pass + + # Build dedupe index + existing_ids = get_destination_message_ids(dst, folder_name) + + # 2. Select Source + status, _ = src.select(f'"{folder_name}"', readonly=True) + if status != 'OK': + print(f"Skipping {folder_name}: Could not select on source.") + return + + # 3. Get Source Messages + status, messages = src.search(None, 'ALL') + msg_ids = messages[0].split() + total = len(msg_ids) + print(f"Found {total} emails in source '{folder_name}'.") + + moved_count = 0 + skipped_count = 0 + + for msg_id in msg_ids: + # Fetch Flags, Date, and Body + res, msg_data = src.fetch(msg_id, '(FLAGS INTERNALDATE BODY.PEEK[])') + + if res != 'OK': + print(f"X", end="", flush=True) + continue + + raw_email = None + flags = None + date_str = None + + for part in msg_data: + if isinstance(part, tuple): + flags_match = re.search(r'FLAGS \((.*?)\)', part[0].decode('utf-8')) + if flags_match: + flags = flags_match.group(1) + + date_match = re.search(r'INTERNALDATE \"(.*?)\"', part[0].decode('utf-8')) + if date_match: + date_str = date_match.group(1) + + raw_email = part[1] + + if raw_email: + # Check Deduplication + msg_obj = email.message_from_bytes(raw_email, policy=default) + src_msg_id = msg_obj.get("Message-ID") + + if src_msg_id and src_msg_id.strip() in existing_ids: + # Duplicate found + print("S", end="", flush=True) # S for Skipped + skipped_count += 1 + continue + + # Not a duplicate, proceed to append + if dry_run: + print(".", end="", flush=True) + moved_count += 1 + else: + delivery_time = imaplib.Time2Internaldate(time.localtime()) + if date_str: + delivery_time = f'"{date_str}"' + + flag_str = f'({flags})' if flags else None + + try: + dst.append(f'"{folder_name}"', flag_str, delivery_time, raw_email) + print(".", end="", flush=True) + moved_count += 1 + # Update local index so we don't duplicate if source has duplicates + if src_msg_id: + existing_ids.add(src_msg_id.strip()) + except Exception as e: + print(f"E", end="", flush=True) # E for Error + + print(f"\nResult: {moved_count} moved, {skipped_count} skipped (duplicates).") + +def main(): + args = parse_args() + + print("Connecting to Source...") + src = connect_imap(args.src_host, args.src_user, args.src_pass, args.ssl) + + print("Connecting to Destination...") + dst = connect_imap(args.dst_host, args.dst_user, args.dst_pass, args.ssl) + + print("Fetching folder list...") + folders = get_folder_list(src) + print(f"Detected folders: {folders}") + + if args.dry_run: + print("\n*** DRY RUN MODE ***") + + for folder in folders: + migrate_folder(src, dst, folder, args.dry_run) + + print("\n\nMigration Complete.") + src.logout() + dst.logout() + +if __name__ == "__main__": + main() diff --git a/migrate_emails_db.py b/migrate_emails_db.py new file mode 100644 index 0000000..560a2d3 --- /dev/null +++ b/migrate_emails_db.py @@ -0,0 +1,287 @@ +import imaplib +import argparse +import sys +import re +import time +import email +import sqlite3 +from email.policy import default + +# Increase limit for large emails +imaplib._MAXLINE = 10000000 + +# ------------------------------------------------------------------------- +# DATABASE MANAGER +# ------------------------------------------------------------------------- +class MigrationDB: + def __init__(self, db_path="migration_history.db"): + self.conn = sqlite3.connect(db_path) + self.create_table() + + def create_table(self): + query = """ + CREATE TABLE IF NOT EXISTS migration_log ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + src_host TEXT, + src_user TEXT, + folder TEXT, + message_id TEXT, + status TEXT, + timestamp DATETIME DEFAULT CURRENT_TIMESTAMP, + UNIQUE(src_host, src_user, folder, message_id) + ) + """ + self.conn.execute(query) + self.conn.commit() + + def is_processed(self, src_host, src_user, folder, message_id): + """Checks if a message ID has been successfully migrated.""" + cursor = self.conn.cursor() + cursor.execute(""" + SELECT 1 FROM migration_log + WHERE src_host=? AND src_user=? AND folder=? AND message_id=? AND status='success' + """, (src_host, src_user, folder, message_id)) + return cursor.fetchone() is not None + + def log_success(self, src_host, src_user, folder, message_id): + try: + query = """ + INSERT OR REPLACE INTO migration_log (src_host, src_user, folder, message_id, status) + VALUES (?, ?, ?, ?, 'success') + """ + self.conn.execute(query, (src_host, src_user, folder, message_id)) + self.conn.commit() + except sqlite3.Error as e: + print(f"DB Error: {e}") + + def log_error(self, src_host, src_user, folder, message_id, error_msg): + # We append error status but don't replace if it was previously success + try: + query = """ + INSERT OR IGNORE INTO migration_log (src_host, src_user, folder, message_id, status) + VALUES (?, ?, ?, ?, ?) + """ + self.conn.execute(query, (src_host, src_user, folder, message_id, f"error: {error_msg}")) + self.conn.commit() + except sqlite3.Error: + pass + + def close(self): + self.conn.close() + +# ------------------------------------------------------------------------- +# IMAP UTILS +# ------------------------------------------------------------------------- + +def parse_args(): + parser = argparse.ArgumentParser(description="Migrate emails with SQLite State Tracking.") + + parser.add_argument('--src-host', required=True) + parser.add_argument('--src-user', required=True) + parser.add_argument('--src-pass', required=True) + + parser.add_argument('--dst-host', required=True) + parser.add_argument('--dst-user', required=True) + parser.add_argument('--dst-pass', required=True) + + parser.add_argument('--dry-run', action='store_true', help="Simulate actions") + parser.add_argument('--ssl', action='store_true', default=True, help="Use SSL") + parser.add_argument('--db', default="migration_history.db", help="Path to SQLite DB") + + return parser.parse_args() + +def connect_imap(host, user, password, use_ssl=True): + try: + if use_ssl: + mail = imaplib.IMAP4_SSL(host) + else: + mail = imaplib.IMAP4(host) + mail.login(user, password) + return mail + except Exception as e: + print(f"Error connecting to {host}: {e}") + sys.exit(1) + +def get_folder_list(mail): + status, folders = mail.list() + if status != 'OK': + return [] + + clean_folders = [] + for folder in folders: + decoded = folder.decode('utf-8') + parts = re.search(r' \".\" \"?(.+?)\"?$', decoded) or re.search(r' \S \S (.+)$', decoded) + if parts: + name = parts.group(1).replace('"', '') + clean_folders.append(name) + else: + clean_folders.append(decoded.split(' ')[-1].replace('"', '')) + return clean_folders + +def get_message_id_from_bytes(raw_header): + """Safe extraction of Message-ID from bytes""" + try: + msg = email.message_from_bytes(raw_header, policy=default) + mid = msg.get("Message-ID") + return mid.strip() if mid else None + except: + return None + +# ------------------------------------------------------------------------- +# MIGRATION LOGIC +# ------------------------------------------------------------------------- + +def migrate_folder(src, dst, folder_name, db, args): + print(f"\n--- Processing: {folder_name} ---") + + # 1. Ensure Destination Exists + if not args.dry_run: + try: + dst.create(f'"{folder_name}"') + except imaplib.IMAP4.error: + pass + + # 2. Select Source + status, _ = src.select(f'"{folder_name}"', readonly=True) + if status != 'OK': + print(f"Skipping {folder_name}: Could not select.") + return + + # 3. Batch Fetch ALL Source Message-IDs first + # This prevents the connection from timing out during a long read loop + # and allows us to filter against the DB instantly. + print(" -> Scanning source messages...") + status, messages = src.search(None, 'ALL') + if status != 'OK' or not messages[0]: + print(" -> Folder empty.") + return + + msg_nums = messages[0].split() + total_msgs = len(msg_nums) + + # We fetch headers in bulk (e.g. 1:1000) or simply iterate. + # For stability, we iterate but only fetch headers first. + + to_migrate = [] # List of (imap_sequence_num, message_id) + + print(f" -> Analyzing {total_msgs} emails against local DB...") + + # Pre-fetch check loop + for num in msg_nums: + # Fetch ONLY the Message-ID header (Very fast) + typ, data = src.fetch(num, '(BODY.PEEK[HEADER.FIELDS (MESSAGE-ID)])') + if typ != 'OK': + continue + + header_data = data[0][1] + msg_id = get_message_id_from_bytes(header_data) + + if not msg_id: + # If no Message-ID, we cannot reliably track it in DB. + # We treat it as "always migrate" or skip. Here we skip to avoid dupes. + # You can change logic to migrate these blindly if needed. + continue + + # Check DB + if db.is_processed(args.src_host, args.src_user, folder_name, msg_id): + # Already migrated + continue + else: + to_migrate.append((num, msg_id)) + + print(f" -> {len(to_migrate)} new emails to migrate. ({total_msgs - len(to_migrate)} skipped).") + + # 4. Migration Loop + count = 0 + for num, msg_id in to_migrate: + count += 1 + + # Fetch Full Body + Flags + res, msg_data = src.fetch(num, '(FLAGS INTERNALDATE BODY.PEEK[])') + if res != 'OK': + print("X", end="", flush=True) + db.log_error(args.src_host, args.src_user, folder_name, msg_id, "Fetch Error") + continue + + raw_email = None + flags = None + date_str = None + + for part in msg_data: + if isinstance(part, tuple): + flags_match = re.search(r'FLAGS \((.*?)\)', part[0].decode('utf-8')) + if flags_match: + flags = flags_match.group(1) + + date_match = re.search(r'INTERNALDATE \"(.*?)\"', part[0].decode('utf-8')) + if date_match: + date_str = date_match.group(1) + + raw_email = part[1] + + if not raw_email: + continue + + if args.dry_run: + print(".", end="", flush=True) + else: + try: + delivery_time = imaplib.Time2Internaldate(time.localtime()) + if date_str: + delivery_time = f'"{date_str}"' + + flag_str = f'({flags})' if flags else None + + dst.append(f'"{folder_name}"', flag_str, delivery_time, raw_email) + + # SUCCESS: Log to DB + db.log_success(args.src_host, args.src_user, folder_name, msg_id) + print(".", end="", flush=True) + + except Exception as e: + print(f"E", end="", flush=True) + db.log_error(args.src_host, args.src_user, folder_name, msg_id, str(e)) + # If connection dropped here, the script crashes, + # but the DB has saved progress up to the previous email. + +def main(): + args = parse_args() + db = MigrationDB(args.db) + + try: + print(f"Logging to database: {args.db}") + + print("Connecting to Source...") + src = connect_imap(args.src_host, args.src_user, args.src_pass, args.ssl) + + print("Connecting to Destination...") + dst = connect_imap(args.dst_host, args.dst_user, args.dst_pass, args.ssl) + + print("Fetching folder list...") + folders = get_folder_list(src) + + if args.dry_run: + print("\n*** DRY RUN MODE ***") + + for folder in folders: + try: + migrate_folder(src, dst, folder, db, args) + except (imaplib.IMAP4.abort, ConnectionResetError, BrokenPipeError) as e: + print(f"\nCRITICAL CONNECTION ERROR on folder {folder}: {e}") + print("The script will exit. Please run it again; it will resume from where it left off.") + sys.exit(1) + except Exception as e: + print(f"\nUnexpected error processing {folder}: {e}") + + finally: + try: + src.logout() + except: pass + try: + dst.logout() + except: pass + db.close() + print("\n\nProcess Finished.") + +if __name__ == "__main__": + main() diff --git a/migrate_emails_old.oy b/migrate_emails_old.oy new file mode 100644 index 0000000..a219bec --- /dev/null +++ b/migrate_emails_old.oy @@ -0,0 +1,171 @@ +import imaplib +import argparse +import sys +import re +import time +from email.utils import parsedate_to_datetime + +# Increase limit for large emails (default is often too small) +imaplib._MAXLINE = 10000000 + +def parse_args(): + parser = argparse.ArgumentParser(description="Migrate emails via IMAP.") + + # Source Creds + parser.add_argument('--src-host', required=True, help="Source IMAP Hostname") + parser.add_argument('--src-user', required=True, help="Source Username") + parser.add_argument('--src-pass', required=True, help="Source Password") + + # Dest Creds + parser.add_argument('--dst-host', required=True, help="Destination IMAP Hostname") + parser.add_argument('--dst-user', required=True, help="Destination Username") + parser.add_argument('--dst-pass', required=True, help="Destination Password") + + parser.add_argument('--dry-run', action='store_true', help="Simulate actions without moving data") + parser.add_argument('--ssl', action='store_true', default=True, help="Use SSL (Default: True)") + + return parser.parse_args() + +def connect_imap(host, user, password, use_ssl=True): + try: + if use_ssl: + mail = imaplib.IMAP4_SSL(host) + else: + mail = imaplib.IMAP4(host) + mail.login(user, password) + return mail + except Exception as e: + print(f"Error connecting to {host}: {e}") + sys.exit(1) + +def get_folder_list(mail): + """Returns a list of folder names.""" + status, folders = mail.list() + if status != 'OK': + print("Failed to retrieve folder list.") + return [] + + clean_folders = [] + for folder in folders: + # Regex to extract the actual folder name typically in quotes or plain text + # Format usually: (\HasNoChildren) "/" "FolderName" + # We try to grab everything after the delimiter + decoded = folder.decode('utf-8') + # Split by the delimiter (usually the second item in the tuple response) + parts = re.search(r' \".\" \"?(.+?)\"?$', decoded) or re.search(r' \S \S (.+)$', decoded) + + if parts: + name = parts.group(1).replace('"', '') + clean_folders.append(name) + else: + # Fallback raw extraction if regex fails + clean_folders.append(decoded.split(' ')[-1].replace('"', '')) + + return clean_folders + +def migrate_folder(src, dst, folder_name, dry_run): + print(f"\n--- Processing: {folder_name} ---") + + # Select Source (Read-Only to be safe) + status, _ = src.select(f'"{folder_name}"', readonly=True) + if status != 'OK': + print(f"Skipping {folder_name}: Could not select on source.") + return + + # Check/Create Destination + if not dry_run: + try: + dst.create(f'"{folder_name}"') + except imaplib.IMAP4.error: + # Usually means folder exists + pass + + # Search all messages + status, messages = src.search(None, 'ALL') + msg_ids = messages[0].split() + total = len(msg_ids) + print(f"Found {total} emails in {folder_name}.") + + count = 0 + for msg_id in msg_ids: + count += 1 + + # 1. Fetch Data & Flags + # Using BODY.PEEK[] prevents marking the email as read on the source + res, msg_data = src.fetch(msg_id, '(FLAGS INTERNALDATE BODY.PEEK[])') + + if res != 'OK': + print(f"Error reading message {msg_id}") + continue + + # msg_data returned as a list of tuples. + # [0] contains (metadata, content), [1] is closing parenthesis usually + raw_email = None + flags = None + date_str = None + + for part in msg_data: + if isinstance(part, tuple): + # Parse Flags + flags_match = re.search(r'FLAGS \((.*?)\)', part[0].decode('utf-8')) + if flags_match: + flags = flags_match.group(1) + + # Parse Internal Date + date_match = re.search(r'INTERNALDATE \"(.*?)\"', part[0].decode('utf-8')) + if date_match: + date_str = date_match.group(1) + + raw_email = part[1] + + # 2. Append to Destination + if dry_run: + # Just print a dot progress + print(".", end="", flush=True) + else: + if raw_email: + # Prepare Time + # imaplib.Time2Internaldate is available but often the raw string from source works best + # If date_str is available, we use it to preserve the receive date + delivery_time = imaplib.Time2Internaldate(time.localtime()) + if date_str: + delivery_time = f'"{date_str}"' + + # Prepare Flags (ensure they are properly formatted) + flag_str = f'({flags})' if flags else None + + try: + dst.append(f'"{folder_name}"', flag_str, delivery_time, raw_email) + print(".", end="", flush=True) + except Exception as e: + print(f"X ({e})", end="", flush=True) + + print(f"\nMoved {count}/{total} emails from {folder_name}.") + +def main(): + args = parse_args() + + print("Connecting to Source...") + src = connect_imap(args.src_host, args.src_user, args.src_pass, args.ssl) + + print("Connecting to Destination...") + dst = connect_imap(args.dst_host, args.dst_user, args.dst_pass, args.ssl) + + print("Fetching folder list...") + folders = get_folder_list(src) + print(f"Detected folders: {folders}") + + if args.dry_run: + print("\n*** DRY RUN MODE: No changes will be made ***") + + for folder in folders: + # Skip weird Gmail folders if necessary like '[Gmail]' root if it causes issues + # For now, we attempt all. + migrate_folder(src, dst, folder, args.dry_run) + + print("\n\nMigration Complete.") + src.logout() + dst.logout() + +if __name__ == "__main__": + main()