A small client for Rockets using JSON-RPC as communication contract over a WebSocket.
You can install this package from PyPI:
pip install rockets
Rockets provides two types of clients to support asychronous and synchronous usage.
The AsyncClient
exposes all of its functionality as async
functions, hence an asyncio
event loop is needed to complete pending
execution via await
or run_until_complete()
.
For simplicity, a synchronous Client
is provided which automagically executes in a synchronous,
blocking fashion.
Create a client and connect:
from rockets import Client
# client does not connect during __init__;
# either explicit or automatically on any notify/request/send
client = Client('myhost:8080')
client.connect()
print(client.connected())
Close the connection with the socket cleanly:
from rockets import Client
client = Client('myhost:8080')
client.connect()
client.disconnect()
print(client.connected())
Listen to server notifications:
from rockets import Client
client = Client('myhost:8080')
client.notifications.subscribe(lambda msg: print("Got message:", msg.data))
NOTE: The notification object is of type Notification
.
Listen to any server message:
from rockets import Client
client = Client('myhost:8080')
client.ws_observable.subscribe(lambda msg: print("Got message:", msg))
Send notifications to the server:
from rockets import Client
client = Client('myhost:8080')
client.notify('mymethod', {'ping': True})
Make a synchronous, blocking request:
from rockets import Client
client = Client('myhost:8080')
response = client.request('mymethod', {'ping': True})
print(response)
Handle a request error:
from rockets import Client, RequestError
client = Client('myhost:8080')
try:
client.request('mymethod')
except RequestError as err:
print(err.code, err.message)
NOTE: Any error that may occur will be a RequestError
.
Make an asynchronous request, using the AsyncClient
and asyncio
:
import asyncio
from rockets import AsyncClient
client = AsyncClient('myhost:8080')
request_task = client.async_request('mymethod', {'ping': True})
asyncio.get_event_loop().run_until_complete(request_task)
print(request_task.result())
Alternatively, you can use add_done_callback()
from the returned RequestTask
which is called
once the request has finished:
import asyncio
from rockets import AsyncClient
client = AsyncClient('myhost:8080')
request_task = client.async_request('mymethod', {'ping': True})
request_task.add_done_callback(lambda task: print(task.result()))
asyncio.get_event_loop().run_until_complete(request_task)
If the RequestTask
is not needed, i.e. no cancel()
or add_progress_callback()
is desired, use
the request()
coroutine:
import asyncio
from rockets import AsyncClient
client = AsyncClient('myhost:8080')
coro = client.request('mymethod', {'ping': True})
result = asyncio.get_event_loop().run_until_complete(coro)
print(result)
If you are already in an async
function or in a Jupyter notebook cell, you may use await
to
execute an asynchronous request:
# Inside a notebook cell here
import asyncio
from rockets import AsyncClient
client = AsyncClient('myhost:8080')
result = await client.request('mymethod', {'ping': True})
print(result)
Cancel a request:
from rockets import AsyncClient
client = AsyncClient('myhost:8080')
request_task = client.async_request('mymethod')
request_task.cancel()
Get progress updates for a request:
from rockets import AsyncClient
client = AsyncClient('myhost:8080')
request_task = client.async_request('mymethod')
request_task.add_progress_callback(lambda progress: print(progress))
NOTE: The progress object is of type RequestProgress
.
Make a batch request:
from rockets import Client, Request, Notification
client = Client('myhost:8080')
request = Request('myrequest')
notification = Notification('mynotify')
responses = client.batch([request, notification])
for response in responses:
print(response)
Cancel a batch request:
from rockets import AsyncClient
client = AsyncClient('myhost:8080')
request = Request('myrequest')
notification = Notification('mynotify')
request_task = client.async_batch([request, notification])
request_task.cancel()