Pipeline¶
The pipeline (pipeline.py) sits between adapters and output generators. It takes raw adapter output and enriches it into the full ExtractionResult.
Entry Point¶
build_result() runs three operations in sequence:
- Thread grouping --
group_threads(messages) - Contact aggregation --
aggregate_contacts(messages) - Provenance computation -- SHA-256 hash, version, timestamp
Thread Grouping¶
group_threads() uses a three-signal strategy to group messages into conversation threads:
Signal 1: References / In-Reply-To (RFC 5322)¶
The most reliable signal. Uses a union-find (disjoint set) algorithm:
- Build a lookup from Message-ID to Message
- For each message with a Message-ID, create a set containing just that ID
- For each In-Reply-To or References header, union the sets together
- Messages in the same set belong to the same thread
Union-find is preferred over simple grouping because it handles:
- Forked conversations -- Same subject, different thread roots
- Subject changes -- "Re: Budget" renamed to "Budget Update" but references still link them
- Cross-source merging -- Messages from different archives that reference each other
Path compression ensures near-constant-time operations even for deep chains.
Signal 2: Conversation-Index (PST)¶
Microsoft Outlook stores a proprietary Conversation-Index in transport headers. The PST adapter extracts this and stores it in the message's extra dict. It's used as a secondary signal when references are missing.
Signal 3: Normalized Subject Fallback¶
Messages without Message-IDs (common in older archives or poorly-formatted exports) fall back to subject-based grouping. Subjects are normalized by stripping prefixes: Re:, RE:, re:, Fwd:, fwd:, FW:, Fw:, fw:.
Thread Assembly¶
After grouping, threads are assembled into Thread objects:
- Messages sorted chronologically within each thread
- Participants deduplicated across all messages
- First/last date computed from message timestamps
- Threads sorted by message count (busiest first)
Only EMAIL kind messages are threaded. Calendar events, contacts, tasks, and notes are excluded.
Contact Aggregation¶
aggregate_contacts() builds a contact list from all observed email addresses:
- Scans every message's
from_addressandall_recipients - Groups by lowercase email address
- Tracks per-contact:
- sent_count -- messages where this contact was the sender
- received_count -- messages where this contact was a recipient
- message_count -- total (sent + received)
- first_seen / last_seen -- date range
- domains -- all email domains observed for this contact
- name -- best available display name (updated if a better name is found later)
Contacts are sorted by message_count descending (most active first).
Empty addresses (no email and no name) are skipped.
Provenance¶
build_result() automatically computes:
- source_hash -- SHA-256 digest of all source files (sorted by path). For directories (Maildir), hashes the path + file count.
- tool_version -- imported from
pikoclaw.__version__ - extracted_at -- current UTC timestamp in ISO 8601
These are always computed. Provenance is not optional.
Multi-Source Merging¶
When processing multiple sources, the CLI collects all messages and calendar events, then calls build_result() once with the combined data. This means:
- Threading works across sources (a message in one PST can thread with a reply in an MBOX)
- Contact aggregation merges all sources into a single contact list
- The
source_formatis set to"mixed"when multiple formats are used