Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -55,4 +55,9 @@ research_and_plans/
packages/logs.txt
.vscode/mcp.json
.pi/readcache/
packages/backend/.test-db.sqlite
packages/backend/.test-db.sqlite-shm
packages/backend/.test-db.sqlite-wal
.test-db.sqlite
.test-db.sqlite-shm
.test-db.sqlite-wal
Binary file removed packages/backend/.test-db.sqlite
Binary file not shown.
Empty file.
251 changes: 94 additions & 157 deletions packages/backend/src/db/migrate.ts
Original file line number Diff line number Diff line change
@@ -1,20 +1,66 @@
import { migrate as migrateSqlite } from 'drizzle-orm/bun-sqlite/migrator';
import { migrate as migratePg } from 'drizzle-orm/postgres-js/migrator';
import { sql } from 'drizzle-orm';
import { getDatabase, getCurrentDialect } from './client';
import { logger } from '../utils/logger';
import path from 'node:path';
import fs from 'node:fs';
import os from 'node:os';
import crypto from 'node:crypto';
import { sqliteJournal, pgJournal } from './migrations-bundle';
import path from 'node:path';
import sqliteJournal from '../../drizzle/migrations/meta/_journal.json';
import pgJournal from '../../drizzle/migrations_pg/meta/_journal.json';

const DRIZZLE_MIGRATIONS_SCHEMA = 'drizzle';
const DRIZZLE_MIGRATIONS_TABLE = '__drizzle_migrations';

interface MigrationJournalEntry {
when: number;
tag: string;
// Bun types embeddedFiles as Blob[] but the runtime objects are BunFile with a name property.
type EmbeddedFile = Blob & { name: string };

// Populated at startup in a compiled binary; empty when running from source.
const embedded = new Map(
(Bun.embeddedFiles as EmbeddedFile[]).map((f) => [f.name, f])
);

// Filesystem paths used as a fallback in dev/source mode.
const DEV_MIGRATIONS_DIR = {
sqlite: path.join(import.meta.dir, '../../drizzle/migrations'),
postgres: path.join(import.meta.dir, '../../drizzle/migrations_pg'),
} as const;

// Shape expected by db.dialect.migrate() — mirrors drizzle-orm's internal MigrationMeta.
interface MigrationMeta {
sql: string[];
bps: boolean;
folderMillis: number;
hash: string;
}

type Journal = { entries: Array<{ tag: string; when: number; breakpoints: boolean }> };

async function readSql(tag: string, devDir: string): Promise<{ content: string; source: 'embedded' | 'filesystem' }> {
const asset = embedded.get(`${tag}.sql`);
if (asset) return { content: await asset.text(), source: 'embedded' };
return { content: await Bun.file(path.join(devDir, `${tag}.sql`)).text(), source: 'filesystem' };
}

async function buildMigrations(journal: Journal, devDir: string): Promise<MigrationMeta[]> {
const results = await Promise.all(
journal.entries.map(async (entry) => {
const { content, source } = await readSql(entry.tag, devDir);
return {
meta: { tag: entry.tag, source },
migration: {
sql: content.split('--> statement-breakpoint'),
bps: entry.breakpoints,
folderMillis: entry.when,
hash: crypto.createHash('sha256').update(content).digest('hex'),
},
};
})
);

const sources = new Set(results.map((r) => r.meta.source));
logger.info(
`Loaded ${results.length} migrations from ${sources.size === 1 ? [...sources][0] : 'mixed'} source`
);

return results.map((r) => r.migration);
}

function normalizeSqlStatement(statement: string): string {
Expand All @@ -37,43 +83,21 @@ function toIdempotentStatement(statement: string): string {

async function attemptPostgresDuplicateColumnRepair(
db: any,
migrationsPath: string,
migrations: MigrationMeta[],
journal: Journal,
migrationError: any
): Promise<boolean> {
const failedQuery = typeof migrationError?.query === 'string' ? migrationError.query : '';
if (!failedQuery) {
return false;
}

const journalPath = path.join(migrationsPath, 'meta', '_journal.json');
if (!fs.existsSync(journalPath)) {
return false;
}
if (!failedQuery) return false;

const journal = JSON.parse(fs.readFileSync(journalPath, 'utf8')) as {
entries?: MigrationJournalEntry[];
};
const entries = Array.isArray(journal.entries) ? journal.entries : [];
const normalizedFailedQuery = normalizeSqlStatement(failedQuery);

for (const entry of entries) {
const migrationPath = path.join(migrationsPath, `${entry.tag}.sql`);
if (!fs.existsSync(migrationPath)) {
continue;
}
for (let i = 0; i < migrations.length; i++) {
const migration = migrations[i]!;
const entry = journal.entries[i]!;
const statements = migration.sql.map((s) => s.trim()).filter((s) => s.length > 0);

const migrationSql = fs.readFileSync(migrationPath, 'utf8');
const statements = migrationSql
.split('--> statement-breakpoint')
.map((statement) => statement.trim())
.filter((statement) => statement.length > 0);

const includesFailedQuery = statements.some(
(statement) => normalizeSqlStatement(statement) === normalizedFailedQuery
);
if (!includesFailedQuery) {
continue;
}
if (!statements.some((s) => normalizeSqlStatement(s) === normalizedFailedQuery)) continue;

logger.warn(
`Detected duplicate-column migration drift in ${entry.tag}; applying idempotent repair`
Expand All @@ -82,37 +106,37 @@ async function attemptPostgresDuplicateColumnRepair(
await db.execute(sql.raw(`CREATE SCHEMA IF NOT EXISTS "${DRIZZLE_MIGRATIONS_SCHEMA}"`));
await db.execute(
sql.raw(`
CREATE TABLE IF NOT EXISTS "${DRIZZLE_MIGRATIONS_SCHEMA}"."${DRIZZLE_MIGRATIONS_TABLE}" (
id SERIAL PRIMARY KEY,
hash text NOT NULL,
created_at bigint
)
`)
CREATE TABLE IF NOT EXISTS "${DRIZZLE_MIGRATIONS_SCHEMA}"."${DRIZZLE_MIGRATIONS_TABLE}" (
id SERIAL PRIMARY KEY,
hash text NOT NULL,
created_at bigint
)
`)
);

for (const statement of statements) {
const repairedStatement = toIdempotentStatement(statement);
try {
await db.execute(sql.raw(repairedStatement));
} catch (statementError: any) {
const isAddColumnStatement = /ALTER\s+TABLE[\s\S]+ADD\s+COLUMN/i.test(repairedStatement);
if (isAddColumnStatement && isDuplicateColumnError(statementError)) {
if (
/ALTER\s+TABLE[\s\S]+ADD\s+COLUMN/i.test(repairedStatement) &&
isDuplicateColumnError(statementError)
)
continue;
}
throw statementError;
}
}

const hash = crypto.createHash('sha256').update(migrationSql).digest('hex');
await db.execute(
sql.raw(`
INSERT INTO "${DRIZZLE_MIGRATIONS_SCHEMA}"."${DRIZZLE_MIGRATIONS_TABLE}" ("hash", "created_at")
SELECT '${hash}', ${entry.when}
WHERE NOT EXISTS (
SELECT 1 FROM "${DRIZZLE_MIGRATIONS_SCHEMA}"."${DRIZZLE_MIGRATIONS_TABLE}"
WHERE "created_at" = ${entry.when}
)
`)
INSERT INTO "${DRIZZLE_MIGRATIONS_SCHEMA}"."${DRIZZLE_MIGRATIONS_TABLE}" ("hash", "created_at")
SELECT '${migration.hash}', ${migration.folderMillis}
WHERE NOT EXISTS (
SELECT 1 FROM "${DRIZZLE_MIGRATIONS_SCHEMA}"."${DRIZZLE_MIGRATIONS_TABLE}"
WHERE "created_at" = ${migration.folderMillis}
)
`)
);

return true;
Expand All @@ -121,118 +145,38 @@ async function attemptPostgresDuplicateColumnRepair(
return false;
}

/**
* When running as a compiled Bun binary, drizzle migration SQL files are
* embedded as assets via `bun build --compile --asset-naming="[name].[ext]"`.
* They are accessible through `Bun.embeddedFiles` with their original basenames.
* The `_journal.json` files are NOT embedded as assets (bun transpiles JSON to
* JS); instead they are statically imported via migrations-bundle.ts.
*
* This function extracts the relevant SQL files and writes the journal to a
* temporary directory so the standard drizzle migrator can read them.
*
* Returns the path to the temp directory on success, or null if no embedded
* migration files were found (i.e., running from source in development mode).
*/

/** Bun embedded-file entry: a Blob with an additional `name` (original path). */
interface BunEmbeddedFile extends Blob {
name: string;
}

async function extractEmbeddedMigrations(dialect: 'sqlite' | 'postgres'): Promise<string | null> {
const embeddedFiles: BunEmbeddedFile[] =
typeof globalThis.Bun !== 'undefined' && 'embeddedFiles' in globalThis.Bun
? (globalThis.Bun as unknown as { embeddedFiles: BunEmbeddedFile[] }).embeddedFiles
: [];

if (embeddedFiles.length === 0) {
return null;
}

// The journal is bundled as JS via migrations-bundle.ts. Use it to get the
// set of expected SQL filenames for this dialect.
const journal = dialect === 'sqlite' ? sqliteJournal : pgJournal;
const expectedTags = new Set(journal.entries.map((e: { tag: string }) => e.tag));

// SQL files are embedded with their original basename (--asset-naming="[name].[ext]").
// Match by checking if the basename (without .sql) is a known journal tag.
const relevant = embeddedFiles.filter((f) => {
const tag = f.name.replace(/\.sql$/, '');
return expectedTags.has(tag);
});

if (relevant.length === 0) {
return null;
}

// Use a stable, per-process temp directory — reused on retry, cleaned up on exit.
const tmpDir = path.join(os.tmpdir(), `plexus-migrations-${process.pid}-${dialect}`);
fs.mkdirSync(path.join(tmpDir, 'meta'), { recursive: true });

// Register a one-time cleanup handler.
process.once('exit', () => {
try {
fs.rmSync(tmpDir, { recursive: true, force: true });
} catch {
// Best-effort cleanup; ignore errors.
}
});

// Write SQL files.
for (const file of relevant) {
fs.writeFileSync(path.join(tmpDir, file.name), await file.text());
}

// Write the journal (bundled as JS, not available as an embedded asset).
fs.writeFileSync(path.join(tmpDir, 'meta', '_journal.json'), JSON.stringify(journal));

logger.info(`Extracted ${relevant.length} embedded migration files to ${tmpDir}`);
return tmpDir;
}

export async function runMigrations() {
try {
const db = getDatabase();
const dialect = getCurrentDialect();

logger.info(`Running ${dialect} migrations...`);

const migrationsBase = path.resolve(__dirname, '../../drizzle');

if (dialect === 'sqlite') {
const embeddedPath = await extractEmbeddedMigrations('sqlite');
const migrationsPath =
embeddedPath ||
(process.env.DRIZZLE_MIGRATIONS_PATH
? process.env.DRIZZLE_MIGRATIONS_PATH.replace('/migrations_pg', '/migrations')
: path.join(migrationsBase, 'migrations'));
logger.info(`SQLite migrations path: ${migrationsPath}`);
await migrateSqlite(db as any, {
migrationsFolder: migrationsPath,
});
const migrations = await buildMigrations(sqliteJournal as Journal, DEV_MIGRATIONS_DIR.sqlite);
(db as any).dialect.migrate(migrations, (db as any).session, { migrationsFolder: '' });
} else {
const embeddedPath = await extractEmbeddedMigrations('postgres');
const migrationsPath =
embeddedPath ||
process.env.DRIZZLE_MIGRATIONS_PATH ||
path.join(migrationsBase, 'migrations_pg');
logger.info(`PostgreSQL migrations path: ${migrationsPath}`);
const migrations = await buildMigrations(pgJournal as Journal, DEV_MIGRATIONS_DIR.postgres);
try {
await migratePg(db as any, {
migrationsFolder: migrationsPath,
await (db as any).dialect.migrate(migrations, (db as any).session, {
migrationsFolder: '',
migrationsSchema: DRIZZLE_MIGRATIONS_SCHEMA,
migrationsTable: DRIZZLE_MIGRATIONS_TABLE,
});
} catch (error: any) {
if (isDuplicateColumnError(error)) {
const repaired = await attemptPostgresDuplicateColumnRepair(
db as any,
migrationsPath,
db,
migrations,
pgJournal as Journal,
error
);
if (repaired) {
logger.warn('Retrying PostgreSQL migrations after duplicate-column repair');
await migratePg(db as any, {
migrationsFolder: migrationsPath,
await (db as any).dialect.migrate(migrations, (db as any).session, {
migrationsFolder: '',
migrationsSchema: DRIZZLE_MIGRATIONS_SCHEMA,
migrationsTable: DRIZZLE_MIGRATIONS_TABLE,
});
} else {
throw error;
Expand All @@ -246,13 +190,6 @@ export async function runMigrations() {
logger.info('Migrations completed successfully');
} catch (error: any) {
logger.error('Migration failed', error);

if (error.message?.includes("Can't find meta/_journal.json file")) {
logger.error('Drizzle journal file path issue detected.');
logger.error('This is often caused by migration file structure.');
logger.error('Try regenerating migrations with: bunx drizzle-kit generate');
}

throw error;
}
}
14 changes: 0 additions & 14 deletions packages/backend/src/db/migrations-bundle.ts

This file was deleted.

Loading