From ddd5edd6bd25328dc1d8db536e4dcd52532cf1cb Mon Sep 17 00:00:00 2001 From: Matteo Ferrando Date: Tue, 22 Oct 2024 00:32:09 -0400 Subject: [PATCH] separate in 2 rpc --- src/isolate/server/definitions/server.proto | 4 +- src/isolate/server/definitions/server_pb2.py | 4 +- .../server/definitions/server_pb2_grpc.py | 43 +++++++++++++++++++ src/isolate/server/server.py | 37 ++++++++++------ tests/test_server.py | 9 +++- 5 files changed, 81 insertions(+), 16 deletions(-) diff --git a/src/isolate/server/definitions/server.proto b/src/isolate/server/definitions/server.proto index 9212b64..934913d 100644 --- a/src/isolate/server/definitions/server.proto +++ b/src/isolate/server/definitions/server.proto @@ -6,7 +6,9 @@ import "google/protobuf/struct.proto"; service Isolate { // Run the given function on the specified environment. Streams logs // and the result originating from that function. - rpc Run (RunRequest) returns (stream PartialRunResult) {} + rpc Run (BoundFunction) returns (stream PartialRunResult) {} + + rpc RunFunction (RunRequest) returns (stream PartialRunResult) {} // Submit a function to be run without waiting for results. rpc Submit (SubmitRequest) returns (SubmitResponse) {} diff --git a/src/isolate/server/definitions/server_pb2.py b/src/isolate/server/definitions/server_pb2.py index b1bf725..1374bff 100644 --- a/src/isolate/server/definitions/server_pb2.py +++ b/src/isolate/server/definitions/server_pb2.py @@ -16,7 +16,7 @@ from google.protobuf import struct_pb2 as google_dot_protobuf_dot_struct__pb2 -DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x0cserver.proto\x1a\x0c\x63ommon.proto\x1a\x1cgoogle/protobuf/struct.proto\"\x9d\x01\n\rBoundFunction\x12,\n\x0c\x65nvironments\x18\x01 \x03(\x0b\x32\x16.EnvironmentDefinition\x12#\n\x08\x66unction\x18\x02 \x01(\x0b\x32\x11.SerializedObject\x12*\n\nsetup_func\x18\x03 \x01(\x0b\x32\x11.SerializedObjectH\x00\x88\x01\x01\x42\r\n\x0b_setup_func\"O\n\nRunRequest\x12 \n\x08\x66unction\x18\x01 \x01(\x0b\x32\x0e.BoundFunction\x12\x1f\n\x08metadata\x18\x02 \x01(\x0b\x32\r.TaskMetadata\"d\n\x15\x45nvironmentDefinition\x12\x0c\n\x04kind\x18\x01 \x01(\t\x12.\n\rconfiguration\x18\x02 \x01(\x0b\x32\x17.google.protobuf.Struct\x12\r\n\x05\x66orce\x18\x03 \x01(\x08\"R\n\rSubmitRequest\x12 \n\x08\x66unction\x18\x01 \x01(\x0b\x32\x0e.BoundFunction\x12\x1f\n\x08metadata\x18\x02 \x01(\x0b\x32\r.TaskMetadata\"\x90\x01\n\x0cTaskMetadata\x12\x36\n\rlogger_labels\x18\x01 \x03(\x0b\x32\x1f.TaskMetadata.LoggerLabelsEntry\x12\x13\n\x0bstream_logs\x18\x02 \x01(\x08\x1a\x33\n\x11LoggerLabelsEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\"!\n\x0eSubmitResponse\x12\x0f\n\x07task_id\x18\x01 \x01(\t\"F\n\x12SetMetadataRequest\x12\x0f\n\x07task_id\x18\x01 \x01(\t\x12\x1f\n\x08metadata\x18\x02 \x01(\x0b\x32\r.TaskMetadata\"\x15\n\x13SetMetadataResponse\"\r\n\x0bListRequest\"\x1b\n\x08TaskInfo\x12\x0f\n\x07task_id\x18\x01 \x01(\t\"(\n\x0cListResponse\x12\x18\n\x05tasks\x18\x01 \x03(\x0b\x32\t.TaskInfo\" \n\rCancelRequest\x12\x0f\n\x07task_id\x18\x01 \x01(\t\"\x10\n\x0e\x43\x61ncelResponse2\xf1\x01\n\x07Isolate\x12)\n\x03Run\x12\x0b.RunRequest\x1a\x11.PartialRunResult\"\x00\x30\x01\x12+\n\x06Submit\x12\x0e.SubmitRequest\x1a\x0f.SubmitResponse\"\x00\x12:\n\x0bSetMetadata\x12\x13.SetMetadataRequest\x1a\x14.SetMetadataResponse\"\x00\x12%\n\x04List\x12\x0c.ListRequest\x1a\r.ListResponse\"\x00\x12+\n\x06\x43\x61ncel\x12\x0e.CancelRequest\x1a\x0f.CancelResponse\"\x00\x62\x06proto3') +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x0cserver.proto\x1a\x0c\x63ommon.proto\x1a\x1cgoogle/protobuf/struct.proto\"\x9d\x01\n\rBoundFunction\x12,\n\x0c\x65nvironments\x18\x01 \x03(\x0b\x32\x16.EnvironmentDefinition\x12#\n\x08\x66unction\x18\x02 \x01(\x0b\x32\x11.SerializedObject\x12*\n\nsetup_func\x18\x03 \x01(\x0b\x32\x11.SerializedObjectH\x00\x88\x01\x01\x42\r\n\x0b_setup_func\"O\n\nRunRequest\x12 \n\x08\x66unction\x18\x01 \x01(\x0b\x32\x0e.BoundFunction\x12\x1f\n\x08metadata\x18\x02 \x01(\x0b\x32\r.TaskMetadata\"d\n\x15\x45nvironmentDefinition\x12\x0c\n\x04kind\x18\x01 \x01(\t\x12.\n\rconfiguration\x18\x02 \x01(\x0b\x32\x17.google.protobuf.Struct\x12\r\n\x05\x66orce\x18\x03 \x01(\x08\"R\n\rSubmitRequest\x12 \n\x08\x66unction\x18\x01 \x01(\x0b\x32\x0e.BoundFunction\x12\x1f\n\x08metadata\x18\x02 \x01(\x0b\x32\r.TaskMetadata\"\x90\x01\n\x0cTaskMetadata\x12\x36\n\rlogger_labels\x18\x01 \x03(\x0b\x32\x1f.TaskMetadata.LoggerLabelsEntry\x12\x13\n\x0bstream_logs\x18\x02 \x01(\x08\x1a\x33\n\x11LoggerLabelsEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\"!\n\x0eSubmitResponse\x12\x0f\n\x07task_id\x18\x01 \x01(\t\"F\n\x12SetMetadataRequest\x12\x0f\n\x07task_id\x18\x01 \x01(\t\x12\x1f\n\x08metadata\x18\x02 \x01(\x0b\x32\r.TaskMetadata\"\x15\n\x13SetMetadataResponse\"\r\n\x0bListRequest\"\x1b\n\x08TaskInfo\x12\x0f\n\x07task_id\x18\x01 \x01(\t\"(\n\x0cListResponse\x12\x18\n\x05tasks\x18\x01 \x03(\x0b\x32\t.TaskInfo\" \n\rCancelRequest\x12\x0f\n\x07task_id\x18\x01 \x01(\t\"\x10\n\x0e\x43\x61ncelResponse2\xa7\x02\n\x07Isolate\x12,\n\x03Run\x12\x0e.BoundFunction\x1a\x11.PartialRunResult\"\x00\x30\x01\x12\x31\n\x0bRunFunction\x12\x0b.RunRequest\x1a\x11.PartialRunResult\"\x00\x30\x01\x12+\n\x06Submit\x12\x0e.SubmitRequest\x1a\x0f.SubmitResponse\"\x00\x12:\n\x0bSetMetadata\x12\x13.SetMetadataRequest\x1a\x14.SetMetadataResponse\"\x00\x12%\n\x04List\x12\x0c.ListRequest\x1a\r.ListResponse\"\x00\x12+\n\x06\x43\x61ncel\x12\x0e.CancelRequest\x1a\x0f.CancelResponse\"\x00\x62\x06proto3') _globals = globals() _builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals) @@ -54,5 +54,5 @@ _globals['_CANCELRESPONSE']._serialized_start=884 _globals['_CANCELRESPONSE']._serialized_end=900 _globals['_ISOLATE']._serialized_start=903 - _globals['_ISOLATE']._serialized_end=1144 + _globals['_ISOLATE']._serialized_end=1198 # @@protoc_insertion_point(module_scope) diff --git a/src/isolate/server/definitions/server_pb2_grpc.py b/src/isolate/server/definitions/server_pb2_grpc.py index e643a79..3031aa8 100644 --- a/src/isolate/server/definitions/server_pb2_grpc.py +++ b/src/isolate/server/definitions/server_pb2_grpc.py @@ -42,6 +42,11 @@ def __init__(self, channel): """ self.Run = channel.unary_stream( '/Isolate/Run', + request_serializer=server__pb2.BoundFunction.SerializeToString, + response_deserializer=common__pb2.PartialRunResult.FromString, + _registered_method=True) + self.RunFunction = channel.unary_stream( + '/Isolate/RunFunction', request_serializer=server__pb2.RunRequest.SerializeToString, response_deserializer=common__pb2.PartialRunResult.FromString, _registered_method=True) @@ -78,6 +83,12 @@ def Run(self, request, context): context.set_details('Method not implemented!') raise NotImplementedError('Method not implemented!') + def RunFunction(self, request, context): + """Missing associated documentation comment in .proto file.""" + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details('Method not implemented!') + raise NotImplementedError('Method not implemented!') + def Submit(self, request, context): """Submit a function to be run without waiting for results. """ @@ -111,6 +122,11 @@ def add_IsolateServicer_to_server(servicer, server): rpc_method_handlers = { 'Run': grpc.unary_stream_rpc_method_handler( servicer.Run, + request_deserializer=server__pb2.BoundFunction.FromString, + response_serializer=common__pb2.PartialRunResult.SerializeToString, + ), + 'RunFunction': grpc.unary_stream_rpc_method_handler( + servicer.RunFunction, request_deserializer=server__pb2.RunRequest.FromString, response_serializer=common__pb2.PartialRunResult.SerializeToString, ), @@ -160,6 +176,33 @@ def Run(request, request, target, '/Isolate/Run', + server__pb2.BoundFunction.SerializeToString, + common__pb2.PartialRunResult.FromString, + options, + channel_credentials, + insecure, + call_credentials, + compression, + wait_for_ready, + timeout, + metadata, + _registered_method=True) + + @staticmethod + def RunFunction(request, + target, + options=(), + channel_credentials=None, + call_credentials=None, + insecure=False, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None): + return grpc.experimental.unary_stream( + request, + target, + '/Isolate/RunFunction', server__pb2.RunRequest.SerializeToString, common__pb2.PartialRunResult.FromString, options, diff --git a/src/isolate/server/server.py b/src/isolate/server/server.py index 1cddb19..1957cbc 100644 --- a/src/isolate/server/server.py +++ b/src/isolate/server/server.py @@ -363,24 +363,37 @@ def set_metadata(self, task: RunTask, metadata: definitions.TaskMetadata) -> Non # Stream_logs defaults to False if not set task.stream_logs = metadata.stream_logs - def Run( + def RunFunction( self, request: definitions.RunRequest, context: ServicerContext, ) -> Iterator[definitions.PartialRunResult]: try: - if isinstance(request, definitions.RunRequest): - task = RunTask(request=request.function) - self.set_metadata(task, request.metadata) - elif isinstance(request, definitions.BoundFunction): - task = RunTask(request=request) - else: - raise GRPCException( - "Invalid request type.", - StatusCode.INVALID_ARGUMENT, - ) + task = RunTask(request=request.function) + self.set_metadata(task, request.metadata) + + # HACK: we can support only one task at a time + # TODO: move away from this when we use submit for env-aware tasks + self.background_tasks["RUN"] = task + yield from self._run_task(task) + except GRPCException as exc: + return self.abort_with_msg( + exc.message, + context, + code=exc.code, + ) + finally: + self.background_tasks.pop("RUN", None) + + def Run( + self, + request: definitions.BoundFunction, + context: ServicerContext, + ) -> Iterator[definitions.PartialRunResult]: + try: + task = RunTask(request=request) - # HACK: we can support only one task at a time for Run + # HACK: we can support only one task at a time # TODO: move away from this when we use submit for env-aware tasks self.background_tasks["RUN"] = task yield from self._run_task(task) diff --git a/tests/test_server.py b/tests/test_server.py index 1ead109..02e25b4 100644 --- a/tests/test_server.py +++ b/tests/test_server.py @@ -131,7 +131,14 @@ def run_request( } return_value = _NOT_SET - for result in stub.Run(request): + if isinstance(request, definitions.BoundFunction): + func = stub.Run + elif isinstance(request, definitions.RunRequest): + func = stub.RunFunction + else: + raise ValueError(f"Unknown request type: {type(request)}") + + for result in func(request): for _log in result.logs: log = from_grpc(_log) log_store[log.source].append(log)