feat: Add streaming support (#125)

This adds:
- [x] Keyboard and mouse handling on the frontend
- [x] Video and audio streaming from the backend to the frontend
- [x] Input server that works with Websockets

Update - 17/11
- [ ] Master docker container to run this
- [ ] Steam runtime
- [ ] Entrypoint.sh

---------

Co-authored-by: Kristian Ollikainen <14197772+DatCaptainHorse@users.noreply.github.com>
Co-authored-by: Kristian Ollikainen <DatCaptainHorse@users.noreply.github.com>
This commit is contained in:
Wanjohi
2024-12-08 14:54:56 +03:00
committed by GitHub
parent 5eb21eeadb
commit 379db1c87b
137 changed files with 12737 additions and 5234 deletions

View File

@@ -0,0 +1,63 @@
import * as Hex from "../common/hex"
import { Connection } from "./connection"
import * as Message from "./message"
import { Stream } from "./stream"
export interface ClientConfig {
url: string
// If set, the server fingerprint will be fetched from this URL.
// This is required to use self-signed certificates with Chrome (May 2023)
fingerprint?: string
}
export class Client {
#fingerprint: Promise<WebTransportHash | undefined>
readonly config: ClientConfig
constructor(config: ClientConfig) {
this.config = config
this.#fingerprint = this.#fetchFingerprint(config.fingerprint).catch((e) => {
console.warn("failed to fetch fingerprint: ", e)
return undefined
})
}
async connect(): Promise<Connection> {
// Helper function to make creating a promise easier
const options: WebTransportOptions = {}
const fingerprint = await this.#fingerprint
if (fingerprint) options.serverCertificateHashes = [fingerprint]
const quic = new WebTransport(this.config.url, options)
await quic.ready
const client = new Message.SessionClient([Message.Version.FORK_02])
const stream = await Stream.open(quic, client)
const server = await Message.SessionServer.decode(stream.reader)
if (server.version !== Message.Version.FORK_02) {
throw new Error(`unsupported server version: ${server.version}`)
}
console.log(`established connection: version=${server.version}`)
return new Connection(quic, stream)
}
async #fetchFingerprint(url?: string): Promise<WebTransportHash | undefined> {
if (!url) return
// TODO remove this fingerprint when Chrome WebTransport accepts the system CA
const response = await fetch(url)
const bytes = Hex.decode(await response.text())
return {
algorithm: "sha-256",
value: bytes,
}
}
}

View File

@@ -0,0 +1,158 @@
import { asError } from "../common/error"
import * as Message from "./message"
import { Reader, Stream } from "./stream"
import type { Queue } from "../common/async"
import { Closed } from "./error"
import type { Track, TrackReader } from "./model"
import { Publisher } from "./publisher"
import { type Announced, Subscriber } from "./subscriber"
export class Connection {
// The established WebTransport session.
#quic: WebTransport
// Use to receive/send session messages.
#session: Stream
// Module for contributing tracks.
#publisher: Publisher
// Module for distributing tracks.
#subscriber: Subscriber
// Async work running in the background
#running: Promise<void>
constructor(quic: WebTransport, session: Stream) {
this.#quic = quic
this.#session = session
this.#publisher = new Publisher(this.#quic)
this.#subscriber = new Subscriber(this.#quic)
this.#running = this.#run()
}
close(code = 0, reason = "") {
this.#quic.close({ closeCode: code, reason })
}
async #run(): Promise<void> {
const session = this.#runSession().catch((err) => new Error("failed to run session: ", err))
const bidis = this.#runBidis().catch((err) => new Error("failed to run bidis: ", err))
const unis = this.#runUnis().catch((err) => new Error("failed to run unis: ", err))
await Promise.all([session, bidis, unis])
}
publish(track: TrackReader) {
this.#publisher.publish(track)
}
async announced(prefix: string[] = []): Promise<Queue<Announced>> {
return this.#subscriber.announced(prefix)
}
async subscribe(track: Track): Promise<TrackReader> {
return await this.#subscriber.subscribe(track)
}
async #runSession() {
// Receive messages until the connection is closed.
for (;;) {
const msg = await Message.SessionInfo.decode_maybe(this.#session.reader)
if (!msg) break
// TODO use the session info
}
}
async #runBidis() {
for (;;) {
const next = await Stream.accept(this.#quic)
if (!next) {
break
}
const [msg, stream] = next
this.#runBidi(msg, stream).catch((err) => stream.writer.reset(Closed.extract(err)))
}
}
async #runBidi(msg: Message.Bi, stream: Stream) {
console.debug("received bi stream: ", msg)
if (msg instanceof Message.SessionClient) {
throw new Error("duplicate session stream")
}
if (msg instanceof Message.AnnounceInterest) {
if (!this.#subscriber) {
throw new Error("not a subscriber")
}
return await this.#publisher.runAnnounce(msg, stream)
}
if (msg instanceof Message.Subscribe) {
if (!this.#publisher) {
throw new Error("not a publisher")
}
return await this.#publisher.runSubscribe(msg, stream)
}
if (msg instanceof Message.Datagrams) {
if (!this.#publisher) {
throw new Error("not a publisher")
}
return await this.#publisher.runDatagrams(msg, stream)
}
if (msg instanceof Message.Fetch) {
if (!this.#publisher) {
throw new Error("not a publisher")
}
return await this.#publisher.runFetch(msg, stream)
}
if (msg instanceof Message.InfoRequest) {
if (!this.#publisher) {
throw new Error("not a publisher")
}
return await this.#publisher.runInfo(msg, stream)
}
}
async #runUnis() {
for (;;) {
const next = await Reader.accept(this.#quic)
if (!next) {
break
}
const [msg, stream] = next
this.#runUni(msg, stream).catch((err) => stream.stop(Closed.extract(err)))
}
}
async #runUni(msg: Message.Uni, stream: Reader) {
console.debug("received uni stream: ", msg)
if (msg instanceof Message.Group) {
if (!this.#subscriber) {
throw new Error("not a subscriber")
}
return this.#subscriber.runGroup(msg, stream)
}
}
async closed(): Promise<Error> {
try {
await this.#running
return new Error("closed")
} catch (e) {
return asError(e)
}
}
}

View File

@@ -0,0 +1,20 @@
export class Closed extends Error {
readonly code?: number
constructor(code?: number) {
super(`closed code=${code}`)
this.code = code
}
static from(err: unknown): Closed {
return new Closed(Closed.extract(err))
}
static extract(err: unknown): number {
if (err instanceof WebTransportError && err.streamErrorCode !== null) {
return err.streamErrorCode
}
return 0
}
}

View File

@@ -0,0 +1,45 @@
import type { Reader, Writer } from "./stream"
export class FrameReader {
#stream: Reader
constructor(stream: Reader) {
this.#stream = stream
}
// Returns the next frame
async read(): Promise<Uint8Array | undefined> {
if (await this.#stream.done()) return
const size = await this.#stream.u53()
const payload = await this.#stream.read(size)
return payload
}
async stop(code: number) {
await this.#stream.stop(code)
}
}
export class FrameWriter {
#stream: Writer
constructor(stream: Writer) {
this.#stream = stream
}
// Writes the next frame
async write(payload: Uint8Array) {
await this.#stream.u53(payload.byteLength)
await this.#stream.write(payload)
}
async close() {
await this.#stream.close()
}
async reset(code: number) {
await this.#stream.reset(code)
}
}

View File

@@ -0,0 +1,7 @@
export { Client } from "./client"
export type { ClientConfig } from "./client"
export { Connection } from "./connection"
export { Track, Group } from "./model"
export { Announced } from "./subscriber"

View File

@@ -0,0 +1,428 @@
import type { Reader, Writer } from "./stream"
export enum Version {
DRAFT_00 = 0xff000000,
DRAFT_01 = 0xff000001,
DRAFT_02 = 0xff000002,
DRAFT_03 = 0xff000003,
FORK_00 = 0xff0bad00,
FORK_01 = 0xff0bad01,
FORK_02 = 0xff0bad02,
}
export class Extensions {
entries: Map<bigint, Uint8Array>
constructor() {
this.entries = new Map()
}
set(id: bigint, value: Uint8Array) {
this.entries.set(id, value)
}
get(id: bigint): Uint8Array | undefined {
return this.entries.get(id)
}
remove(id: bigint): Uint8Array | undefined {
const value = this.entries.get(id)
this.entries.delete(id)
return value
}
async encode(w: Writer) {
await w.u53(this.entries.size)
for (const [id, value] of this.entries) {
await w.u62(id)
await w.u53(value.length)
await w.write(value)
}
}
static async decode(r: Reader): Promise<Extensions> {
const count = await r.u53()
const params = new Extensions()
for (let i = 0; i < count; i++) {
const id = await r.u62()
const size = await r.u53()
const value = await r.read(size)
if (params.entries.has(id)) {
throw new Error(`duplicate parameter id: ${id}`)
}
params.entries.set(id, value)
}
return params
}
}
export enum Order {
Any = 0,
Ascending = 1,
Descending = 2,
}
export class SessionClient {
versions: Version[]
extensions: Extensions
static StreamID = 0x0
constructor(versions: Version[], extensions = new Extensions()) {
this.versions = versions
this.extensions = extensions
}
async encode(w: Writer) {
await w.u53(this.versions.length)
for (const v of this.versions) {
await w.u53(v)
}
await this.extensions.encode(w)
}
static async decode(r: Reader): Promise<SessionClient> {
const versions = []
const count = await r.u53()
for (let i = 0; i < count; i++) {
versions.push(await r.u53())
}
const extensions = await Extensions.decode(r)
return new SessionClient(versions, extensions)
}
}
export class SessionServer {
version: Version
extensions: Extensions
constructor(version: Version, extensions = new Extensions()) {
this.version = version
this.extensions = extensions
}
async encode(w: Writer) {
await w.u53(this.version)
await this.extensions.encode(w)
}
static async decode(r: Reader): Promise<SessionServer> {
const version = await r.u53()
const extensions = await Extensions.decode(r)
return new SessionServer(version, extensions)
}
}
export class SessionInfo {
bitrate: number
constructor(bitrate: number) {
this.bitrate = bitrate
}
async encode(w: Writer) {
await w.u53(this.bitrate)
}
static async decode(r: Reader): Promise<SessionInfo> {
const bitrate = await r.u53()
return new SessionInfo(bitrate)
}
static async decode_maybe(r: Reader): Promise<SessionInfo | undefined> {
if (await r.done()) return
return await SessionInfo.decode(r)
}
}
export type AnnounceStatus = "active" | "closed"
export class Announce {
suffix: string[]
status: AnnounceStatus
constructor(suffix: string[], status: AnnounceStatus) {
this.suffix = suffix
this.status = status
}
async encode(w: Writer) {
await w.u53(this.status === "active" ? 1 : 0)
await w.path(this.suffix)
}
static async decode(r: Reader): Promise<Announce> {
const status = (await r.u53()) === 1 ? "active" : "closed"
const suffix = await r.path()
return new Announce(suffix, status)
}
static async decode_maybe(r: Reader): Promise<Announce | undefined> {
if (await r.done()) return
return await Announce.decode(r)
}
}
export class AnnounceInterest {
static StreamID = 0x1
constructor(public prefix: string[]) {}
async encode(w: Writer) {
await w.path(this.prefix)
}
static async decode(r: Reader): Promise<AnnounceInterest> {
const prefix = await r.path()
return new AnnounceInterest(prefix)
}
}
export class SubscribeUpdate {
priority: number
order = Order.Any
expires = 0 // ms
start?: bigint
end?: bigint
constructor(priority: number) {
this.priority = priority
}
async encode(w: Writer) {
await w.u53(this.priority)
await w.u53(this.order)
await w.u53(this.expires)
await w.u62(this.start ? this.start + 1n : 0n)
await w.u62(this.end ? this.end + 1n : 0n)
}
static async decode(r: Reader): Promise<SubscribeUpdate> {
const priority = await r.u53()
const order = await r.u53()
if (order > 2) {
throw new Error(`invalid order: ${order}`)
}
const expires = await r.u53()
const start = await r.u62()
const end = await r.u62()
const update = new SubscribeUpdate(priority)
update.order = order
update.expires = expires
update.start = start === 0n ? undefined : start - 1n
update.end = end === 0n ? undefined : end - 1n
return update
}
static async decode_maybe(r: Reader): Promise<SubscribeUpdate | undefined> {
if (await r.done()) return
return await SubscribeUpdate.decode(r)
}
}
export class Subscribe extends SubscribeUpdate {
id: bigint
path: string[]
static StreamID = 0x2
constructor(id: bigint, path: string[], priority: number) {
super(priority)
this.id = id
this.path = path
}
async encode(w: Writer) {
await w.u62(this.id)
await w.path(this.path)
await super.encode(w)
}
static async decode(r: Reader): Promise<Subscribe> {
const id = await r.u62()
const path = await r.path()
const update = await SubscribeUpdate.decode(r)
const subscribe = new Subscribe(id, path, update.priority)
subscribe.order = update.order
subscribe.expires = update.expires
subscribe.start = update.start
subscribe.end = update.end
return subscribe
}
}
export class Datagrams extends Subscribe {
static StreamID = 0x3
}
export class Info {
priority: number
order = Order.Descending
expires = 0
latest?: number
constructor(priority: number) {
this.priority = priority
}
async encode(w: Writer) {
await w.u53(this.priority)
await w.u53(this.order)
await w.u53(this.expires)
await w.u53(this.latest ? this.latest + 1 : 0)
}
static async decode(r: Reader): Promise<Info> {
const priority = await r.u53()
const order = await r.u53()
const latest = await r.u53()
const info = new Info(priority)
info.latest = latest === 0 ? undefined : latest - 1
info.order = order
return info
}
}
export class InfoRequest {
path: string[]
static StreamID = 0x5
constructor(path: string[]) {
this.path = path
}
async encode(w: Writer) {
await w.path(this.path)
}
static async decode(r: Reader): Promise<InfoRequest> {
const path = await r.path()
return new InfoRequest(path)
}
}
export class FetchUpdate {
priority: number
constructor(priority: number) {
this.priority = priority
}
async encode(w: Writer) {
await w.u53(this.priority)
}
static async decode(r: Reader): Promise<FetchUpdate> {
return new FetchUpdate(await r.u53())
}
static async decode_maybe(r: Reader): Promise<FetchUpdate | undefined> {
if (await r.done()) return
return await FetchUpdate.decode(r)
}
}
export class Fetch extends FetchUpdate {
path: string[]
static StreamID = 0x4
constructor(path: string[], priority: number) {
super(priority)
this.path = path
}
async encode(w: Writer) {
await w.path(this.path)
await super.encode(w)
}
static async decode(r: Reader): Promise<Fetch> {
const path = await r.path()
const update = await FetchUpdate.decode(r)
const fetch = new Fetch(path, update.priority)
return fetch
}
}
export class Group {
subscribe: bigint
sequence: number
static StreamID = 0x0
constructor(subscribe: bigint, sequence: number) {
this.subscribe = subscribe
this.sequence = sequence
}
async encode(w: Writer) {
await w.u62(this.subscribe)
await w.u53(this.sequence)
}
static async decode(r: Reader): Promise<Group> {
return new Group(await r.u62(), await r.u53())
}
}
export class GroupDrop {
sequence: number
count: number
error: number
constructor(sequence: number, count: number, error: number) {
this.sequence = sequence
this.count = count
this.error = error
}
async encode(w: Writer) {
await w.u53(this.sequence)
await w.u53(this.count)
await w.u53(this.error)
}
static async decode(r: Reader): Promise<GroupDrop> {
return new GroupDrop(await r.u53(), await r.u53(), await r.u53())
}
}
export class Frame {
payload: Uint8Array
constructor(payload: Uint8Array) {
this.payload = payload
}
async encode(w: Writer) {
await w.u53(this.payload.byteLength)
await w.write(this.payload)
}
static async decode(r: Reader): Promise<Frame> {
const size = await r.u53()
const payload = await r.read(size)
return new Frame(payload)
}
}
export type Bi = SessionClient | AnnounceInterest | Subscribe | Datagrams | Fetch | InfoRequest
export type Uni = Group

View File

@@ -0,0 +1,170 @@
import { Watch } from "../common/async"
import { Closed } from "./error"
import { Order } from "./message"
export class Track {
readonly path: string[]
readonly priority: number
order = Order.Any
// TODO use an array
latest = new Watch<GroupReader | undefined>(undefined)
readers = 0
closed?: Closed
constructor(path: string[], priority: number) {
this.path = path
this.priority = priority
}
appendGroup(): Group {
const next = this.latest.value()[0]?.id ?? 0
return this.createGroup(next)
}
createGroup(sequence: number): Group {
if (this.closed) throw this.closed
const group = new Group(sequence)
const [current, _] = this.latest.value()
// TODO use an array
if (!current || current.id < sequence) {
const reader = new GroupReader(group)
this.latest.update(reader)
}
return group
}
close(closed = new Closed()) {
if (this.closed) return
this.closed = closed
this.latest.close()
}
reader(): TrackReader {
// VERY important that readers are closed to decrement the count
this.readers += 1
return new TrackReader(this)
}
}
export class TrackReader {
latest?: number
#track: Track
constructor(track: Track) {
this.#track = track
}
async nextGroup(): Promise<GroupReader | undefined> {
let [current, next] = this.#track.latest.value()
for (;;) {
if (current && this.latest !== current.id) {
this.latest = current.id
return current
}
if (this.#track.closed) throw this.#track.closed
if (!next) return
;[current, next] = await next
}
}
get path() {
return this.#track.path
}
get order() {
return this.#track.order
}
get priority() {
return this.#track.priority
}
close() {
this.#track.readers -= 1
if (this.#track.readers <= 0) this.#track.close()
}
}
export class Group {
readonly id: number
chunks = new Watch<Uint8Array[]>([])
readers = 0
closed?: Closed
constructor(id: number) {
this.id = id
}
writeFrame(frame: Uint8Array) {
if (this.closed) throw this.closed
this.chunks.update((chunks) => [...chunks, frame])
}
writeFrames(...frames: Uint8Array[]) {
if (this.closed) throw this.closed
this.chunks.update((chunks) => [...chunks, ...frames])
this.close()
}
reader(): GroupReader {
this.readers += 1
return new GroupReader(this)
}
get length(): number {
return this.chunks.value()[0].length
}
close(closed = new Closed()) {
if (this.closed) return
this.closed = closed
this.chunks.close()
}
}
export class GroupReader {
#group: Group
#index = 0
constructor(group: Group) {
this.#group = group
}
async readFrame(): Promise<Uint8Array | undefined> {
let [chunks, next] = this.#group.chunks.value()
for (;;) {
if (this.#index < chunks.length) {
this.#index += 1
return chunks[this.#index - 1]
}
if (this.#group.closed) throw this.#group.closed
if (!next) return
;[chunks, next] = await next
}
}
get index(): number {
return this.#index
}
get id(): number {
return this.#group.id
}
close() {
this.#group.readers -= 1
if (this.#group.readers <= 0) this.#group.close()
}
}

View File

@@ -0,0 +1,173 @@
import { Watch } from "../common/async"
import { Closed } from "./error"
import * as Message from "./message"
import type { GroupReader, TrackReader } from "./model"
import { type Stream, Writer } from "./stream"
export class Publisher {
#quic: WebTransport
// Our announced broadcasts.
#announce = new Map<string[], TrackReader>()
// Their subscribed tracks.
#subscribe = new Map<bigint, Subscribed>()
constructor(quic: WebTransport) {
this.#quic = quic
}
// Publish a track
publish(track: TrackReader) {
if (this.#announce.has(track.path)) {
throw new Error(`already announced: ${track.path.toString()}`)
}
this.#announce.set(track.path, track)
// TODO: clean up announcements
// track.closed().then(() => this.#announce.delete(track.path))
}
#get(path: string[]): TrackReader | undefined {
return this.#announce.get(path)
}
async runAnnounce(msg: Message.AnnounceInterest, stream: Stream) {
for (const announce of this.#announce.values()) {
if (announce.path.length < msg.prefix.length) continue
const prefix = announce.path.slice(0, msg.prefix.length)
if (prefix !== msg.prefix) continue
const suffix = announce.path.slice(msg.prefix.length)
const active = new Message.Announce(suffix, "active")
await active.encode(stream.writer)
}
// TODO support updates.
// Until then, just keep the stream open.
await stream.reader.closed()
}
async runSubscribe(msg: Message.Subscribe, stream: Stream) {
if (this.#subscribe.has(msg.id)) {
throw new Error(`duplicate subscribe for id: ${msg.id}`)
}
const track = this.#get(msg.path)
if (!track) {
await stream.writer.reset(404)
return
}
const subscribe = new Subscribed(msg, track, this.#quic)
// TODO close the stream when done
subscribe.run().catch((err) => console.warn("failed to run subscribe: ", err))
try {
const info = new Message.Info(track.priority)
info.order = track.order
info.latest = track.latest
await info.encode(stream.writer)
for (;;) {
// TODO try_decode
const update = await Message.SubscribeUpdate.decode_maybe(stream.reader)
if (!update) {
subscribe.close()
break
}
// TODO use the update
}
} catch (err) {
subscribe.close(Closed.from(err))
}
}
async runDatagrams(msg: Message.Datagrams, stream: Stream) {
await stream.writer.reset(501)
throw new Error("datagrams not implemented")
}
async runFetch(msg: Message.Fetch, stream: Stream) {
await stream.writer.reset(501)
throw new Error("fetch not implemented")
}
async runInfo(msg: Message.InfoRequest, stream: Stream) {
const track = this.#get(msg.path)
if (!track) {
await stream.writer.reset(404)
return
}
const info = new Message.Info(track.priority)
info.order = track.order
info.latest = track.latest
await info.encode(stream.writer)
throw new Error("info not implemented")
}
}
class Subscribed {
#id: bigint
#track: TrackReader
#quic: WebTransport
#closed = new Watch<Closed | undefined>(undefined)
constructor(msg: Message.Subscribe, track: TrackReader, quic: WebTransport) {
this.#id = msg.id
this.#track = track
this.#quic = quic
}
async run() {
const closed = this.closed()
for (;;) {
const [group, done] = await Promise.all([this.#track.nextGroup(), closed])
if (done) return
if (!group) break
this.#runGroup(group).catch((err) => console.warn("failed to run group: ", err))
}
// TODO wait until all groups are done
this.close()
}
async #runGroup(group: GroupReader) {
const msg = new Message.Group(this.#id, group.id)
const stream = await Writer.open(this.#quic, msg)
for (;;) {
const frame = await group.readFrame()
if (!frame) break
await stream.u53(frame.byteLength)
await stream.write(frame)
}
}
close(err = new Closed()) {
this.#closed.update(err)
this.#track.close()
}
async closed(): Promise<Closed> {
let [closed, next] = this.#closed.value()
for (;;) {
if (closed !== undefined) return closed
if (!next) return new Closed()
;[closed, next] = await next
}
}
}

View File

@@ -0,0 +1,417 @@
import * as Message from "./message"
const MAX_U6 = 2 ** 6 - 1
const MAX_U14 = 2 ** 14 - 1
const MAX_U30 = 2 ** 30 - 1
const MAX_U31 = 2 ** 31 - 1
const MAX_U53 = Number.MAX_SAFE_INTEGER
const MAX_U62: bigint = 2n ** 62n - 1n
export class Stream {
reader: Reader
writer: Writer
constructor(props: {
writable: WritableStream<Uint8Array>
readable: ReadableStream<Uint8Array>
}) {
this.writer = new Writer(props.writable)
this.reader = new Reader(props.readable)
}
static async accept(quic: WebTransport): Promise<[Message.Bi, Stream] | undefined> {
const reader = quic.incomingBidirectionalStreams.getReader()
const next = await reader.read()
reader.releaseLock()
if (next.done) return
const stream = new Stream(next.value)
let msg: Message.Bi
const typ = await stream.reader.u8()
if (typ === Message.SessionClient.StreamID) {
msg = await Message.SessionClient.decode(stream.reader)
} else if (typ === Message.AnnounceInterest.StreamID) {
msg = await Message.AnnounceInterest.decode(stream.reader)
} else if (typ === Message.Subscribe.StreamID) {
msg = await Message.Subscribe.decode(stream.reader)
} else if (typ === Message.Datagrams.StreamID) {
msg = await Message.Datagrams.decode(stream.reader)
} else if (typ === Message.Fetch.StreamID) {
msg = await Message.Fetch.decode(stream.reader)
} else if (typ === Message.InfoRequest.StreamID) {
msg = await Message.InfoRequest.decode(stream.reader)
} else {
throw new Error(`unknown stream type: ${typ}`)
}
console.debug("accepted stream", msg)
return [msg, stream]
}
static async open(quic: WebTransport, msg: Message.Bi): Promise<Stream> {
const stream = new Stream(await quic.createBidirectionalStream())
if (msg instanceof Message.SessionClient) {
await stream.writer.u8(Message.SessionClient.StreamID)
} else if (msg instanceof Message.AnnounceInterest) {
await stream.writer.u8(Message.AnnounceInterest.StreamID)
} else if (msg instanceof Message.Subscribe) {
await stream.writer.u8(Message.Subscribe.StreamID)
} else if (msg instanceof Message.Datagrams) {
await stream.writer.u8(Message.Datagrams.StreamID)
} else if (msg instanceof Message.Fetch) {
await stream.writer.u8(Message.Fetch.StreamID)
} else if (msg instanceof Message.InfoRequest) {
await stream.writer.u8(Message.InfoRequest.StreamID)
} else {
// Make sure we're not missing any types.
const _: never = msg
throw new Error("invalid message type")
}
await msg.encode(stream.writer)
console.debug("opened stream", msg)
return stream
}
async close(code?: number) {
if (code === undefined) {
await this.writer.close()
} else {
await this.writer.reset(code)
await this.reader.stop(code)
}
}
}
// Reader wraps a stream and provides convience methods for reading pieces from a stream
// Unfortunately we can't use a BYOB reader because it's not supported with WebTransport+WebWorkers yet.
export class Reader {
#buffer: Uint8Array
#stream: ReadableStream<Uint8Array>
#reader: ReadableStreamDefaultReader<Uint8Array>
constructor(stream: ReadableStream<Uint8Array>, buffer = new Uint8Array()) {
this.#buffer = buffer
this.#stream = stream
this.#reader = this.#stream.getReader()
}
// Adds more data to the buffer, returning true if more data was added.
async #fill(): Promise<boolean> {
const result = await this.#reader.read()
if (result.done) {
return false
}
const buffer = new Uint8Array(result.value)
if (this.#buffer.byteLength === 0) {
this.#buffer = buffer
} else {
const temp = new Uint8Array(this.#buffer.byteLength + buffer.byteLength)
temp.set(this.#buffer)
temp.set(buffer, this.#buffer.byteLength)
this.#buffer = temp
}
return true
}
// Add more data to the buffer until it's at least size bytes.
async #fillTo(size: number) {
while (this.#buffer.byteLength < size) {
if (!(await this.#fill())) {
throw new Error("unexpected end of stream")
}
}
}
// Consumes the first size bytes of the buffer.
#slice(size: number): Uint8Array {
const result = new Uint8Array(this.#buffer.buffer, this.#buffer.byteOffset, size)
this.#buffer = new Uint8Array(this.#buffer.buffer, this.#buffer.byteOffset + size)
return result
}
async read(size: number): Promise<Uint8Array> {
if (size === 0) return new Uint8Array()
await this.#fillTo(size)
return this.#slice(size)
}
async readAll(): Promise<Uint8Array> {
while (await this.#fill()) {}
return this.#slice(this.#buffer.byteLength)
}
async string(maxLength?: number): Promise<string> {
const length = await this.u53()
if (maxLength !== undefined && length > maxLength) {
throw new Error(`string length ${length} exceeds max length ${maxLength}`)
}
const buffer = await this.read(length)
return new TextDecoder().decode(buffer)
}
async path(): Promise<string[]> {
const parts = await this.u53()
const path = []
for (let i = 0; i < parts; i++) {
path.push(await this.string())
}
return path
}
async u8(): Promise<number> {
await this.#fillTo(1)
return this.#slice(1)[0]
}
// Returns a Number using 53-bits, the max Javascript can use for integer math
async u53(): Promise<number> {
const v = await this.u62()
if (v > MAX_U53) {
throw new Error("value larger than 53-bits; use v62 instead")
}
return Number(v)
}
// NOTE: Returns a bigint instead of a number since it may be larger than 53-bits
async u62(): Promise<bigint> {
await this.#fillTo(1)
const size = (this.#buffer[0] & 0xc0) >> 6
if (size === 0) {
const first = this.#slice(1)[0]
return BigInt(first) & 0x3fn
}
if (size === 1) {
await this.#fillTo(2)
const slice = this.#slice(2)
const view = new DataView(slice.buffer, slice.byteOffset, slice.byteLength)
return BigInt(view.getInt16(0)) & 0x3fffn
}
if (size === 2) {
await this.#fillTo(4)
const slice = this.#slice(4)
const view = new DataView(slice.buffer, slice.byteOffset, slice.byteLength)
return BigInt(view.getUint32(0)) & 0x3fffffffn
}
await this.#fillTo(8)
const slice = this.#slice(8)
const view = new DataView(slice.buffer, slice.byteOffset, slice.byteLength)
return view.getBigUint64(0) & 0x3fffffffffffffffn
}
async done(): Promise<boolean> {
if (this.#buffer.byteLength > 0) return false
return !(await this.#fill())
}
async stop(code: number) {
this.#reader.releaseLock()
await this.#stream.cancel(code)
}
async closed() {
return this.#reader.closed
}
release(): [Uint8Array, ReadableStream<Uint8Array>] {
this.#reader.releaseLock()
return [this.#buffer, this.#stream]
}
static async accept(quic: WebTransport): Promise<[Message.Group, Reader] | undefined> {
const reader = quic.incomingUnidirectionalStreams.getReader()
const next = await reader.read()
reader.releaseLock()
if (next.done) return
const stream = new Reader(next.value)
let msg: Message.Uni
const typ = await stream.u8()
if (typ === Message.Group.StreamID) {
msg = await Message.Group.decode(stream)
} else {
throw new Error(`unknown stream type: ${typ}`)
}
return [msg, stream]
}
}
// Writer wraps a stream and writes chunks of data
export class Writer {
#scratch: Uint8Array
#writer: WritableStreamDefaultWriter<Uint8Array>
#stream: WritableStream<Uint8Array>
constructor(stream: WritableStream<Uint8Array>) {
this.#stream = stream
this.#scratch = new Uint8Array(8)
this.#writer = this.#stream.getWriter()
}
async u8(v: number) {
await this.write(setUint8(this.#scratch, v))
}
async i32(v: number) {
if (Math.abs(v) > MAX_U31) {
throw new Error(`overflow, value larger than 32-bits: ${v}`)
}
// We don't use a VarInt, so it always takes 4 bytes.
// This could be improved but nothing is standardized yet.
await this.write(setInt32(this.#scratch, v))
}
async u53(v: number) {
if (v < 0) {
throw new Error(`underflow, value is negative: ${v}`)
}
if (v > MAX_U53) {
throw new Error(`overflow, value larger than 53-bits: ${v}`)
}
await this.write(setVint53(this.#scratch, v))
}
async u62(v: bigint) {
if (v < 0) {
throw new Error(`underflow, value is negative: ${v}`)
}
if (v >= MAX_U62) {
throw new Error(`overflow, value larger than 62-bits: ${v}`)
}
await this.write(setVint62(this.#scratch, v))
}
async write(v: Uint8Array) {
await this.#writer.write(v)
}
async string(str: string) {
const data = new TextEncoder().encode(str)
await this.u53(data.byteLength)
await this.write(data)
}
async path(path: string[]) {
await this.u53(path.length)
for (const part of path) {
await this.string(part)
}
}
async close() {
this.#writer.releaseLock()
await this.#stream.close()
}
async reset(code: number) {
this.#writer.releaseLock()
await this.#stream.abort(code)
}
release(): WritableStream<Uint8Array> {
this.#writer.releaseLock()
return this.#stream
}
static async open(quic: WebTransport, msg: Message.Uni): Promise<Writer> {
const stream = new Writer(await quic.createUnidirectionalStream())
if (msg instanceof Message.Group) {
await stream.u8(Message.Group.StreamID)
} else {
// Make sure we're not missing any types.
const _: never = msg
throw new Error("invalid message type")
}
return stream
}
}
export function setUint8(dst: Uint8Array, v: number): Uint8Array {
dst[0] = v
return dst.slice(0, 1)
}
export function setUint16(dst: Uint8Array, v: number): Uint8Array {
const view = new DataView(dst.buffer, dst.byteOffset, 2)
view.setUint16(0, v)
return new Uint8Array(view.buffer, view.byteOffset, view.byteLength)
}
export function setInt32(dst: Uint8Array, v: number): Uint8Array {
const view = new DataView(dst.buffer, dst.byteOffset, 4)
view.setInt32(0, v)
return new Uint8Array(view.buffer, view.byteOffset, view.byteLength)
}
export function setUint32(dst: Uint8Array, v: number): Uint8Array {
const view = new DataView(dst.buffer, dst.byteOffset, 4)
view.setUint32(0, v)
return new Uint8Array(view.buffer, view.byteOffset, view.byteLength)
}
export function setVint53(dst: Uint8Array, v: number): Uint8Array {
if (v <= MAX_U6) {
return setUint8(dst, v)
}
if (v <= MAX_U14) {
return setUint16(dst, v | 0x4000)
}
if (v <= MAX_U30) {
return setUint32(dst, v | 0x80000000)
}
if (v <= MAX_U53) {
return setUint64(dst, BigInt(v) | 0xc000000000000000n)
}
throw new Error(`overflow, value larger than 53-bits: ${v}`)
}
export function setVint62(dst: Uint8Array, v: bigint): Uint8Array {
if (v < MAX_U6) {
return setUint8(dst, Number(v))
}
if (v < MAX_U14) {
return setUint16(dst, Number(v) | 0x4000)
}
if (v <= MAX_U30) {
return setUint32(dst, Number(v) | 0x80000000)
}
if (v <= MAX_U62) {
return setUint64(dst, BigInt(v) | 0xc000000000000000n)
}
throw new Error(`overflow, value larger than 62-bits: ${v}`)
}
export function setUint64(dst: Uint8Array, v: bigint): Uint8Array {
const view = new DataView(dst.buffer, dst.byteOffset, 8)
view.setBigUint64(0, v)
return new Uint8Array(view.buffer, view.byteOffset, view.byteLength)
}

View File

@@ -0,0 +1,169 @@
import { Queue, Watch } from "../common/async"
import { Closed } from "./error"
import { FrameReader } from "./frame"
import * as Message from "./message"
import type { Track, TrackReader } from "./model"
import { type Reader, Stream } from "./stream"
export class Subscriber {
#quic: WebTransport
// Our subscribed tracks.
#subscribe = new Map<bigint, Subscribe>()
#subscribeNext = 0n
constructor(quic: WebTransport) {
this.#quic = quic
}
async announced(prefix: string[]): Promise<Queue<Announced>> {
const announced = new Queue<Announced>()
const msg = new Message.AnnounceInterest(prefix)
const stream = await Stream.open(this.#quic, msg)
this.runAnnounced(stream, announced, prefix)
.then(() => announced.close())
.catch((err) => announced.abort(err))
return announced
}
async runAnnounced(stream: Stream, announced: Queue<Announced>, prefix: string[]) {
const toggle: Map<string[], Announced> = new Map()
try {
for (;;) {
const announce = await Message.Announce.decode_maybe(stream.reader)
if (!announce) {
break
}
const existing = toggle.get(announce.suffix)
if (existing) {
if (announce.status === "active") {
throw new Error("duplicate announce")
}
existing.close()
toggle.delete(announce.suffix)
} else {
if (announce.status === "closed") {
throw new Error("unknown announce")
}
const path = prefix.concat(announce.suffix)
const item = new Announced(path)
await announced.push(item)
toggle.set(announce.suffix, item)
}
}
} finally {
for (const item of toggle.values()) {
item.close()
}
}
}
// TODO: Deduplicate identical subscribes
async subscribe(track: Track): Promise<TrackReader> {
const id = this.#subscribeNext++
const msg = new Message.Subscribe(id, track.path, track.priority)
const stream = await Stream.open(this.#quic, msg)
const subscribe = new Subscribe(id, stream, track)
this.#subscribe.set(subscribe.id, subscribe)
try {
const ok = await Message.Info.decode(stream.reader)
/*
for (;;) {
const dropped = await Message.GroupDrop.decode(stream.reader)
console.debug("dropped", dropped)
}
*/
return subscribe.track.reader()
} catch (err) {
console.error(err)
this.#subscribe.delete(subscribe.id)
await subscribe.close(Closed.from(err))
throw err
}
}
async runGroup(group: Message.Group, stream: Reader) {
const subscribe = this.#subscribe.get(group.subscribe)
if (!subscribe) return
const writer = subscribe.track.createGroup(group.sequence)
const reader = new FrameReader(stream)
for (;;) {
const frame = await reader.read()
if (!frame) break
writer.writeFrame(frame)
}
writer.close()
}
}
export class Announced {
readonly path: string[]
#closed = new Watch<Closed | undefined>(undefined)
constructor(path: string[]) {
this.path = path
}
close(err = new Closed()) {
this.#closed.update(err)
}
async closed(): Promise<Closed> {
let [closed, next] = this.#closed.value()
for (;;) {
if (closed !== undefined) return closed
if (!next) return new Closed()
;[closed, next] = await next
}
}
}
export class Subscribe {
readonly id: bigint
readonly track: Track
readonly stream: Stream
// A queue of received streams for this subscription.
#closed = new Watch<Closed | undefined>(undefined)
constructor(id: bigint, stream: Stream, track: Track) {
this.id = id
this.track = track
this.stream = stream
}
async run() {
try {
await this.closed()
await this.close()
} catch (err) {
await this.close(Closed.from(err))
}
}
async close(closed?: Closed) {
this.track.close(closed)
await this.stream.close(closed?.code)
}
async closed() {
await this.stream.reader.closed()
}
}

View File

@@ -0,0 +1,9 @@
{
"extends": "../tsconfig.json",
"include": ["."],
"references": [
{
"path": "../common"
}
]
}