Skip to content

Commit

Permalink
feat: Add Retryable Configuration for GRPC Sink (using CEL) (#44)
Browse files Browse the repository at this point in the history
Add new config to enable response based retry in gRPC Sink Config

Example Parameter:
SINK_GRPC_RESPONSE_RETRY_CEL_EXPRESSION ="GenericResponse.success == false && GenericResponse.errors.exists(e, e.code == "400")"

Reason for the PR:
EGLC users who want to migrate expect this feature to be supported in Firehose

Notes :

CEL Expression: https://cel.dev/
cel-java: https://github.com/google/cel-java

* [feat] Add CEL evaluator and add it to grpc sink

* Remove blank check

* remove unintended change

* [feat] add success field checking

* [feat] add evaluator method

* [feat] handle descriptor update

* Add test for evaluator

* Update test

* Update SINK_GRPC_RESPONSE_RETRY_CEL_EXPRESSION default value

* Add test for GrpcSink

* Rename descriptor to payloadDescriptor

* Checkstyle update

* Refactor instantiation logic to separate method

* Remove schema refresh

* Remove projectnessie and use implementation from cel-java

* update checkstyle

* Add comment

* Update docs

* Move the evaluator instantiation to factory method

* Remove unused sink config

* Add more testcases

* revert protoc version

* Add more test cases

* Add more comprehensive documentation

* Rename default class and update docs

* Refactor typical cel functionality to util class

* Add checking for expression result

* Use built in UnsupportedOperationException

* Update build-info-extractor

* Update to classpath("org.jfrog.buildinfo:build-info-extractor-gradle:4.33.1")

* Remove OperationNotSupportedException.java

* Remove jfrog build info on dependencies

* - Tidy up tests
- Make exception message more verbose

* Bump version

* Makes error type for retryable error configurable through env

* Add 1 more test case

* Update default value

* Use default value of true on CEL Expression config to retry on default case. Remove implementation of DefaultGrpcResponsePayloadEvaluator.java
  • Loading branch information
ekawinataa authored Jul 24, 2024
1 parent a368c89 commit ba3220e
Show file tree
Hide file tree
Showing 10 changed files with 418 additions and 11 deletions.
43 changes: 43 additions & 0 deletions docs/docs/sinks/grpc-sink.md
Original file line number Diff line number Diff line change
Expand Up @@ -82,3 +82,46 @@ Defines the amount of time (in milliseconds) gRPC clients are willing to wait fo

- Example value: `1000`
- Type: `optional`

### `SINK_GRPC_RESPONSE_RETRY_CEL_EXPRESSION`

Defines the CEL(Common Expression Language) expression used to evaluate whether gRPC sink call should be retried or not based on the gRPC response.
The given CEL expression should evaluate to a boolean value. If the expression evaluates to true, the unsuccessful gRPC sink call will be retried, otherwise it won't.
Currently, we support all standard CEL macro including: has, all, exists, exists_one, map, map_filter, filter
For more information about CEL please refer to this documentation : https://github.com/google/cel-spec/blob/master/doc/langdef.md

- Example value: `com.gotocompany.generic.GrpcResponse.success == false && com.gotocompany.generic.GenericResponse.errors.exists(e, int(e.code) >= 400 && int(e.code) <= 500)`
- Type: `optional`
- Default value: ``
- Use cases :
Example response proto :
```
syntax = "proto3";
package com.gotocompany.generic;
GenericResponse {
bool success = 1;
repeated Error errors = 2;
}
Error {
string code = 1;
string reason = 2;
}
```

Example retry rule :
- Retry on specific error code : `com.gotocompany.generic.GenericResponse.errors.exists(e, e.code == "400")`
- Retry on specific error code range : `com.gotocompany.generic.GenericResponse.errors.exists(e, int(e.code) >= 400 && int(e.code) <= 500)`
- Retry on error codes outside from specific error codes : `com.gotocompany.generic.GenericResponse.errors.exists(e, !(int(e.code) in [400, 500, 600]))`
- Disable retry on all cases : `false`
- Retry on all error codes : `true`

### `SINK_GRPC_RESPONSE_RETRY_ERROR_TYPE`

Defines the ErrorType to assign for a retryable error. This is used in conjunction with `SINK_GRPC_RESPONSE_RETRY_CEL_EXPRESSION` and `ERROR_TYPES_FOR_RETRY`.
Value must be defined in com.gotocompany.depot.error.ErrorType

- Example value: `SINK_RETRYABLE_ERROR`
- Type: `optional`
- Default Value: `DEFAULT_ERROR`
11 changes: 11 additions & 0 deletions src/main/java/com/gotocompany/firehose/config/GrpcSinkConfig.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package com.gotocompany.firehose.config;

import com.gotocompany.depot.error.ErrorType;
import com.gotocompany.firehose.config.converter.GrpcMetadataConverter;
import com.gotocompany.firehose.config.converter.GrpcSinkRetryErrorTypeConverter;
import org.aeonbits.owner.Config;

import java.util.Map;
Expand Down Expand Up @@ -31,6 +33,15 @@ public interface GrpcSinkConfig extends AppConfig {
@Config.Key("SINK_GRPC_ARG_DEADLINE_MS")
Long getSinkGrpcArgDeadlineMS();

@Config.Key("SINK_GRPC_RESPONSE_RETRY_CEL_EXPRESSION")
@DefaultValue("true")
String getSinkGrpcResponseRetryCELExpression();

@Config.Key("SINK_GRPC_RESPONSE_RETRY_ERROR_TYPE")
@DefaultValue("DEFAULT_ERROR")
@ConverterClass(GrpcSinkRetryErrorTypeConverter.class)
ErrorType getSinkGrpcRetryErrorType();

@Key("SINK_GRPC_METADATA")
@DefaultValue("")
@ConverterClass(GrpcMetadataConverter.class)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package com.gotocompany.firehose.config.converter;

import com.gotocompany.depot.error.ErrorType;
import org.aeonbits.owner.Converter;

import java.lang.reflect.Method;
import java.util.Locale;

public class GrpcSinkRetryErrorTypeConverter implements Converter<ErrorType> {
@Override
public ErrorType convert(Method method, String s) {
return ErrorType.valueOf(s.trim().toUpperCase(Locale.ROOT));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package com.gotocompany.firehose.evaluator;

import com.google.protobuf.Descriptors;
import com.google.protobuf.Message;
import com.gotocompany.firehose.utils.CelUtils;
import dev.cel.common.types.CelKind;
import dev.cel.compiler.CelCompiler;
import dev.cel.runtime.CelRuntime;
import dev.cel.runtime.CelRuntimeFactory;
import lombok.extern.slf4j.Slf4j;

/**
* Implementation of PayloadEvaluator that evaluates gRPC responses using CEL (Common Expression Language).
*/
@Slf4j
public class GrpcResponseCelPayloadEvaluator implements PayloadEvaluator<Message> {

private final Descriptors.Descriptor descriptor;
private CelRuntime.Program celProgram;

/**
* Constructs a GrpcResponseCelPayloadEvaluator with the specified descriptor and CEL expression.
*
* @param descriptor the descriptor of the gRPC message
* @param celExpression the CEL expression to evaluate against the message
*/
public GrpcResponseCelPayloadEvaluator(Descriptors.Descriptor descriptor, String celExpression) {
this.descriptor = descriptor;
buildCelEnvironment(celExpression);
}

/**
* Evaluates the given gRPC message payload using the CEL program.
*
* @param payload the gRPC message to be evaluated
* @return true if the payload passes the evaluation, false otherwise
*/
@Override
public boolean evaluate(Message payload) {
if (!descriptor.getFullName().equals(payload.getDescriptorForType().getFullName())) {
throw new IllegalArgumentException(String.format("Payload %s does not match descriptor %s",
payload.getDescriptorForType().getFullName(), descriptor.getFullName()));
}
return (boolean) CelUtils.evaluate(this.celProgram, payload);
}

/**
* Builds the CEL environment required to evaluate the CEL expression.
*
* @param celExpression the CEL expression to evaluate against the message
* @throws IllegalArgumentException if the CEL expression is invalid or if the evaluator cannot be constructed
*/
private void buildCelEnvironment(String celExpression) {
CelCompiler celCompiler = CelUtils.initializeCelCompiler(this.descriptor);
CelRuntime celRuntime = CelRuntimeFactory.standardCelRuntimeBuilder()
.build();
this.celProgram = CelUtils.initializeCelProgram(celExpression, celRuntime, celCompiler,
celType -> celType.kind().equals(CelKind.BOOL));
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package com.gotocompany.firehose.evaluator;

/**
* A generic interface for evaluating payloads.
*
* @param <T> the type of payload to be evaluated
*/
public interface PayloadEvaluator<T> {
/**
* Evaluates the given payload.
*
* @param payload the payload to be evaluated
* @return true if the payload passes the evaluation, false otherwise
*/
boolean evaluate(T payload);
}
29 changes: 26 additions & 3 deletions src/main/java/com/gotocompany/firehose/sink/grpc/GrpcSink.java
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
package com.gotocompany.firehose.sink.grpc;



import com.gotocompany.depot.error.ErrorInfo;
import com.gotocompany.depot.error.ErrorType;
import com.gotocompany.firehose.config.GrpcSinkConfig;
import com.gotocompany.firehose.evaluator.PayloadEvaluator;
import com.gotocompany.firehose.exception.DefaultException;
import com.gotocompany.firehose.exception.DeserializerException;
import com.gotocompany.firehose.message.Message;
import com.gotocompany.firehose.metrics.FirehoseInstrumentation;
Expand All @@ -21,13 +25,21 @@
public class GrpcSink extends AbstractSink {

private final GrpcClient grpcClient;
private final StencilClient stencilClient;
private final GrpcSinkConfig grpcSinkConfig;
private List<Message> messages;
private StencilClient stencilClient;
private PayloadEvaluator<com.google.protobuf.Message> retryEvaluator;

public GrpcSink(FirehoseInstrumentation firehoseInstrumentation, GrpcClient grpcClient, StencilClient stencilClient) {
public GrpcSink(FirehoseInstrumentation firehoseInstrumentation,
GrpcClient grpcClient,
StencilClient stencilClient,
GrpcSinkConfig grpcSinkConfig,
PayloadEvaluator<com.google.protobuf.Message> retryEvaluator) {
super(firehoseInstrumentation, "grpc");
this.grpcClient = grpcClient;
this.stencilClient = stencilClient;
this.grpcSinkConfig = grpcSinkConfig;
this.retryEvaluator = retryEvaluator;
}

@Override
Expand All @@ -43,6 +55,7 @@ protected List<Message> execute() throws Exception {
if (!success) {
getFirehoseInstrumentation().logWarn("Grpc Service returned error");
failedMessages.add(message);
setRetryableErrorInfo(message, response);
}
}
getFirehoseInstrumentation().logDebug("Failed messages count: {}", failedMessages.size());
Expand All @@ -60,4 +73,14 @@ public void close() throws IOException {
this.messages = new ArrayList<>();
stencilClient.close();
}

private void setRetryableErrorInfo(Message message, DynamicMessage dynamicMessage) {
boolean eligibleToRetry = retryEvaluator.evaluate(dynamicMessage);
if (eligibleToRetry) {
getFirehoseInstrumentation().logDebug("Retrying grpc service");
message.setErrorInfo(new ErrorInfo(new DefaultException("Retryable gRPC Error"), grpcSinkConfig.getSinkGrpcRetryErrorType()));
return;
}
message.setErrorInfo(new ErrorInfo(new DefaultException("Non Retryable gRPC Error"), ErrorType.SINK_NON_RETRYABLE_ERROR));
}
}
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
package com.gotocompany.firehose.sink.grpc;


import com.google.protobuf.Message;
import com.gotocompany.firehose.config.AppConfig;
import com.gotocompany.firehose.config.GrpcSinkConfig;
import com.gotocompany.firehose.evaluator.GrpcResponseCelPayloadEvaluator;
import com.gotocompany.firehose.evaluator.PayloadEvaluator;
import com.gotocompany.firehose.metrics.FirehoseInstrumentation;
import com.gotocompany.firehose.proto.ProtoToMetadataMapper;
import com.gotocompany.firehose.sink.grpc.client.GrpcClient;
Expand Down Expand Up @@ -32,15 +35,21 @@ public static AbstractSink create(Map<String, String> configuration, StatsDRepor
grpcConfig.getSinkGrpcServiceHost(), grpcConfig.getSinkGrpcServicePort(), grpcConfig.getSinkGrpcMethodUrl(), grpcConfig.getSinkGrpcResponseSchemaProtoClass());
firehoseInstrumentation.logDebug(grpcSinkConfig);
ManagedChannel managedChannel = ManagedChannelBuilder.forAddress(grpcConfig.getSinkGrpcServiceHost(), grpcConfig.getSinkGrpcServicePort())
.keepAliveTime(grpcConfig.getSinkGrpcArgKeepaliveTimeMS(), TimeUnit.MILLISECONDS)
.keepAliveTimeout(grpcConfig.getSinkGrpcArgKeepaliveTimeoutMS(), TimeUnit.MILLISECONDS)
.usePlaintext().build();
.keepAliveTime(grpcConfig.getSinkGrpcArgKeepaliveTimeMS(), TimeUnit.MILLISECONDS)
.keepAliveTimeout(grpcConfig.getSinkGrpcArgKeepaliveTimeoutMS(), TimeUnit.MILLISECONDS)
.usePlaintext().build();
AppConfig appConfig = ConfigFactory.create(AppConfig.class, configuration);
ProtoToMetadataMapper protoToMetadataMapper = new ProtoToMetadataMapper(stencilClient.get(appConfig.getInputSchemaProtoClass()), grpcConfig.getSinkGrpcMetadata());
GrpcClient grpcClient = new GrpcClient(new FirehoseInstrumentation(statsDReporter, GrpcClient.class), grpcConfig, managedChannel, stencilClient, protoToMetadataMapper);
firehoseInstrumentation.logInfo("GRPC connection established");
PayloadEvaluator<Message> grpcResponseRetryEvaluator = instantiatePayloadEvaluator(grpcConfig, stencilClient);
return new GrpcSink(new FirehoseInstrumentation(statsDReporter, GrpcSink.class), grpcClient, stencilClient, grpcConfig, grpcResponseRetryEvaluator);
}

return new GrpcSink(new FirehoseInstrumentation(statsDReporter, GrpcSink.class), grpcClient, stencilClient);
private static PayloadEvaluator<Message> instantiatePayloadEvaluator(GrpcSinkConfig grpcSinkConfig, StencilClient stencilClient) {
return new GrpcResponseCelPayloadEvaluator(
stencilClient.get(grpcSinkConfig.getSinkGrpcResponseSchemaProtoClass()),
grpcSinkConfig.getSinkGrpcResponseRetryCELExpression());
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package com.gotocompany.firehose.converter;

import com.gotocompany.depot.error.ErrorType;
import com.gotocompany.firehose.config.converter.GrpcSinkRetryErrorTypeConverter;
import org.junit.Test;
import org.junit.jupiter.api.Assertions;

import java.util.Arrays;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;


public class GrpcSinkRetryErrorTypeConverterTest {
@Test
public void shouldConvertToAppropriateEnumType() {
Map<String, ErrorType> stringToExpectedValue = Arrays.stream(ErrorType.values())
.collect(Collectors.toMap(ErrorType::toString, Function.identity()));
GrpcSinkRetryErrorTypeConverter grpcSinkRetryErrorTypeConverter = new GrpcSinkRetryErrorTypeConverter();

stringToExpectedValue.keySet().stream()
.forEach(key -> {
ErrorType expectedValue = stringToExpectedValue.get(key);
ErrorType actualValue = grpcSinkRetryErrorTypeConverter.convert(null, key);
Assertions.assertEquals(expectedValue, actualValue);
});
}

@Test(expected = IllegalArgumentException.class)
public void shouldThrowExceptionForInvalidValue() {
GrpcSinkRetryErrorTypeConverter grpcSinkRetryErrorTypeConverter = new GrpcSinkRetryErrorTypeConverter();
grpcSinkRetryErrorTypeConverter.convert(null, "ErrorType.UNREGISTERED");
}
}
Loading

0 comments on commit ba3220e

Please sign in to comment.