mirror of
https://github.com/nestriness/nestri.git
synced 2025-12-12 08:45:38 +02:00
⭐ feat: Upgrade to asynchronous event bus with retry queue and backoff strategy (#290)
## Description <!-- Briefly describe the purpose and scope of your changes --> <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit - **New Features** - Introduced a retry and dead-letter queue system for more robust event processing. - Added a retry handler for processing failed Lambda invocations with exponential backoff. - Enhanced event handling to support retry logic and improved error management. - **Refactor** - Replaced SQS-based library event processing with an event bus-based approach. - Updated event names and structure for improved clarity and consistency. - Removed legacy library queue and related infrastructure. - **Chores** - Updated dependencies to include the AWS Lambda client. - Cleaned up unused code and removed deprecated event handling logic. <!-- end of auto-generated comment: release notes by coderabbit.ai -->
This commit is contained in:
@@ -20,6 +20,7 @@
|
||||
"dependencies": {
|
||||
"@actor-core/bun": "^0.8.0",
|
||||
"@actor-core/file-system": "^0.8.0",
|
||||
"@aws-sdk/client-lambda": "^3.821.0",
|
||||
"@aws-sdk/client-s3": "^3.806.0",
|
||||
"@aws-sdk/client-sqs": "^3.806.0",
|
||||
"@nestri/core": "workspace:",
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
import { z } from "zod";
|
||||
import { Hono } from "hono";
|
||||
import { Resource } from "sst";
|
||||
import { bus } from "sst/aws/bus";
|
||||
import { Actor } from "@nestri/core/actor";
|
||||
import { describeRoute } from "hono-openapi";
|
||||
import { User } from "@nestri/core/user/index";
|
||||
@@ -12,10 +13,8 @@ import { Friend } from "@nestri/core/friend/index";
|
||||
import { Library } from "@nestri/core/library/index";
|
||||
import { chunkArray } from "@nestri/core/utils/helper";
|
||||
import { ErrorCodes, VisibleError } from "@nestri/core/error";
|
||||
import { SendMessageCommand, SQSClient } from "@aws-sdk/client-sqs";
|
||||
import { ErrorResponses, validator, Result, notPublic } from "./utils";
|
||||
|
||||
const sqs = new SQSClient({});
|
||||
|
||||
export namespace SteamApi {
|
||||
export const route = new Hono()
|
||||
@@ -168,20 +167,17 @@ export namespace SteamApi {
|
||||
steamID: currentSteamID,
|
||||
},
|
||||
async () => {
|
||||
const payload = await Library.Events.Queue.create({
|
||||
appID: game.appid,
|
||||
lastPlayed: game.rtime_last_played ? new Date(game.rtime_last_played * 1000) : null,
|
||||
totalPlaytime: game.playtime_forever
|
||||
});
|
||||
|
||||
await sqs.send(
|
||||
new SendMessageCommand({
|
||||
QueueUrl: Resource.LibraryQueue.url,
|
||||
// Prevent bombarding Steam with requests at the same time
|
||||
DelaySeconds: 10,
|
||||
MessageBody: JSON.stringify(payload),
|
||||
})
|
||||
await bus.publish(
|
||||
Resource.Bus,
|
||||
Library.Events.Add,
|
||||
{
|
||||
appID: game.appid,
|
||||
totalPlaytime: game.playtime_forever,
|
||||
lastPlayed: game.rtime_last_played ? new Date(game.rtime_last_played * 1000) : null,
|
||||
}
|
||||
)
|
||||
|
||||
}
|
||||
)
|
||||
})
|
||||
|
||||
@@ -9,12 +9,14 @@ import { Images } from "@nestri/core/images/index";
|
||||
import { Library } from "@nestri/core/library/index";
|
||||
import { BaseGame } from "@nestri/core/base-game/index";
|
||||
import { Categories } from "@nestri/core/categories/index";
|
||||
import { ImageTypeEnum } from "@nestri/core/images/images.sql";
|
||||
import { PutObjectCommand, S3Client, HeadObjectCommand } from "@aws-sdk/client-s3";
|
||||
|
||||
const s3 = new S3Client({});
|
||||
|
||||
export const handler = bus.subscriber(
|
||||
[
|
||||
Library.Events.Add,
|
||||
BaseGame.Events.New,
|
||||
Steam.Events.Updated,
|
||||
Steam.Events.Created,
|
||||
@@ -114,125 +116,154 @@ export const handler = bus.subscriber(
|
||||
|
||||
const images = await Client.createHeroArt(input);
|
||||
|
||||
const settled =
|
||||
await Promise.allSettled(
|
||||
images.map(async (image) => {
|
||||
await Images.create({
|
||||
type: image.type,
|
||||
imageHash: image.hash,
|
||||
baseGameID: input.appID,
|
||||
position: image.position,
|
||||
fileSize: image.fileSize,
|
||||
sourceUrl: image.sourceUrl,
|
||||
dimensions: image.dimensions,
|
||||
extractedColor: image.averageColor,
|
||||
});
|
||||
await Promise.all(
|
||||
images.map(async (image) => {
|
||||
await Images.create({
|
||||
type: image.type,
|
||||
imageHash: image.hash,
|
||||
baseGameID: input.appID,
|
||||
position: image.position,
|
||||
fileSize: image.fileSize,
|
||||
sourceUrl: image.sourceUrl,
|
||||
dimensions: image.dimensions,
|
||||
extractedColor: image.averageColor,
|
||||
});
|
||||
|
||||
try {
|
||||
//Check whether the image already exists
|
||||
await s3.send(
|
||||
new HeadObjectCommand({
|
||||
Bucket: Resource.Storage.name,
|
||||
Key: `images/${image.hash}`,
|
||||
})
|
||||
);
|
||||
try {
|
||||
//Check whether the image already exists
|
||||
await s3.send(
|
||||
new HeadObjectCommand({
|
||||
Bucket: Resource.Storage.name,
|
||||
Key: `images/${image.hash}`,
|
||||
})
|
||||
);
|
||||
|
||||
} catch (e) {
|
||||
// Save to s3 because it doesn't already exist
|
||||
await s3.send(
|
||||
new PutObjectCommand({
|
||||
Bucket: Resource.Storage.name,
|
||||
Key: `images/${image.hash}`,
|
||||
Body: image.buffer,
|
||||
...(image.format && { ContentType: `image/${image.format}` }),
|
||||
Metadata: {
|
||||
type: image.type,
|
||||
appID: input.appID,
|
||||
}
|
||||
})
|
||||
)
|
||||
}
|
||||
})
|
||||
)
|
||||
|
||||
settled
|
||||
.filter(r => r.status === "rejected")
|
||||
.forEach(r => console.warn("[processHeroArt] failed:", (r as PromiseRejectedResult).reason));
|
||||
} catch (e) {
|
||||
// Save to s3 because it doesn't already exist
|
||||
await s3.send(
|
||||
new PutObjectCommand({
|
||||
Bucket: Resource.Storage.name,
|
||||
Key: `images/${image.hash}`,
|
||||
Body: image.buffer,
|
||||
...(image.format && { ContentType: `image/${image.format}` }),
|
||||
Metadata: {
|
||||
type: image.type,
|
||||
appID: input.appID,
|
||||
}
|
||||
})
|
||||
)
|
||||
}
|
||||
})
|
||||
)
|
||||
|
||||
break;
|
||||
}
|
||||
// case "steam_account.updated":
|
||||
// case "steam_account.created": {
|
||||
// //Get user library and commit it to the db
|
||||
// const steamID = event.properties.steamID;
|
||||
case "library.add": {
|
||||
|
||||
// await Actor.provide(
|
||||
// event.metadata.actor.type,
|
||||
// event.metadata.actor.properties,
|
||||
// async () => {
|
||||
// //Get user library
|
||||
// const gameLibrary = await Client.getUserLibrary(steamID);
|
||||
await Actor.provide(
|
||||
event.metadata.actor.type,
|
||||
event.metadata.actor.properties,
|
||||
async () => {
|
||||
const game = event.properties
|
||||
// First check whether the base_game exists, if not get it
|
||||
const appID = game.appID.toString();
|
||||
const exists = await BaseGame.fromID(appID);
|
||||
|
||||
// const myLibrary = new Map(gameLibrary.response.games.map(g => [g.appid, g]))
|
||||
if (!exists) {
|
||||
const appInfo = await Client.getAppInfo(appID);
|
||||
|
||||
// const queryLib = await Promise.allSettled(
|
||||
// gameLibrary.response.games.map(async (game) => {
|
||||
// return await Client.getAppInfo(game.appid.toString())
|
||||
// })
|
||||
// )
|
||||
await BaseGame.create({
|
||||
id: appID,
|
||||
name: appInfo.name,
|
||||
size: appInfo.size,
|
||||
slug: appInfo.slug,
|
||||
links: appInfo.links,
|
||||
score: appInfo.score,
|
||||
description: appInfo.description,
|
||||
releaseDate: appInfo.releaseDate,
|
||||
primaryGenre: appInfo.primaryGenre,
|
||||
compatibility: appInfo.compatibility,
|
||||
controllerSupport: appInfo.controllerSupport,
|
||||
})
|
||||
|
||||
// queryLib
|
||||
// .filter(i => i.status === "rejected")
|
||||
// .forEach(e => console.warn(`[getAppInfo]: Failed to get game metadata: ${e.reason}`))
|
||||
const allCategories = [...appInfo.tags, ...appInfo.genres, ...appInfo.publishers, ...appInfo.developers, ...appInfo.categories, ...appInfo.franchises]
|
||||
|
||||
// const gameInfo = queryLib.filter(i => i.status === "fulfilled").map(f => f.value)
|
||||
const uniqueCategories = Array.from(
|
||||
new Map(allCategories.map(c => [`${c.type}:${c.slug}`, c])).values()
|
||||
);
|
||||
|
||||
// const queryGames = gameInfo.map(async (game) => {
|
||||
// await BaseGame.create(game);
|
||||
await Promise.all(
|
||||
uniqueCategories.map(async (cat) => {
|
||||
//Create category if it doesn't exist
|
||||
await Categories.create({
|
||||
type: cat.type, slug: cat.slug, name: cat.name
|
||||
})
|
||||
|
||||
// const allCategories = [...game.tags, ...game.genres, ...game.publishers, ...game.developers];
|
||||
//Create game if it doesn't exist
|
||||
await Game.create({ baseGameID: appID, categorySlug: cat.slug, categoryType: cat.type })
|
||||
})
|
||||
)
|
||||
|
||||
// const uniqueCategories = Array.from(
|
||||
// new Map(allCategories.map(c => [`${c.type}:${c.slug}`, c])).values()
|
||||
// );
|
||||
const imageUrls = appInfo.images
|
||||
|
||||
// const gameSettled = await Promise.allSettled(
|
||||
// uniqueCategories.map(async (cat) => {
|
||||
// //Use a single db transaction to get or set the category
|
||||
// await Categories.create({
|
||||
// type: cat.type, slug: cat.slug, name: cat.name
|
||||
// })
|
||||
await Promise.all(
|
||||
ImageTypeEnum.enumValues.map(async (type) => {
|
||||
switch (type) {
|
||||
case "backdrop": {
|
||||
await bus.publish(Resource.Bus, BaseGame.Events.New, { appID, type: "backdrop", url: imageUrls.backdrop })
|
||||
break;
|
||||
}
|
||||
case "banner": {
|
||||
await bus.publish(Resource.Bus, BaseGame.Events.New, { appID, type: "banner", url: imageUrls.banner })
|
||||
break;
|
||||
}
|
||||
case "icon": {
|
||||
await bus.publish(Resource.Bus, BaseGame.Events.New, { appID, type: "icon", url: imageUrls.icon })
|
||||
break;
|
||||
}
|
||||
case "logo": {
|
||||
await bus.publish(Resource.Bus, BaseGame.Events.New, { appID, type: "logo", url: imageUrls.logo })
|
||||
break;
|
||||
}
|
||||
case "poster": {
|
||||
await bus.publish(
|
||||
Resource.Bus,
|
||||
BaseGame.Events.New,
|
||||
{ appID, type: "poster", url: imageUrls.poster }
|
||||
)
|
||||
break;
|
||||
}
|
||||
case "heroArt": {
|
||||
await bus.publish(
|
||||
Resource.Bus,
|
||||
BaseGame.Events.NewHeroArt,
|
||||
{ appID, backdropUrl: imageUrls.backdrop, screenshots: imageUrls.screenshots }
|
||||
)
|
||||
break;
|
||||
}
|
||||
case "boxArt": {
|
||||
await bus.publish(
|
||||
Resource.Bus,
|
||||
BaseGame.Events.NewBoxArt,
|
||||
{ appID, logoUrl: imageUrls.logo, backgroundUrl: imageUrls.backdrop }
|
||||
)
|
||||
break;
|
||||
}
|
||||
}
|
||||
})
|
||||
)
|
||||
}
|
||||
|
||||
// // Use a single db transaction to get or create the game
|
||||
// await Game.create({ baseGameID: game.id, categorySlug: cat.slug, categoryType: cat.type })
|
||||
// })
|
||||
// )
|
||||
// Add to user's library
|
||||
await Library.add({
|
||||
baseGameID: appID,
|
||||
lastPlayed: game.lastPlayed ? new Date(game.lastPlayed) : null,
|
||||
totalPlaytime: game.totalPlaytime,
|
||||
})
|
||||
})
|
||||
|
||||
// gameSettled
|
||||
// .filter(r => r.status === "rejected")
|
||||
// .forEach(r => console.warn("[uniqueCategories] failed:", (r as PromiseRejectedResult).reason));
|
||||
|
||||
// const currentGameInLibrary = myLibrary.get(parseInt(game.id))
|
||||
// if (currentGameInLibrary) {
|
||||
// await Library.add({
|
||||
// baseGameID: game.id,
|
||||
// lastPlayed: currentGameInLibrary.rtime_last_played ? new Date(currentGameInLibrary.rtime_last_played * 1000) : null,
|
||||
// totalPlaytime: currentGameInLibrary.playtime_forever,
|
||||
// })
|
||||
// } else {
|
||||
// throw new Error(`Game is not in library, but was found in app info:${game.id}`)
|
||||
// }
|
||||
// })
|
||||
|
||||
// const settled = await Promise.allSettled(queryGames);
|
||||
|
||||
// settled
|
||||
// .filter(i => i.status === "rejected")
|
||||
// .forEach(e => console.warn(`[gameCreate]: Failed to create game: ${e.reason}`))
|
||||
// })
|
||||
|
||||
// break;
|
||||
// }
|
||||
break;
|
||||
}
|
||||
}
|
||||
},
|
||||
);
|
||||
@@ -1,122 +0,0 @@
|
||||
import "zod-openapi/extend";
|
||||
import { Resource } from "sst";
|
||||
import { bus } from "sst/aws/bus";
|
||||
import { SQSHandler } from "aws-lambda";
|
||||
import { Actor } from "@nestri/core/actor";
|
||||
import { Game } from "@nestri/core/game/index";
|
||||
import { Client } from "@nestri/core/client/index";
|
||||
import { Library } from "@nestri/core/library/index";
|
||||
import { BaseGame } from "@nestri/core/base-game/index";
|
||||
import { Categories } from "@nestri/core/categories/index";
|
||||
import { ImageTypeEnum } from "@nestri/core/images/images.sql";
|
||||
|
||||
export const handler: SQSHandler = async (event) => {
|
||||
for (const record of event.Records) {
|
||||
const parsed = JSON.parse(
|
||||
record.body,
|
||||
) as typeof Library.Events.Queue.$payload;
|
||||
|
||||
await Actor.provide(
|
||||
parsed.metadata.actor.type,
|
||||
parsed.metadata.actor.properties,
|
||||
async () => {
|
||||
const game = parsed.properties
|
||||
// First check whether the base_game exists, if not get it
|
||||
const appID = game.appID.toString();
|
||||
const exists = await BaseGame.fromID(appID);
|
||||
|
||||
if (!exists) {
|
||||
const appInfo = await Client.getAppInfo(appID);
|
||||
|
||||
await BaseGame.create({
|
||||
id: appID,
|
||||
name: appInfo.name,
|
||||
size: appInfo.size,
|
||||
slug: appInfo.slug,
|
||||
links: appInfo.links,
|
||||
score: appInfo.score,
|
||||
description: appInfo.description,
|
||||
releaseDate: appInfo.releaseDate,
|
||||
primaryGenre: appInfo.primaryGenre,
|
||||
compatibility: appInfo.compatibility,
|
||||
controllerSupport: appInfo.controllerSupport,
|
||||
})
|
||||
|
||||
const allCategories = [...appInfo.tags, ...appInfo.genres, ...appInfo.publishers, ...appInfo.developers, ...appInfo.categories, ...appInfo.franchises]
|
||||
|
||||
const uniqueCategories = Array.from(
|
||||
new Map(allCategories.map(c => [`${c.type}:${c.slug}`, c])).values()
|
||||
);
|
||||
|
||||
await Promise.all(
|
||||
uniqueCategories.map(async (cat) => {
|
||||
//Create category if it doesn't exist
|
||||
await Categories.create({
|
||||
type: cat.type, slug: cat.slug, name: cat.name
|
||||
})
|
||||
|
||||
//Create game if it doesn't exist
|
||||
await Game.create({ baseGameID: appID, categorySlug: cat.slug, categoryType: cat.type })
|
||||
})
|
||||
)
|
||||
|
||||
const imageUrls = appInfo.images
|
||||
|
||||
await Promise.all(
|
||||
ImageTypeEnum.enumValues.map(async (type) => {
|
||||
switch (type) {
|
||||
case "backdrop": {
|
||||
await bus.publish(Resource.Bus, BaseGame.Events.New, { appID, type: "backdrop", url: imageUrls.backdrop })
|
||||
break;
|
||||
}
|
||||
case "banner": {
|
||||
await bus.publish(Resource.Bus, BaseGame.Events.New, { appID, type: "banner", url: imageUrls.banner })
|
||||
break;
|
||||
}
|
||||
case "icon": {
|
||||
await bus.publish(Resource.Bus, BaseGame.Events.New, { appID, type: "icon", url: imageUrls.icon })
|
||||
break;
|
||||
}
|
||||
case "logo": {
|
||||
await bus.publish(Resource.Bus, BaseGame.Events.New, { appID, type: "logo", url: imageUrls.logo })
|
||||
break;
|
||||
}
|
||||
case "poster": {
|
||||
await bus.publish(
|
||||
Resource.Bus,
|
||||
BaseGame.Events.New,
|
||||
{ appID, type: "poster", url: imageUrls.poster }
|
||||
)
|
||||
break;
|
||||
}
|
||||
case "heroArt": {
|
||||
await bus.publish(
|
||||
Resource.Bus,
|
||||
BaseGame.Events.NewHeroArt,
|
||||
{ appID, backdropUrl: imageUrls.backdrop, screenshots: imageUrls.screenshots }
|
||||
)
|
||||
break;
|
||||
}
|
||||
case "boxArt": {
|
||||
await bus.publish(
|
||||
Resource.Bus,
|
||||
BaseGame.Events.NewBoxArt,
|
||||
{ appID, logoUrl: imageUrls.logo, backgroundUrl: imageUrls.backdrop }
|
||||
)
|
||||
break;
|
||||
}
|
||||
}
|
||||
})
|
||||
)
|
||||
}
|
||||
|
||||
// Add to user's library
|
||||
await Library.add({
|
||||
baseGameID: appID,
|
||||
lastPlayed: game.lastPlayed ? new Date(game.lastPlayed) : null,
|
||||
totalPlaytime: game.totalPlaytime,
|
||||
})
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
93
packages/functions/src/queues/retry.ts
Normal file
93
packages/functions/src/queues/retry.ts
Normal file
@@ -0,0 +1,93 @@
|
||||
import { Resource } from "sst";
|
||||
import type { SQSHandler } from "aws-lambda";
|
||||
import {
|
||||
SQSClient,
|
||||
SendMessageCommand
|
||||
} from "@aws-sdk/client-sqs";
|
||||
import {
|
||||
LambdaClient,
|
||||
InvokeCommand,
|
||||
GetFunctionCommand,
|
||||
ResourceNotFoundException,
|
||||
} from "@aws-sdk/client-lambda";
|
||||
|
||||
const lambda = new LambdaClient({});
|
||||
lambda.middlewareStack.remove("recursionDetectionMiddleware");
|
||||
const sqs = new SQSClient({});
|
||||
sqs.middlewareStack.remove("recursionDetectionMiddleware");
|
||||
|
||||
export const handler: SQSHandler = async (evt) => {
|
||||
for (const record of evt.Records) {
|
||||
const parsed = JSON.parse(record.body);
|
||||
console.log("body", parsed);
|
||||
const functionName = parsed.requestContext.functionArn
|
||||
.replace(":$LATEST", "")
|
||||
.split(":")
|
||||
.pop();
|
||||
if (parsed.responsePayload) {
|
||||
const attempt = (parsed.requestPayload.attempts || 0) + 1;
|
||||
|
||||
const info = await lambda.send(
|
||||
new GetFunctionCommand({
|
||||
FunctionName: functionName,
|
||||
}),
|
||||
);
|
||||
const max =
|
||||
Number.parseInt(
|
||||
info.Configuration?.Environment?.Variables?.RETRIES || "",
|
||||
) || 0;
|
||||
console.log("max retries", max);
|
||||
if (attempt > max) {
|
||||
console.log(`giving up after ${attempt} retries`);
|
||||
// send to dlq
|
||||
await sqs.send(
|
||||
new SendMessageCommand({
|
||||
QueueUrl: Resource.Dlq.url,
|
||||
MessageBody: JSON.stringify({
|
||||
requestPayload: parsed.requestPayload,
|
||||
requestContext: parsed.requestContext,
|
||||
responsePayload: parsed.responsePayload,
|
||||
}),
|
||||
}),
|
||||
);
|
||||
return;
|
||||
}
|
||||
const seconds = Math.min(Math.pow(2, attempt), 900);
|
||||
console.log(
|
||||
"delaying retry by ",
|
||||
seconds,
|
||||
"seconds for attempt",
|
||||
attempt,
|
||||
);
|
||||
parsed.requestPayload.attempts = attempt;
|
||||
await sqs.send(
|
||||
new SendMessageCommand({
|
||||
QueueUrl: Resource.RetryQueue.url,
|
||||
DelaySeconds: seconds,
|
||||
MessageBody: JSON.stringify({
|
||||
requestPayload: parsed.requestPayload,
|
||||
requestContext: parsed.requestContext,
|
||||
}),
|
||||
}),
|
||||
);
|
||||
}
|
||||
|
||||
if (!parsed.responsePayload) {
|
||||
console.log("triggering function");
|
||||
try {
|
||||
await lambda.send(
|
||||
new InvokeCommand({
|
||||
InvocationType: "Event",
|
||||
Payload: Buffer.from(JSON.stringify(parsed.requestPayload)),
|
||||
FunctionName: functionName,
|
||||
}),
|
||||
);
|
||||
} catch (e) {
|
||||
if (e instanceof ResourceNotFoundException) {
|
||||
return;
|
||||
}
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user