Files
MigrateEmails/migrate_emails_old.oy

172 lines
5.9 KiB
Plaintext

import imaplib
import argparse
import sys
import re
import time
from email.utils import parsedate_to_datetime
# Increase limit for large emails (default is often too small)
imaplib._MAXLINE = 10000000
def parse_args():
parser = argparse.ArgumentParser(description="Migrate emails via IMAP.")
# Source Creds
parser.add_argument('--src-host', required=True, help="Source IMAP Hostname")
parser.add_argument('--src-user', required=True, help="Source Username")
parser.add_argument('--src-pass', required=True, help="Source Password")
# Dest Creds
parser.add_argument('--dst-host', required=True, help="Destination IMAP Hostname")
parser.add_argument('--dst-user', required=True, help="Destination Username")
parser.add_argument('--dst-pass', required=True, help="Destination Password")
parser.add_argument('--dry-run', action='store_true', help="Simulate actions without moving data")
parser.add_argument('--ssl', action='store_true', default=True, help="Use SSL (Default: True)")
return parser.parse_args()
def connect_imap(host, user, password, use_ssl=True):
try:
if use_ssl:
mail = imaplib.IMAP4_SSL(host)
else:
mail = imaplib.IMAP4(host)
mail.login(user, password)
return mail
except Exception as e:
print(f"Error connecting to {host}: {e}")
sys.exit(1)
def get_folder_list(mail):
"""Returns a list of folder names."""
status, folders = mail.list()
if status != 'OK':
print("Failed to retrieve folder list.")
return []
clean_folders = []
for folder in folders:
# Regex to extract the actual folder name typically in quotes or plain text
# Format usually: (\HasNoChildren) "/" "FolderName"
# We try to grab everything after the delimiter
decoded = folder.decode('utf-8')
# Split by the delimiter (usually the second item in the tuple response)
parts = re.search(r' \".\" \"?(.+?)\"?$', decoded) or re.search(r' \S \S (.+)$', decoded)
if parts:
name = parts.group(1).replace('"', '')
clean_folders.append(name)
else:
# Fallback raw extraction if regex fails
clean_folders.append(decoded.split(' ')[-1].replace('"', ''))
return clean_folders
def migrate_folder(src, dst, folder_name, dry_run):
print(f"\n--- Processing: {folder_name} ---")
# Select Source (Read-Only to be safe)
status, _ = src.select(f'"{folder_name}"', readonly=True)
if status != 'OK':
print(f"Skipping {folder_name}: Could not select on source.")
return
# Check/Create Destination
if not dry_run:
try:
dst.create(f'"{folder_name}"')
except imaplib.IMAP4.error:
# Usually means folder exists
pass
# Search all messages
status, messages = src.search(None, 'ALL')
msg_ids = messages[0].split()
total = len(msg_ids)
print(f"Found {total} emails in {folder_name}.")
count = 0
for msg_id in msg_ids:
count += 1
# 1. Fetch Data & Flags
# Using BODY.PEEK[] prevents marking the email as read on the source
res, msg_data = src.fetch(msg_id, '(FLAGS INTERNALDATE BODY.PEEK[])')
if res != 'OK':
print(f"Error reading message {msg_id}")
continue
# msg_data returned as a list of tuples.
# [0] contains (metadata, content), [1] is closing parenthesis usually
raw_email = None
flags = None
date_str = None
for part in msg_data:
if isinstance(part, tuple):
# Parse Flags
flags_match = re.search(r'FLAGS \((.*?)\)', part[0].decode('utf-8'))
if flags_match:
flags = flags_match.group(1)
# Parse Internal Date
date_match = re.search(r'INTERNALDATE \"(.*?)\"', part[0].decode('utf-8'))
if date_match:
date_str = date_match.group(1)
raw_email = part[1]
# 2. Append to Destination
if dry_run:
# Just print a dot progress
print(".", end="", flush=True)
else:
if raw_email:
# Prepare Time
# imaplib.Time2Internaldate is available but often the raw string from source works best
# If date_str is available, we use it to preserve the receive date
delivery_time = imaplib.Time2Internaldate(time.localtime())
if date_str:
delivery_time = f'"{date_str}"'
# Prepare Flags (ensure they are properly formatted)
flag_str = f'({flags})' if flags else None
try:
dst.append(f'"{folder_name}"', flag_str, delivery_time, raw_email)
print(".", end="", flush=True)
except Exception as e:
print(f"X ({e})", end="", flush=True)
print(f"\nMoved {count}/{total} emails from {folder_name}.")
def main():
args = parse_args()
print("Connecting to Source...")
src = connect_imap(args.src_host, args.src_user, args.src_pass, args.ssl)
print("Connecting to Destination...")
dst = connect_imap(args.dst_host, args.dst_user, args.dst_pass, args.ssl)
print("Fetching folder list...")
folders = get_folder_list(src)
print(f"Detected folders: {folders}")
if args.dry_run:
print("\n*** DRY RUN MODE: No changes will be made ***")
for folder in folders:
# Skip weird Gmail folders if necessary like '[Gmail]' root if it causes issues
# For now, we attempt all.
migrate_folder(src, dst, folder, args.dry_run)
print("\n\nMigration Complete.")
src.logout()
dst.logout()
if __name__ == "__main__":
main()