Files
netris-nestri/packages/moq/transfork/publisher.ts
Wanjohi 379db1c87b feat: Add streaming support (#125)
This adds:
- [x] Keyboard and mouse handling on the frontend
- [x] Video and audio streaming from the backend to the frontend
- [x] Input server that works with Websockets

Update - 17/11
- [ ] Master docker container to run this
- [ ] Steam runtime
- [ ] Entrypoint.sh

---------

Co-authored-by: Kristian Ollikainen <14197772+DatCaptainHorse@users.noreply.github.com>
Co-authored-by: Kristian Ollikainen <DatCaptainHorse@users.noreply.github.com>
2024-12-08 14:54:56 +03:00

174 lines
4.0 KiB
TypeScript

import { Watch } from "../common/async"
import { Closed } from "./error"
import * as Message from "./message"
import type { GroupReader, TrackReader } from "./model"
import { type Stream, Writer } from "./stream"
export class Publisher {
#quic: WebTransport
// Our announced broadcasts.
#announce = new Map<string[], TrackReader>()
// Their subscribed tracks.
#subscribe = new Map<bigint, Subscribed>()
constructor(quic: WebTransport) {
this.#quic = quic
}
// Publish a track
publish(track: TrackReader) {
if (this.#announce.has(track.path)) {
throw new Error(`already announced: ${track.path.toString()}`)
}
this.#announce.set(track.path, track)
// TODO: clean up announcements
// track.closed().then(() => this.#announce.delete(track.path))
}
#get(path: string[]): TrackReader | undefined {
return this.#announce.get(path)
}
async runAnnounce(msg: Message.AnnounceInterest, stream: Stream) {
for (const announce of this.#announce.values()) {
if (announce.path.length < msg.prefix.length) continue
const prefix = announce.path.slice(0, msg.prefix.length)
if (prefix !== msg.prefix) continue
const suffix = announce.path.slice(msg.prefix.length)
const active = new Message.Announce(suffix, "active")
await active.encode(stream.writer)
}
// TODO support updates.
// Until then, just keep the stream open.
await stream.reader.closed()
}
async runSubscribe(msg: Message.Subscribe, stream: Stream) {
if (this.#subscribe.has(msg.id)) {
throw new Error(`duplicate subscribe for id: ${msg.id}`)
}
const track = this.#get(msg.path)
if (!track) {
await stream.writer.reset(404)
return
}
const subscribe = new Subscribed(msg, track, this.#quic)
// TODO close the stream when done
subscribe.run().catch((err) => console.warn("failed to run subscribe: ", err))
try {
const info = new Message.Info(track.priority)
info.order = track.order
info.latest = track.latest
await info.encode(stream.writer)
for (;;) {
// TODO try_decode
const update = await Message.SubscribeUpdate.decode_maybe(stream.reader)
if (!update) {
subscribe.close()
break
}
// TODO use the update
}
} catch (err) {
subscribe.close(Closed.from(err))
}
}
async runDatagrams(msg: Message.Datagrams, stream: Stream) {
await stream.writer.reset(501)
throw new Error("datagrams not implemented")
}
async runFetch(msg: Message.Fetch, stream: Stream) {
await stream.writer.reset(501)
throw new Error("fetch not implemented")
}
async runInfo(msg: Message.InfoRequest, stream: Stream) {
const track = this.#get(msg.path)
if (!track) {
await stream.writer.reset(404)
return
}
const info = new Message.Info(track.priority)
info.order = track.order
info.latest = track.latest
await info.encode(stream.writer)
throw new Error("info not implemented")
}
}
class Subscribed {
#id: bigint
#track: TrackReader
#quic: WebTransport
#closed = new Watch<Closed | undefined>(undefined)
constructor(msg: Message.Subscribe, track: TrackReader, quic: WebTransport) {
this.#id = msg.id
this.#track = track
this.#quic = quic
}
async run() {
const closed = this.closed()
for (;;) {
const [group, done] = await Promise.all([this.#track.nextGroup(), closed])
if (done) return
if (!group) break
this.#runGroup(group).catch((err) => console.warn("failed to run group: ", err))
}
// TODO wait until all groups are done
this.close()
}
async #runGroup(group: GroupReader) {
const msg = new Message.Group(this.#id, group.id)
const stream = await Writer.open(this.#quic, msg)
for (;;) {
const frame = await group.readFrame()
if (!frame) break
await stream.u53(frame.byteLength)
await stream.write(frame)
}
}
close(err = new Closed()) {
this.#closed.update(err)
this.#track.close()
}
async closed(): Promise<Closed> {
let [closed, next] = this.#closed.value()
for (;;) {
if (closed !== undefined) return closed
if (!next) return new Closed()
;[closed, next] = await next
}
}
}