From 7f6022eef45b1467f65e287633e54ccc0ecf09de Mon Sep 17 00:00:00 2001 From: Charles Lowell Date: Wed, 2 Oct 2024 12:25:44 -0500 Subject: [PATCH] =?UTF-8?q?=F0=9F=8E=89=20Add=20WebSocket=20resource?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .github/workflows/verify.yaml | 2 +- deno.json | 3 +- websocket/README.md | 7 ++ websocket/deno.json | 5 + websocket/mod.ts | 1 + websocket/websocket.test.ts | 143 ++++++++++++++++++++++++++++ websocket/websocket.ts | 170 ++++++++++++++++++++++++++++++++++ websocket/with-resolvers.ts | 65 +++++++++++++ 8 files changed, 394 insertions(+), 2 deletions(-) create mode 100644 websocket/README.md create mode 100644 websocket/deno.json create mode 100644 websocket/mod.ts create mode 100644 websocket/websocket.test.ts create mode 100644 websocket/websocket.ts create mode 100644 websocket/with-resolvers.ts diff --git a/.github/workflows/verify.yaml b/.github/workflows/verify.yaml index fd08475..f5ecbbc 100644 --- a/.github/workflows/verify.yaml +++ b/.github/workflows/verify.yaml @@ -26,4 +26,4 @@ jobs: - run: deno lint - - run: deno test + - run: deno test --allow-net diff --git a/deno.json b/deno.json index 56a7212..750ccbe 100644 --- a/deno.json +++ b/deno.json @@ -29,6 +29,7 @@ "./deno-deploy", "./task-buffer", "./tinyexec", - "./examples" + "./examples", + "./websocket" ] } diff --git a/websocket/README.md b/websocket/README.md new file mode 100644 index 0000000..df06f5a --- /dev/null +++ b/websocket/README.md @@ -0,0 +1,7 @@ +# WebSocket + +Use the [WebSocket](https://developer.mozilla.org/en-US/docs/Web/API/WebSocket) +API as an Effection resource. Instead of a fragile, spring-loaded confederation +of 'open', 'close', 'error', and 'message' event handlers, `useWebSocket()` +organizes them for you so that you can consume all events from the server as a +plain stream that has state-readiness and proper error handling baked in. diff --git a/websocket/deno.json b/websocket/deno.json new file mode 100644 index 0000000..0cb6e1c --- /dev/null +++ b/websocket/deno.json @@ -0,0 +1,5 @@ +{ + "name": "@effection-contrib/websocket", + "version": "1.0.0", + "exports": "./mod.ts" +} diff --git a/websocket/mod.ts b/websocket/mod.ts new file mode 100644 index 0000000..5adfd3a --- /dev/null +++ b/websocket/mod.ts @@ -0,0 +1 @@ +export * from "./websocket.ts"; diff --git a/websocket/websocket.test.ts b/websocket/websocket.test.ts new file mode 100644 index 0000000..98b5f9c --- /dev/null +++ b/websocket/websocket.test.ts @@ -0,0 +1,143 @@ +import { describe, it } from "bdd"; +import { expect } from "expect"; +import { + call, + createQueue, + type Operation, + resource, + run, + type Subscription, + suspend, + useScope, +} from "effection"; + +import { useWebSocket, type WebSocketResource } from "./websocket.ts"; + +describe("WebSocket", () => { + it("can send messages from the client to the server", async () => { + await run(function* () { + let [client, server] = yield* useTestingPair(); + + let subscription = yield* server.socket; + + client.socket.send("hello from client"); + + let { value } = yield* subscription.next(); + + expect(value).toMatchObject({ data: "hello from client" }); + }); + }); + + it("can send messages from the server to the client", async () => { + await run(function* () { + let [client, server] = yield* useTestingPair(); + + let subscription = yield* client.socket; + + server.socket.send("hello from server"); + + let { value } = yield* subscription.next(); + + expect(value).toMatchObject({ data: "hello from server" }); + }); + }); + + it("closes the client when the server closes", async () => { + await run(function* () { + let [client, server] = yield* useTestingPair(); + let messages = yield* client.socket; + + server.close(); + + let event = yield* drain(messages); + + expect(event.type).toEqual("close"); + expect(event.wasClean).toEqual(true); + }); + }); + it("closes the server when the client closes", async () => { + await run(function* () { + let [client, server] = yield* useTestingPair(); + let messages = yield* server.socket; + + client.close(); + + let event = yield* drain(messages); + + expect(event.type).toEqual("close"); + expect(event.wasClean).toEqual(true); + }); + }); +}); + +export interface TestSocket { + close(): void; + socket: WebSocketResource; +} + +export interface TestingPairOptions { + fail?: Response; +} + +function useTestingPair( + { fail }: TestingPairOptions = {}, +): Operation<[TestSocket, TestSocket]> { + return resource(function* (provide) { + let sockets = createQueue(); + + let scope = yield* useScope(); + + let server = yield* call(() => + Deno.serve({ + port: 9901, + onListen() {}, + }, (req) => { + if (req.headers.get("upgrade") != "websocket") { + return new Response(null, { status: 501 }); + } else if (fail) { + return fail; + } + const { socket, response } = Deno.upgradeWebSocket(req); + + scope.run(function* () { + sockets.add({ + close: () => socket.close(), + socket: yield* useWebSocket(() => socket), + }); + yield* suspend(); + }); + + return response; + }) + ); + + let client = new WebSocket( + `ws://${server.addr.hostname}:${server.addr.port}`, + ); + + let next = yield* sockets.next(); + + let local = { + close: () => client.close(), + socket: yield* useWebSocket(() => client), + }; + + let remote = next.value; + + try { + yield* provide([local, remote]); + } finally { + yield* call(() => server.shutdown()); + } + }); +} + +function* drain( + subscription: Subscription, +): Operation { + let next = yield* subscription.next(); + while (!next.done) { + next = yield* subscription.next(); + } + return next.value; +} diff --git a/websocket/websocket.ts b/websocket/websocket.ts new file mode 100644 index 0000000..9c6d4ce --- /dev/null +++ b/websocket/websocket.ts @@ -0,0 +1,170 @@ +import { createSignal, once, race, resource, spawn } from "effection"; +import type { Operation, Stream } from "effection"; +import { withResolvers } from "./with-resolvers.ts"; + +/** + * Handle to a + * [`WebSocket`](https://developer.mozilla.org/en-US/docs/Web/API/WebSocket) object + * that can be consumed as an Effection stream. It has all the same properties as + * the underlying `WebSocket` apart from the event handlers. Instead, the resource + * itself is a subscribale stream. When the socket is closed, the stream will + * complete with a [`CloseEvent`](https://developer.mozilla.org/en-US/docs/Web/API/CloseEvent) + * + * A WebSocketResource does not have an explicit close method. Rather, the underlying + * socket will be automatically closed when the resource passes out of scope. + */ +export interface WebSocketResource + extends Stream, CloseEvent> { + /** + * the type of data that this websocket accepts + */ + readonly binaryType: BinaryType; + readonly bufferedAmmount: number; + readonly extensions: string; + readonly protocol: string; + readonly readyState: number; + readonly url: string; + send(data: WebSocketData): void; +} + +/** + * Create a [WebSocket](https://developer.mozilla.org/en-US/docs/Web/API/WebSocket) + * resource using the native + * [WebSocket constructor](https://developer.mozilla.org/en-US/docs/Web/API/WebSocket/WebSocket) + * available on the current platform. + * + * The resource will not be returned until a connection has been + * succesffuly established with the server and the + * [`open`](https://developer.mozilla.org/en-US/docs/Web/API/WebSocket/open_event) + * has been received. Once initialized, it will crash if it receives + * an [`error`]() event at any time. + * + * Once created, the websocket resource can be use to consume events from the server: + * + * ```ts + * let socket = yield* useWebSocket("ws://websocket.example.org"); + * + * for (let event of yield* each(socket)) { + * console.log('event data: ', event.data); + * yield* each.next(); + * } + * + * ``` + * + * @param url - The URL of the target WebSocket server to connect to. The URL must use one of the following schemes: ws, wss, http, or https, and cannot include a URL fragment. If a relative URL is provided, it is relative to the base URL of the calling script. For more detail, see https://developer.mozilla.org/en-US/docs/Web/API/WebSocket/WebSocket#url + * + * @param prototol - A single string or an array of strings representing the sub-protocol(s) that the client would like to use, in order of preference. If it is omitted, an empty array is used by default, i.e. []. For more details, see + + * @returns an operation yielding a {@link WebSocketResource} + */ +export function useWebSocket( + url: string, + protocols?: string, +): Operation>; + +/** + * Create a [WebSocket](https://developer.mozilla.org/en-US/docs/Web/API/WebSocket) + * resource, but delegate the creation of the underlying websocket to a function + * of your choice. This is necessary on platforms that do not have a global + * `WebSocket` constructor such as NodeJS <= 20. + * + * The resource will not be returned until a connection has been + * succesffuly established with the server and the + * [`open`](https://developer.mozilla.org/en-US/docs/Web/API/WebSocket/open_event) + * has been received. Once initialized, it will crash if it receives + * an [`error`]() event at any time. + * + * Once created, the websocket resource can be use to consume events from the server: + * + * ```ts + * import * as ws from 'ws'; + * + * function* example() { + * let socket = yield* useWebSocket(() => new ws.WebSocket("ws://websocket.example.org")); + * + * for (let event of yield* each(socket)) { + * console.log('event data: ', event.data); + * yield* each.next(); + * } + * } + * + * ``` + * @param create - a function that will construct the underlying [`WebSocket`](https://developer.mozilla.org/en-US/docs/Web/API/WebSocket) object that this resource wil use + * @returns an operation yielding a {@link WebSocketResource} + */ +export function useWebSocket( + create: () => WebSocket, +): Operation>; + +/** + * @ignore the catch-all version that supports both forms above. + */ +export function useWebSocket( + url: string | (() => WebSocket), + protocols?: string, +): Operation> { + return resource(function* (provide) { + let socket = typeof url === "string" + ? new WebSocket(url, protocols) + : url(); + + let messages = createSignal, CloseEvent>(); + let { operation: closed, resolve: close } = withResolvers(); + + yield* spawn(function* () { + throw yield* once(socket, "error"); + }); + + yield* once(socket, "open"); + + yield* spawn(function* () { + let subscription = yield* messages; + let next = yield* subscription.next(); + while (!next.done) { + next = yield* subscription.next(); + } + close(next.value); + }); + + try { + socket.addEventListener("message", messages.send); + socket.addEventListener("close", messages.close); + + yield* race([ + closed, + provide({ + get binaryType() { + return socket.binaryType; + }, + get bufferedAmmount() { + return socket.bufferedAmount; + }, + get extensions() { + return socket.extensions; + }, + get protocol() { + return socket.protocol; + }, + get readyState() { + return socket.readyState; + }, + get url() { + return socket.url; + }, + send: (data) => socket.send(data), + [Symbol.iterator]: messages[Symbol.iterator], + }), + ]); + } finally { + socket.close(1000, "released"); + yield* closed; + socket.removeEventListener("message", messages.send); + socket.removeEventListener("close", messages.close); + } + }); +} + +/** + * @ignore until we can get jsdocs working for type unions + */ +export type WebSocketData = Parameters[0]; diff --git a/websocket/with-resolvers.ts b/websocket/with-resolvers.ts new file mode 100644 index 0000000..906a3a5 --- /dev/null +++ b/websocket/with-resolvers.ts @@ -0,0 +1,65 @@ +import { + action, + Err, + Ok, + type Operation, + type Result, + suspend, +} from "effection"; + +export interface WithResolvers { + operation: Operation; + resolve(value: T): void; + reject(error: Error): void; +} + +export function withResolvers(): WithResolvers { + let subscribers: Set> = new Set(); + let settlement: Result | undefined = undefined; + let operation = action(function* (resolve, reject) { + let resolver = { resolve, reject }; + if (settlement) { + notify(settlement, resolver); + } else { + try { + subscribers.add(resolver); + yield* suspend(); + } finally { + subscribers.delete(resolver); + } + } + }); + + let settle = (result: Result) => { + if (!settlement) { + settlement = result; + settle = () => {}; + } + for (let subscriber of subscribers) { + subscribers.delete(subscriber); + notify(settlement, subscriber); + } + }; + + let resolve = (value: T) => { + settle(Ok(value)); + }; + let reject = (error: Error) => { + settle(Err(error)); + }; + + return { operation, resolve, reject }; +} + +interface Resolver { + resolve(value: T): void; + reject(error: Error): void; +} + +function notify(result: Result, resolver: Resolver): void { + if (result.ok) { + resolver.resolve(result.value); + } else { + resolver.reject(result.error); + } +}