Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add streaming support to the Reasoning Engine Python client. #4619

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions google/cloud/aiplatform_v1beta1/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -900,6 +900,7 @@
from .types.reasoning_engine import ReasoningEngineSpec
from .types.reasoning_engine_execution_service import QueryReasoningEngineRequest
from .types.reasoning_engine_execution_service import QueryReasoningEngineResponse
from .types.reasoning_engine_execution_service import StreamQueryReasoningEngineRequest
from .types.reasoning_engine_service import CreateReasoningEngineOperationMetadata
from .types.reasoning_engine_service import CreateReasoningEngineRequest
from .types.reasoning_engine_service import DeleteReasoningEngineRequest
Expand Down Expand Up @@ -1845,6 +1846,7 @@
"QueryExtensionResponse",
"QueryReasoningEngineRequest",
"QueryReasoningEngineResponse",
"StreamQueryReasoningEngineRequest",
"QuestionAnsweringCorrectnessInput",
"QuestionAnsweringCorrectnessInstance",
"QuestionAnsweringCorrectnessResult",
Expand Down
15 changes: 15 additions & 0 deletions google/cloud/aiplatform_v1beta1/gapic_metadata.json
Original file line number Diff line number Diff line change
Expand Up @@ -4682,6 +4682,11 @@
"methods": [
"query_reasoning_engine"
]
},
"StreamQueryReasoningEngine": {
"methods": [
"stream_query_reasoning_engine"
]
}
}
},
Expand All @@ -4692,6 +4697,11 @@
"methods": [
"query_reasoning_engine"
]
},
"StreamQueryReasoningEngine": {
"methods": [
"stream_query_reasoning_engine"
]
}
}
},
Expand All @@ -4702,6 +4712,11 @@
"methods": [
"query_reasoning_engine"
]
},
"StreamQueryReasoningEngine": {
"methods": [
"stream_query_reasoning_engine"
]
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
MutableMapping,
MutableSequence,
Optional,
Iterable,
Sequence,
Tuple,
Type,
Expand All @@ -49,6 +50,7 @@
OptionalRetry = Union[retries.Retry, object, None] # type: ignore

from google.cloud.aiplatform_v1beta1.types import reasoning_engine_execution_service
from google.api import httpbody_pb2 # type: ignore
from google.cloud.location import locations_pb2 # type: ignore
from google.iam.v1 import iam_policy_pb2 # type: ignore
from google.iam.v1 import policy_pb2 # type: ignore
Expand Down Expand Up @@ -799,6 +801,134 @@ def sample_query_reasoning_engine():
# Done; return the response.
return response

def stream_query_reasoning_engine(self,
request: Optional[Union[reasoning_engine_execution_service.StreamQueryReasoningEngineRequest, dict]] = None,
*,
retry: OptionalRetry = gapic_v1.method.DEFAULT,
timeout: Union[float, object] = gapic_v1.method.DEFAULT,
metadata: Sequence[Tuple[str, str]] = (),
) -> Iterable[httpbody_pb2.HttpBody]:
r"""Streams queries using a reasoning engine.

.. code-block:: python

# This snippet has been automatically generated and should be regarded as a
# code template only.
# It will require modifications to work:
# - It may require correct/in-range values for request initialization.
# - It may require specifying regional endpoints when creating the service
# client as shown in:
# https://googleapis.dev/python/google-api-core/latest/client_options.html
from google.cloud import aiplatform_v1beta1

def sample_stream_query_reasoning_engine():
# Create a client
client = aiplatform_v1beta1.ReasoningEngineExecutionServiceClient()

# Initialize request argument(s)
request = aiplatform_v1beta1.StreamQueryReasoningEngineRequest(
name="name_value",
)

# Make the request
stream = client.stream_query_reasoning_engine(request=request)

# Handle the response
for response in stream:
print(response)

Args:
request (Union[google.cloud.aiplatform_v1beta1.types.StreamQueryReasoningEngineRequest, dict]):
The request object. Request message for
[ReasoningEngineExecutionService.StreamQuery][].
retry (google.api_core.retry.Retry): Designation of what errors, if any,
should be retried.
timeout (float): The timeout for this request.
metadata (Sequence[Tuple[str, str]]): Strings which should be
sent along with the request as metadata.

Returns:
Iterable[google.api.httpbody_pb2.HttpBody]:
Message that represents an arbitrary HTTP body. It should only be used for
payload formats that can't be represented as JSON,
such as raw binary or an HTML page.

This message can be used both in streaming and
non-streaming API methods in the request as well as
the response.

It can be used as a top-level request field, which is
convenient if one wants to extract parameters from
either the URL or HTTP template into the request
fields and also want access to the raw HTTP body.

Example:

message GetResourceRequest {
// A unique request id. string request_id = 1;

// The raw HTTP body is bound to this field.
google.api.HttpBody http_body = 2;

}

service ResourceService {
rpc GetResource(GetResourceRequest)
returns (google.api.HttpBody);

rpc UpdateResource(google.api.HttpBody)
returns (google.protobuf.Empty);

}

Example with streaming methods:

service CaldavService {
rpc GetCalendar(stream google.api.HttpBody)
returns (stream google.api.HttpBody);

rpc UpdateCalendar(stream google.api.HttpBody)
returns (stream google.api.HttpBody);

}

Use of this type only changes how the request and
response bodies are handled, all other features will
continue to work unchanged.

"""
# Create or coerce a protobuf request object.
# - Use the request object if provided (there's no risk of modifying the input as
# there are no flattened fields), or create one.
if not isinstance(request, reasoning_engine_execution_service.StreamQueryReasoningEngineRequest):
request = reasoning_engine_execution_service.StreamQueryReasoningEngineRequest(request)

# Wrap the RPC method; this adds retry and timeout information,
# and friendly error handling.
rpc = self._transport._wrapped_methods[self._transport.stream_query_reasoning_engine]

# Certain fields should be provided within the metadata header;
# add these here.
metadata = tuple(metadata) + (
gapic_v1.routing_header.to_grpc_metadata((
("name", request.name),
)),
)

# Validate the universe domain.
self._validate_universe_domain()

# Send the request.
response = rpc(
request,
retry=retry,
timeout=timeout,
metadata=metadata,
)

# Done; return the response.
return response

def __enter__(self) -> "ReasoningEngineExecutionServiceClient":
return self

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
from google.oauth2 import service_account # type: ignore

from google.cloud.aiplatform_v1beta1.types import reasoning_engine_execution_service
from google.api import httpbody_pb2 # type: ignore
from google.cloud.location import locations_pb2 # type: ignore
from google.iam.v1 import iam_policy_pb2 # type: ignore
from google.iam.v1 import policy_pb2 # type: ignore
Expand Down Expand Up @@ -138,6 +139,11 @@ def _prep_wrapped_messages(self, client_info):
default_timeout=None,
client_info=client_info,
),
self.stream_query_reasoning_engine: gapic_v1.method.wrap_method(
self.stream_query_reasoning_engine,
default_timeout=None,
client_info=client_info,
),
self.get_location: gapic_v1.method.wrap_method(
self.get_location,
default_timeout=None,
Expand Down Expand Up @@ -211,6 +217,15 @@ def query_reasoning_engine(
]:
raise NotImplementedError()

@property
def stream_query_reasoning_engine(self) -> Callable[
[reasoning_engine_execution_service.StreamQueryReasoningEngineRequest],
Union[
httpbody_pb2.HttpBody,
Awaitable[httpbody_pb2.HttpBody]
]]:
raise NotImplementedError()

@property
def list_operations(
self,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import grpc # type: ignore

from google.cloud.aiplatform_v1beta1.types import reasoning_engine_execution_service
from google.api import httpbody_pb2 # type: ignore
from google.cloud.location import locations_pb2 # type: ignore
from google.iam.v1 import iam_policy_pb2 # type: ignore
from google.iam.v1 import policy_pb2 # type: ignore
Expand Down Expand Up @@ -270,6 +271,32 @@ def query_reasoning_engine(
)
return self._stubs["query_reasoning_engine"]

@property
def stream_query_reasoning_engine(self) -> Callable[
[reasoning_engine_execution_service.StreamQueryReasoningEngineRequest],
httpbody_pb2.HttpBody]:
r"""Return a callable for the stream query reasoning engine method over gRPC.

Streams queries using a reasoning engine.

Returns:
Callable[[~.StreamQueryReasoningEngineRequest],
~.HttpBody]:
A function that, when called, will call the underlying RPC
on the server.
"""
# Generate a "stub function" on-the-fly which will actually make
# the request.
# gRPC handles serialization and deserialization, so we just need
# to pass in the functions for each.
if 'stream_query_reasoning_engine' not in self._stubs:
self._stubs['stream_query_reasoning_engine'] = self.grpc_channel.unary_stream(
'/google.cloud.aiplatform.v1beta1.ReasoningEngineExecutionService/StreamQueryReasoningEngine',
request_serializer=reasoning_engine_execution_service.StreamQueryReasoningEngineRequest.serialize,
response_deserializer=httpbody_pb2.HttpBody.FromString,
)
return self._stubs['stream_query_reasoning_engine']

def close(self):
self.grpc_channel.close()

Expand Down
Loading