From a1da49f8e1f3639fa2c7ba179c6984c44e22ec08 Mon Sep 17 00:00:00 2001 From: Mike Skells Date: Thu, 7 Nov 2024 17:36:32 +0000 Subject: [PATCH 1/2] add telemetry --- .../config/validators/ClassValidator.java | 35 +++++++++++++++++++ .../kafka/connect/gcs/GcsSinkConfig.java | 30 ++++++++++++---- .../aiven/kafka/connect/gcs/GcsSinkTask.java | 9 +++-- 3 files changed, 64 insertions(+), 10 deletions(-) create mode 100644 gcs-sink-connector/src/main/java/io/aiven/kafka/connect/common/config/validators/ClassValidator.java diff --git a/gcs-sink-connector/src/main/java/io/aiven/kafka/connect/common/config/validators/ClassValidator.java b/gcs-sink-connector/src/main/java/io/aiven/kafka/connect/common/config/validators/ClassValidator.java new file mode 100644 index 000000000..24513b34b --- /dev/null +++ b/gcs-sink-connector/src/main/java/io/aiven/kafka/connect/common/config/validators/ClassValidator.java @@ -0,0 +1,35 @@ +/* + * Copyright 2024 Aiven Oy + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.aiven.kafka.connect.common.config.validators; + +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.config.ConfigException; + +public class ClassValidator implements ConfigDef.Validator { + private final Class baseClass; + + public ClassValidator(final Class baseClass) { + this.baseClass = baseClass; + } + + @Override + public void ensureValid(final String name, final Object value) { + if (value != null && !baseClass.isAssignableFrom((Class) value)) { + throw new ConfigException(name, value, "must be a subclass of " + baseClass.getName()); + } + } +} diff --git a/gcs-sink-connector/src/main/java/io/aiven/kafka/connect/gcs/GcsSinkConfig.java b/gcs-sink-connector/src/main/java/io/aiven/kafka/connect/gcs/GcsSinkConfig.java index 5ac3a26ba..04507ec85 100644 --- a/gcs-sink-connector/src/main/java/io/aiven/kafka/connect/gcs/GcsSinkConfig.java +++ b/gcs-sink-connector/src/main/java/io/aiven/kafka/connect/gcs/GcsSinkConfig.java @@ -27,6 +27,8 @@ import java.util.stream.Collectors; import java.util.stream.Stream; +import com.google.api.gax.tracing.ApiTracerFactory; +import io.aiven.kafka.connect.common.config.validators.ClassValidator; import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.common.config.types.Password; @@ -59,6 +61,7 @@ public final class GcsSinkConfig extends AivenCommonConfig { public static final String GCS_BUCKET_NAME_CONFIG = "gcs.bucket.name"; public static final String GCS_OBJECT_CONTENT_ENCODING_CONFIG = "gcs.object.content.encoding"; public static final String GCS_USER_AGENT = "gcs.user.agent"; + public static final String GCS_METRICS = "gcs.metrics.class"; private static final String GROUP_FILE = "File"; public static final String FILE_NAME_PREFIX_CONFIG = "file.name.prefix"; public static final String FILE_NAME_TEMPLATE_CONFIG = "file.name.template"; @@ -144,14 +147,16 @@ private static void addGcsConfigGroup(final ConfigDef configDef) { configDef.define(GCS_BUCKET_NAME_CONFIG, ConfigDef.Type.STRING, ConfigDef.NO_DEFAULT_VALUE, new ConfigDef.NonEmptyString(), ConfigDef.Importance.HIGH, - "The GCS bucket name to store output files in.", GROUP_GCS, gcsGroupCounter++, ConfigDef.Width.NONE, // NOPMD - // the - // gcsGroupCounter - // updated - // value - // never - // used + "The GCS bucket name to store output files in.", GROUP_GCS, gcsGroupCounter++, ConfigDef.Width.NONE, GCS_BUCKET_NAME_CONFIG); + configDef.define(GCS_METRICS, ConfigDef.Type.CLASS, + null, new ClassValidator(ApiTracerFactory.class), + ConfigDef.Importance.LOW, + "class for GCS metrics. Default is not to attache metrics", + GROUP_GCS, gcsGroupCounter++, ConfigDef.Width.NONE, // NOPMD + // retryPolicyGroupCounter updated value never used + GCS_METRICS); + } private static void addGcsRetryPolicies(final ConfigDef configDef) { @@ -438,4 +443,15 @@ public String getGcsEndpoint() { public String getUserAgent() { return getString(GCS_USER_AGENT); } + + public ApiTracerFactory getApiTracerFactory() { + if (getClass(GCS_METRICS) == null) { + return null; + } + try { + return getClass(GCS_METRICS).asSubclass(ApiTracerFactory.class).getDeclaredConstructor().newInstance(); + } catch (ReflectiveOperationException e) { + throw new RuntimeException(e); + } + } } diff --git a/gcs-sink-connector/src/main/java/io/aiven/kafka/connect/gcs/GcsSinkTask.java b/gcs-sink-connector/src/main/java/io/aiven/kafka/connect/gcs/GcsSinkTask.java index eda879e4a..bed787104 100644 --- a/gcs-sink-connector/src/main/java/io/aiven/kafka/connect/gcs/GcsSinkTask.java +++ b/gcs-sink-connector/src/main/java/io/aiven/kafka/connect/gcs/GcsSinkTask.java @@ -71,7 +71,7 @@ public void start(final Map props) { Objects.requireNonNull(props, "props cannot be null"); this.config = new GcsSinkConfig(props); - this.storage = StorageOptions.newBuilder() + StorageOptions.Builder builder = StorageOptions.newBuilder() .setHost(config.getGcsEndpoint()) .setCredentials(config.getCredentials()) .setHeaderProvider(FixedHeaderProvider.create(USER_AGENT_HEADER_KEY, config.getUserAgent())) @@ -81,8 +81,11 @@ public void start(final Map props) { .setRetryDelayMultiplier(config.getGcsRetryBackoffDelayMultiplier()) .setTotalTimeout(config.getGcsRetryBackoffTotalTimeout()) .setMaxAttempts(config.getGcsRetryBackoffMaxAttempts()) - .build()) - .build() + .build()); + if (config.getApiTracerFactory() != null) { + builder.setApiTracerFactory(config.getApiTracerFactory()); + } + this.storage = builder.build() .getService(); initRest(); if (Objects.nonNull(config.getKafkaRetryBackoffMs())) { From bc2241f34de4be3ae13733e430d1e482933880a5 Mon Sep 17 00:00:00 2001 From: Mike Skells Date: Mon, 11 Nov 2024 09:58:01 +0000 Subject: [PATCH 2/2] add telemetry --- .../java/io/aiven/kafka/connect/gcs/GcsSinkConfig.java | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/gcs-sink-connector/src/main/java/io/aiven/kafka/connect/gcs/GcsSinkConfig.java b/gcs-sink-connector/src/main/java/io/aiven/kafka/connect/gcs/GcsSinkConfig.java index 04507ec85..417f66264 100644 --- a/gcs-sink-connector/src/main/java/io/aiven/kafka/connect/gcs/GcsSinkConfig.java +++ b/gcs-sink-connector/src/main/java/io/aiven/kafka/connect/gcs/GcsSinkConfig.java @@ -445,13 +445,16 @@ public String getUserAgent() { } public ApiTracerFactory getApiTracerFactory() { - if (getClass(GCS_METRICS) == null) { + final Class cls = getClass(GCS_METRICS); + if (cls == null) { return null; } try { - return getClass(GCS_METRICS).asSubclass(ApiTracerFactory.class).getDeclaredConstructor().newInstance(); + return cls.asSubclass(ApiTracerFactory.class).getDeclaredConstructor().newInstance(); } catch (ReflectiveOperationException e) { - throw new RuntimeException(e); + var ex = new ConfigException("Failed to create GCS metrics for "+cls+" : " + e.getMessage()); + ex.initCause(e); + throw ex; } } }