From 126839e6aec80f71f14e39f12b88ef182eda4bc5 Mon Sep 17 00:00:00 2001 From: Wanjohi Date: Fri, 18 Apr 2025 14:42:27 +0300 Subject: [PATCH] =?UTF-8?q?=E2=AD=90=20feat(core):=20Add=20usage=20billing?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- packages/core/package.json | 3 +- packages/core/src/billing/bandwidth.ts | 155 +++++++++++++++ packages/core/src/billing/gpu.ts | 262 +++++++++++++++++++++++++ packages/core/src/billing/index.ts | 3 + packages/core/src/billing/storage.ts | 78 ++++++++ 5 files changed, 500 insertions(+), 1 deletion(-) create mode 100644 packages/core/src/billing/bandwidth.ts create mode 100644 packages/core/src/billing/gpu.ts create mode 100644 packages/core/src/billing/index.ts create mode 100644 packages/core/src/billing/storage.ts diff --git a/packages/core/package.json b/packages/core/package.json index 4ca20d09..4a4b27de 100644 --- a/packages/core/package.json +++ b/packages/core/package.json @@ -32,7 +32,8 @@ "@instantdb/admin": "^0.17.7", "@openauthjs/openauth": "*", "@openauthjs/openevent": "^0.0.27", - "@polar-sh/sdk": "^0.26.1", + "@polar-sh/ingestion": "^0.2.2", + "@polar-sh/sdk": "^0.32.10", "drizzle-kit": "^0.30.5", "drizzle-orm": "^0.40.0", "postgres": "^3.4.5" diff --git a/packages/core/src/billing/bandwidth.ts b/packages/core/src/billing/bandwidth.ts new file mode 100644 index 00000000..83e789de --- /dev/null +++ b/packages/core/src/billing/bandwidth.ts @@ -0,0 +1,155 @@ +import { + type IngestionContext, + IngestionStrategy, + type IngestionStrategyCustomer, + type IngestionStrategyExternalCustomer, +} from "@polar-sh/ingestion"; + +// Define the context specific to bandwidth usage +export type BandwidthStrategyContext = IngestionContext<{ + megabytesTransferred: number; + durationSeconds: number; + direction: "inbound" | "outbound" | "both"; + protocol?: string; +}>; + +// Bandwidth rates per MB +const DEFAULT_CREDITS_PER_MB = 0.01; // 1 credit per 100 MB + +// Client interface for bandwidth tracking +export interface BandwidthClient { + // Track bandwidth usage + recordBandwidthUsage: (options: { + megabytesTransferred: number; + durationSeconds?: number; + direction?: "inbound" | "outbound" | "both"; + protocol?: string; + metadata?: Record; + }) => Promise<{ creditsUsed: number }>; + + // Start a bandwidth tracking session + startBandwidthSession: () => { + sessionId: string; + startTime: number; + }; + + // End a bandwidth tracking session with the total transferred + endBandwidthSession: (options: { + sessionId: string; + megabytesTransferred: number; + direction?: "inbound" | "outbound" | "both"; + protocol?: string; + metadata?: Record; + }) => Promise<{ + creditsUsed: number; + durationSeconds: number; + }>; +} + +// Bandwidth tracking strategy +export class BandwidthStrategy extends IngestionStrategy { + private creditsPerMB: number; + private activeSessions: Map = new Map(); + + constructor(options: { creditsPerMB?: number } = {}) { + super(); + this.creditsPerMB = options.creditsPerMB ?? DEFAULT_CREDITS_PER_MB; + } + + // Calculate credits for a specific amount of bandwidth + private calculateCredits(megabytesTransferred: number): number { + return megabytesTransferred * this.creditsPerMB; + } + + override client( + customer: IngestionStrategyCustomer | IngestionStrategyExternalCustomer + ): BandwidthClient { + const executionHandler = this.createExecutionHandler(); + + return { + // Record a single bandwidth usage event + recordBandwidthUsage: async ({ + megabytesTransferred, + durationSeconds = 0, + direction = "both", + protocol = "webrtc", + metadata = {} + }) => { + // Calculate credits used + const creditsUsed = this.calculateCredits(megabytesTransferred); + + // Create the bandwidth usage context + const usageContext: BandwidthStrategyContext = { + megabytesTransferred, + durationSeconds, + direction, + protocol, + ...metadata + }; + + // Send the usage data to Polar + await executionHandler(usageContext, customer); + + return { creditsUsed }; + }, + + // Start tracking a bandwidth session + startBandwidthSession: () => { + const sessionId = `bw-${Date.now()}-${Math.random().toString(36).substring(2, 9)}`; + const startTime = Date.now(); + + // Store the session data + this.activeSessions.set(sessionId, { + startTime, + customer + }); + + return { sessionId, startTime }; + }, + + // End a bandwidth tracking session + endBandwidthSession: async ({ + sessionId, + megabytesTransferred, + direction = "both", + protocol = "webrtc", + metadata = {} + }) => { + // Get the session data + const sessionData = this.activeSessions.get(sessionId); + if (!sessionData) { + throw new Error(`Session ${sessionId} not found`); + } + + // Calculate duration + const durationSeconds = (Date.now() - sessionData.startTime) / 1000; + + // Calculate credits used + const creditsUsed = this.calculateCredits(megabytesTransferred); + + // Create the bandwidth usage context + const usageContext: BandwidthStrategyContext = { + megabytesTransferred, + durationSeconds, + direction, + protocol, + ...metadata + }; + + // Send the usage data to Polar + await executionHandler(usageContext, customer); + + // Clean up session data + this.activeSessions.delete(sessionId); + + return { + creditsUsed, + durationSeconds + }; + } + }; + } +} \ No newline at end of file diff --git a/packages/core/src/billing/gpu.ts b/packages/core/src/billing/gpu.ts new file mode 100644 index 00000000..9ceb0197 --- /dev/null +++ b/packages/core/src/billing/gpu.ts @@ -0,0 +1,262 @@ +import { + type IngestionContext, + IngestionStrategy, + type IngestionStrategyCustomer, + type IngestionStrategyExternalCustomer, +} from "@polar-sh/ingestion"; + +// Define known GPU types and their credit rates +export type GPUModel = "3080" | "4080" | "4090" | string; + +// Map of GPU models to their credit costs per minute +const GPU_CREDIT_RATES: Record = { + "3080": 3, + "4080": 6, + "4090": 9, + // Default for unknown models + "default": 1 +}; + +// Define the context specific to GPU usage +export type GPUStrategyContext = IngestionContext<{ + gpuModel: GPUModel; + durationMinutes: number; + utilizationPercent: number; + memoryUsedMB: number; +}>; + +// Define the client interface we'll return +export interface GPUClient { + startSession: (options: { + gpuModel: GPUModel; + instanceId: string; + onMetrics?: (metrics: { + utilizationPercent: number; + memoryUsedMB: number; + }) => void; + }) => Promise<{ sessionId: string }>; + + endSession: (sessionId: string) => Promise<{ + durationMinutes: number; + creditsUsed: number; + finalMetrics: { + avgUtilization: number; + peakMemoryUsed: number; + } + }>; + + getActiveSessionMetrics: (sessionId: string) => Promise<{ + durationMinutes: number; + currentUtilization: number; + memoryUsedMB: number; + } | null>; +} + +export class GPUUsageStrategy extends IngestionStrategy { + private activeSessions: Map = new Map(); + + private metricsPollingInterval: NodeJS.Timeout | null = null; + + constructor() { + super(); + // Start the metrics polling system + this.startMetricsPolling(); + } + + private startMetricsPolling() { + // Poll every 30 seconds to collect metrics from active sessions + this.metricsPollingInterval = setInterval(() => { + this.collectMetricsForActiveSessions(); + }, 30000); + } + + private async collectMetricsForActiveSessions() { + for (const [sessionId, sessionData] of this.activeSessions.entries()) { + try { + // In a real implementation, you would call your GPU monitoring service here + const metrics = await this.fetchGPUMetrics(sessionData.instanceId, sessionData.gpuModel); + + // Store the metrics + sessionData.metrics.utilizationSamples.push(metrics.utilizationPercent); + sessionData.metrics.memoryUsageSamples.push(metrics.memoryUsedMB); + sessionData.metrics.lastUpdateTime = Date.now(); + + // Update the session data + this.activeSessions.set(sessionId, sessionData); + } catch (error) { + console.error(`Failed to collect metrics for session ${sessionId}:`, error); + } + } + } + + // This would connect to your actual GPU monitoring system + private async fetchGPUMetrics(instanceId: string, gpuModel: GPUModel): Promise<{ + utilizationPercent: number; + memoryUsedMB: number; + }> { + // Mock implementation - in a real system you would: + // 1. Call your GPU monitoring API + // 2. Parse and return the actual metrics + + // For this example, we're generating random values + return { + utilizationPercent: Math.floor(Math.random() * 100), + memoryUsedMB: Math.floor(Math.random() * + (gpuModel === "4090" ? 24000 : + gpuModel === "4080" ? 16000 : + gpuModel === "3080" ? 10000 : 8000)) + }; + } + + // Get credit rate for a given GPU model + private getCreditsForGPU(gpuModel: GPUModel): number { + return GPU_CREDIT_RATES[gpuModel] || GPU_CREDIT_RATES.default; + } + + // Calculate average from array of samples + private calculateAverage(samples: number[]): number { + if (samples.length === 0) return 0; + return samples.reduce((sum, value) => sum + value, 0) / samples.length; + } + + // Find maximum value in array of samples + private findPeakValue(samples: number[]): number { + if (samples.length === 0) return 0; + return Math.max(...samples); + } + + // Clean up resources when the strategy is no longer needed + public cleanup() { + if (this.metricsPollingInterval) { + clearInterval(this.metricsPollingInterval); + this.metricsPollingInterval = null; + } + } + + override client( + customer: IngestionStrategyCustomer | IngestionStrategyExternalCustomer + ): GPUClient { + const executionHandler = this.createExecutionHandler(); + + return { + startSession: async ({ gpuModel, instanceId, onMetrics }) => { + const sessionId = `${instanceId}-${Date.now()}`; + + // Initialize the session tracking + this.activeSessions.set(sessionId, { + startTime: Date.now(), + gpuModel, + instanceId, + customer, + metrics: { + utilizationSamples: [], + memoryUsageSamples: [], + lastUpdateTime: Date.now() + } + }); + + // Set up a callback for metrics if provided + if (onMetrics) { + const metricsInterval = setInterval(async () => { + const sessionData = this.activeSessions.get(sessionId); + if (!sessionData) { + clearInterval(metricsInterval); + return; + } + + const latestMetrics = await this.fetchGPUMetrics(instanceId, gpuModel); + onMetrics(latestMetrics); + }, 60000); // Send metrics callback once per minute + } + + return { sessionId }; + }, + + endSession: async (sessionId) => { + const sessionData = this.activeSessions.get(sessionId); + if (!sessionData) { + throw new Error(`Session ${sessionId} not found`); + } + + // Calculate duration in minutes + const durationMinutes = (Date.now() - sessionData.startTime) / (1000 * 60); + + // Calculate average utilization and peak memory + const avgUtilization = this.calculateAverage(sessionData.metrics.utilizationSamples); + const peakMemoryUsed = this.findPeakValue(sessionData.metrics.memoryUsageSamples); + + // Get credit rate for this GPU model + const creditsPerMinute = this.getCreditsForGPU(sessionData.gpuModel); + + // Calculate total credits used + const totalCredits = creditsPerMinute * durationMinutes; + + // Create the usage context for this GPU session + const usageContext: GPUStrategyContext = { + gpuModel: sessionData.gpuModel, + durationMinutes, + utilizationPercent: avgUtilization, + memoryUsedMB: peakMemoryUsed + }; + + // Send the usage data to Polar + await executionHandler(usageContext, sessionData.customer); + + // Clean up session tracking + this.activeSessions.delete(sessionId); + + return { + durationMinutes, + creditsUsed: Math.ceil(totalCredits), + finalMetrics: { + avgUtilization, + peakMemoryUsed + } + }; + }, + + getActiveSessionMetrics: async (sessionId) => { + const sessionData = this.activeSessions.get(sessionId); + if (!sessionData) { + return null; + } + + const currentDurationMinutes = (Date.now() - sessionData.startTime) / (1000 * 60); + + // Get the most recent metrics or fetch new ones if needed + const lastMetricsUpdateAgeMs = Date.now() - sessionData.metrics.lastUpdateTime; + + // If metrics are older than 2 minutes, fetch fresh data + let currentUtilization = 0; + let memoryUsedMB = 0; + + if (lastMetricsUpdateAgeMs > 120000) { + const freshMetrics = await this.fetchGPUMetrics(sessionData.instanceId, sessionData.gpuModel); + currentUtilization = freshMetrics.utilizationPercent; + memoryUsedMB = freshMetrics.memoryUsedMB; + } else if (sessionData.metrics.utilizationSamples.length > 0) { + // Use the most recent sample + const lastIdx = sessionData.metrics.utilizationSamples.length - 1; + currentUtilization = sessionData.metrics.utilizationSamples[lastIdx]; + memoryUsedMB = sessionData.metrics.memoryUsageSamples[lastIdx]; + } + + return { + durationMinutes: currentDurationMinutes, + currentUtilization, + memoryUsedMB + }; + } + }; + } +} \ No newline at end of file diff --git a/packages/core/src/billing/index.ts b/packages/core/src/billing/index.ts new file mode 100644 index 00000000..63924d91 --- /dev/null +++ b/packages/core/src/billing/index.ts @@ -0,0 +1,3 @@ +export namespace Billing { + +} \ No newline at end of file diff --git a/packages/core/src/billing/storage.ts b/packages/core/src/billing/storage.ts new file mode 100644 index 00000000..b7b91e30 --- /dev/null +++ b/packages/core/src/billing/storage.ts @@ -0,0 +1,78 @@ +import { + type IngestionContext, + IngestionStrategy, + type IngestionStrategyCustomer, + type IngestionStrategyExternalCustomer, +} from "@polar-sh/ingestion"; + +// Define the context specific to storage usage +export type StorageStrategyContext = IngestionContext<{ + storageSizeGB: number; + storageType: string; + storageLocation: string; +}>; + +// Define the client interface for storage operations +export interface StorageClient { + // Record a storage snapshot for a customer + recordStorageUsage: (options: { + storageSizeGB: number; + storageType?: string; + storageLocation?: string; + metadata?: Record; + }) => Promise; + + // Calculate costs for a specific storage amount + calculateStorageCost: (sizeGB: number) => { + creditsUsed: number; + ratePerGB: number; + }; +} + +// Storage strategy implementation +export class StorageStrategy extends IngestionStrategy { + // Rate in credits per GB + private creditsPerGB: number; + + constructor(options: { creditsPerGB?: number } = {}) { + super(); + // Default to 3 credits per GB, but allow customization + this.creditsPerGB = options.creditsPerGB ?? 3; + } + + override client( + customer: IngestionStrategyCustomer | IngestionStrategyExternalCustomer + ): StorageClient { + const executionHandler = this.createExecutionHandler(); + + return { + // Record storage usage for a customer + recordStorageUsage: async ({ + storageSizeGB, + storageType = "default", + storageLocation = "default", + metadata = {} + }) => { + // Create the storage usage context + const usageContext: StorageStrategyContext = { + storageSizeGB, + storageType, + storageLocation, + ...metadata + }; + + // Send the usage data to Polar + await executionHandler(usageContext, customer); + }, + + // Calculate the cost for a specific storage amount + calculateStorageCost: (sizeGB: number) => { + const creditsUsed = sizeGB * this.creditsPerGB; + return { + creditsUsed, + ratePerGB: this.creditsPerGB + }; + } + }; + } +} \ No newline at end of file