import imaplib import argparse import sys import re import time from email.utils import parsedate_to_datetime # Increase limit for large emails (default is often too small) imaplib._MAXLINE = 10000000 def parse_args(): parser = argparse.ArgumentParser(description="Migrate emails via IMAP.") # Source Creds parser.add_argument('--src-host', required=True, help="Source IMAP Hostname") parser.add_argument('--src-user', required=True, help="Source Username") parser.add_argument('--src-pass', required=True, help="Source Password") # Dest Creds parser.add_argument('--dst-host', required=True, help="Destination IMAP Hostname") parser.add_argument('--dst-user', required=True, help="Destination Username") parser.add_argument('--dst-pass', required=True, help="Destination Password") parser.add_argument('--dry-run', action='store_true', help="Simulate actions without moving data") parser.add_argument('--ssl', action='store_true', default=True, help="Use SSL (Default: True)") return parser.parse_args() def connect_imap(host, user, password, use_ssl=True): try: if use_ssl: mail = imaplib.IMAP4_SSL(host) else: mail = imaplib.IMAP4(host) mail.login(user, password) return mail except Exception as e: print(f"Error connecting to {host}: {e}") sys.exit(1) def get_folder_list(mail): """Returns a list of folder names.""" status, folders = mail.list() if status != 'OK': print("Failed to retrieve folder list.") return [] clean_folders = [] for folder in folders: # Regex to extract the actual folder name typically in quotes or plain text # Format usually: (\HasNoChildren) "/" "FolderName" # We try to grab everything after the delimiter decoded = folder.decode('utf-8') # Split by the delimiter (usually the second item in the tuple response) parts = re.search(r' \".\" \"?(.+?)\"?$', decoded) or re.search(r' \S \S (.+)$', decoded) if parts: name = parts.group(1).replace('"', '') clean_folders.append(name) else: # Fallback raw extraction if regex fails clean_folders.append(decoded.split(' ')[-1].replace('"', '')) return clean_folders def migrate_folder(src, dst, folder_name, dry_run): print(f"\n--- Processing: {folder_name} ---") # Select Source (Read-Only to be safe) status, _ = src.select(f'"{folder_name}"', readonly=True) if status != 'OK': print(f"Skipping {folder_name}: Could not select on source.") return # Check/Create Destination if not dry_run: try: dst.create(f'"{folder_name}"') except imaplib.IMAP4.error: # Usually means folder exists pass # Search all messages status, messages = src.search(None, 'ALL') msg_ids = messages[0].split() total = len(msg_ids) print(f"Found {total} emails in {folder_name}.") count = 0 for msg_id in msg_ids: count += 1 # 1. Fetch Data & Flags # Using BODY.PEEK[] prevents marking the email as read on the source res, msg_data = src.fetch(msg_id, '(FLAGS INTERNALDATE BODY.PEEK[])') if res != 'OK': print(f"Error reading message {msg_id}") continue # msg_data returned as a list of tuples. # [0] contains (metadata, content), [1] is closing parenthesis usually raw_email = None flags = None date_str = None for part in msg_data: if isinstance(part, tuple): # Parse Flags flags_match = re.search(r'FLAGS \((.*?)\)', part[0].decode('utf-8')) if flags_match: flags = flags_match.group(1) # Parse Internal Date date_match = re.search(r'INTERNALDATE \"(.*?)\"', part[0].decode('utf-8')) if date_match: date_str = date_match.group(1) raw_email = part[1] # 2. Append to Destination if dry_run: # Just print a dot progress print(".", end="", flush=True) else: if raw_email: # Prepare Time # imaplib.Time2Internaldate is available but often the raw string from source works best # If date_str is available, we use it to preserve the receive date delivery_time = imaplib.Time2Internaldate(time.localtime()) if date_str: delivery_time = f'"{date_str}"' # Prepare Flags (ensure they are properly formatted) flag_str = f'({flags})' if flags else None try: dst.append(f'"{folder_name}"', flag_str, delivery_time, raw_email) print(".", end="", flush=True) except Exception as e: print(f"X ({e})", end="", flush=True) print(f"\nMoved {count}/{total} emails from {folder_name}.") def main(): args = parse_args() print("Connecting to Source...") src = connect_imap(args.src_host, args.src_user, args.src_pass, args.ssl) print("Connecting to Destination...") dst = connect_imap(args.dst_host, args.dst_user, args.dst_pass, args.ssl) print("Fetching folder list...") folders = get_folder_list(src) print(f"Detected folders: {folders}") if args.dry_run: print("\n*** DRY RUN MODE: No changes will be made ***") for folder in folders: # Skip weird Gmail folders if necessary like '[Gmail]' root if it causes issues # For now, we attempt all. migrate_folder(src, dst, folder, args.dry_run) print("\n\nMigration Complete.") src.logout() dst.logout() if __name__ == "__main__": main()