🐜 fix: Add asynchronous bus for Steam account

This commit is contained in:
Wanjohi
2025-06-10 16:01:59 +03:00
parent 661d9d2e56
commit 8b07adb0fc
3 changed files with 99 additions and 91 deletions

View File

@@ -1,12 +1,14 @@
import { z } from "zod";
import { fn } from "../utils";
import { Resource } from "sst";
import { Actor } from "../actor";
import { bus } from "sst/aws/bus";
import { Common } from "../common";
import { Examples } from "../examples";
import { createEvent } from "../event";
import { eq, and, isNull, desc } from "drizzle-orm";
import { steamTable, StatusEnum, Limitations } from "./steam.sql";
import { createTransaction, useTransaction } from "../drizzle/transaction";
import { afterTx, createTransaction, useTransaction } from "../drizzle/transaction";
export namespace Steam {
export const Info = z
@@ -122,9 +124,9 @@ export namespace Steam {
lastSyncedAt: input.lastSyncedAt ?? Common.utc(),
})
// await afterTx(async () =>
// bus.publish(Resource.Bus, Events.Created, { userID, steamID: input.id })
// );
await afterTx(async () =>
bus.publish(Resource.Bus, Events.Created, { userID, steamID: input.id })
);
return input.id
}),
@@ -149,9 +151,9 @@ export namespace Steam {
})
.where(eq(steamTable.id, input.steamID));
// await afterTx(async () =>
// bus.publish(Resource.Bus, Events.Updated, { userID, steamID: input.steamID })
// );
await afterTx(async () =>
bus.publish(Resource.Bus, Events.Updated, { userID, steamID: input.steamID })
);
return input.steamID
})

View File

@@ -110,89 +110,6 @@ export namespace SteamApi {
await Steam.updateOwner({ userID, steamID })
}
c.executionCtx.waitUntil((async () => {
try {
// Get friends info
const friends = await Client.getFriendsList(steamID);
const friendSteamIDs = friends.friendslist.friends.map(f => f.steamid);
// Steam API has a limit of requesting 100 friends at a go
const friendChunks = chunkArray(friendSteamIDs, 100);
const settled = await Promise.allSettled(
friendChunks.map(async (friendIDs) => {
const friendsInfo = await Client.getUserInfo(friendIDs)
return await Promise.all(
friendsInfo.map(async (friend) => {
const wasAdded = await Steam.create(friend);
if (!wasAdded) {
console.log(`Friend ${friend.id} already exists`)
}
await Friend.add({ friendSteamID: friend.id, steamID })
return friend.id
})
)
})
)
settled
.filter(result => result.status === 'rejected')
.forEach(result => console.warn('[putFriends] failed:', (result as PromiseRejectedResult).reason))
const prod = (Resource.App.stage === "production" || Resource.App.stage === "dev")
const friendIDs = [
steamID,
...(prod ? settled
.filter(result => result.status === "fulfilled")
.map(f => f.value)
.flat() : [])
]
await Promise.all(
friendIDs.map(async (currentSteamID) => {
// Get user library
const gameLibrary = await Client.getUserLibrary(currentSteamID);
const queryLib = await Promise.allSettled(
gameLibrary.response.games.map(async (game) => {
await Actor.provide(
"steam",
{
steamID: currentSteamID,
},
async () => {
await bus.publish(
Resource.Bus,
Library.Events.Add,
{
appID: game.appid,
totalPlaytime: game.playtime_forever,
lastPlayed: game.rtime_last_played ? new Date(game.rtime_last_played * 1000) : null,
}
)
}
)
})
)
queryLib
.filter(i => i.status === "rejected")
.forEach(e => console.warn(`[pushUserLib]: Failed to push user library to queue: ${e.reason}`))
})
)
} catch (error: any) {
console.error(`Failed to process Steam data for user ${userID}:`, error);
}
})())
return c.html(
`
<script>

View File

@@ -5,8 +5,10 @@ import { Actor } from "@nestri/core/actor";
import { Game } from "@nestri/core/game/index";
import { Steam } from "@nestri/core/steam/index";
import { Client } from "@nestri/core/client/index";
import { Friend } from "@nestri/core/friend/index";
import { Images } from "@nestri/core/images/index";
import { Library } from "@nestri/core/library/index";
import { chunkArray } from "@nestri/core/utils/index";
import { BaseGame } from "@nestri/core/base-game/index";
import { Categories } from "@nestri/core/categories/index";
import { ImageTypeEnum } from "@nestri/core/images/images.sql";
@@ -262,7 +264,94 @@ export const handler = bus.subscriber(
})
})
break;
break;
}
case "steam_account.created":
case "steam_account.updated": {
const userID = event.properties.userID;
try {
const steamID = event.properties.steamID;
// Get friends info
const friends = await Client.getFriendsList(steamID);
const friendSteamIDs = friends.friendslist.friends.map(f => f.steamid);
// Steam API has a limit of requesting 100 friends at a go
const friendChunks = chunkArray(friendSteamIDs, 100);
const settled = await Promise.allSettled(
friendChunks.map(async (friendIDs) => {
const friendsInfo = await Client.getUserInfo(friendIDs)
return await Promise.all(
friendsInfo.map(async (friend) => {
const wasAdded = await Steam.create(friend);
if (!wasAdded) {
console.log(`Friend ${friend.id} already exists`)
}
await Friend.add({ friendSteamID: friend.id, steamID })
return friend.id
})
)
})
)
settled
.filter(result => result.status === 'rejected')
.forEach(result => console.warn('[putFriends] failed:', (result as PromiseRejectedResult).reason))
const prod = (Resource.App.stage === "production" || Resource.App.stage === "dev")
const friendIDs = [
steamID,
...(prod ? settled
.filter(result => result.status === "fulfilled")
.map(f => f.value)
.flat() : [])
]
await Promise.all(
friendIDs.map(async (currentSteamID) => {
// Get user library
const gameLibrary = await Client.getUserLibrary(currentSteamID);
const queryLib = await Promise.allSettled(
gameLibrary.response.games.map(async (game) => {
await Actor.provide(
"steam",
{
steamID: currentSteamID,
},
async () => {
await bus.publish(
Resource.Bus,
Library.Events.Add,
{
appID: game.appid,
totalPlaytime: game.playtime_forever,
lastPlayed: game.rtime_last_played ? new Date(game.rtime_last_played * 1000) : null,
}
)
}
)
})
)
queryLib
.filter(i => i.status === "rejected")
.forEach(e => console.warn(`[pushUserLib]: Failed to push user library to queue: ${e.reason}`))
})
)
} catch (error: any) {
console.error(`Failed to process Steam data for user ${userID}:`, error);
}
break;
}
}
},