#!/usr/bin/env python3 """Lädt Anhänge aus einem Gmail-Postfach per IMAP herunter.""" from __future__ import annotations import argparse import email import getpass import imaplib import os import re import sys from dataclasses import dataclass from datetime import datetime from email.header import decode_header, make_header from email.message import Message from email.utils import parsedate_to_datetime from pathlib import Path GMAIL_IMAP_HOST = "imap.gmail.com" GMAIL_IMAP_PORT = 993 @dataclass(frozen=True) class DownloadStats: messages_seen: int = 0 attachments_saved: int = 0 attachments_skipped: int = 0 def decode_mime_header(value: str | None, fallback: str = "") -> str: if not value: return fallback try: return str(make_header(decode_header(value))).strip() except Exception: return value.strip() def safe_filename(name: str, fallback: str) -> str: cleaned = decode_mime_header(name, fallback) cleaned = cleaned.replace("\x00", "") cleaned = re.sub(r"[\\/:\*\?\"<>\|]+", "_", cleaned) cleaned = re.sub(r"\s+", " ", cleaned).strip(" .") return cleaned or fallback def unique_path(path: Path, overwrite: bool) -> Path: if overwrite or not path.exists(): return path stem = path.stem suffix = path.suffix parent = path.parent counter = 2 while True: candidate = parent / f"{stem}-{counter}{suffix}" if not candidate.exists(): return candidate counter += 1 def message_folder(message: Message, message_id: bytes) -> str: date_header = decode_mime_header(message.get("Date")) subject = safe_filename(decode_mime_header(message.get("Subject"), "ohne-betreff"), "ohne-betreff") date_prefix = "ohne-datum" if date_header: try: date_prefix = parsedate_to_datetime(date_header).strftime("%Y-%m-%d_%H-%M-%S") except Exception: pass short_id = message_id.decode("ascii", errors="ignore") or "mail" return safe_filename(f"{date_prefix}_{short_id}_{subject[:80]}", f"mail-{short_id}") def build_search_query(args: argparse.Namespace) -> str: parts = [args.search] if args.since: parts.append(f'SINCE "{args.since}"') if args.before: parts.append(f'BEFORE "{args.before}"') return f"({' '.join(parts)})" def parse_imap_date(value: str | None, option_name: str) -> str | None: if not value: return None try: return datetime.strptime(value, "%Y-%m-%d").strftime("%d-%b-%Y") except ValueError: raise argparse.ArgumentTypeError(f"{option_name} muss im Format YYYY-MM-DD angegeben werden.") def iter_attachment_parts(message: Message): attachment_index = 1 for part in message.walk(): if part.is_multipart(): continue disposition = part.get_content_disposition() filename = part.get_filename() if disposition != "attachment" and not filename: continue fallback = f"anhang-{attachment_index}" attachment_index += 1 yield part, safe_filename(filename or fallback, fallback) def download_attachments(args: argparse.Namespace) -> DownloadStats: password = args.password or os.environ.get("GMAIL_APP_PASSWORD") if not password: password = getpass.getpass("Gmail App-Passwort: ") stats = DownloadStats() with imaplib.IMAP4_SSL(GMAIL_IMAP_HOST, GMAIL_IMAP_PORT) as client: client.login(args.email, password) if args.list_mailboxes: status, mailbox_data = client.list() if status != "OK": raise RuntimeError("IMAP-Postfächer konnten nicht gelesen werden.") for mailbox in mailbox_data: print(f"KI-AGENT: {mailbox.decode('utf-8', errors='replace')}") return stats output_dir = args.output.expanduser().resolve() output_dir.mkdir(parents=True, exist_ok=True) status, select_data = client.select(args.mailbox, readonly=True) if status != "OK": details = b" ".join(select_data or []).decode("utf-8", errors="replace") raise RuntimeError( f'IMAP-Postfach "{args.mailbox}" konnte nicht geöffnet werden. ' f"Nutze --list-mailboxes, um verfügbare Postfächer anzuzeigen. {details}".strip() ) status, search_data = client.search(None, build_search_query(args)) if status != "OK": raise RuntimeError("IMAP-Suche ist fehlgeschlagen.") message_ids = search_data[0].split() stats = DownloadStats(messages_seen=len(message_ids)) for position, message_id in enumerate(message_ids, start=1): status, fetch_data = client.fetch(message_id, "(RFC822)") if status != "OK" or not fetch_data: print(f"KI-AGENT: Mail {message_id!r} konnte nicht gelesen werden.", file=sys.stderr) continue raw_message = next((item[1] for item in fetch_data if isinstance(item, tuple)), None) if not raw_message: continue message = email.message_from_bytes(raw_message) target_dir = output_dir / message_folder(message, message_id) if args.group_by_message else output_dir target_dir.mkdir(parents=True, exist_ok=True) for part, filename in iter_attachment_parts(message): payload = part.get_payload(decode=True) if payload is None: stats = DownloadStats( stats.messages_seen, stats.attachments_saved, stats.attachments_skipped + 1, ) continue target = unique_path(target_dir / filename, args.overwrite) target.write_bytes(payload) stats = DownloadStats( stats.messages_seen, stats.attachments_saved + 1, stats.attachments_skipped, ) print(f"KI-AGENT: Gespeichert: {target}") if args.progress and position % args.progress == 0: print(f"KI-AGENT: {position}/{len(message_ids)} Mails verarbeitet.") return stats def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser( description="Lädt alle Anhänge aus einem Gmail-Postfach per IMAP herunter.", ) parser.add_argument("--email", required=True, help="Gmail-Adresse, z. B. name@gmail.com") parser.add_argument( "--password", help="Gmail App-Passwort. Sicherer: Umgebungsvariable GMAIL_APP_PASSWORD verwenden.", ) parser.add_argument( "--output", type=Path, default=Path("gmail-anhaenge"), help="Zielordner für Anhänge. Standard: gmail-anhaenge", ) parser.add_argument( "--mailbox", default="[Gmail]/All Mail", help='IMAP-Postfach/Label. Standard: "[Gmail]/All Mail"', ) parser.add_argument( "--list-mailboxes", action="store_true", help="Verfügbare IMAP-Postfächer anzeigen und beenden.", ) parser.add_argument( "--search", default="ALL", help='IMAP-Suche. Beispiele: ALL, UNSEEN, FROM "person@example.com"', ) parser.add_argument("--since", type=lambda value: parse_imap_date(value, "--since"), help="Ab Datum YYYY-MM-DD") parser.add_argument("--before", type=lambda value: parse_imap_date(value, "--before"), help="Vor Datum YYYY-MM-DD") parser.add_argument( "--group-by-message", action="store_true", help="Anhänge je Mail in einen eigenen Unterordner speichern.", ) parser.add_argument("--overwrite", action="store_true", help="Bestehende Dateien überschreiben.") parser.add_argument( "--progress", type=int, default=100, help="Fortschritt alle N Mails ausgeben. 0 deaktiviert die Ausgabe.", ) return parser.parse_args() def main() -> int: try: stats = download_attachments(parse_args()) except KeyboardInterrupt: print("\nKI-AGENT: Abgebrochen.", file=sys.stderr) return 130 except Exception as exc: print(f"KI-AGENT: Fehler: {exc}", file=sys.stderr) return 1 print( "KI-AGENT: Fertig. " f"{stats.messages_seen} Mails geprüft, " f"{stats.attachments_saved} Anhänge gespeichert, " f"{stats.attachments_skipped} Anhänge übersprungen." ) return 0 if __name__ == "__main__": raise SystemExit(main())