fix: Clean up and add Neon DB

This commit is contained in:
Wanjohi
2025-09-20 04:38:16 +03:00
parent df4af84d55
commit a045fc9c91
10 changed files with 329 additions and 272 deletions

View File

@@ -1,17 +1,21 @@
import { AsyncLocalStorage } from "node:async_hooks";
export function createContext<T>() {
const storage = new AsyncLocalStorage<T>();
return {
use() {
const result = storage.getStore();
if (!result) {
throw new Error("No context available");
}
return result;
},
provide<R>(value: T, fn: () => R) {
return storage.run<R>(value, fn);
},
};
}
export namespace Context {
export class NotFound extends Error {}
export function create<T>() {
const storage = new AsyncLocalStorage<T>();
return {
use() {
const result = storage.getStore();
if (!result) {
throw new NotFound();
}
return result;
},
provide<R>(value: T, fn: () => R) {
return storage.run<R>(value, fn);
},
};
}
}

View File

@@ -1,16 +1,102 @@
import ws from "ws";
import { Resource } from "sst";
import postgres from "postgres";
import { drizzle } from "drizzle-orm/postgres-js";
import { memo } from "../utils";
import { Context } from "../context";
import { ExtractTablesWithRelations } from "drizzle-orm";
import { Pool, neonConfig } from "@neondatabase/serverless";
import { PgTransaction, PgTransactionConfig } from "drizzle-orm/pg-core";
import { NeonQueryResultHKT, drizzle } from "drizzle-orm/neon-serverless";
const client = postgres({
idle_timeout: 30000,
connect_timeout: 30000,
host: Resource.Database.host,
database: Resource.Database.database,
user: Resource.Database.username,
password: Resource.Database.password,
port: Resource.Database.port,
max: parseInt(process.env.POSTGRES_POOL_MAX || "1"),
});
neonConfig.webSocketConstructor = ws;
export const db = drizzle(client, {});
export namespace Database {
function addPoolerSuffix(original: string): string {
const firstDotIndex = original.indexOf(".");
if (firstDotIndex === -1) return original + "-pooler";
return (
original.slice(0, firstDotIndex) +
"-pooler" +
original.slice(firstDotIndex)
);
}
const client = memo(() => {
const dbHost = addPoolerSuffix(Resource.Database.host);
const pool = new Pool({
connectionString: `postgres://${Resource.Database.user}:${Resource.Database.password}@${dbHost}/${Resource.Database.name}?sslmode=require`,
});
const db = drizzle(pool);
return db;
});
export type Transaction = PgTransaction<
NeonQueryResultHKT,
Record<string, never>,
ExtractTablesWithRelations<Record<string, never>>
>;
export type TxOrDb = Transaction | ReturnType<typeof client>;
const TransactionContext = Context.create<{
tx: TxOrDb;
effects: (() => void | Promise<void>)[];
}>();
export async function use<T>(callback: (trx: TxOrDb) => Promise<T>) {
try {
const { tx } = TransactionContext.use();
return tx.transaction(callback);
} catch (err) {
if (err instanceof Context.NotFound) {
const effects: (() => void | Promise<void>)[] = [];
const result = await TransactionContext.provide(
{
effects,
tx: client(),
},
() => callback(client()),
);
await Promise.all(effects.map((x) => x()));
return result;
}
throw err;
}
}
export async function fn<Input, T>(
callback: (input: Input, trx: TxOrDb) => Promise<T>,
) {
return (input: Input) => use(async (tx) => callback(input, tx));
}
export async function effect(effect: () => any | Promise<any>) {
try {
const { effects } = TransactionContext.use();
effects.push(effect);
} catch {
await effect();
}
}
export async function transaction<T>(
callback: (tx: TxOrDb) => Promise<T>,
config?: PgTransactionConfig,
) {
try {
const { tx } = TransactionContext.use();
return callback(tx);
} catch (err) {
if (err instanceof Context.NotFound) {
const effects: (() => void | Promise<void>)[] = [];
const result = await client().transaction(async (tx) => {
return TransactionContext.provide({ tx, effects }, () =>
callback(tx),
);
}, config);
await Promise.all(effects.map((x) => x()));
return result;
}
throw err;
}
}
}

View File

@@ -1,63 +0,0 @@
import { db } from ".";
import {
PgTransaction,
PgTransactionConfig
} from "drizzle-orm/pg-core";
import {
PostgresJsQueryResultHKT
} from "drizzle-orm/postgres-js";
import { ExtractTablesWithRelations } from "drizzle-orm";
import { createContext } from "../context";
export type Transaction = PgTransaction<
PostgresJsQueryResultHKT,
Record<string, never>,
ExtractTablesWithRelations<Record<string, never>>
>;
type TxOrDb = Transaction | typeof db;
const TransactionContext = createContext<{
tx: Transaction;
effects: (() => void | Promise<void>)[];
}>();
export async function useTransaction<T>(callback: (trx: TxOrDb) => Promise<T>) {
try {
const { tx } = TransactionContext.use();
return callback(tx);
} catch {
return callback(db);
}
}
export async function afterTx(effect: () => any | Promise<any>) {
try {
const { effects } = TransactionContext.use();
effects.push(effect);
} catch {
await effect();
}
}
export async function createTransaction<T>(
callback: (tx: Transaction) => Promise<T>,
isolationLevel?: PgTransactionConfig["isolationLevel"],
): Promise<T> {
try {
const { tx } = TransactionContext.use();
return callback(tx);
} catch {
const effects: (() => void | Promise<void>)[] = [];
const result = await db.transaction(
async (tx) => {
return TransactionContext.provide({ tx, effects }, () => callback(tx));
},
{
isolationLevel: isolationLevel || "read committed",
},
);
await Promise.all(effects.map((x) => x()));
return result as T;
}
}

View File

@@ -1,15 +1,17 @@
import { z } from "zod";
import { fn } from "../utils";
import { Resource } from "sst";
import { fn, memo } from "../utils";
import { Polar as PolarSdk } from "@polar-sh/sdk";
import { validateEvent } from "@polar-sh/sdk/webhooks";
export namespace Polar {
export const client = () =>
new PolarSdk({
accessToken: Resource.POLAR_API_KEY.value,
server: Resource.App.stage !== "production" ? "sandbox" : "production",
});
export const client = memo(
() =>
new PolarSdk({
accessToken: Resource.POLAR_API_KEY.value,
server: Resource.App.stage !== "production" ? "sandbox" : "production",
}),
);
export const fromUserEmail = fn(z.string().min(1), async (email) => {
try {

View File

@@ -1,5 +1,6 @@
import { z } from "zod";
import { Common } from "../common";
import { Database } from "../drizzle";
import { createEvent } from "../event";
import { Polar } from "../polar/index";
import { createID, fn } from "../utils";
@@ -7,180 +8,165 @@ import { userTable } from "./user.sql";
import { Examples } from "../examples";
import { and, eq, isNull, asc } from "drizzle-orm";
import { ErrorCodes, VisibleError } from "../error";
import { createTransaction, useTransaction } from "../drizzle/transaction";
export namespace User {
export const Info = z
.object({
id: z.string().openapi({
description: Common.IdDescription,
example: Examples.User.id,
}),
name: z.string().regex(/^[a-zA-Z ]{1,32}$/, "Use a friendly name.").openapi({
description: "The name of this account",
example: Examples.User.name
}),
polarCustomerID: z.string().nullable().openapi({
description: "Associated Polar.sh customer identifier",
example: Examples.User.polarCustomerID,
}),
avatarUrl: z.string().url().nullable().openapi({
description: "The url to the profile picture",
example: Examples.User.avatarUrl
}),
email: z.string().openapi({
description: "Primary email address for user notifications and authentication",
example: Examples.User.email,
}),
lastLogin: z.date().openapi({
description: "Timestamp of user's most recent authentication",
example: Examples.User.lastLogin
})
})
export const Info = z
.object({
id: z.string().openapi({
description: Common.IdDescription,
example: Examples.User.id,
}),
name: z
.string()
.regex(/^[a-zA-Z ]{1,32}$/, "Use a friendly name.")
.openapi({
ref: "User",
description: "User account entity with core identification and authentication details",
example: Examples.User,
});
description: "The name of this account",
example: Examples.User.name,
}),
polarCustomerID: z.string().nullable().openapi({
description: "Associated Polar.sh customer identifier",
example: Examples.User.polarCustomerID,
}),
avatarUrl: z.string().url().nullable().openapi({
description: "The url to the profile picture",
example: Examples.User.avatarUrl,
}),
email: z.string().openapi({
description:
"Primary email address for user notifications and authentication",
example: Examples.User.email,
}),
lastLogin: z.date().openapi({
description: "Timestamp of user's most recent authentication",
example: Examples.User.lastLogin,
}),
})
.openapi({
ref: "User",
description:
"User account entity with core identification and authentication details",
example: Examples.User,
});
export type Info = z.infer<typeof Info>;
export type Info = z.infer<typeof Info>;
export class UserExistsError extends VisibleError {
constructor(username: string) {
super(
"already_exists",
ErrorCodes.Validation.ALREADY_EXISTS,
`A user with this email ${username} already exists`
);
}
export class UserExistsError extends VisibleError {
constructor(username: string) {
super(
"already_exists",
ErrorCodes.Validation.ALREADY_EXISTS,
`A user with this email ${username} already exists`,
);
}
}
export const Events = {
Created: createEvent(
"user.created",
z.object({
userID: Info.shape.id,
}),
),
};
export const Events = {
Created: createEvent(
"user.created",
z.object({
userID: Info.shape.id,
}),
),
};
export const create = fn(
Info
.omit({
lastLogin: true,
polarCustomerID: true,
}).partial({
avatarUrl: true,
id: true
}),
async (input) => {
const userID = createID("user")
export const create = fn(
Info.omit({
lastLogin: true,
polarCustomerID: true,
}).partial({
avatarUrl: true,
id: true,
}),
async (input) => {
const userID = createID("user");
const customer = await Polar.fromUserEmail(input.email)
const customer = await Polar.fromUserEmail(input.email);
const id = input.id ?? userID;
const id = input.id ?? userID;
await createTransaction(async (tx) => {
const result = await tx
.insert(userTable)
.values({
id,
avatarUrl: input.avatarUrl,
email: input.email,
name: input.name,
polarCustomerID: customer?.id,
lastLogin: Common.utc()
})
.onConflictDoNothing({
target: [userTable.email]
})
if (result.count === 0) {
throw new UserExistsError(input.email)
}
})
return id;
})
export const fromEmail = fn(
Info.shape.email.min(1),
async (email) =>
useTransaction(async (tx) =>
tx
.select()
.from(userTable)
.where(
and(
eq(userTable.email, email),
isNull(userTable.timeDeleted)
)
)
.orderBy(asc(userTable.timeCreated))
.execute()
.then(rows => rows.map(serialize).at(0))
)
)
export const fromID = fn(
Info.shape.id.min(1),
(id) =>
useTransaction(async (tx) =>
tx
.select()
.from(userTable)
.where(
and(
eq(userTable.id, id),
isNull(userTable.timeDeleted)
)
)
.orderBy(asc(userTable.timeCreated))
.execute()
.then(rows => rows.map(serialize).at(0))
),
)
export const remove = fn(
Info.shape.id.min(1),
(id) =>
useTransaction(async (tx) => {
await tx
.update(userTable)
.set({
timeDeleted: Common.utc(),
})
.where(and(eq(userTable.id, id)))
.execute();
return id;
}),
);
export const acknowledgeLogin = fn(
Info.shape.id,
(id) =>
useTransaction(async (tx) =>
tx
.update(userTable)
.set({
lastLogin: Common.utc(),
})
.where(and(eq(userTable.id, id)))
.execute()
),
)
export function serialize(
input: typeof userTable.$inferSelect
): z.infer<typeof Info> {
return {
id: input.id,
name: input.name,
email: input.email,
await Database.transaction(async (tx) => {
const result = await tx
.insert(userTable)
.values({
id,
avatarUrl: input.avatarUrl,
lastLogin: input.lastLogin,
polarCustomerID: input.polarCustomerID,
email: input.email,
name: input.name,
polarCustomerID: customer?.id,
lastLogin: Common.utc(),
})
.onConflictDoNothing({
target: [userTable.email],
});
if (result.rowCount === 0) {
throw new UserExistsError(input.email);
}
}
}
});
return id;
},
);
export const fromEmail = fn(Info.shape.email.min(1), async (email) =>
Database.transaction(async (tx) =>
tx
.select()
.from(userTable)
.where(and(eq(userTable.email, email), isNull(userTable.timeDeleted)))
.orderBy(asc(userTable.timeCreated))
.execute()
.then((rows) => rows.map(serialize).at(0)),
),
);
export const fromID = fn(Info.shape.id.min(1), (id) =>
Database.transaction(async (tx) =>
tx
.select()
.from(userTable)
.where(and(eq(userTable.id, id), isNull(userTable.timeDeleted)))
.orderBy(asc(userTable.timeCreated))
.execute()
.then((rows) => rows.map(serialize).at(0)),
),
);
export const remove = fn(Info.shape.id.min(1), (id) =>
Database.transaction(async (tx) => {
await tx
.update(userTable)
.set({
timeDeleted: Common.utc(),
})
.where(and(eq(userTable.id, id)))
.execute();
return id;
}),
);
export const acknowledgeLogin = fn(Info.shape.id, (id) =>
Database.transaction(async (tx) =>
tx
.update(userTable)
.set({
lastLogin: Common.utc(),
})
.where(and(eq(userTable.id, id)))
.execute(),
),
);
export function serialize(
input: typeof userTable.$inferSelect,
): z.infer<typeof Info> {
return {
id: input.id,
name: input.name,
email: input.email,
avatarUrl: input.avatarUrl,
lastLogin: input.lastLogin,
polarCustomerID: input.polarCustomerID,
};
}
}

View File

@@ -1,5 +1,6 @@
export * from "./id"
export * from "./fn"
export * from "./log"
export * from "./invite"
export * from "./helper"
export * from "./id";
export * from "./fn";
export * from "./log";
export * from "./invite";
export * from "./helper";
export * from "./memo";

View File

@@ -0,0 +1,18 @@
export function memo<T>(fn: () => T, cleanup?: (input: T) => Promise<void>) {
let value: T | undefined;
let loaded = false;
const result = (): T => {
if (loaded) return value as T;
loaded = true;
value = fn();
return value as T;
};
result.reset = async () => {
if (cleanup && value) await cleanup(value);
loaded = false;
value = undefined;
};
return result;
}