Skip to main content
You already have material in databases, SaaS tools, files, and event streams. A cognitive ETL pipeline pulls that material in, coerces it into typed graph structure through a shape, and lands it as governed entities you can query — each tracing back to where it came from.
register a source ──▶ extract through a shape ──▶ review the delta ──▶ read entities
This is the same extract loop whether the source is a contract PDF, a row dump from your database, a Slack export, or a webhook payload. You register it, run it through a shape, and review before anything commits.
Prerequisites: a client and a shape that exists in your project to extract through. List shapes with pb.shapes.list().

Set up the client

import { createPenumbra } from "@penumbra-systems/platform";

const pb = createPenumbra({ apiKey: process.env.PENUMBRA_API_KEY });

Run the pipeline

1

Register the source

Register the material so it can be extracted. Use whatever you already run as the source — a document, a database export, a SaaS record, an event payload. The returned id is what you extract and read against.
const source = await pb.sources.register({
  type: "document",
  name: "Acme MSA.pdf",
  // ...source reference or content, per your upload flow
});
The same registration step covers anything you can hand the pipeline. The type and name describe where the material came from; the content or reference follows your upload flow.
2

Extract through a shape (staged)

Extraction coerces the source into entities and relationships that match the shape. Stage it with apply: false so you can review before it lands.
const receipt = await pb.extract({
  source: { id: source.id },
  shapeId: "shp_contract_terms",
  apply: false,
});

console.log(receipt.status, receipt.deltaId); // "staged", "..."
The same source can be extracted through different shapes to surface different structure. The shape is the lens you read the material with — point it at a contract shape to land terms, or a research shape to land findings.
3

Review the delta

Extraction stages a delta — a staged set of changes that has not touched the graph yet. Plan it to see what applying it would add.
const plan = await pb.deltas.plan(receipt.deltaId);
console.log(plan);
The plan is your review gate. Nothing in the source reaches the queryable graph until you decide it should.
4

Apply

Commit the delta when the plan looks right.
await pb.deltas.apply(receipt.deltaId);
If an applied delta turns out wrong, you can undo it with pb.deltas.revert(receipt.deltaId). Preview the undo first with pb.deltas.revertPreview(receipt.deltaId).
5

Read what landed

From the source, read exactly the entities it grounds, with counts per shape.
const entities = await pb.sources.entities(source.id);
const stats = await pb.sources.entityStats(source.id);

console.log(stats);

Run it on a schedule

The four-step loop is the unit you automate. Point it at a system you already run, and on each pull you register the new material, extract it staged, plan, and apply. Because every extraction stages a delta before committing, you keep a review gate even when the pipeline runs unattended — and an undo path through pb.deltas.revert if a batch lands wrong.

What you have now

The material from your systems is no longer scattered across formats. It is typed entities in a governed graph, each tracing back to the source it came from. You can search them, check whether they are fit to act on, and pull more sources through the same loop. What you actually authored, by choosing the shape to extract through, is an ontology — the shape decides what structure your systems land as.

Sources reference

Every pb.sources method.

Semantic git

How staging, planning, applying, and reverting work.