Files
MigrateEmails/migrate_emails_db.py

288 lines
9.8 KiB
Python

import imaplib
import argparse
import sys
import re
import time
import email
import sqlite3
from email.policy import default
# Increase limit for large emails
imaplib._MAXLINE = 10000000
# -------------------------------------------------------------------------
# DATABASE MANAGER
# -------------------------------------------------------------------------
class MigrationDB:
def __init__(self, db_path="migration_history.db"):
self.conn = sqlite3.connect(db_path)
self.create_table()
def create_table(self):
query = """
CREATE TABLE IF NOT EXISTS migration_log (
id INTEGER PRIMARY KEY AUTOINCREMENT,
src_host TEXT,
src_user TEXT,
folder TEXT,
message_id TEXT,
status TEXT,
timestamp DATETIME DEFAULT CURRENT_TIMESTAMP,
UNIQUE(src_host, src_user, folder, message_id)
)
"""
self.conn.execute(query)
self.conn.commit()
def is_processed(self, src_host, src_user, folder, message_id):
"""Checks if a message ID has been successfully migrated."""
cursor = self.conn.cursor()
cursor.execute("""
SELECT 1 FROM migration_log
WHERE src_host=? AND src_user=? AND folder=? AND message_id=? AND status='success'
""", (src_host, src_user, folder, message_id))
return cursor.fetchone() is not None
def log_success(self, src_host, src_user, folder, message_id):
try:
query = """
INSERT OR REPLACE INTO migration_log (src_host, src_user, folder, message_id, status)
VALUES (?, ?, ?, ?, 'success')
"""
self.conn.execute(query, (src_host, src_user, folder, message_id))
self.conn.commit()
except sqlite3.Error as e:
print(f"DB Error: {e}")
def log_error(self, src_host, src_user, folder, message_id, error_msg):
# We append error status but don't replace if it was previously success
try:
query = """
INSERT OR IGNORE INTO migration_log (src_host, src_user, folder, message_id, status)
VALUES (?, ?, ?, ?, ?)
"""
self.conn.execute(query, (src_host, src_user, folder, message_id, f"error: {error_msg}"))
self.conn.commit()
except sqlite3.Error:
pass
def close(self):
self.conn.close()
# -------------------------------------------------------------------------
# IMAP UTILS
# -------------------------------------------------------------------------
def parse_args():
parser = argparse.ArgumentParser(description="Migrate emails with SQLite State Tracking.")
parser.add_argument('--src-host', required=True)
parser.add_argument('--src-user', required=True)
parser.add_argument('--src-pass', required=True)
parser.add_argument('--dst-host', required=True)
parser.add_argument('--dst-user', required=True)
parser.add_argument('--dst-pass', required=True)
parser.add_argument('--dry-run', action='store_true', help="Simulate actions")
parser.add_argument('--ssl', action='store_true', default=True, help="Use SSL")
parser.add_argument('--db', default="migration_history.db", help="Path to SQLite DB")
return parser.parse_args()
def connect_imap(host, user, password, use_ssl=True):
try:
if use_ssl:
mail = imaplib.IMAP4_SSL(host)
else:
mail = imaplib.IMAP4(host)
mail.login(user, password)
return mail
except Exception as e:
print(f"Error connecting to {host}: {e}")
sys.exit(1)
def get_folder_list(mail):
status, folders = mail.list()
if status != 'OK':
return []
clean_folders = []
for folder in folders:
decoded = folder.decode('utf-8')
parts = re.search(r' \".\" \"?(.+?)\"?$', decoded) or re.search(r' \S \S (.+)$', decoded)
if parts:
name = parts.group(1).replace('"', '')
clean_folders.append(name)
else:
clean_folders.append(decoded.split(' ')[-1].replace('"', ''))
return clean_folders
def get_message_id_from_bytes(raw_header):
"""Safe extraction of Message-ID from bytes"""
try:
msg = email.message_from_bytes(raw_header, policy=default)
mid = msg.get("Message-ID")
return mid.strip() if mid else None
except:
return None
# -------------------------------------------------------------------------
# MIGRATION LOGIC
# -------------------------------------------------------------------------
def migrate_folder(src, dst, folder_name, db, args):
print(f"\n--- Processing: {folder_name} ---")
# 1. Ensure Destination Exists
if not args.dry_run:
try:
dst.create(f'"{folder_name}"')
except imaplib.IMAP4.error:
pass
# 2. Select Source
status, _ = src.select(f'"{folder_name}"', readonly=True)
if status != 'OK':
print(f"Skipping {folder_name}: Could not select.")
return
# 3. Batch Fetch ALL Source Message-IDs first
# This prevents the connection from timing out during a long read loop
# and allows us to filter against the DB instantly.
print(" -> Scanning source messages...")
status, messages = src.search(None, 'ALL')
if status != 'OK' or not messages[0]:
print(" -> Folder empty.")
return
msg_nums = messages[0].split()
total_msgs = len(msg_nums)
# We fetch headers in bulk (e.g. 1:1000) or simply iterate.
# For stability, we iterate but only fetch headers first.
to_migrate = [] # List of (imap_sequence_num, message_id)
print(f" -> Analyzing {total_msgs} emails against local DB...")
# Pre-fetch check loop
for num in msg_nums:
# Fetch ONLY the Message-ID header (Very fast)
typ, data = src.fetch(num, '(BODY.PEEK[HEADER.FIELDS (MESSAGE-ID)])')
if typ != 'OK':
continue
header_data = data[0][1]
msg_id = get_message_id_from_bytes(header_data)
if not msg_id:
# If no Message-ID, we cannot reliably track it in DB.
# We treat it as "always migrate" or skip. Here we skip to avoid dupes.
# You can change logic to migrate these blindly if needed.
continue
# Check DB
if db.is_processed(args.src_host, args.src_user, folder_name, msg_id):
# Already migrated
continue
else:
to_migrate.append((num, msg_id))
print(f" -> {len(to_migrate)} new emails to migrate. ({total_msgs - len(to_migrate)} skipped).")
# 4. Migration Loop
count = 0
for num, msg_id in to_migrate:
count += 1
# Fetch Full Body + Flags
res, msg_data = src.fetch(num, '(FLAGS INTERNALDATE BODY.PEEK[])')
if res != 'OK':
print("X", end="", flush=True)
db.log_error(args.src_host, args.src_user, folder_name, msg_id, "Fetch Error")
continue
raw_email = None
flags = None
date_str = None
for part in msg_data:
if isinstance(part, tuple):
flags_match = re.search(r'FLAGS \((.*?)\)', part[0].decode('utf-8'))
if flags_match:
flags = flags_match.group(1)
date_match = re.search(r'INTERNALDATE \"(.*?)\"', part[0].decode('utf-8'))
if date_match:
date_str = date_match.group(1)
raw_email = part[1]
if not raw_email:
continue
if args.dry_run:
print(".", end="", flush=True)
else:
try:
delivery_time = imaplib.Time2Internaldate(time.localtime())
if date_str:
delivery_time = f'"{date_str}"'
flag_str = f'({flags})' if flags else None
dst.append(f'"{folder_name}"', flag_str, delivery_time, raw_email)
# SUCCESS: Log to DB
db.log_success(args.src_host, args.src_user, folder_name, msg_id)
print(".", end="", flush=True)
except Exception as e:
print(f"E", end="", flush=True)
db.log_error(args.src_host, args.src_user, folder_name, msg_id, str(e))
# If connection dropped here, the script crashes,
# but the DB has saved progress up to the previous email.
def main():
args = parse_args()
db = MigrationDB(args.db)
try:
print(f"Logging to database: {args.db}")
print("Connecting to Source...")
src = connect_imap(args.src_host, args.src_user, args.src_pass, args.ssl)
print("Connecting to Destination...")
dst = connect_imap(args.dst_host, args.dst_user, args.dst_pass, args.ssl)
print("Fetching folder list...")
folders = get_folder_list(src)
if args.dry_run:
print("\n*** DRY RUN MODE ***")
for folder in folders:
try:
migrate_folder(src, dst, folder, db, args)
except (imaplib.IMAP4.abort, ConnectionResetError, BrokenPipeError) as e:
print(f"\nCRITICAL CONNECTION ERROR on folder {folder}: {e}")
print("The script will exit. Please run it again; it will resume from where it left off.")
sys.exit(1)
except Exception as e:
print(f"\nUnexpected error processing {folder}: {e}")
finally:
try:
src.logout()
except: pass
try:
dst.logout()
except: pass
db.close()
print("\n\nProcess Finished.")
if __name__ == "__main__":
main()