Skip to content

Commit

Permalink
Merge pull request #11 from thefrontside/cl/websocket
Browse files Browse the repository at this point in the history
🎉 Add WebSocket resource
  • Loading branch information
cowboyd authored Oct 4, 2024
2 parents 4f2e012 + 7f6022e commit df6d765
Show file tree
Hide file tree
Showing 8 changed files with 394 additions and 2 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/verify.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,4 @@ jobs:

- run: deno lint

- run: deno test
- run: deno test --allow-net
3 changes: 2 additions & 1 deletion deno.json
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
"./deno-deploy",
"./task-buffer",
"./tinyexec",
"./examples"
"./examples",
"./websocket"
]
}
7 changes: 7 additions & 0 deletions websocket/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# WebSocket

Use the [WebSocket](https://developer.mozilla.org/en-US/docs/Web/API/WebSocket)
API as an Effection resource. Instead of a fragile, spring-loaded confederation
of 'open', 'close', 'error', and 'message' event handlers, `useWebSocket()`
organizes them for you so that you can consume all events from the server as a
plain stream that has state-readiness and proper error handling baked in.
5 changes: 5 additions & 0 deletions websocket/deno.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
{
"name": "@effection-contrib/websocket",
"version": "1.0.0",
"exports": "./mod.ts"
}
1 change: 1 addition & 0 deletions websocket/mod.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
export * from "./websocket.ts";
143 changes: 143 additions & 0 deletions websocket/websocket.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
import { describe, it } from "bdd";
import { expect } from "expect";
import {
call,
createQueue,
type Operation,
resource,
run,
type Subscription,
suspend,
useScope,
} from "effection";

import { useWebSocket, type WebSocketResource } from "./websocket.ts";

describe("WebSocket", () => {
it("can send messages from the client to the server", async () => {
await run(function* () {
let [client, server] = yield* useTestingPair();

let subscription = yield* server.socket;

client.socket.send("hello from client");

let { value } = yield* subscription.next();

expect(value).toMatchObject({ data: "hello from client" });
});
});

it("can send messages from the server to the client", async () => {
await run(function* () {
let [client, server] = yield* useTestingPair();

let subscription = yield* client.socket;

server.socket.send("hello from server");

let { value } = yield* subscription.next();

expect(value).toMatchObject({ data: "hello from server" });
});
});

it("closes the client when the server closes", async () => {
await run(function* () {
let [client, server] = yield* useTestingPair();
let messages = yield* client.socket;

server.close();

let event = yield* drain(messages);

expect(event.type).toEqual("close");
expect(event.wasClean).toEqual(true);
});
});
it("closes the server when the client closes", async () => {
await run(function* () {
let [client, server] = yield* useTestingPair();
let messages = yield* server.socket;

client.close();

let event = yield* drain(messages);

expect(event.type).toEqual("close");
expect(event.wasClean).toEqual(true);
});
});
});

export interface TestSocket {
close(): void;
socket: WebSocketResource<unknown>;
}

export interface TestingPairOptions {
fail?: Response;
}

function useTestingPair(
{ fail }: TestingPairOptions = {},
): Operation<[TestSocket, TestSocket]> {
return resource(function* (provide) {
let sockets = createQueue<TestSocket, never>();

let scope = yield* useScope();

let server = yield* call(() =>
Deno.serve({
port: 9901,
onListen() {},
}, (req) => {
if (req.headers.get("upgrade") != "websocket") {
return new Response(null, { status: 501 });
} else if (fail) {
return fail;
}
const { socket, response } = Deno.upgradeWebSocket(req);

scope.run(function* () {
sockets.add({
close: () => socket.close(),
socket: yield* useWebSocket(() => socket),
});
yield* suspend();
});

return response;
})
);

let client = new WebSocket(
`ws://${server.addr.hostname}:${server.addr.port}`,
);

let next = yield* sockets.next();

let local = {
close: () => client.close(),
socket: yield* useWebSocket(() => client),
};

let remote = next.value;

try {
yield* provide([local, remote]);
} finally {
yield* call(() => server.shutdown());
}
});
}

function* drain<T, TClose>(
subscription: Subscription<T, TClose>,
): Operation<TClose> {
let next = yield* subscription.next();
while (!next.done) {
next = yield* subscription.next();
}
return next.value;
}
170 changes: 170 additions & 0 deletions websocket/websocket.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
import { createSignal, once, race, resource, spawn } from "effection";
import type { Operation, Stream } from "effection";
import { withResolvers } from "./with-resolvers.ts";

/**
* Handle to a
* [`WebSocket`](https://developer.mozilla.org/en-US/docs/Web/API/WebSocket) object
* that can be consumed as an Effection stream. It has all the same properties as
* the underlying `WebSocket` apart from the event handlers. Instead, the resource
* itself is a subscribale stream. When the socket is closed, the stream will
* complete with a [`CloseEvent`](https://developer.mozilla.org/en-US/docs/Web/API/CloseEvent)
*
* A WebSocketResource does not have an explicit close method. Rather, the underlying
* socket will be automatically closed when the resource passes out of scope.
*/
export interface WebSocketResource<T>
extends Stream<MessageEvent<T>, CloseEvent> {
/**
* the type of data that this websocket accepts
*/
readonly binaryType: BinaryType;
readonly bufferedAmmount: number;
readonly extensions: string;
readonly protocol: string;
readonly readyState: number;
readonly url: string;
send(data: WebSocketData): void;
}

/**
* Create a [WebSocket](https://developer.mozilla.org/en-US/docs/Web/API/WebSocket)
* resource using the native
* [WebSocket constructor](https://developer.mozilla.org/en-US/docs/Web/API/WebSocket/WebSocket)
* available on the current platform.
*
* The resource will not be returned until a connection has been
* succesffuly established with the server and the
* [`open`](https://developer.mozilla.org/en-US/docs/Web/API/WebSocket/open_event)
* has been received. Once initialized, it will crash if it receives
* an [`error`]() event at any time.
*
* Once created, the websocket resource can be use to consume events from the server:
*
* ```ts
* let socket = yield* useWebSocket("ws://websocket.example.org");
*
* for (let event of yield* each(socket)) {
* console.log('event data: ', event.data);
* yield* each.next();
* }
*
* ```
*
* @param url - The URL of the target WebSocket server to connect to. The URL must use one of the following schemes: ws, wss, http, or https, and cannot include a URL fragment. If a relative URL is provided, it is relative to the base URL of the calling script. For more detail, see https://developer.mozilla.org/en-US/docs/Web/API/WebSocket/WebSocket#url
*
* @param prototol - A single string or an array of strings representing the sub-protocol(s) that the client would like to use, in order of preference. If it is omitted, an empty array is used by default, i.e. []. For more details, see
* @returns an operation yielding a {@link WebSocketResource}
*/
export function useWebSocket<T>(
url: string,
protocols?: string,
): Operation<WebSocketResource<T>>;

/**
* Create a [WebSocket](https://developer.mozilla.org/en-US/docs/Web/API/WebSocket)
* resource, but delegate the creation of the underlying websocket to a function
* of your choice. This is necessary on platforms that do not have a global
* `WebSocket` constructor such as NodeJS <= 20.
*
* The resource will not be returned until a connection has been
* succesffuly established with the server and the
* [`open`](https://developer.mozilla.org/en-US/docs/Web/API/WebSocket/open_event)
* has been received. Once initialized, it will crash if it receives
* an [`error`]() event at any time.
*
* Once created, the websocket resource can be use to consume events from the server:
*
* ```ts
* import * as ws from 'ws';
*
* function* example() {
* let socket = yield* useWebSocket(() => new ws.WebSocket("ws://websocket.example.org"));
*
* for (let event of yield* each(socket)) {
* console.log('event data: ', event.data);
* yield* each.next();
* }
* }
*
* ```
* @param create - a function that will construct the underlying [`WebSocket`](https://developer.mozilla.org/en-US/docs/Web/API/WebSocket) object that this resource wil use
* @returns an operation yielding a {@link WebSocketResource}
*/
export function useWebSocket<T>(
create: () => WebSocket,
): Operation<WebSocketResource<T>>;

/**
* @ignore the catch-all version that supports both forms above.
*/
export function useWebSocket<T>(
url: string | (() => WebSocket),
protocols?: string,
): Operation<WebSocketResource<T>> {
return resource(function* (provide) {
let socket = typeof url === "string"
? new WebSocket(url, protocols)
: url();

let messages = createSignal<MessageEvent<T>, CloseEvent>();
let { operation: closed, resolve: close } = withResolvers<CloseEvent>();

yield* spawn(function* () {
throw yield* once(socket, "error");
});

yield* once(socket, "open");

yield* spawn(function* () {
let subscription = yield* messages;
let next = yield* subscription.next();
while (!next.done) {
next = yield* subscription.next();
}
close(next.value);
});

try {
socket.addEventListener("message", messages.send);
socket.addEventListener("close", messages.close);

yield* race([
closed,
provide({
get binaryType() {
return socket.binaryType;
},
get bufferedAmmount() {
return socket.bufferedAmount;
},
get extensions() {
return socket.extensions;
},
get protocol() {
return socket.protocol;
},
get readyState() {
return socket.readyState;
},
get url() {
return socket.url;
},
send: (data) => socket.send(data),
[Symbol.iterator]: messages[Symbol.iterator],
}),
]);
} finally {
socket.close(1000, "released");
yield* closed;
socket.removeEventListener("message", messages.send);
socket.removeEventListener("close", messages.close);
}
});
}

/**
* @ignore until we can get jsdocs working for type unions
*/
export type WebSocketData = Parameters<WebSocket["send"]>[0];
Loading

0 comments on commit df6d765

Please sign in to comment.