Skip to content

Commit

Permalink
chore: add Aiven user-agent
Browse files Browse the repository at this point in the history
  • Loading branch information
jeqo committed Apr 8, 2024
1 parent 4080bc4 commit 23ba7f8
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import static com.wepay.kafka.connect.bigquery.config.BigQuerySinkConfig.USE_STORAGE_WRITE_API_CONFIG;

import com.google.api.gax.core.FixedCredentialsProvider;
import com.google.api.gax.rpc.FixedHeaderProvider;
import com.google.api.gax.rpc.HeaderProvider;
import com.google.auth.oauth2.GoogleCredentials;
import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryOptions;
Expand All @@ -37,6 +39,7 @@
import com.wepay.kafka.connect.bigquery.config.BigQuerySinkConfig;
import com.wepay.kafka.connect.bigquery.exception.BigQueryConnectException;
import com.wepay.kafka.connect.bigquery.exception.BigQueryStorageWriteApiConnectException;
import com.wepay.kafka.connect.bigquery.utils.Version;
import java.io.ByteArrayInputStream;
import java.io.FileInputStream;
import java.io.IOException;
Expand All @@ -60,6 +63,10 @@ public abstract class GcpClientBuilder<ClientT> {
"https://www.googleapis.com/auth/devstorage.read_only",
"https://www.googleapis.com/auth/devstorage.read_write"
);
private static final String USER_AGENT_HEADER_KEY = "user-agent";
private static final String USER_AGENT_HEADER_FORMAT = "Google BigQuery Sink/%s (GPN: Aiven;)";

private HeaderProvider headerProvider = null;
private String project = null;
private KeySource keySource = null;
private String key = null;
Expand All @@ -70,7 +77,8 @@ public GcpClientBuilder<ClientT> withConfig(BigQuerySinkConfig config) {
return withProject(config.getString(PROJECT_CONFIG))
.withKeySource(config.getKeySource())
.withKey(config.getKey())
.withWriterApi(config.getBoolean(USE_STORAGE_WRITE_API_CONFIG));
.withWriterApi(config.getBoolean(USE_STORAGE_WRITE_API_CONFIG))
.withUserAgent();
}

public GcpClientBuilder<ClientT> withProject(String project) {
Expand All @@ -95,8 +103,16 @@ public GcpClientBuilder<ClientT> withKey(String key) {
return this;
}

public GcpClientBuilder<ClientT> withUserAgent() {
this.headerProvider = FixedHeaderProvider.create(
USER_AGENT_HEADER_KEY,
String.format(USER_AGENT_HEADER_FORMAT, Version.version())
);
return this;
}

public ClientT build() {
return doBuild(project, credentials());
return doBuild(project, credentials(), headerProvider);
}

private GoogleCredentials credentials() {
Expand Down Expand Up @@ -140,17 +156,18 @@ private GoogleCredentials credentials() {
}
}

protected abstract ClientT doBuild(String project, GoogleCredentials credentials);
protected abstract ClientT doBuild(String project, GoogleCredentials credentials, HeaderProvider headerProvider);

public enum KeySource {
FILE, JSON, APPLICATION_DEFAULT
}

public static class BigQueryBuilder extends GcpClientBuilder<BigQuery> {
@Override
protected BigQuery doBuild(String project, GoogleCredentials credentials) {
protected BigQuery doBuild(String project, GoogleCredentials credentials, HeaderProvider headerProvider) {
BigQueryOptions.Builder builder = BigQueryOptions.newBuilder()
.setProjectId(project);
.setProjectId(project)
.setHeaderProvider(headerProvider);

if (credentials != null) {
builder.setCredentials(credentials);
Expand All @@ -164,9 +181,10 @@ protected BigQuery doBuild(String project, GoogleCredentials credentials) {

public static class GcsBuilder extends GcpClientBuilder<Storage> {
@Override
protected Storage doBuild(String project, GoogleCredentials credentials) {
protected Storage doBuild(String project, GoogleCredentials credentials, HeaderProvider headerProvider) {
StorageOptions.Builder builder = StorageOptions.newBuilder()
.setProjectId(project);
.setProjectId(project)
.setHeaderProvider(headerProvider);

if (credentials != null) {
builder.setCredentials(credentials);
Expand All @@ -184,9 +202,10 @@ protected Storage doBuild(String project, GoogleCredentials credentials) {
public static class BigQueryWriteSettingsBuilder extends GcpClientBuilder<BigQueryWriteSettings> {

@Override
protected BigQueryWriteSettings doBuild(String project, GoogleCredentials credentials) {
protected BigQueryWriteSettings doBuild(String project, GoogleCredentials credentials, HeaderProvider headerProvider) {
BigQueryWriteSettings.Builder builder = BigQueryWriteSettings.newBuilder()
.setQuotaProjectId(project);
.setQuotaProjectId(project)
.setHeaderProvider(headerProvider);

if (credentials != null) {
builder.setCredentialsProvider(FixedCredentialsProvider.create(credentials));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.util.Map;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;

public class BigQueryBuilderTest {

Expand All @@ -47,6 +48,7 @@ public void testBigQueryBuild() {
.build();

assertEquals(actualSettings.getOptions().getProjectId(), "abcd");
assertTrue(actualSettings.getOptions().getMergedHeaderProvider(HashMap::new).getHeaders().get("user-agent").contains("Aiven"));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
package com.wepay.kafka.connect.bigquery.write.storage;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;

import com.google.cloud.bigquery.storage.v1.BigQueryWriteSettings;
import com.wepay.kafka.connect.bigquery.GcpClientBuilder;
Expand All @@ -47,6 +48,7 @@ public void testBigQueryWriteSettingsBuild() {
.build();

assertEquals(actualSettings.getQuotaProjectId(), "abcd");
assertTrue(actualSettings.getHeaderProvider().getHeaders().get("user-agent").contains("Aiven"));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.util.Map;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;

public class GcsBuilderTest {

Expand All @@ -47,6 +48,7 @@ public void testStorageBuild() {
.build();

assertEquals(actualSettings.getOptions().getProjectId(), "abcd");
assertTrue(actualSettings.getOptions().getMergedHeaderProvider(HashMap::new).getHeaders().get("user-agent").contains("Aiven"));
}

}

0 comments on commit 23ba7f8

Please sign in to comment.