From e489624213af9286bf7e887972fc03fd458a0ef1 Mon Sep 17 00:00:00 2001 From: cannorin Date: Wed, 1 Jan 2025 18:51:53 +0000 Subject: [PATCH] Update misskey-js/streaming.ts --- index.ts | 3 + misskey-js/streaming.ts | 231 ++++++++++++++-------------------------- 2 files changed, 80 insertions(+), 154 deletions(-) diff --git a/index.ts b/index.ts index c50ca75..c594329 100644 --- a/index.ts +++ b/index.ts @@ -218,6 +218,9 @@ function initializeStream() { { token: Bun.env["MISSKEY_CREDENTIAL"] ?? "", }, + { + binaryType: "arraybuffer" + } ) as unknown as MisskeyStream; channel = stream.useChannel("main"); diff --git a/misskey-js/streaming.ts b/misskey-js/streaming.ts index bbf143b..2cb90ed 100644 --- a/misskey-js/streaming.ts +++ b/misskey-js/streaming.ts @@ -1,25 +1,18 @@ -import { EventEmitter } from "eventemitter3"; -import ReconnectingWebsocket, { - type Options as WebsocketOptions, -} from "reconnecting-websocket"; -import type { BroadcastEvents, Channels } from "./streaming.types.js"; +import { EventEmitter } from 'eventemitter3'; +import ReconnectingWebsocket, { type Options } from 'reconnecting-websocket'; +import type { BroadcastEvents, Channels } from './streaming.types.js'; -export function urlQuery( - obj: Record, -): string { +export function urlQuery(obj: Record): string { const params = Object.entries(obj) - .filter(([, v]) => (Array.isArray(v) ? v.length : v !== undefined)) - .reduce( - // biome-ignore lint/style/noNonNullAssertion: - // biome-ignore lint/suspicious/noAssignInExpressions: - // biome-ignore lint/style/noCommaOperator: - (a, [k, v]) => ((a[k] = v!), a), - {} as Record, - ); + .filter(([, v]) => Array.isArray(v) ? v.length : v !== undefined) + // biome-ignore lint/suspicious/noAssignInExpressions: + // biome-ignore lint/style/noCommaOperator: + // biome-ignore lint/style/noNonNullAssertion: + .reduce((a, [k, v]) => (a[k] = v!, a), {} as Record); return Object.entries(params) .map((e) => `${e[0]}=${encodeURIComponent(e[1])}`) - .join("&"); + .join('&'); } type AnyOf> = T[keyof T]; @@ -30,23 +23,16 @@ export type StreamEvents = { } & BroadcastEvents; export interface IStream extends EventEmitter { - state: "initializing" | "reconnecting" | "connected"; + state: 'initializing' | 'reconnecting' | 'connected'; - useChannel( - channel: C, - params?: Channels[C]["params"], - name?: string, - ): IChannelConnection; + useChannel(channel: C, params?: Channels[C]['params'], name?: string): IChannelConnection; removeSharedConnection(connection: SharedConnection): void; removeSharedConnectionPool(pool: Pool): void; disconnectToChannel(connection: NonSharedConnection): void; send(typeOrPayload: string): void; send(typeOrPayload: string, payload: unknown): void; send(typeOrPayload: Record | unknown[]): void; - send( - typeOrPayload: string | Record | unknown[], - payload?: unknown, - ): void; + send(typeOrPayload: string | Record | unknown[], payload?: unknown): void; ping(): void; heartbeat(): void; close(): void; @@ -56,29 +42,25 @@ export interface IStream extends EventEmitter { * Misskey stream connection */ // eslint-disable-next-line import/no-default-export -export default class Stream - extends EventEmitter - implements IStream { +export default class Stream extends EventEmitter implements IStream { private stream: ReconnectingWebsocket; - public state: "initializing" | "reconnecting" | "connected" = "initializing"; + public state: 'initializing' | 'reconnecting' | 'connected' = 'initializing'; private sharedConnectionPools: Pool[] = []; private sharedConnections: SharedConnection[] = []; private nonSharedConnections: NonSharedConnection[] = []; private idCounter = 0; - constructor( - origin: string, - user: { token: string } | null, - options?: WebsocketOptions, - ) { + constructor(origin: string, user: { token: string; } | null, options: { + WebSocket?: Options['WebSocket']; + binaryType?: ReconnectingWebsocket['binaryType']; + } = {}) { super(); this.genId = this.genId.bind(this); this.useChannel = this.useChannel.bind(this); this.useSharedConnection = this.useSharedConnection.bind(this); this.removeSharedConnection = this.removeSharedConnection.bind(this); - this.removeSharedConnectionPool = - this.removeSharedConnectionPool.bind(this); + this.removeSharedConnectionPool = this.removeSharedConnectionPool.bind(this); this.connectToChannel = this.connectToChannel.bind(this); this.disconnectToChannel = this.disconnectToChannel.bind(this); this.onOpen = this.onOpen.bind(this); @@ -94,102 +76,70 @@ export default class Stream _t: Date.now(), }); - const wsOrigin = origin - .replace("http://", "ws://") - .replace("https://", "wss://"); + const wsOrigin = origin.replace('http://', 'ws://').replace('https://', 'wss://'); - this.stream = new ReconnectingWebsocket( - `${wsOrigin}/streaming?${query}`, - "", - { - minReconnectionDelay: 1, // https://github.com/pladaria/reconnecting-websocket/issues/91 - ...(options ?? {}), - }, - ); - this.stream.binaryType = "arraybuffer"; - this.stream.addEventListener("open", this.onOpen); - this.stream.addEventListener("close", this.onClose); - this.stream.addEventListener("message", this.onMessage); + this.stream = new ReconnectingWebsocket(`${wsOrigin}/streaming?${query}`, '', { + minReconnectionDelay: 1, // https://github.com/pladaria/reconnecting-websocket/issues/91 + WebSocket: options.WebSocket, + }); + if (options.binaryType) { + this.stream.binaryType = options.binaryType; + } + this.stream.addEventListener('open', this.onOpen); + this.stream.addEventListener('close', this.onClose); + this.stream.addEventListener('message', this.onMessage); } private genId(): string { return (++this.idCounter).toString(); } - public useChannel( - channel: C, - params?: Channels[C]["params"], - name?: string, - ): Connection { + public useChannel(channel: C, params?: Channels[C]['params'], name?: string): Connection { if (params) { return this.connectToChannel(channel, params); } return this.useSharedConnection(channel, name); } - private useSharedConnection( - channel: C, - name?: string, - ): SharedConnection { - let pool = this.sharedConnectionPools.find((p) => p.channel === channel); + private useSharedConnection(channel: C, name?: string): SharedConnection { + let pool = this.sharedConnectionPools.find(p => p.channel === channel); if (pool == null) { pool = new Pool(this, channel, this.genId()); this.sharedConnectionPools.push(pool); } - const connection = new SharedConnection( - this, - channel, - pool, - name, - ); + const connection = new SharedConnection(this, channel, pool, name); this.sharedConnections.push(connection as unknown as SharedConnection); return connection; } public removeSharedConnection(connection: SharedConnection): void { - this.sharedConnections = this.sharedConnections.filter( - (c) => c !== connection, - ); + this.sharedConnections = this.sharedConnections.filter(c => c !== connection); } public removeSharedConnectionPool(pool: Pool): void { - this.sharedConnectionPools = this.sharedConnectionPools.filter( - (p) => p !== pool, - ); + this.sharedConnectionPools = this.sharedConnectionPools.filter(p => p !== pool); } - private connectToChannel( - channel: C, - params: Channels[C]["params"], - ): NonSharedConnection { - const connection = new NonSharedConnection( - this, - channel, - this.genId(), - params, - ); - this.nonSharedConnections.push( - connection as unknown as NonSharedConnection, - ); + private connectToChannel(channel: C, params: Channels[C]['params']): NonSharedConnection { + const connection = new NonSharedConnection(this, channel, this.genId(), params); + this.nonSharedConnections.push(connection as unknown as NonSharedConnection); return connection; } public disconnectToChannel(connection: NonSharedConnection): void { - this.nonSharedConnections = this.nonSharedConnections.filter( - (c) => c !== connection, - ); + this.nonSharedConnections = this.nonSharedConnections.filter(c => c !== connection); } /** * Callback of when open connection */ private onOpen(): void { - const isReconnect = this.state === "reconnecting"; + const isReconnect = this.state === 'reconnecting'; - this.state = "connected"; - this.emit("_connected_"); + this.state = 'connected'; + this.emit('_connected_'); // チャンネル再接続 if (isReconnect) { @@ -202,27 +152,27 @@ export default class Stream * Callback of when close connection */ private onClose(): void { - if (this.state === "connected") { - this.state = "reconnecting"; - this.emit("_disconnected_"); + if (this.state === 'connected') { + this.state = 'reconnecting'; + this.emit('_disconnected_'); } } /** * Callback of when received a message from connection */ - private onMessage(message: { data: string }): void { + private onMessage(message: { data: string; }): void { const { type, body } = JSON.parse(message.data); - if (type === "channel") { + if (type === 'channel') { const id = body.id; let connections: Connection[]; - connections = this.sharedConnections.filter((c) => c.id === id); + connections = this.sharedConnections.filter(c => c.id === id); if (connections.length === 0) { - const found = this.nonSharedConnections.find((c) => c.id === id); + const found = this.nonSharedConnections.find(c => c.id === id); if (found) { connections = [found]; } @@ -241,20 +191,15 @@ export default class Stream * Send a message to connection * ! ストリーム上のやり取りはすべてJSONで行われます ! */ - public send(typeOrPayload: string): void; - public send(typeOrPayload: string, payload: unknown): void; - public send(typeOrPayload: Record | unknown[]): void; - public send( - typeOrPayload: string | Record | unknown[], - payload?: unknown, - ): void { - if (typeof typeOrPayload === "string") { - this.stream.send( - JSON.stringify({ - type: typeOrPayload, - ...(payload === undefined ? {} : { body: payload }), - }), - ); + public send(typeOrPayload: string): void + public send(typeOrPayload: string, payload: unknown): void + public send(typeOrPayload: Record | unknown[]): void + public send(typeOrPayload: string | Record | unknown[], payload?: unknown): void { + if (typeof typeOrPayload === 'string') { + this.stream.send(JSON.stringify({ + type: typeOrPayload, + ...(payload === undefined ? {} : { body: payload }), + })); return; } @@ -262,11 +207,11 @@ export default class Stream } public ping(): void { - this.stream.send("ping"); + this.stream.send('ping'); } public heartbeat(): void { - this.stream.send("h"); + this.stream.send('h'); } /** @@ -298,7 +243,7 @@ class Pool { this.stream = stream; this.id = id; - this.stream.on("_disconnected_", this.onStreamDisconnected); + this.stream.on('_disconnected_', this.onStreamDisconnected); } private onStreamDisconnected(): void { @@ -335,41 +280,31 @@ class Pool { public connect(): void { if (this.isConnected) return; this.isConnected = true; - this.stream.send("connect", { + this.stream.send('connect', { channel: this.channel, id: this.id, }); } private disconnect(): void { - this.stream.off("_disconnected_", this.onStreamDisconnected); - this.stream.send("disconnect", { id: this.id }); + this.stream.off('_disconnected_', this.onStreamDisconnected); + this.stream.send('disconnect', { id: this.id }); this.stream.removeSharedConnectionPool(this); } } -export interface IChannelConnection< - Channel extends AnyOf = AnyOf, -> extends EventEmitter { +export interface IChannelConnection = AnyOf> extends EventEmitter { id: string; name?: string; inCount: number; outCount: number; channel: string; - send( - type: T, - body: Channel["receives"][T], - ): void; + send(type: T, body: Channel['receives'][T]): void; dispose(): void; } -export abstract class Connection< - Channel extends AnyOf = AnyOf, -> - extends EventEmitter - implements IChannelConnection -{ +export abstract class Connection = AnyOf> extends EventEmitter implements IChannelConnection { public channel: string; protected stream: Stream; public abstract id: string; @@ -390,11 +325,8 @@ export abstract class Connection< } } - public send( - type: T, - body: Channel["receives"][T], - ): void { - this.stream.send("ch", { + public send(type: T, body: Channel['receives'][T]): void { + this.stream.send('ch', { id: this.id, type: type, body: body, @@ -406,9 +338,7 @@ export abstract class Connection< public abstract dispose(): void; } -class SharedConnection< - Channel extends AnyOf = AnyOf, -> extends Connection { +class SharedConnection = AnyOf> extends Connection { private pool: Pool; public get id(): string { @@ -431,18 +361,11 @@ class SharedConnection< } } -class NonSharedConnection< - Channel extends AnyOf = AnyOf, -> extends Connection { +class NonSharedConnection = AnyOf> extends Connection { public id: string; - protected params: Channel["params"]; + protected params: Channel['params']; - constructor( - stream: Stream, - channel: string, - id: string, - params: Channel["params"], - ) { + constructor(stream: Stream, channel: string, id: string, params: Channel['params']) { super(stream, channel); this.connect = this.connect.bind(this); @@ -455,7 +378,7 @@ class NonSharedConnection< } public connect(): void { - this.stream.send("connect", { + this.stream.send('connect', { channel: this.channel, id: this.id, params: this.params, @@ -464,7 +387,7 @@ class NonSharedConnection< public dispose(): void { this.removeAllListeners(); - this.stream.send("disconnect", { id: this.id }); + this.stream.send('disconnect', { id: this.id }); this.stream.disconnectToChannel(this as unknown as NonSharedConnection); } }