
How we built the data model and document processing pipeline behind our dataset engine — 5 PostgreSQL tables with multi-dimensional embedding columns, a semantic text chunker, and a BullMQ flow that turns uploaded files into searchable segments.
Auxx.ai automates customer support for Shopify businesses. AI needs context — product catalogs, return policies, shipping docs, FAQ pages. That context lives in datasets. Without them, the AI is guessing. With them, it knows your business.
This is part 1 of a 3-part series on how we built a production dataset engine with hybrid search from scratch. This post covers the data model and the document processing pipeline — from file upload to chunked, searchable segments. Part 2 covers embedding generation and vector search. Part 3 covers hybrid search and the dataset management UI.
We evaluated off-the-shelf vector databases early on — Pinecone, Weaviate, Qdrant. They solve storage, but not the pipeline around it. Extraction, chunking, multi-tenant isolation, embedding model flexibility, search analytics — we'd still need to build 80% of the system ourselves. And we'd have another service to deploy, another connection pool, another billing line item.
What we built instead: a PostgreSQL-native dataset engine using pgvector. The entire system is 5 schema tables, 5 services, 3 search implementations, and about 3,500 lines of TypeScript. It handles PDF/DOCX/HTML/TXT extraction, semantic chunking, multi-dimensional embedding storage, and three search strategies (vector, full-text, hybrid).
The schema has a clean hierarchy:
Organization
└── Dataset (name, embeddingModel, chunkSettings, vectorDimension)
└── Document (file, status, checksum, content)
└── DocumentSegment (chunk, position, embedding columns, searchVector)
Dataset ← DatasetSearchQuery ← DatasetSearchResult → DocumentSegment
Five tables. Dataset holds configuration. Document tracks the file lifecycle. DocumentSegment stores the chunked content with embeddings. DatasetSearchQuery and DatasetSearchResult track every search for analytics.
The most important design decision is where configuration lives. Embedding model and vector dimension are on the Dataset, not the Document. All documents in a dataset share the same embedding model and dimension. This makes multi-dataset search possible — you know the dimension before querying, which lets you group datasets by dimension and minimize embedding API calls. Document-level overrides exist only for chunk settings (size, overlap, delimiter).
// packages/database/src/db/schema/dataset.ts
const DEFAULT_CHUNK_SETTINGS: ChunkSettings = {
strategy: 'FIXED_SIZE',
size: 1024,
overlap: 50,
delimiter: '\n\n',
preprocessing: {
normalizeWhitespace: true,
removeUrlsAndEmails: false,
},
}
export const Dataset = pgTable(
'Dataset',
{
id: text().$defaultFn(() => createId()).primaryKey().notNull(),
name: text().notNull(),
description: text(),
status: datasetStatus().default('ACTIVE').notNull(),
documentCount: integer().default(0).notNull(),
totalSize: bigint({ mode: 'number' }).default(0).notNull(),
lastIndexedAt: timestamp({ precision: 3 }),
// Consolidated chunking settings as JSONB
chunkSettings: jsonb().$type<ChunkSettings>().default(DEFAULT_CHUNK_SETTINGS).notNull(),
// Embedding model in "provider:model" format (e.g., "openai:text-embedding-3-large")
embeddingModel: text(),
vectorDimension: integer(),
vectorDbType: vectorDbType().default('POSTGRESQL').notNull(),
searchConfig: jsonb().default({ searchType: 'hybrid' }).notNull(),
organizationId: text().notNull().references(/* ... */),
createdById: text().notNull().references(/* ... */),
// ...timestamps
},
(table) => [
uniqueIndex('Dataset_organizationId_name_key').using(
'btree', table.organizationId.asc().nullsLast(), table.name.asc().nullsLast()
),
index('idx_dataset_org_status').using('btree', table.organizationId.asc(), table.status.asc()),
]
)
A few things worth calling out.
chunkSettings as JSONB. Strategy, size, overlap, delimiter, preprocessing options — these are tightly coupled and always read/written together. JSONB avoids a 6-column spread and makes defaults easy: { ...DEFAULTS, ...userOverrides }.
provider:model format for embedding model. Splitting provider and model into two columns buys nothing — you always need both to generate an embedding. The colon-delimited string is the natural key for the ProviderRegistry lookup. "openai:text-embedding-3-large" routes to the OpenAI provider, picks the model, and knows the supported dimensions.
Unique constraint on (organizationId, name). Dataset names are unique per organization. This prevents confusion when searching — "which product-catalog dataset?" is never ambiguous.
Status is ACTIVE/INACTIVE, not soft-delete. Archiving a dataset stops it from being searchable but preserves the data. Hard delete exists for GDPR/cleanup.
// packages/database/src/db/schema/document.ts
export const Document = pgTable(
'Document',
{
id: text().$defaultFn(() => createId()).primaryKey().notNull(),
title: text().notNull(),
filename: text().notNull(),
mimeType: text().notNull(),
type: documentType().notNull(),
size: bigint({ mode: 'number' }).notNull(),
checksum: text().notNull(),
status: documentStatus().default('UPLOADED').notNull(),
content: text(),
// Processing metrics
processedAt: timestamp({ precision: 3 }),
errorMessage: text(),
processingTime: integer(),
totalChunks: integer().default(0).notNull(),
datasetId: text().notNull().references(/* ... */),
enabled: boolean().default(true).notNull(),
chunkSettings: jsonb(), // Optional document-specific override
mediaAssetId: text().references(/* ... */),
// ...timestamps
},
(table) => [
uniqueIndex('Document_datasetId_checksum_key').using(
'btree', table.datasetId.asc().nullsLast(), table.checksum.asc().nullsLast()
),
index('idx_document_dataset_enabled')
.using('btree', table.datasetId.asc())
.where(sql`enabled = true`),
]
)
Status progression is: UPLOADED → PROCESSING → INDEXED | FAILED | ARCHIVED.
Storing extracted content on the Document. Segments have chunks, but the full extracted text lives on the Document. This enables re-chunking without re-extraction — change your chunk size, and we split the existing content again instead of re-parsing the PDF. This makes chunk size tuning a fast feedback loop.
totalChunks on Document, not computed. Could be SELECT COUNT(*) FROM segments WHERE documentId = ?, but storing it avoids N+1 in list views where you show 50 documents with chunk counts.
Checksum-based deduplication. The unique constraint on (datasetId, checksum) means re-uploading the same PDF is a no-op, not a duplicate. The checksum is computed at upload time — before any processing happens.
Partial index on enabled = true. Search queries only care about enabled documents. The partial index means enabled-document lookups dont scan disabled rows.
This is the most interesting table in the schema.
// packages/database/src/db/schema/document-segment.ts
const tsvectorGenerated = customType<{ data: string }>({
dataType() {
return `tsvector GENERATED ALWAYS AS (to_tsvector('english', content)) STORED`
},
})
export const DocumentSegment = pgTable(
'DocumentSegment',
{
id: text().$defaultFn(() => createId()).primaryKey().notNull(),
content: text().notNull(),
position: integer().notNull(),
startOffset: integer().notNull(),
endOffset: integer().notNull(),
tokenCount: integer().notNull(),
// Multi-dimension embedding columns
embedding_512: vector({ dimensions: 512 }),
embedding_768: vector({ dimensions: 768 }),
embedding_1024: vector({ dimensions: 1024 }),
embedding_1536: vector({ dimensions: 1536 }),
embedding_3072: vector({ dimensions: 3072 }),
embeddingModel: text(),
embeddingDimension: integer(),
enabled: boolean().default(true).notNull(),
indexStatus: indexStatus().default('PENDING').notNull(),
// Stored generated tsvector for full-text search
searchVector: tsvectorGenerated('searchVector'),
documentId: text().notNull().references(/* ... */),
organizationId: text().notNull().references(/* ... */),
// ...timestamps
},
(table) => [
// HNSW indexes per dimension — partial, only indexed + enabled segments
index('idx_embedding_512_hnsw')
.using('hnsw', table.embedding_512.op('vector_cosine_ops'))
.where(sql`(embedding_512 IS NOT NULL AND enabled = true AND "indexStatus" = 'INDEXED'::"IndexStatus")`)
.with({ m: '16', ef_construction: '64' }),
index('idx_embedding_768_hnsw')
.using('hnsw', table.embedding_768.op('vector_cosine_ops'))
.where(sql`(embedding_768 IS NOT NULL AND enabled = true AND "indexStatus" = 'INDEXED'::"IndexStatus")`)
.with({ m: '16', ef_construction: '64' }),
// ... same for 1024, 1536
// NOTE: embedding_3072 has no index (pgvector max indexed dimension is 2000)
// Queries on 3072-dimension embeddings will use sequential scan
// GIN index for full-text search
index('idx_document_segment_search_vector').using('gin', table.searchVector),
]
)
5 embedding columns, not 1 dynamic column. PostgreSQL vectors are fixed-dimension per column. You cant store a 512-d and a 1536-d vector in the same vector(N) column. The multi-column approach means each dataset picks its dimension at creation, and segments use the corresponding column. The others stay null. This avoids dynamic DDL — no ALTER TABLE ADD COLUMN at runtime, no type casting, each dimension gets its own optimized index.
Null columns are free in PostgreSQL. An unused vector(3072) column costs nothing in storage — PostgreSQL doesnt store null values. A dataset using 512-d embeddings pays zero overhead for the unused columns.
HNSW indexes with partial filtering. Each dimension column gets an HNSW index, but only on segments where enabled = true AND indexStatus = 'INDEXED'. PENDING and FAILED segments are excluded from the index entirely. This keeps the index smaller and avoids searching segments that arent ready.
No HNSW index on 3072. pgvectors maximum indexed dimension is 2000. The 3072-d column falls back to sequential scan. For most use cases, 1536 is the sweet spot — text-embedding-3-large at 1536-d gives excellent quality with full HNSW index support.
searchVector is a stored generated column. The tsvector is auto-computed from content by PostgreSQL itself — not by our application code. When we insert a segment, the tsvector is generated at the database level. Its indexed with GIN for fast full-text search. At query time, we only need to match against the index, not process text.
Position + offsets for reconstruction. Each segment stores position (ordinal), startOffset, endOffset (byte offsets into original content), and tokenCount. This enables highlighting search results in the context of the original document.
When a document is uploaded, the first step is extracting text content from the file. A PDF is not text. A DOCX is a zip archive of XML. The extraction pipeline handles all of this.
// packages/lib/src/datasets/extractors/extractor-factory.ts
// Auto-import all extractors to register them
import './text-extractor'
import './pdf-extractor'
import './docx-extractor'
import './html-extractor'
export class ExtractorFactory {
static async extractWithFallback(
fileContent: Buffer,
mimeType: string,
extension: string,
metadata: { fileName?: string; documentId?: string },
options: ExtractorFactoryOptions = {}
): Promise<ExtractorFactoryResult> {
const fallbacksAttempted: string[] = []
// Get compatible extractors in priority order
const compatibleExtractors = ExtractorRegistry.getCompatibleExtractors(mimeType, extension)
// Try preferred extractor first if specified
if (options.preferredExtractor) {
const result = await ExtractorFactory.trySpecificExtractor(/* ... */)
if (result) return { ...result, extractorUsed: options.preferredExtractor, fallbacksAttempted }
fallbacksAttempted.push(options.preferredExtractor)
}
// Try extractors in priority order
for (const extractorInfo of compatibleExtractors) {
if (!extractorInfo.isAvailable) continue
try {
const result = await ExtractorFactory.trySpecificExtractor(extractorInfo.name, /* ... */)
if (result) return { ...result, extractorUsed: extractorInfo.name, fallbacksAttempted }
} catch (error) {
fallbacksAttempted.push(extractorInfo.name)
if (options.fallbackEnabled === false) throw error
}
}
throw new Error(
`All extraction attempts failed. Tried: ${fallbacksAttempted.join(', ')}`
)
}
}
Fallback chain, not single extractor. If the PDF extractor fails (corrupted file, scanned image with no text layer), the factory tries compatible fallbacks. The fallbacksAttempted array in the result tracks what was tried — useful for debugging "why did my PDF get extracted as plain text?"
Extractors self-register via imports. Each extractor file (e.g., pdf-extractor.ts) registers itself with the ExtractorRegistry on import. The factory imports all extractors at the top, which triggers registration. Adding a new extractor is: write the file, add the import.
Retry with backoff. Each extractor gets up to 2 retries with linear backoff (1s, 2s). PDF parsing can fail on first attempt due to memory pressure or temporary file system issues. Retrying is cheaper than failing the entire document.
Result includes metadata. The extraction result returns content, word count, and metadata (page count for PDFs, document properties for DOCX). This populates the Documents processing metrics without a second pass.
Splitting text into chunks sounds simple. It isnt. A naive split at character position 1024 will cut mid-sentence, mid-word, or mid-paragraph. The chunker walks backward from the target position looking for the best natural break point.
// packages/lib/src/datasets/processors/text-chunker.ts
export class TextChunker {
/**
* Find the best break point within a chunk window
* Prioritizes: custom delimiter > paragraphs > sentences > words > exact position
*/
private static findBestBreakPoint(
content: string,
start: number,
maxEnd: number,
options: ChunkingOptions
): number {
if (maxEnd >= content.length) return content.length
const window = content.slice(start, maxEnd)
const minDelimiterPosition = Math.floor(window.length * 0.1)
const minFallbackPosition = Math.floor(window.length * 0.5)
// 1. Custom delimiter (user-provided, respects intent)
if (options.delimiter) {
const delimiterPos = window.lastIndexOf(options.delimiter)
if (delimiterPos >= minDelimiterPosition) {
return start + delimiterPos + options.delimiter.length
}
}
// 2. Paragraph breaks (\n\n)
const paraBreak = window.lastIndexOf('\n\n')
if (paraBreak >= minFallbackPosition) return start + paraBreak + 2
// 3. Single newlines (\n)
const lineBreak = window.lastIndexOf('\n')
if (lineBreak >= minFallbackPosition) return start + lineBreak + 1
// 4. Sentence endings (. ? ! with newline variants)
const sentenceEndings = ['. ', '? ', '! ', '.\n', '?\n', '!\n']
let bestSentenceBreak = -1
for (const ending of sentenceEndings) {
const pos = window.lastIndexOf(ending)
if (pos > bestSentenceBreak) bestSentenceBreak = pos
}
if (bestSentenceBreak >= minFallbackPosition) {
return start + bestSentenceBreak + 1
}
// 5. Word breaks (space) — more relaxed threshold (30%)
const wordBreak = window.lastIndexOf(' ')
if (wordBreak >= Math.floor(window.length * 0.3)) return start + wordBreak + 1
// 6. Exact position (fallback)
return maxEnd
}
}
The priority order matters. Custom delimiters come first because the user set them intentionally — if they said "split on ---", we should respect that even if it means shorter chunks. Paragraph breaks are next because they represent natural topic boundaries. Sentence endings are the fallback for prose-heavy content. Word breaks prevent mid-word splits. Exact position is the last resort.
Two minimum thresholds. Custom delimiters use a relaxed 10% minimum (allow splitting early to respect user intent). Fallback heuristics use a stricter 50% minimum (dont create tiny chunks just because theres a paragraph break at position 30 of a 1024-char window).
Overlap is character-based, not token-based. Token counting requires model-specific tokenization. Character overlap (default: 50 chars) is model-agnostic and fast. For most English text, 50 characters is about 10-12 tokens — enough context for embedding coherence at chunk boundaries.
Minimum effective step size = 20% of chunk size. If overlap is so large that the sliding window advances less than 20% per step, youre generating near-duplicate chunks. The chunker validates this upfront:
// packages/lib/src/datasets/processors/text-chunker.ts
const effectiveStep = options.chunkSize - options.chunkOverlap
if (effectiveStep < options.chunkSize * 0.2) {
throw new Error(
`Overlap too large: effective step (${effectiveStep}) must be at least 20% of chunk size (${options.chunkSize}). ` +
`Maximum overlap for this chunk size: ${Math.floor(options.chunkSize * 0.8)}`
)
}
The chunker also includes content analysis — it can examine paragraph structure, sentence length, and structural complexity (code blocks, lists) to auto-optimize chunk size. Short paragraphs get larger chunks to include multiple paragraphs. Very long paragraphs get smaller chunks to maintain coherence.
The document processor ties everything together. It takes an uploaded file and turns it into searchable segments with embeddings.
// packages/lib/src/datasets/workers/document-processor.ts
export class DocumentProcessor {
static async processDocumentWithFlow(
jobData: DocumentProcessingJobData,
reporter: DocumentExecutionReporter,
signal?: AbortSignal
): Promise<WorkerJobResult> {
const { documentId, datasetId, organizationId, userId } = jobData
const startTime = Date.now()
// Update status to PROCESSING
await documentService.update(documentId, organizationId, {
status: DocumentStatus.PROCESSING
})
// 1. Extract content
await reporter.emit(DocumentEventType.EXTRACTION_STARTED, { step: 'extraction' })
const extractionResult = await DocumentProcessor.extractContent(jobData)
// 2. Fetch dataset settings BEFORE cleaning (preserves whitespace fix)
const [dataset] = await db
.select({ chunkSettings: schema.Dataset.chunkSettings })
.from(schema.Dataset)
.where(eq(schema.Dataset.id, datasetId))
.limit(1)
// Resolve settings: document > dataset > defaults
const settings = documentChunkSettings ?? datasetChunkSettings
// 3. Clean content WITH preprocessing settings
const cleanedContent = DocumentProcessor.preprocessContent(
extractionResult.content!,
settings?.preprocessing
)
// 4. Delete existing segments (for reprocessing)
await db.delete(schema.DocumentSegment)
.where(eq(schema.DocumentSegment.documentId, documentId))
// 5. Create segments
const segments = await DocumentProcessor.createSegments(
documentId, datasetId, cleanedContent, /* ... */ settings
)
// 6. Create embedding flow (BullMQ FlowProducer)
if (segments.length > 0) {
await createDocumentProcessingFlow({
documentId, datasetId, organizationId, userId,
segments: segments.map(s => ({ segmentId: s!.id, content: s!.content })),
})
}
}
}
The pipeline has 6 steps: extract, fetch settings, clean, delete old segments, create new segments, queue embeddings. Each step emits events via the reporter for real-time progress in the UI.
BullMQ FlowProducer for parent-child jobs. Extraction and embedding are separate concerns with different failure modes and retry strategies. A PDF extraction failure shouldnt retry embedding generation. FlowProducer expresses this as a dependency graph — the child job (embeddings) only runs after the parent (extraction + chunking) succeeds.
Settings resolution: document > dataset > defaults. Documents can override chunk settings from the dataset. This lets you use 512-char chunks for dense legal text while the rest of the dataset uses 1024. Settings are fetched before content cleaning — this is important because the normalizeWhitespace option in preprocessing determines whether whitespace is collapsed or preserved.
Delete-then-create for reprocessing. When a document is reprocessed (e.g., chunk settings changed), existing segments are deleted before new ones are created. This is a hard replace, not an upsert — simpler and correct.
AbortSignal support. The processor checks signal?.aborted between steps. Long-running extractions (large PDFs) can be cancelled without waiting for the full pipeline to complete.
Status is the source of truth, not job state. BullMQ jobs are ephemeral — they get cleaned up. Document.status (PROCESSING/INDEXED/FAILED) is the permanent record. The finalize job at the end of the embedding flow transitions the status and records processing metrics (time, chunk count).
Two more tables round out the schema. Theyre not afterthought logging — theyre first-class tables with indexes and relations.
DatasetSearchQuery records every search: query text, search type (vector/text/hybrid), result count, response time, similarity threshold, filters used. DatasetSearchResult records every result: rank, score, and a reference to the matching segment.
This enables "what are users searching for?", "which queries return no results?", "whats the average response time?" — the data you need to iterate on search quality. More on how we use this in Part 3.
| Decision | Trade-off | Why we chose it |
|---|---|---|
| 5 embedding columns, not 1 dynamic | Looks ugly in the schema | PostgreSQL vectors are fixed-dimension — no runtime DDL, null columns are free |
| Extracted content on Document | Duplicates segment content | Enables re-chunking without re-extraction |
totalChunks on Document | Denormalized count | Avoids N+1 in list views with 50+ documents |
| Stored generated tsvector | Slight write overhead | 100x faster full-text search at query time |
| BullMQ FlowProducer | More complex than a single job | Separate failure modes for extraction vs embedding |
| Semantic boundary detection | ~100 lines more than naive split | Dramatically better embedding quality |
| Checksum deduplication | Extra hash computation at upload | Prevents silent duplicate documents |
Part 2 covers the embedding layer — multi-provider generation, smart chunking for token limits, quota management, and how vector search works when datasets have different embedding dimensions.