Skip to content

Commit

Permalink
feat: bq Storage api changes (#4)
Browse files Browse the repository at this point in the history
* feat: bq streaming api changes

* chore: version bump
  • Loading branch information
lavkesh authored May 5, 2023
1 parent f913212 commit af19231
Show file tree
Hide file tree
Showing 8 changed files with 28 additions and 12 deletions.
12 changes: 6 additions & 6 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,15 @@ lombok {
}

group 'com.gotocompany'
version '0.8.2'
version '0.8.3'

def projName = "firehose"

sourceCompatibility = JavaVersion.VERSION_1_8
targetCompatibility = JavaVersion.VERSION_1_8

repositories {
mavenLocal()
mavenCentral()
jcenter()
maven {
Expand Down Expand Up @@ -85,7 +86,7 @@ dependencies {
exclude group: "log4j", module: "log4j"
}
implementation 'io.confluent:monitoring-interceptors:3.0.0'
implementation "io.grpc:grpc-all:1.38.0"
implementation 'io.grpc:grpc-all:1.53.0'
implementation group: 'org.jfrog.buildinfo', name: 'build-info-extractor', version: '2.6.3'
implementation group: 'com.google.gradle', name: 'osdetector-gradle-plugin', version: '1.2.1'
implementation group: 'org.apache.ivy', name: 'ivy', version: '2.2.0'
Expand All @@ -98,10 +99,9 @@ dependencies {
implementation 'com.gojek.parquet:parquet-hadoop:1.11.9'
implementation group: 'com.github.os72', name: 'protobuf-dynamic', version: '1.0.1'
implementation platform('com.google.cloud:libraries-bom:20.5.0')
implementation 'com.google.cloud:google-cloud-storage:1.114.0'
implementation 'com.google.cloud:google-cloud-bigquery:1.115.0'
implementation 'org.apache.logging.log4j:log4j-core:2.17.1'
implementation group: 'com.gotocompany', name: 'depot', version: '0.4.1'
implementation 'com.google.cloud:google-cloud-storage:2.20.1'
implementation 'org.apache.logging.log4j:log4j-core:2.20.0'
implementation group: 'com.gotocompany', name: 'depot', version: '0.4.2'
implementation group: 'com.networknt', name: 'json-schema-validator', version: '1.0.59' exclude group: 'org.slf4j'

testImplementation group: 'junit', name: 'junit', version: '4.11'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,9 @@ private Future<List<Message>> scheduleTask(List<Message> messages) {

@Override
public void close() throws IOException {
sinkPool.close();
consumerAndOffsetManager.close();
tracer.close();
sinkPool.close();
firehoseInstrumentation.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ public FirehoseConsumer buildConsumer() {
ConsumerAndOffsetManager consumerAndOffsetManager = new ConsumerAndOffsetManager(sinks, offsetManager, firehoseKafkaConsumer, kafkaConsumerConfig, new FirehoseInstrumentation(statsDReporter, ConsumerAndOffsetManager.class));
SinkPool sinkPool = new SinkPool(
new LinkedBlockingQueue<>(sinks),
sinks,
Executors.newCachedThreadPool(),
sinkPoolConfig.getSinkPoolQueuePollTimeoutMS());
return new FirehoseAsyncConsumer(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,9 @@ public void process() throws IOException {

@Override
public void close() throws IOException {
sink.close();
tracer.close();
consumerAndOffsetManager.close();
firehoseInstrumentation.close();
sink.close();
}
}
6 changes: 4 additions & 2 deletions src/main/java/com/gotocompany/firehose/launch/Main.java
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ private static void multiThreadedConsumers(KafkaConsumerConfig kafkaConsumerConf
firehoseConsumer.process();
}
} catch (Exception | Error e) {
ensureThreadInterruptStateIsClearedAndClose(firehoseConsumer, firehoseInstrumentation);
firehoseInstrumentation.captureFatalError("firehose_error_event", e, "Caught exception or error, exiting the application");
System.exit(1);
} finally {
Expand All @@ -73,9 +74,10 @@ private static void multiThreadedConsumers(KafkaConsumerConfig kafkaConsumerConf
}

private static void ensureThreadInterruptStateIsClearedAndClose(FirehoseConsumer firehoseConsumer, FirehoseInstrumentation firehoseInstrumentation) {
Thread.interrupted();
try {
firehoseConsumer.close();
if (firehoseConsumer != null) {
firehoseConsumer.close();
}
} catch (IOException e) {
firehoseInstrumentation.captureFatalError("firehose_error_event", e, "Exception on closing firehose consumer");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,6 @@ protected void prepare(List<Message> messages) throws DeserializerException, IOE

@Override
public void close() throws IOException {

sink.close();
}
}
13 changes: 13 additions & 0 deletions src/main/java/com/gotocompany/firehose/sink/SinkPool.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.extern.slf4j.Slf4j;

import java.io.IOException;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
Expand All @@ -18,9 +20,11 @@
import java.util.stream.Collectors;

@AllArgsConstructor
@Slf4j
public class SinkPool implements AutoCloseable {
private final Set<SinkFuture> sinkFutures = new HashSet<>();
private final BlockingQueue<Sink> workerSinks;
private final List<Sink> allSinks;
private final ExecutorService executorService;
private final long pollTimeOutMillis;

Expand Down Expand Up @@ -59,6 +63,15 @@ public Future<List<Message>> submitTask(List<Message> messages) {

@Override
public void close() {
allSinks.forEach(sink -> {
try {
log.info("Closing sink");
sink.close();
} catch (IOException e) {
log.error("Error happened while closing sink");
e.printStackTrace();
}
});
executorService.shutdown();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public class SinkPoolTest {
@Before
public void setup() {
MockitoAnnotations.initMocks(this);
sinkPool = new SinkPool(workerSinks, executorService, 5);
sinkPool = new SinkPool(workerSinks, new ArrayList<>(), executorService, 5);
}

@Test
Expand Down

0 comments on commit af19231

Please sign in to comment.