Skip to content
Go back

Building a Robust Document Parsing System with Cloudflare Queues and Workflows

Building a Document Parsing System

Introduction

At pdfparse.net, we run a document parsing system that turns PDFs into structured, queryable data at scale. Our stack is Cloudflare-native, which means global distribution, predictable costs, and no servers to babysit.

This article focuses on invoice parsing because it combines structured fields, messy layouts, and long OCR runtimes. We will build a pipeline that accepts large batches, survives failures, and keeps users informed about progress end to end.


Architecture Overview

At a high level, the pipeline looks like this:

HTTP API → Queue → Workflow → Mistral Batch OCR → Database

We keep the pieces small and loosely coupled so each one can be retried safely without restarting the entire job.


Project Setup

Prerequisites

  1. Cloudflare account with Workers paid plan (for Workflows)
  2. Mistral AI API key (sign up)
  3. Node.js 18+ and pnpm

Dependencies

We keep the dependency list focused:

{
  "dependencies": {
    "@mistralai/mistralai": "^1.0.0",
    "drizzle-orm": "^0.44.0",
    "effect": "^3.19.0",
    "zod": "^3.23.0"
  },
  "devDependencies": {
    "wrangler": "^3.90.0",
    "@cloudflare/workers-types": "^4.0.0"
  }
}

Project Structure

document-parser/
├── packages/
│   ├── db/                 # Shared database schema + migrations
│   ├── queue-manager/      # HTTP + queue consumer
│   └── workflows/          # Durable workflow logic

Database Setup (D1 + Drizzle)

We store jobs, batch processes, and extracted invoice fields in D1.

import { sqliteTable, text, integer, numeric } from "drizzle-orm/sqlite-core";

export const jobs = sqliteTable("jobs", {
  id: integer("id").primaryKey({ autoIncrement: true }),
  jobStatus: text("job_status").default("queued").notNull(),
  createdAt: integer("created_at", { mode: "timestamp" })
    .$defaultFn(() => new Date()),
});

export const batch_processes = sqliteTable("batch_processes", {
  id: integer("id").primaryKey({ autoIncrement: true }),
  batchId: text("batch_id").notNull().unique(),
  workflowInstanceId: text("workflow_instance_id").notNull().unique(),
  job_id: integer("job_id").references(() => jobs.id, { onDelete: "cascade" }),
  status: text("status").default("init"),
});

export const invoices = sqliteTable("invoices", {
  invoiceId: text("invoice_id").primaryKey(),
  vendorName: text("vendor_name").notNull(),
  invoiceDate: text("invoice_date").notNull(),
  dueDate: text("due_date").notNull(),
  totalAmountUsd: numeric("total_amount_usd").notNull()
});

Create the database and run migrations:

wrangler d1 create document-parser-db
pnpm drizzle-kit generate
wrangler d1 migrations apply document-parser-db --remote

Worker Bindings

Add bindings to both the queue worker and workflow:

{
  "r2_buckets": [
    { "binding": "DOCUMENT_BUCKET", "bucket_name": "<YOUR_BUCKET_NAME>" }
  ],
  "d1_databases": [
    {
      "binding": "DB",
      "database_name": "<YOUR_DB_NAME>",
      "database_id": "<YOUR_DB_ID>"
    }
  ],
  "queues": {
    "producers": [{ "binding": "JOB_QUEUE", "queue": "<YOUR_QUEUE_NAME>" }],
    "consumers": [{ "queue": "<YOUR_QUEUE_NAME>" }]
  },
  "workflows": [
    { "binding": "WORKFLOW", "name": "<YOUR_WORKFLOW_NAME>" }
  ]
}

Also add a MISTRAL_API_KEY secret and an R2 access URL for presigned links.


Queue Manager

The queue manager handles two things: accepting document IDs via HTTP and fanning them out into batches.

import { createDB, jobs } from "@repo/db";

type Env = {
  DB: D1Database;
  JOB_QUEUE: Queue;
  WORKFLOW: Workflow;
};

export default {
  async fetch(request: Request, env: Env): Promise<Response> {
    if (request.method !== "POST") {
      return new Response("Not Found", { status: 404 });
    }

    const { documentIds } = await request.json();
    const db = await createDB(env.DB);

    const [job] = await db
      .insert(jobs)
      .values({ jobStatus: "queued" })
      .returning();

    const batches = chunk(documentIds, 50);

    await env.JOB_QUEUE.sendBatch(
      batches.map((batch, index) => ({
        body: {
          jobId: job.id,
          documentKeys: batch,
          batchIndex: index,
          totalBatches: batches.length,
        },
      }))
    );

    return Response.json({ jobId: job.id });
  },

  async queue(batch: MessageBatch<any>, env: Env): Promise<void> {
    for (const message of batch.messages) {
      await env.WORKFLOW.create({ params: message.body });
      message.ack();
    }
  },
};

Why 50 documents per batch? It is a good balance between throughput and retry cost. If files are large or highly variable, use smaller batches to keep failure recovery cheap.


Workflows: Durable, Retryable OCR

Workflows are a great fit for OCR: they can pause, retry, and keep state across minutes or hours without manual orchestration.

Step 1: Submit a Batch (with Effect.ts)

We start by assembling a batch payload and handing it off to Mistral. We use Effect.ts to make failure handling explicit and consistent. It keeps errors typed and composable, which matters when you are orchestrating network calls, file uploads, and database updates.

const batchJob = await step.do("submit-batch", async () => {
  const program = pipe(
    Effect.gen(function* (_) {
      const urls = yield* _(generatePresignedUrls(documentKeys, env.R2_URL));

      const requests = urls.map((url, i) => ({
        custom_id: documentKeys[i],
        body: {
          model: "mistral-ocr-latest",
          document: { type: "document_url", document_url: url },
        },
      }));

      const jsonl = requests.map((r) => JSON.stringify(r)).join("\n");
      const client = new Mistral({ apiKey: env.MISTRAL_API_KEY });
      const file = new File([jsonl], `batch_${jobId}.jsonl`, {
        type: "application/x-ndjson",
      });

      const uploadedFile = yield* _(
        Effect.tryPromise({
          try: () => client.files.upload({ file, purpose: "batch" }),
          catch: (error) => new BatchUploadError({ cause: error }),
        })
      );

      return yield* _(
        Effect.tryPromise({
          try: () =>
            client.batch.jobs.create({
              inputFiles: [uploadedFile.id],
              model: "mistral-ocr-latest",
              endpoint: "/v1/ocr",
              metadata: { jobId: String(jobId) },
            }),
          catch: (error) => new BatchCreationError({ cause: error }),
        })
      );
    })
  );

  return Effect.runPromise(program);
});

Step 2: Poll for Completion

We wait for Mistral to finish by polling inside the workflow. Sleeping steps are not billed, so long polling stays cost-effective. You can configure retries via the retries: {} option on step.do().

const results = await step.do("poll-status", {
  retries: { limit: 60, delay: "30 seconds" },
  async () => {
    const program = pipe(
      Effect.gen(function* (_) {
        const status = yield* _(
          Effect.tryPromise({
            try: () => client.batch.jobs.get({ jobId: batchJob.id }),
            catch: (error) => new NetworkError({ cause: error }),
          })
        );

        if (status.status !== "SUCCESS") {
          throw new Error("Still processing");
        }

        return parseJsonlResults(await downloadFile(status.outputFile));
      })
    );

    return Effect.runPromise(program);
  },
});

Step 3: Save Results

Once results are ready, we persist the extracted fields and keep writes simple.

await step.do("save-results", async () => {
  for (const result of results) {
    await db.insert(invoices).values({ /* ... */ });
  }
});

Step 4: Update Batch Status

After saving results, we update the batch record and mark the parent job complete once all batches finish.

await step.do("update-status", async () => {
  await db
    .update(batch_processes)
    .set({
      status: "completed",
      completedRequests: results.length,
    })
    .where(eq(batch_processes.batchId, batchJob.id));

  const allBatches = await db
    .select()
    .from(batch_processes)
    .where(eq(batch_processes.job_id, jobId));

  const allComplete = allBatches.every(
    (b) => b.status === "completed" || b.status === "failed"
  );

  if (allComplete) {
    await db
      .update(jobs)
      .set({ jobStatus: "completed" })
      .where(eq(jobs.id, jobId));
  }
});

Failure Modes We Plan For

This keeps the pipeline stable without manual cleanup.


Observability and Status

We expose job status directly from D1. Each job has a jobStatus and per-batch metadata, which lets us show progress like “12/24 batches complete.” Internally, we log workflow step timings to identify slow OCR runs and throttling.

If you need an external-facing dashboard, this structure makes it trivial to compute percent complete without storing extra state.

Resources




Previous Post
Build a Resume Database: Extract and Query Hundreds of Resumes as SQLite