Initial commit for a test Python project for email migration between two IMAP servers.
This commit is contained in:
212
migrate_emails.py
Normal file
212
migrate_emails.py
Normal file
@@ -0,0 +1,212 @@
|
||||
import imaplib
|
||||
import argparse
|
||||
import sys
|
||||
import re
|
||||
import time
|
||||
import email
|
||||
from email.policy import default
|
||||
|
||||
# Increase limit for large emails
|
||||
imaplib._MAXLINE = 10000000
|
||||
|
||||
def parse_args():
|
||||
parser = argparse.ArgumentParser(description="Migrate emails via IMAP with Deduplication.")
|
||||
|
||||
# Source Creds
|
||||
parser.add_argument('--src-host', required=True, help="Source IMAP Hostname")
|
||||
parser.add_argument('--src-user', required=True, help="Source Username")
|
||||
parser.add_argument('--src-pass', required=True, help="Source Password")
|
||||
|
||||
# Dest Creds
|
||||
parser.add_argument('--dst-host', required=True, help="Destination IMAP Hostname")
|
||||
parser.add_argument('--dst-user', required=True, help="Destination Username")
|
||||
parser.add_argument('--dst-pass', required=True, help="Destination Password")
|
||||
|
||||
parser.add_argument('--dry-run', action='store_true', help="Simulate actions without moving data")
|
||||
parser.add_argument('--ssl', action='store_true', default=True, help="Use SSL (Default: True)")
|
||||
|
||||
return parser.parse_args()
|
||||
|
||||
def connect_imap(host, user, password, use_ssl=True):
|
||||
try:
|
||||
if use_ssl:
|
||||
mail = imaplib.IMAP4_SSL(host)
|
||||
else:
|
||||
mail = imaplib.IMAP4(host)
|
||||
mail.login(user, password)
|
||||
return mail
|
||||
except Exception as e:
|
||||
print(f"Error connecting to {host}: {e}")
|
||||
sys.exit(1)
|
||||
|
||||
def get_folder_list(mail):
|
||||
status, folders = mail.list()
|
||||
if status != 'OK':
|
||||
print("Failed to retrieve folder list.")
|
||||
return []
|
||||
|
||||
clean_folders = []
|
||||
for folder in folders:
|
||||
decoded = folder.decode('utf-8')
|
||||
# Regex to extract folder name from standard IMAP response
|
||||
parts = re.search(r' \".\" \"?(.+?)\"?$', decoded) or re.search(r' \S \S (.+)$', decoded)
|
||||
if parts:
|
||||
name = parts.group(1).replace('"', '')
|
||||
clean_folders.append(name)
|
||||
else:
|
||||
clean_folders.append(decoded.split(' ')[-1].replace('"', ''))
|
||||
return clean_folders
|
||||
|
||||
def get_destination_message_ids(dst, folder_name):
|
||||
"""
|
||||
Fetches only the Message-IDs from the destination folder
|
||||
to build a set for deduplication.
|
||||
"""
|
||||
existing_ids = set()
|
||||
|
||||
# Select folder on destination
|
||||
try:
|
||||
status, _ = dst.select(f'"{folder_name}"', readonly=True)
|
||||
if status != 'OK':
|
||||
return existing_ids # Folder probably empty or doesn't exist yet
|
||||
except:
|
||||
return existing_ids
|
||||
|
||||
status, messages = dst.search(None, 'ALL')
|
||||
msg_ids = messages[0].split()
|
||||
|
||||
if not msg_ids:
|
||||
return existing_ids
|
||||
|
||||
print(f" -> Indexing {len(msg_ids)} existing emails on destination for deduplication...")
|
||||
|
||||
# Fetch headers in batches would be faster, but simple iteration is safer for compatibility
|
||||
# We fetch ONLY the header fields to save bandwidth
|
||||
for num in msg_ids:
|
||||
try:
|
||||
# PEEK prevents marking read, though usually not critical on destination
|
||||
typ, data = dst.fetch(num, '(BODY.PEEK[HEADER.FIELDS (MESSAGE-ID)])')
|
||||
if typ == 'OK':
|
||||
header_data = data[0][1]
|
||||
# Parse the byte string into an email object to extract ID reliably
|
||||
msg = email.message_from_bytes(header_data, policy=default)
|
||||
msg_id = msg.get("Message-ID")
|
||||
if msg_id:
|
||||
existing_ids.add(msg_id.strip())
|
||||
except Exception:
|
||||
continue
|
||||
|
||||
return existing_ids
|
||||
|
||||
def migrate_folder(src, dst, folder_name, dry_run):
|
||||
print(f"\n--- Processing: {folder_name} ---")
|
||||
|
||||
# 1. Prepare Destination (Create & Index)
|
||||
existing_ids = set()
|
||||
if not dry_run:
|
||||
try:
|
||||
dst.create(f'"{folder_name}"')
|
||||
except imaplib.IMAP4.error:
|
||||
pass
|
||||
|
||||
# Build dedupe index
|
||||
existing_ids = get_destination_message_ids(dst, folder_name)
|
||||
|
||||
# 2. Select Source
|
||||
status, _ = src.select(f'"{folder_name}"', readonly=True)
|
||||
if status != 'OK':
|
||||
print(f"Skipping {folder_name}: Could not select on source.")
|
||||
return
|
||||
|
||||
# 3. Get Source Messages
|
||||
status, messages = src.search(None, 'ALL')
|
||||
msg_ids = messages[0].split()
|
||||
total = len(msg_ids)
|
||||
print(f"Found {total} emails in source '{folder_name}'.")
|
||||
|
||||
moved_count = 0
|
||||
skipped_count = 0
|
||||
|
||||
for msg_id in msg_ids:
|
||||
# Fetch Flags, Date, and Body
|
||||
res, msg_data = src.fetch(msg_id, '(FLAGS INTERNALDATE BODY.PEEK[])')
|
||||
|
||||
if res != 'OK':
|
||||
print(f"X", end="", flush=True)
|
||||
continue
|
||||
|
||||
raw_email = None
|
||||
flags = None
|
||||
date_str = None
|
||||
|
||||
for part in msg_data:
|
||||
if isinstance(part, tuple):
|
||||
flags_match = re.search(r'FLAGS \((.*?)\)', part[0].decode('utf-8'))
|
||||
if flags_match:
|
||||
flags = flags_match.group(1)
|
||||
|
||||
date_match = re.search(r'INTERNALDATE \"(.*?)\"', part[0].decode('utf-8'))
|
||||
if date_match:
|
||||
date_str = date_match.group(1)
|
||||
|
||||
raw_email = part[1]
|
||||
|
||||
if raw_email:
|
||||
# Check Deduplication
|
||||
msg_obj = email.message_from_bytes(raw_email, policy=default)
|
||||
src_msg_id = msg_obj.get("Message-ID")
|
||||
|
||||
if src_msg_id and src_msg_id.strip() in existing_ids:
|
||||
# Duplicate found
|
||||
print("S", end="", flush=True) # S for Skipped
|
||||
skipped_count += 1
|
||||
continue
|
||||
|
||||
# Not a duplicate, proceed to append
|
||||
if dry_run:
|
||||
print(".", end="", flush=True)
|
||||
moved_count += 1
|
||||
else:
|
||||
delivery_time = imaplib.Time2Internaldate(time.localtime())
|
||||
if date_str:
|
||||
delivery_time = f'"{date_str}"'
|
||||
|
||||
flag_str = f'({flags})' if flags else None
|
||||
|
||||
try:
|
||||
dst.append(f'"{folder_name}"', flag_str, delivery_time, raw_email)
|
||||
print(".", end="", flush=True)
|
||||
moved_count += 1
|
||||
# Update local index so we don't duplicate if source has duplicates
|
||||
if src_msg_id:
|
||||
existing_ids.add(src_msg_id.strip())
|
||||
except Exception as e:
|
||||
print(f"E", end="", flush=True) # E for Error
|
||||
|
||||
print(f"\nResult: {moved_count} moved, {skipped_count} skipped (duplicates).")
|
||||
|
||||
def main():
|
||||
args = parse_args()
|
||||
|
||||
print("Connecting to Source...")
|
||||
src = connect_imap(args.src_host, args.src_user, args.src_pass, args.ssl)
|
||||
|
||||
print("Connecting to Destination...")
|
||||
dst = connect_imap(args.dst_host, args.dst_user, args.dst_pass, args.ssl)
|
||||
|
||||
print("Fetching folder list...")
|
||||
folders = get_folder_list(src)
|
||||
print(f"Detected folders: {folders}")
|
||||
|
||||
if args.dry_run:
|
||||
print("\n*** DRY RUN MODE ***")
|
||||
|
||||
for folder in folders:
|
||||
migrate_folder(src, dst, folder, args.dry_run)
|
||||
|
||||
print("\n\nMigration Complete.")
|
||||
src.logout()
|
||||
dst.logout()
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user