Skip to content

Pipeline

The pipeline (pipeline.py) sits between adapters and output generators. It takes raw adapter output and enriches it into the full ExtractionResult.

Entry Point

result = build_result(messages, calendar_events, source_files, source_format)

build_result() runs three operations in sequence:

  1. Thread grouping -- group_threads(messages)
  2. Contact aggregation -- aggregate_contacts(messages)
  3. Provenance computation -- SHA-256 hash, version, timestamp

Thread Grouping

group_threads() uses a three-signal strategy to group messages into conversation threads:

Signal 1: References / In-Reply-To (RFC 5322)

The most reliable signal. Uses a union-find (disjoint set) algorithm:

  1. Build a lookup from Message-ID to Message
  2. For each message with a Message-ID, create a set containing just that ID
  3. For each In-Reply-To or References header, union the sets together
  4. Messages in the same set belong to the same thread

Union-find is preferred over simple grouping because it handles:

  • Forked conversations -- Same subject, different thread roots
  • Subject changes -- "Re: Budget" renamed to "Budget Update" but references still link them
  • Cross-source merging -- Messages from different archives that reference each other

Path compression ensures near-constant-time operations even for deep chains.

Signal 2: Conversation-Index (PST)

Microsoft Outlook stores a proprietary Conversation-Index in transport headers. The PST adapter extracts this and stores it in the message's extra dict. It's used as a secondary signal when references are missing.

Signal 3: Normalized Subject Fallback

Messages without Message-IDs (common in older archives or poorly-formatted exports) fall back to subject-based grouping. Subjects are normalized by stripping prefixes: Re:, RE:, re:, Fwd:, fwd:, FW:, Fw:, fw:.

Thread Assembly

After grouping, threads are assembled into Thread objects:

  • Messages sorted chronologically within each thread
  • Participants deduplicated across all messages
  • First/last date computed from message timestamps
  • Threads sorted by message count (busiest first)

Only EMAIL kind messages are threaded. Calendar events, contacts, tasks, and notes are excluded.

Contact Aggregation

aggregate_contacts() builds a contact list from all observed email addresses:

  • Scans every message's from_address and all_recipients
  • Groups by lowercase email address
  • Tracks per-contact:
    • sent_count -- messages where this contact was the sender
    • received_count -- messages where this contact was a recipient
    • message_count -- total (sent + received)
    • first_seen / last_seen -- date range
    • domains -- all email domains observed for this contact
    • name -- best available display name (updated if a better name is found later)

Contacts are sorted by message_count descending (most active first).

Empty addresses (no email and no name) are skipped.

Provenance

build_result() automatically computes:

  • source_hash -- SHA-256 digest of all source files (sorted by path). For directories (Maildir), hashes the path + file count.
  • tool_version -- imported from pikoclaw.__version__
  • extracted_at -- current UTC timestamp in ISO 8601

These are always computed. Provenance is not optional.

Multi-Source Merging

When processing multiple sources, the CLI collects all messages and calendar events, then calls build_result() once with the combined data. This means:

  • Threading works across sources (a message in one PST can thread with a reply in an MBOX)
  • Contact aggregation merges all sources into a single contact list
  • The source_format is set to "mixed" when multiple formats are used