Relational database processor
In this chapter, we will implement a Todo-List relational database processor. This processor receives processed operations from the reactor and can use the prevState
, resultingState
, or data from the operations themselves to populate a database.
What is a Relational Database Processor?
A relational database processor is a specialized component that listens to document changes in your Powerhouse application and transforms that data into a traditional relational database format (like PostgreSQL, MySQL, or SQLite). This is incredibly useful for:
- Analytics and Reporting: Running complex SQL queries on your document data
- Integration: Connecting with existing business intelligence tools
Generate the Processor
To generate a relational database processor, run the following command:
ph generate --processor todo-indexer --processor-type relationalDb --document-types powerhouse/todolist
Breaking down this command:
--processor todo-indexer
: Creates a processor with the name "todo-indexer"--processor-type relationalDb
: Specifies we want a relational database processor (vs other types like analytics or webhook processors)--document-types powerhouse/todolist
: Tells the processor to only listen for changes to documents of type "powerhouse/todolist"
This command creates a processor named todo-indexer
of type relational database
that listens for changes from documents of type powerhouse/todolist
.
What gets generated:
- A processor class file (
processors/todo-indexer/index.ts
) - A database migration file (
processors/todo-indexer/migrations.ts
) - A factory file for configuration (
processors/todo-indexer/factory.ts
) - A schema file for TypeScript types (
processors/todo-indexer/schema.ts
)
Define Your Database Schema
Next, define your database schema in the processors/todo-indexer/migration.ts
file.
Understanding Database Migrations
Migrations are version-controlled database changes that ensure your database schema evolves safely over time. They contain:
up()
function: Creates or modifies database structures when the processor startsdown()
function: Safely removes changes when the processor is removed
This approach ensures your database schema stays in sync across different environments (development, staging, production).
The migration file contains up
and down
functions that are called when the processor is added or removed, respectively.
In the migration.ts file you'll find an example of the todo table default schema:
import { type IRelationalDb } from "document-drive/processors/types";
export async function up(db: IRelationalDb<any>): Promise<void> {
// Create table - this runs when the processor starts
await db.schema
.createTable("todo") // Creates a new table named "todo"
.addColumn("task", "varchar(255)") // Text column for the task description (max 255 characters)
.addColumn("status", "boolean") // Boolean column for completion status (true/false)
.addPrimaryKeyConstraint("todo_pkey", ["task"]) // Makes "task" the primary key (unique identifier)
.ifNotExists() // Only create if table doesn't already exist
.execute(); // Execute the SQL command
}
export async function down(db: IRelationalDb<any>): Promise<void> {
// Drop table - this runs when the processor is removed
await db.schema.dropTable("todo").execute();
}
Design Considerations:
- We're using
task
as the primary key, which means each task description must be unique - The
varchar(255)
limit ensures reasonable memory usage - The
boolean
status makes it easy to filter completed vs. incomplete tasks - Consider adding timestamps (
created_at
,updated_at
) for audit trails in production applications
Generate Database Types
After defining your database schema, generate TypeScript types for type-safe queries and better IDE support:
ph generate --migration-file processors/todo-indexer/migrations.ts
Why Generate Types?
TypeScript types provide several benefits:
- Type Safety: Catch errors at compile time instead of runtime
- IDE Support: Get autocomplete and IntelliSense for your database queries
- Documentation: Types serve as living documentation of your database structure
- Refactoring: Safe renaming and restructuring of database fields
Check your processors/todo-indexer/schema.ts
file after generation - it will contain the TypeScript types for your database schema.
Example of generated types:
// This is what gets auto-generated based on your migration
export interface Database {
todo: {
task: string;
status: boolean;
};
}
Configure the Processor Filter
This give you the opportunity to configure the processor filter in processors/todo-indexer/factory.ts
:
Understanding Processor Filters
Filters determine which document changes your processor will respond to. This is crucial for performance and functionality:
- Performance: Only process relevant changes to avoid unnecessary work
- Isolation: Different processors can handle different document types
- Scalability: Distribute processing load across multiple processors
import {
type ProcessorRecord,
type IProcessorHostModule,
} from "document-drive/processors/types";
import { type RelationalDbProcessorFilter } from "document-drive/processors/relational";
import { TodoIndexerProcessor } from "./index.js";
export const todoIndexerProcessorFactory =
(module: IProcessorHostModule) =>
async (driveId: string): Promise<ProcessorRecord[]> => {
// Create a namespace for the processor and the provided drive id
// Namespaces prevent data collisions between different drives
const namespace = TodoIndexerProcessor.getNamespace(driveId);
// Create a namespaced db for the processor
// This ensures each drive gets its own isolated database tables
const store =
await module.relationalDb.createNamespace<TodoIndexerProcessor>(
namespace,
);
// Create a filter for the processor
// This determines which document changes trigger the processor
const filter: RelationalDbProcessorFilter = {
branch: ["main"], // Only process changes from the "main" branch
documentId: ["*"], // Process changes from any document ID (* = wildcard)
documentType: ["powerhouse/todolist"], // Only process todolist documents
scope: ["global"], // Process global changes (not user-specific)
};
// Create the processor instance
const processor = new TodoIndexerProcessor(namespace, filter, store);
return [
{
processor,
filter,
},
];
};
Filter Options Explained:
branch
: Which document branches to monitor (usually "main" for production data)documentId
: Specific document IDs to watch ("*" means all documents)documentType
: Document types to process (ensures type safety)scope
: Whether to process global changes or user-specific ones
Implement the Processor Logic
Now implement the actual processor logic in processors/todo-indexer/index.ts
by copying the code underneath:
Understanding the Processor Lifecycle
The processor has several key methods:
initAndUpgrade()
: Runs once when the processor starts (perfect for running migrations)onStrands()
: Runs every time relevant document changes occur (this is where the main logic goes)onDisconnect()
: Cleanup when the processor shuts down
What are "Strands"? Strands represent a batch of operations that happened to documents. Each strand contains:
- Document ID and metadata
- Array of operations (create, update, delete, etc.)
- Previous and resulting document states
import { type IRelationalDb } from "document-drive/processors/types";
import { RelationalDbProcessor } from "document-drive/processors/relational";
import { type InternalTransmitterUpdate } from "document-drive/server/listener/transmitter/internal";
import type { ToDoListDocument } from "../document-models/to-do-list/index.js";
import { up } from "./todo-indexer/migrations.js";
import { type DB } from "./todo-indexer/schema.js";
// Define the document type this processor handles
type DocumentType = ToDoListDocument;
export class TodoIndexerProcessor extends RelationalDbProcessor<DB> {
// Generate a unique namespace for this processor based on the drive ID
// This prevents data conflicts between different drives
static override getNamespace(driveId: string): string {
// Default namespace: `${this.name}_${driveId.replaceAll("-", "_")}`
return super.getNamespace(driveId);
}
// Initialize the processor and run database migrations
// This method runs once when the processor starts up
override async initAndUpgrade(): Promise<void> {
await up(this.relationalDb); // Run the database migration to create tables
}
// Main processing logic - handles incoming document changes
// This method is called whenever there are new document operations
override async onStrands(
strands: InternalTransmitterUpdate<DocumentType>[],
): Promise<void> {
// Early return if no changes to process
if (strands.length === 0) {
return;
}
// Process each strand (batch of changes) individually
for (const strand of strands) {
// Skip strands with no operations
if (strand.operations.length === 0) {
continue;
}
// Process each operation within the strand
for (const operation of strand.operations) {
// Insert a record for each operation into the database
// This is a simple example - you might want more sophisticated logic
await this.relationalDb
.insertInto("todo")
.values({
// Create a unique task identifier combining document ID, operation index, and type
task: `${strand.documentId}-${operation.index}: ${operation.type}`,
status: true, // Default to completed status
})
// Handle conflicts by doing nothing if the task already exists
// This prevents duplicate entries if operations are replayed
.onConflict((oc) => oc.column("task").doNothing())
.execute(); // Execute the database query
}
}
}
// Cleanup method called when the processor disconnects
// Use this for closing connections, clearing caches, etc.
async onDisconnect() {
// Add any cleanup logic here
// For example: await this.relationalDb.destroy();
}
}
Expose Data Through a Subgraph
Generate a Subgraph
Generate a new subgraph to expose your processor data:
ph generate --subgraph todo
What is a Subgraph?
A subgraph is a GraphQL schema that exposes your processed data to clients. It:
- Provides a standardized API for accessing your relational database data
- Integrates with the Powerhouse supergraph for unified data access
- Supports both queries (reading data) and mutations (modifying data)
- Can join data across multiple processors and document types
Configure the Subgraph
Open ./subgraphs/todo/schema.ts
and configure the schema:
import { gql } from "graphql-tag";
import type { DocumentNode } from "graphql";
export const schema: DocumentNode = gql`
# Define the structure of a todo item as returned by GraphQL
type ToDoListEntry {
task: String! # The task description (! means required/non-null)
status: Boolean! # The completion status (true = done, false = pending)
}
# Define available queries
type Query {
todos(driveId: ID!): [ToDoListEntry] # Get array of todos for a specific drive
}
`;
Open ./subgraphs/todo/resolvers.ts
and configure the resolvers:
// subgraphs/search-todos/resolvers.ts
import { type Subgraph } from "@powerhousedao/reactor-api";
import { type ToDoListDocument } from "document-models/to-do-list/index.js";
import { TodoIndexerProcessor } from "../../processors/todo-indexer/index.js";
export const getResolvers = (subgraph: Subgraph) => {
const reactor = subgraph.reactor;
const relationalDb = subgraph.relationalDb;
return {
Query: {
todos: {
// Resolver function for the "todos" query
// Arguments: parent object, query arguments, context, GraphQL info
resolve: async (_: any, args: { driveId: string }) => {
// Query the database using the processor's static query method
// This gives us access to the namespaced database for the specific drive
const todos = await TodoIndexerProcessor.query(
args.driveId,
relationalDb,
)
.selectFrom("todo") // Select from the "todo" table
.selectAll() // Get all columns
.execute(); // Execute the query
// Transform database results to match GraphQL schema
return todos.map((todo) => ({
task: todo.task, // Map database "task" column to GraphQL "task" field
status: todo.status, // Map database "status" column to GraphQL "status" field
}));
},
},
},
};
};
Now query the data via the supergraph.
Understanding the Supergraph
The Powerhouse supergraph is a unified GraphQL endpoint that combines:
- Document Models: Direct access to your Powerhouse documents
- Subgraphs: Custom data views from your processors
- Built-in APIs: System functionality like authentication and drives
This unified approach means you can query document state AND processed data in a single request, which is perfect for building rich user interfaces.
The Powerhouse supergraph for any given remote drive or reactor can be found under http://localhost:4001/graphql
. The gateway / supergraph available on /graphql
combines all the subgraphs, except for the drive subgraph (which is accessible via /d/:driveId
). To access the endpoint, start the reactor and navigate to the URL with graphql
appended. The following commands explain how you can test & try the supergraph.
-
Start the reactor:
ph reactor
-
This will return an endpoint, but you'll need to change the url of the endpoint to the following URL:
http://localhost:4001/graphql
The supergraph allows you to both query & mutate data from the same endpoint. Read more about subgraphs
Example: Complete Data Flow from Document Operations to Relational Database
Understanding the Complete Data Pipeline
This comprehensive example demonstrates the entire data flow in a Powerhouse application:
- Storage Layer: Create a drive (document storage container)
- Document Layer: Create a todo document and add operations
- Processing Layer: Watch the relational database processor automatically index changes
- API Layer: Query both original document state AND processed relational data
- Analysis: Compare the different data representations
Step 1: Create a Drive (Storage Container)
What's Happening: Every document needs a "drive" - think of it as a folder or database that contains related documents. This is where your todo documents will live.
mutation DriveCreation($name: String!) {
addDrive(name: $name) {
name
}
}
Variables:
{
"driveId": "powerhouse",
"name": "tutorial"
}
💡 Behind the Scenes: This creates a new drive namespace. Your relational database processor will create isolated tables for this drive using the namespace pattern we defined earlier.
Step 2: Create a Todo Document
What's Happening: Now we're creating an actual todo list document inside our drive. This uses the document model we built in previous chapters.
mutation Mutation($driveId: String, $name: String) {
ToDoList_createDocument(driveId: $driveId, name: $name)
}
Variables:
{
"driveId": "powerhouse",
"name": "tutorial"
}
Result:
{
"data": {
"ToDoList_createDocument": "72b73d31-4874-4b71-8cc3-289ed4cfbe2b"
}
}
💡 Key Insight: The returned UUID (72b73d31-4874-4b71-8cc3-289ed4cfbe2b
) is crucial - this is the document ID that will appear in our processor's database records, linking operations back to their source document. You will receive a different UUID.
Step 3: Add Todo Items (Generate Operations)
What's Happening: Each time we add a todo item, we're creating a new operation in the document's history. Our relational database processor is listening for these operations in real-time.
mutation Mutation(
$driveId: String
$docId: PHID
$input: ToDoList_AddTodoItemInput
) {
ToDoList_addTodoItem(driveId: $driveId, docId: $docId, input: $input)
}
Variables:
{
"driveId": "powerhouse",
"name": "tutorial",
"docId": "72b73d31-4874-4b71-8cc3-289ed4cfbe2b",
"input": {
"id": "1",
"text": "complete mutation"
}
}
Result:
{
"data": {
"ToDoList_addTodoItem": 1
}
}
💡 What Happens Next:
- Document Model: Stores the operation and updates document state
- Reactor: Broadcasts the operation to all listening processors
- Our Processor: Automatically receives the operation and creates a database record
- Database: Now contains:
"72b73d31-4874-4b71-8cc3-289ed4cfbe2b-0: ADD_TODO_ITEM"
🔄 Repeat this step 2-3 times with different todo items to see multiple operations get processed. Each operation will have an incrementing index (0, 1, 2...).
Step 4: Query Both Data Sources
The Power of Dual Data Access: Now we can query BOTH the original document state AND our processed relational data in a single GraphQL request. This demonstrates the flexibility of the Powerhouse architecture.
query Query($driveId: ID!) {
todos(driveId: $driveId) {
task
status
}
ToDoList {
getDocuments {
state {
items {
text
}
}
}
}
}
Variables:
{
"driveId": "powerhouse"
}
Response:
{
"data": {
"todos": [
{
"task": "72b73d31-4874-4b71-8cc3-289ed4cfbe2b-0: ADD_TODO_ITEM",
"status": true
},
{
"task": "72b73d31-4874-4b71-8cc3-289ed4cfbe2b-1: ADD_TODO_ITEM",
"status": true
},
{
"task": "72b73d31-4874-4b71-8cc3-289ed4cfbe2b-2: ADD_TODO_ITEM",
"status": true
}
],
"ToDoList": {
"getDocuments": [
{
"state": {
"items": [
{
"text": "complete mutation"
},
{
"text": "add another todo"
},
{
"text": "Now check the data"
}
]
}
}
]
}
}
}
🔍 Data Analysis: Understanding What You're Seeing
Document Model Data (ToDoList.getDocuments
):
- ✅ Current State: Shows the final todo items as they exist in the document
- ✅ User-Friendly: Displays actual todo text like "complete mutation"
- ✅ Real-Time: Always reflects the latest document state
- ❌ Limited History: Doesn't show how the document changed over time
Processed Relational Data (todos
):
- ✅ Operation History: Shows each individual operation that occurred
- ✅ Audit Trail: You can see the sequence (0, 1, 2) of operations
- ✅ Analytics Ready: Perfect for counting operations, tracking changes
- ✅ Integration Friendly: Standard SQL database that other tools can access