diff --git a/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/GcpClientBuilder.java b/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/GcpClientBuilder.java index c51c843..7043498 100644 --- a/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/GcpClientBuilder.java +++ b/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/GcpClientBuilder.java @@ -27,6 +27,8 @@ import static com.wepay.kafka.connect.bigquery.config.BigQuerySinkConfig.USE_STORAGE_WRITE_API_CONFIG; import com.google.api.gax.core.FixedCredentialsProvider; +import com.google.api.gax.rpc.FixedHeaderProvider; +import com.google.api.gax.rpc.HeaderProvider; import com.google.auth.oauth2.GoogleCredentials; import com.google.cloud.bigquery.BigQuery; import com.google.cloud.bigquery.BigQueryOptions; @@ -37,6 +39,7 @@ import com.wepay.kafka.connect.bigquery.config.BigQuerySinkConfig; import com.wepay.kafka.connect.bigquery.exception.BigQueryConnectException; import com.wepay.kafka.connect.bigquery.exception.BigQueryStorageWriteApiConnectException; +import com.wepay.kafka.connect.bigquery.utils.Version; import java.io.ByteArrayInputStream; import java.io.FileInputStream; import java.io.IOException; @@ -60,6 +63,10 @@ public abstract class GcpClientBuilder { "https://www.googleapis.com/auth/devstorage.read_only", "https://www.googleapis.com/auth/devstorage.read_write" ); + private static final String USER_AGENT_HEADER_KEY = "user-agent"; + private static final String USER_AGENT_HEADER_FORMAT = "Google BigQuery Sink/%s (GPN: Aiven;)"; + + private HeaderProvider headerProvider = null; private String project = null; private KeySource keySource = null; private String key = null; @@ -70,7 +77,8 @@ public GcpClientBuilder withConfig(BigQuerySinkConfig config) { return withProject(config.getString(PROJECT_CONFIG)) .withKeySource(config.getKeySource()) .withKey(config.getKey()) - .withWriterApi(config.getBoolean(USE_STORAGE_WRITE_API_CONFIG)); + .withWriterApi(config.getBoolean(USE_STORAGE_WRITE_API_CONFIG)) + .withUserAgent(); } public GcpClientBuilder withProject(String project) { @@ -95,8 +103,16 @@ public GcpClientBuilder withKey(String key) { return this; } + public GcpClientBuilder withUserAgent() { + this.headerProvider = FixedHeaderProvider.create( + USER_AGENT_HEADER_KEY, + String.format(USER_AGENT_HEADER_FORMAT, Version.version()) + ); + return this; + } + public ClientT build() { - return doBuild(project, credentials()); + return doBuild(project, credentials(), headerProvider); } private GoogleCredentials credentials() { @@ -140,7 +156,7 @@ private GoogleCredentials credentials() { } } - protected abstract ClientT doBuild(String project, GoogleCredentials credentials); + protected abstract ClientT doBuild(String project, GoogleCredentials credentials, HeaderProvider headerProvider); public enum KeySource { FILE, JSON, APPLICATION_DEFAULT @@ -148,9 +164,10 @@ public enum KeySource { public static class BigQueryBuilder extends GcpClientBuilder { @Override - protected BigQuery doBuild(String project, GoogleCredentials credentials) { + protected BigQuery doBuild(String project, GoogleCredentials credentials, HeaderProvider headerProvider) { BigQueryOptions.Builder builder = BigQueryOptions.newBuilder() - .setProjectId(project); + .setProjectId(project) + .setHeaderProvider(headerProvider); if (credentials != null) { builder.setCredentials(credentials); @@ -164,9 +181,10 @@ protected BigQuery doBuild(String project, GoogleCredentials credentials) { public static class GcsBuilder extends GcpClientBuilder { @Override - protected Storage doBuild(String project, GoogleCredentials credentials) { + protected Storage doBuild(String project, GoogleCredentials credentials, HeaderProvider headerProvider) { StorageOptions.Builder builder = StorageOptions.newBuilder() - .setProjectId(project); + .setProjectId(project) + .setHeaderProvider(headerProvider); if (credentials != null) { builder.setCredentials(credentials); @@ -184,9 +202,10 @@ protected Storage doBuild(String project, GoogleCredentials credentials) { public static class BigQueryWriteSettingsBuilder extends GcpClientBuilder { @Override - protected BigQueryWriteSettings doBuild(String project, GoogleCredentials credentials) { + protected BigQueryWriteSettings doBuild(String project, GoogleCredentials credentials, HeaderProvider headerProvider) { BigQueryWriteSettings.Builder builder = BigQueryWriteSettings.newBuilder() - .setQuotaProjectId(project); + .setQuotaProjectId(project) + .setHeaderProvider(headerProvider); if (credentials != null) { builder.setCredentialsProvider(FixedCredentialsProvider.create(credentials)); diff --git a/kcbq-connector/src/test/java/com/wepay/kafka/connect/bigquery/write/storage/BigQueryBuilderTest.java b/kcbq-connector/src/test/java/com/wepay/kafka/connect/bigquery/write/storage/BigQueryBuilderTest.java index 28deaa7..e52cd81 100644 --- a/kcbq-connector/src/test/java/com/wepay/kafka/connect/bigquery/write/storage/BigQueryBuilderTest.java +++ b/kcbq-connector/src/test/java/com/wepay/kafka/connect/bigquery/write/storage/BigQueryBuilderTest.java @@ -32,6 +32,7 @@ import java.util.Map; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; public class BigQueryBuilderTest { @@ -47,6 +48,7 @@ public void testBigQueryBuild() { .build(); assertEquals(actualSettings.getOptions().getProjectId(), "abcd"); + assertTrue(actualSettings.getOptions().getMergedHeaderProvider(HashMap::new).getHeaders().get("user-agent").contains("Aiven")); } } diff --git a/kcbq-connector/src/test/java/com/wepay/kafka/connect/bigquery/write/storage/BigQueryWriteSettingsBuilderTest.java b/kcbq-connector/src/test/java/com/wepay/kafka/connect/bigquery/write/storage/BigQueryWriteSettingsBuilderTest.java index 016b229..bf187ac 100644 --- a/kcbq-connector/src/test/java/com/wepay/kafka/connect/bigquery/write/storage/BigQueryWriteSettingsBuilderTest.java +++ b/kcbq-connector/src/test/java/com/wepay/kafka/connect/bigquery/write/storage/BigQueryWriteSettingsBuilderTest.java @@ -24,6 +24,7 @@ package com.wepay.kafka.connect.bigquery.write.storage; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; import com.google.cloud.bigquery.storage.v1.BigQueryWriteSettings; import com.wepay.kafka.connect.bigquery.GcpClientBuilder; @@ -47,6 +48,7 @@ public void testBigQueryWriteSettingsBuild() { .build(); assertEquals(actualSettings.getQuotaProjectId(), "abcd"); + assertTrue(actualSettings.getHeaderProvider().getHeaders().get("user-agent").contains("Aiven")); } } diff --git a/kcbq-connector/src/test/java/com/wepay/kafka/connect/bigquery/write/storage/GcsBuilderTest.java b/kcbq-connector/src/test/java/com/wepay/kafka/connect/bigquery/write/storage/GcsBuilderTest.java index f374ed3..47da014 100644 --- a/kcbq-connector/src/test/java/com/wepay/kafka/connect/bigquery/write/storage/GcsBuilderTest.java +++ b/kcbq-connector/src/test/java/com/wepay/kafka/connect/bigquery/write/storage/GcsBuilderTest.java @@ -32,6 +32,7 @@ import java.util.Map; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; public class GcsBuilderTest { @@ -47,6 +48,7 @@ public void testStorageBuild() { .build(); assertEquals(actualSettings.getOptions().getProjectId(), "abcd"); + assertTrue(actualSettings.getOptions().getMergedHeaderProvider(HashMap::new).getHeaders().get("user-agent").contains("Aiven")); } }