import imaplib import argparse import sys import re import time import email from email.policy import default # Increase limit for large emails imaplib._MAXLINE = 10000000 def parse_args(): parser = argparse.ArgumentParser(description="Migrate emails via IMAP with Deduplication.") # Source Creds parser.add_argument('--src-host', required=True, help="Source IMAP Hostname") parser.add_argument('--src-user', required=True, help="Source Username") parser.add_argument('--src-pass', required=True, help="Source Password") # Dest Creds parser.add_argument('--dst-host', required=True, help="Destination IMAP Hostname") parser.add_argument('--dst-user', required=True, help="Destination Username") parser.add_argument('--dst-pass', required=True, help="Destination Password") parser.add_argument('--dry-run', action='store_true', help="Simulate actions without moving data") parser.add_argument('--ssl', action='store_true', default=True, help="Use SSL (Default: True)") return parser.parse_args() def connect_imap(host, user, password, use_ssl=True): try: if use_ssl: mail = imaplib.IMAP4_SSL(host) else: mail = imaplib.IMAP4(host) mail.login(user, password) return mail except Exception as e: print(f"Error connecting to {host}: {e}") sys.exit(1) def get_folder_list(mail): status, folders = mail.list() if status != 'OK': print("Failed to retrieve folder list.") return [] clean_folders = [] for folder in folders: decoded = folder.decode('utf-8') # Regex to extract folder name from standard IMAP response parts = re.search(r' \".\" \"?(.+?)\"?$', decoded) or re.search(r' \S \S (.+)$', decoded) if parts: name = parts.group(1).replace('"', '') clean_folders.append(name) else: clean_folders.append(decoded.split(' ')[-1].replace('"', '')) return clean_folders def get_destination_message_ids(dst, folder_name): """ Fetches only the Message-IDs from the destination folder to build a set for deduplication. """ existing_ids = set() # Select folder on destination try: status, _ = dst.select(f'"{folder_name}"', readonly=True) if status != 'OK': return existing_ids # Folder probably empty or doesn't exist yet except: return existing_ids status, messages = dst.search(None, 'ALL') msg_ids = messages[0].split() if not msg_ids: return existing_ids print(f" -> Indexing {len(msg_ids)} existing emails on destination for deduplication...") # Fetch headers in batches would be faster, but simple iteration is safer for compatibility # We fetch ONLY the header fields to save bandwidth for num in msg_ids: try: # PEEK prevents marking read, though usually not critical on destination typ, data = dst.fetch(num, '(BODY.PEEK[HEADER.FIELDS (MESSAGE-ID)])') if typ == 'OK': header_data = data[0][1] # Parse the byte string into an email object to extract ID reliably msg = email.message_from_bytes(header_data, policy=default) msg_id = msg.get("Message-ID") if msg_id: existing_ids.add(msg_id.strip()) except Exception: continue return existing_ids def migrate_folder(src, dst, folder_name, dry_run): print(f"\n--- Processing: {folder_name} ---") # 1. Prepare Destination (Create & Index) existing_ids = set() if not dry_run: try: dst.create(f'"{folder_name}"') except imaplib.IMAP4.error: pass # Build dedupe index existing_ids = get_destination_message_ids(dst, folder_name) # 2. Select Source status, _ = src.select(f'"{folder_name}"', readonly=True) if status != 'OK': print(f"Skipping {folder_name}: Could not select on source.") return # 3. Get Source Messages status, messages = src.search(None, 'ALL') msg_ids = messages[0].split() total = len(msg_ids) print(f"Found {total} emails in source '{folder_name}'.") moved_count = 0 skipped_count = 0 for msg_id in msg_ids: # Fetch Flags, Date, and Body res, msg_data = src.fetch(msg_id, '(FLAGS INTERNALDATE BODY.PEEK[])') if res != 'OK': print(f"X", end="", flush=True) continue raw_email = None flags = None date_str = None for part in msg_data: if isinstance(part, tuple): flags_match = re.search(r'FLAGS \((.*?)\)', part[0].decode('utf-8')) if flags_match: flags = flags_match.group(1) date_match = re.search(r'INTERNALDATE \"(.*?)\"', part[0].decode('utf-8')) if date_match: date_str = date_match.group(1) raw_email = part[1] if raw_email: # Check Deduplication msg_obj = email.message_from_bytes(raw_email, policy=default) src_msg_id = msg_obj.get("Message-ID") if src_msg_id and src_msg_id.strip() in existing_ids: # Duplicate found print("S", end="", flush=True) # S for Skipped skipped_count += 1 continue # Not a duplicate, proceed to append if dry_run: print(".", end="", flush=True) moved_count += 1 else: delivery_time = imaplib.Time2Internaldate(time.localtime()) if date_str: delivery_time = f'"{date_str}"' flag_str = f'({flags})' if flags else None try: dst.append(f'"{folder_name}"', flag_str, delivery_time, raw_email) print(".", end="", flush=True) moved_count += 1 # Update local index so we don't duplicate if source has duplicates if src_msg_id: existing_ids.add(src_msg_id.strip()) except Exception as e: print(f"E", end="", flush=True) # E for Error print(f"\nResult: {moved_count} moved, {skipped_count} skipped (duplicates).") def main(): args = parse_args() print("Connecting to Source...") src = connect_imap(args.src_host, args.src_user, args.src_pass, args.ssl) print("Connecting to Destination...") dst = connect_imap(args.dst_host, args.dst_user, args.dst_pass, args.ssl) print("Fetching folder list...") folders = get_folder_list(src) print(f"Detected folders: {folders}") if args.dry_run: print("\n*** DRY RUN MODE ***") for folder in folders: migrate_folder(src, dst, folder, args.dry_run) print("\n\nMigration Complete.") src.logout() dst.logout() if __name__ == "__main__": main()