Skip to content

Commit

Permalink
Upgrade to Kafka 3.8 and Avro 1.12
Browse files Browse the repository at this point in the history
  • Loading branch information
philipp94831 committed Jan 7, 2025
1 parent 40318f6 commit 0dd243a
Show file tree
Hide file tree
Showing 6 changed files with 119 additions and 113 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,8 @@ public void init(final ProcessorContext context) {
}

@Override
public void process(final Record<Integer, String> record) {
if (1 == record.key() && "foo".equals(record.value())) {
public void process(final Record<Integer, String> inputRecord) {
if (1 == inputRecord.key() && "foo".equals(inputRecord.value())) {
throw throwable;
}
throw new UnsupportedOperationException();
Expand Down Expand Up @@ -141,12 +141,13 @@ void shouldNotCaptureThrowable(final SoftAssertions softly) {
this.mapper = new Processor<>() {

@Override
public void init(final ProcessorContext<Double, Long> context) {
public void init(final ProcessorContext context) {
// do nothing
}

@Override
public void process(final Record<Integer, String> record) {
if (1 == record.key() && "foo".equals(record.value())) {
public void process(final Record<Integer, String> inputRecord) {
if (1 == inputRecord.key() && "foo".equals(inputRecord.value())) {
throw throwable;
}
throw new UnsupportedOperationException();
Expand Down Expand Up @@ -175,12 +176,12 @@ public void init(final ProcessorContext<Double, Long> context) {
}

@Override
public void process(final Record<Integer, String> record) {
if (1 == record.key() && "foo".equals(record.value())) {
public void process(final Record<Integer, String> inputRecord) {
if (1 == inputRecord.key() && "foo".equals(inputRecord.value())) {
throw new RuntimeException("Cannot process");
}
if (2 == record.key() && "bar".equals(record.value())) {
this.context.forward(record.withKey(2.0).withValue(2L));
if (2 == inputRecord.key() && "bar".equals(inputRecord.value())) {
this.context.forward(inputRecord.withKey(2.0).withValue(2L));
return;
}
throw new UnsupportedOperationException();
Expand Down Expand Up @@ -244,9 +245,9 @@ public void init(final ProcessorContext<Double, Long> context) {
}

@Override
public void process(final Record<Integer, String> record) {
if (record.key() == null && record.value() == null) {
this.context.forward(record.withKey(2.0).withValue(2L));
public void process(final Record<Integer, String> inputRecord) {
if (inputRecord.key() == null && inputRecord.value() == null) {
this.context.forward(inputRecord.withKey(2.0).withValue(2L));
return;
}
throw new UnsupportedOperationException();
Expand Down Expand Up @@ -287,12 +288,13 @@ void shouldHandleErrorOnNullInput(final SoftAssertions softly) {
this.mapper = new Processor<>() {

@Override
public void init(final ProcessorContext<Double, Long> context) {
public void init(final ProcessorContext context) {
// do nothing
}

@Override
public void process(final Record<Integer, String> record) {
if (record.key() == null && record.value() == null) {
public void process(final Record<Integer, String> inputRecord) {
if (inputRecord.key() == null && inputRecord.value() == null) {
throw new RuntimeException("Cannot process");
}
throw new UnsupportedOperationException();
Expand Down Expand Up @@ -348,9 +350,9 @@ public void init(final ProcessorContext<Double, Long> context) {
}

@Override
public void process(final Record<Integer, String> record) {
if (record.key() == null && record.value() == null) {
this.context.forward(record.withKey(3.0).withValue(3L));
public void process(final Record<Integer, String> inputRecord) {
if (inputRecord.key() == null && inputRecord.value() == null) {
this.context.forward(inputRecord.withKey(3.0).withValue(3L));
return;
}
throw new UnsupportedOperationException();
Expand Down Expand Up @@ -397,13 +399,13 @@ public void init(final ProcessorContext<Double, Long> context) {
}

@Override
public void process(final Record<Integer, String> record) {
if (2 == record.key() && "bar".equals(record.value())) {
this.context.forward(record.withKey(2.0).withValue(2L));
public void process(final Record<Integer, String> inputRecord) {
if (2 == inputRecord.key() && "bar".equals(inputRecord.value())) {
this.context.forward(inputRecord.withKey(2.0).withValue(2L));
return;
}
if (3 == record.key() && "baz".equals(record.value())) {
this.context.forward(record.<Double>withKey(null).withValue(null));
if (3 == inputRecord.key() && "baz".equals(inputRecord.value())) {
this.context.forward(inputRecord.<Double>withKey(null).withValue(null));
return;
}
throw new UnsupportedOperationException();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,8 @@ public void init(final FixedKeyProcessorContext<Integer, Long> context) {
}

@Override
public void process(final FixedKeyRecord<Integer, String> record) {
if ("foo".equals(record.value())) {
public void process(final FixedKeyRecord<Integer, String> inputRecord) {
if ("foo".equals(inputRecord.value())) {
throw throwable;
}
throw new UnsupportedOperationException();
Expand Down Expand Up @@ -143,8 +143,8 @@ public void init(final FixedKeyProcessorContext<Integer, Long> context) {
}

@Override
public void process(final FixedKeyRecord<Integer, String> record) {
if ("foo".equals(record.value())) {
public void process(final FixedKeyRecord<Integer, String> inputRecord) {
if ("foo".equals(inputRecord.value())) {
throw throwable;
}
throw new UnsupportedOperationException();
Expand Down Expand Up @@ -173,12 +173,12 @@ public void init(final FixedKeyProcessorContext<Integer, Long> context) {
}

@Override
public void process(final FixedKeyRecord<Integer, String> record) {
if ("foo".equals(record.value())) {
public void process(final FixedKeyRecord<Integer, String> inputRecord) {
if ("foo".equals(inputRecord.value())) {
throw new RuntimeException("Cannot process");
}
if ("bar".equals(record.value())) {
this.context.forward(record.withValue(2L));
if ("bar".equals(inputRecord.value())) {
this.context.forward(inputRecord.withValue(2L));
return;
}
throw new UnsupportedOperationException();
Expand Down Expand Up @@ -242,9 +242,9 @@ public void init(final FixedKeyProcessorContext<Integer, Long> context) {
}

@Override
public void process(final FixedKeyRecord<Integer, String> record) {
if (record.value() == null) {
this.context.forward(record.withValue(2L));
public void process(final FixedKeyRecord<Integer, String> inputRecord) {
if (inputRecord.value() == null) {
this.context.forward(inputRecord.withValue(2L));
return;
}
throw new UnsupportedOperationException();
Expand Down Expand Up @@ -289,8 +289,8 @@ public void init(final FixedKeyProcessorContext<Integer, Long> context) {
}

@Override
public void process(final FixedKeyRecord<Integer, String> record) {
if (record.value() == null) {
public void process(final FixedKeyRecord<Integer, String> inputRecord) {
if (inputRecord.value() == null) {
throw new RuntimeException("Cannot process");
}
throw new UnsupportedOperationException();
Expand Down Expand Up @@ -346,9 +346,9 @@ public void init(final FixedKeyProcessorContext<Integer, Long> context) {
}

@Override
public void process(final FixedKeyRecord<Integer, String> record) {
if ("bar".equals(record.value())) {
this.context.forward(record.withValue(null));
public void process(final FixedKeyRecord<Integer, String> inputRecord) {
if ("bar".equals(inputRecord.value())) {
this.context.forward(inputRecord.withValue(null));
return;
}
throw new UnsupportedOperationException();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,12 +77,13 @@ void shouldNotCaptureThrowable(final SoftAssertions softly) {
this.mapper = new Processor<>() {

@Override
public void init(final ProcessorContext<Double, Long> context) {
public void init(final ProcessorContext context) {
// do nothing
}

@Override
public void process(final Record<Integer, String> record) {
if (1 == record.key() && "foo".equals(record.value())) {
public void process(final Record<Integer, String> inputRecord) {
if (1 == inputRecord.key() && "foo".equals(inputRecord.value())) {
throw throwable;
}
throw new UnsupportedOperationException();
Expand All @@ -106,12 +107,12 @@ public void init(final ProcessorContext<Double, Long> context) {
}

@Override
public void process(final Record<Integer, String> record) {
if (1 == record.key() && "foo".equals(record.value())) {
public void process(final Record<Integer, String> inputRecord) {
if (1 == inputRecord.key() && "foo".equals(inputRecord.value())) {
throw new RuntimeException("Cannot process");
}
if (2 == record.key() && "bar".equals(record.value())) {
this.context.forward(record.withKey(2.0).withValue(2L));
if (2 == inputRecord.key() && "bar".equals(inputRecord.value())) {
this.context.forward(inputRecord.withKey(2.0).withValue(2L));
return;
}
throw new UnsupportedOperationException();
Expand Down Expand Up @@ -152,9 +153,9 @@ public void init(final ProcessorContext<Double, Long> context) {
}

@Override
public void process(final Record<Integer, String> record) {
if (record.key() == null && record.value() == null) {
this.context.forward(record.withKey(2.0).withValue(2L));
public void process(final Record<Integer, String> inputRecord) {
if (inputRecord.key() == null && inputRecord.value() == null) {
this.context.forward(inputRecord.withKey(2.0).withValue(2L));
return;
}
throw new UnsupportedOperationException();
Expand Down Expand Up @@ -184,12 +185,13 @@ void shouldHandleErrorOnNullInput(final SoftAssertions softly) {
this.mapper = new Processor<>() {

@Override
public void init(final ProcessorContext<Double, Long> context) {
public void init(final ProcessorContext context) {
// do nothing
}

@Override
public void process(final Record<Integer, String> record) {
if (record.key() == null && record.value() == null) {
public void process(final Record<Integer, String> inputRecord) {
if (inputRecord.key() == null && inputRecord.value() == null) {
throw new RuntimeException("Cannot process");
}
throw new UnsupportedOperationException();
Expand Down Expand Up @@ -222,9 +224,9 @@ public void init(final ProcessorContext<Double, Long> context) {
}

@Override
public void process(final Record<Integer, String> record) {
if (record.key() == null && record.value() == null) {
this.context.forward(record.withKey(3.0).withValue(3L));
public void process(final Record<Integer, String> inputRecord) {
if (inputRecord.key() == null && inputRecord.value() == null) {
this.context.forward(inputRecord.withKey(3.0).withValue(3L));
return;
}
throw new UnsupportedOperationException();
Expand Down Expand Up @@ -260,13 +262,13 @@ public void init(final ProcessorContext<Double, Long> context) {
}

@Override
public void process(final Record<Integer, String> record) {
if (2 == record.key() && "bar".equals(record.value())) {
this.context.forward(record.withKey(2.0).withValue(2L));
public void process(final Record<Integer, String> inputRecord) {
if (2 == inputRecord.key() && "bar".equals(inputRecord.value())) {
this.context.forward(inputRecord.withKey(2.0).withValue(2L));
return;
}
if (3 == record.key() && "baz".equals(record.value())) {
this.context.forward(record.<Double>withKey(null).withValue(null));
if (3 == inputRecord.key() && "baz".equals(inputRecord.value())) {
this.context.forward(inputRecord.<Double>withKey(null).withValue(null));
return;
}
throw new UnsupportedOperationException();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,8 @@ public void init(final FixedKeyProcessorContext<Integer, Long> context) {
}

@Override
public void process(final FixedKeyRecord<Integer, String> record) {
if ("foo".equals(record.value())) {
public void process(final FixedKeyRecord<Integer, String> inputRecord) {
if ("foo".equals(inputRecord.value())) {
throw throwable;
}
throw new UnsupportedOperationException();
Expand All @@ -107,12 +107,12 @@ public void init(final FixedKeyProcessorContext<Integer, Long> context) {
}

@Override
public void process(final FixedKeyRecord<Integer, String> record) {
if ("foo".equals(record.value())) {
public void process(final FixedKeyRecord<Integer, String> inputRecord) {
if ("foo".equals(inputRecord.value())) {
throw new RuntimeException("Cannot process");
}
if ("bar".equals(record.value())) {
this.context.forward(record.withValue(2L));
if ("bar".equals(inputRecord.value())) {
this.context.forward(inputRecord.withValue(2L));
return;
}
throw new UnsupportedOperationException();
Expand Down Expand Up @@ -153,9 +153,9 @@ public void init(final FixedKeyProcessorContext<Integer, Long> context) {
}

@Override
public void process(final FixedKeyRecord<Integer, String> record) {
if (record.value() == null) {
this.context.forward(record.withValue(2L));
public void process(final FixedKeyRecord<Integer, String> inputRecord) {
if (inputRecord.value() == null) {
this.context.forward(inputRecord.withValue(2L));
return;
}
throw new UnsupportedOperationException();
Expand Down Expand Up @@ -189,8 +189,8 @@ public void init(final FixedKeyProcessorContext<Integer, Long> context) {
}

@Override
public void process(final FixedKeyRecord<Integer, String> record) {
if (record.value() == null) {
public void process(final FixedKeyRecord<Integer, String> inputRecord) {
if (inputRecord.value() == null) {
throw new RuntimeException("Cannot process");
}
throw new UnsupportedOperationException();
Expand Down Expand Up @@ -223,9 +223,9 @@ public void init(final FixedKeyProcessorContext<Integer, Long> context) {
}

@Override
public void process(final FixedKeyRecord<Integer, String> record) {
if ("bar".equals(record.value())) {
this.context.forward(record.withValue(null));
public void process(final FixedKeyRecord<Integer, String> inputRecord) {
if ("bar".equals(inputRecord.value())) {
this.context.forward(inputRecord.withValue(null));
return;
}
throw new UnsupportedOperationException();
Expand Down
Loading

0 comments on commit 0dd243a

Please sign in to comment.