diff --git a/error-handling-core/src/test/java/com/bakdata/kafka/ErrorCapturingProcessorTopologyTest.java b/error-handling-core/src/test/java/com/bakdata/kafka/ErrorCapturingProcessorTopologyTest.java index 5e3907f..44e9c31 100644 --- a/error-handling-core/src/test/java/com/bakdata/kafka/ErrorCapturingProcessorTopologyTest.java +++ b/error-handling-core/src/test/java/com/bakdata/kafka/ErrorCapturingProcessorTopologyTest.java @@ -103,8 +103,8 @@ public void init(final ProcessorContext context) { } @Override - public void process(final Record record) { - if (1 == record.key() && "foo".equals(record.value())) { + public void process(final Record inputRecord) { + if (1 == inputRecord.key() && "foo".equals(inputRecord.value())) { throw throwable; } throw new UnsupportedOperationException(); @@ -141,12 +141,13 @@ void shouldNotCaptureThrowable(final SoftAssertions softly) { this.mapper = new Processor<>() { @Override - public void init(final ProcessorContext context) { + public void init(final ProcessorContext context) { + // do nothing } @Override - public void process(final Record record) { - if (1 == record.key() && "foo".equals(record.value())) { + public void process(final Record inputRecord) { + if (1 == inputRecord.key() && "foo".equals(inputRecord.value())) { throw throwable; } throw new UnsupportedOperationException(); @@ -175,12 +176,12 @@ public void init(final ProcessorContext context) { } @Override - public void process(final Record record) { - if (1 == record.key() && "foo".equals(record.value())) { + public void process(final Record inputRecord) { + if (1 == inputRecord.key() && "foo".equals(inputRecord.value())) { throw new RuntimeException("Cannot process"); } - if (2 == record.key() && "bar".equals(record.value())) { - this.context.forward(record.withKey(2.0).withValue(2L)); + if (2 == inputRecord.key() && "bar".equals(inputRecord.value())) { + this.context.forward(inputRecord.withKey(2.0).withValue(2L)); return; } throw new UnsupportedOperationException(); @@ -244,9 +245,9 @@ public void init(final ProcessorContext context) { } @Override - public void process(final Record record) { - if (record.key() == null && record.value() == null) { - this.context.forward(record.withKey(2.0).withValue(2L)); + public void process(final Record inputRecord) { + if (inputRecord.key() == null && inputRecord.value() == null) { + this.context.forward(inputRecord.withKey(2.0).withValue(2L)); return; } throw new UnsupportedOperationException(); @@ -287,12 +288,13 @@ void shouldHandleErrorOnNullInput(final SoftAssertions softly) { this.mapper = new Processor<>() { @Override - public void init(final ProcessorContext context) { + public void init(final ProcessorContext context) { + // do nothing } @Override - public void process(final Record record) { - if (record.key() == null && record.value() == null) { + public void process(final Record inputRecord) { + if (inputRecord.key() == null && inputRecord.value() == null) { throw new RuntimeException("Cannot process"); } throw new UnsupportedOperationException(); @@ -348,9 +350,9 @@ public void init(final ProcessorContext context) { } @Override - public void process(final Record record) { - if (record.key() == null && record.value() == null) { - this.context.forward(record.withKey(3.0).withValue(3L)); + public void process(final Record inputRecord) { + if (inputRecord.key() == null && inputRecord.value() == null) { + this.context.forward(inputRecord.withKey(3.0).withValue(3L)); return; } throw new UnsupportedOperationException(); @@ -397,13 +399,13 @@ public void init(final ProcessorContext context) { } @Override - public void process(final Record record) { - if (2 == record.key() && "bar".equals(record.value())) { - this.context.forward(record.withKey(2.0).withValue(2L)); + public void process(final Record inputRecord) { + if (2 == inputRecord.key() && "bar".equals(inputRecord.value())) { + this.context.forward(inputRecord.withKey(2.0).withValue(2L)); return; } - if (3 == record.key() && "baz".equals(record.value())) { - this.context.forward(record.withKey(null).withValue(null)); + if (3 == inputRecord.key() && "baz".equals(inputRecord.value())) { + this.context.forward(inputRecord.withKey(null).withValue(null)); return; } throw new UnsupportedOperationException(); diff --git a/error-handling-core/src/test/java/com/bakdata/kafka/ErrorCapturingValueProcessorTopologyTest.java b/error-handling-core/src/test/java/com/bakdata/kafka/ErrorCapturingValueProcessorTopologyTest.java index 4e9589b..5ee9bf2 100644 --- a/error-handling-core/src/test/java/com/bakdata/kafka/ErrorCapturingValueProcessorTopologyTest.java +++ b/error-handling-core/src/test/java/com/bakdata/kafka/ErrorCapturingValueProcessorTopologyTest.java @@ -102,8 +102,8 @@ public void init(final FixedKeyProcessorContext context) { } @Override - public void process(final FixedKeyRecord record) { - if ("foo".equals(record.value())) { + public void process(final FixedKeyRecord inputRecord) { + if ("foo".equals(inputRecord.value())) { throw throwable; } throw new UnsupportedOperationException(); @@ -143,8 +143,8 @@ public void init(final FixedKeyProcessorContext context) { } @Override - public void process(final FixedKeyRecord record) { - if ("foo".equals(record.value())) { + public void process(final FixedKeyRecord inputRecord) { + if ("foo".equals(inputRecord.value())) { throw throwable; } throw new UnsupportedOperationException(); @@ -173,12 +173,12 @@ public void init(final FixedKeyProcessorContext context) { } @Override - public void process(final FixedKeyRecord record) { - if ("foo".equals(record.value())) { + public void process(final FixedKeyRecord inputRecord) { + if ("foo".equals(inputRecord.value())) { throw new RuntimeException("Cannot process"); } - if ("bar".equals(record.value())) { - this.context.forward(record.withValue(2L)); + if ("bar".equals(inputRecord.value())) { + this.context.forward(inputRecord.withValue(2L)); return; } throw new UnsupportedOperationException(); @@ -242,9 +242,9 @@ public void init(final FixedKeyProcessorContext context) { } @Override - public void process(final FixedKeyRecord record) { - if (record.value() == null) { - this.context.forward(record.withValue(2L)); + public void process(final FixedKeyRecord inputRecord) { + if (inputRecord.value() == null) { + this.context.forward(inputRecord.withValue(2L)); return; } throw new UnsupportedOperationException(); @@ -289,8 +289,8 @@ public void init(final FixedKeyProcessorContext context) { } @Override - public void process(final FixedKeyRecord record) { - if (record.value() == null) { + public void process(final FixedKeyRecord inputRecord) { + if (inputRecord.value() == null) { throw new RuntimeException("Cannot process"); } throw new UnsupportedOperationException(); @@ -346,9 +346,9 @@ public void init(final FixedKeyProcessorContext context) { } @Override - public void process(final FixedKeyRecord record) { - if ("bar".equals(record.value())) { - this.context.forward(record.withValue(null)); + public void process(final FixedKeyRecord inputRecord) { + if ("bar".equals(inputRecord.value())) { + this.context.forward(inputRecord.withValue(null)); return; } throw new UnsupportedOperationException(); diff --git a/error-handling-core/src/test/java/com/bakdata/kafka/ErrorDescribingProcessorTopologyTest.java b/error-handling-core/src/test/java/com/bakdata/kafka/ErrorDescribingProcessorTopologyTest.java index ec65f3c..507dae1 100644 --- a/error-handling-core/src/test/java/com/bakdata/kafka/ErrorDescribingProcessorTopologyTest.java +++ b/error-handling-core/src/test/java/com/bakdata/kafka/ErrorDescribingProcessorTopologyTest.java @@ -77,12 +77,13 @@ void shouldNotCaptureThrowable(final SoftAssertions softly) { this.mapper = new Processor<>() { @Override - public void init(final ProcessorContext context) { + public void init(final ProcessorContext context) { + // do nothing } @Override - public void process(final Record record) { - if (1 == record.key() && "foo".equals(record.value())) { + public void process(final Record inputRecord) { + if (1 == inputRecord.key() && "foo".equals(inputRecord.value())) { throw throwable; } throw new UnsupportedOperationException(); @@ -106,12 +107,12 @@ public void init(final ProcessorContext context) { } @Override - public void process(final Record record) { - if (1 == record.key() && "foo".equals(record.value())) { + public void process(final Record inputRecord) { + if (1 == inputRecord.key() && "foo".equals(inputRecord.value())) { throw new RuntimeException("Cannot process"); } - if (2 == record.key() && "bar".equals(record.value())) { - this.context.forward(record.withKey(2.0).withValue(2L)); + if (2 == inputRecord.key() && "bar".equals(inputRecord.value())) { + this.context.forward(inputRecord.withKey(2.0).withValue(2L)); return; } throw new UnsupportedOperationException(); @@ -152,9 +153,9 @@ public void init(final ProcessorContext context) { } @Override - public void process(final Record record) { - if (record.key() == null && record.value() == null) { - this.context.forward(record.withKey(2.0).withValue(2L)); + public void process(final Record inputRecord) { + if (inputRecord.key() == null && inputRecord.value() == null) { + this.context.forward(inputRecord.withKey(2.0).withValue(2L)); return; } throw new UnsupportedOperationException(); @@ -184,12 +185,13 @@ void shouldHandleErrorOnNullInput(final SoftAssertions softly) { this.mapper = new Processor<>() { @Override - public void init(final ProcessorContext context) { + public void init(final ProcessorContext context) { + // do nothing } @Override - public void process(final Record record) { - if (record.key() == null && record.value() == null) { + public void process(final Record inputRecord) { + if (inputRecord.key() == null && inputRecord.value() == null) { throw new RuntimeException("Cannot process"); } throw new UnsupportedOperationException(); @@ -222,9 +224,9 @@ public void init(final ProcessorContext context) { } @Override - public void process(final Record record) { - if (record.key() == null && record.value() == null) { - this.context.forward(record.withKey(3.0).withValue(3L)); + public void process(final Record inputRecord) { + if (inputRecord.key() == null && inputRecord.value() == null) { + this.context.forward(inputRecord.withKey(3.0).withValue(3L)); return; } throw new UnsupportedOperationException(); @@ -260,13 +262,13 @@ public void init(final ProcessorContext context) { } @Override - public void process(final Record record) { - if (2 == record.key() && "bar".equals(record.value())) { - this.context.forward(record.withKey(2.0).withValue(2L)); + public void process(final Record inputRecord) { + if (2 == inputRecord.key() && "bar".equals(inputRecord.value())) { + this.context.forward(inputRecord.withKey(2.0).withValue(2L)); return; } - if (3 == record.key() && "baz".equals(record.value())) { - this.context.forward(record.withKey(null).withValue(null)); + if (3 == inputRecord.key() && "baz".equals(inputRecord.value())) { + this.context.forward(inputRecord.withKey(null).withValue(null)); return; } throw new UnsupportedOperationException(); diff --git a/error-handling-core/src/test/java/com/bakdata/kafka/ErrorDescribingValueProcessorTopologyTest.java b/error-handling-core/src/test/java/com/bakdata/kafka/ErrorDescribingValueProcessorTopologyTest.java index 58ed2e4..020b521 100644 --- a/error-handling-core/src/test/java/com/bakdata/kafka/ErrorDescribingValueProcessorTopologyTest.java +++ b/error-handling-core/src/test/java/com/bakdata/kafka/ErrorDescribingValueProcessorTopologyTest.java @@ -82,8 +82,8 @@ public void init(final FixedKeyProcessorContext context) { } @Override - public void process(final FixedKeyRecord record) { - if ("foo".equals(record.value())) { + public void process(final FixedKeyRecord inputRecord) { + if ("foo".equals(inputRecord.value())) { throw throwable; } throw new UnsupportedOperationException(); @@ -107,12 +107,12 @@ public void init(final FixedKeyProcessorContext context) { } @Override - public void process(final FixedKeyRecord record) { - if ("foo".equals(record.value())) { + public void process(final FixedKeyRecord inputRecord) { + if ("foo".equals(inputRecord.value())) { throw new RuntimeException("Cannot process"); } - if ("bar".equals(record.value())) { - this.context.forward(record.withValue(2L)); + if ("bar".equals(inputRecord.value())) { + this.context.forward(inputRecord.withValue(2L)); return; } throw new UnsupportedOperationException(); @@ -153,9 +153,9 @@ public void init(final FixedKeyProcessorContext context) { } @Override - public void process(final FixedKeyRecord record) { - if (record.value() == null) { - this.context.forward(record.withValue(2L)); + public void process(final FixedKeyRecord inputRecord) { + if (inputRecord.value() == null) { + this.context.forward(inputRecord.withValue(2L)); return; } throw new UnsupportedOperationException(); @@ -189,8 +189,8 @@ public void init(final FixedKeyProcessorContext context) { } @Override - public void process(final FixedKeyRecord record) { - if (record.value() == null) { + public void process(final FixedKeyRecord inputRecord) { + if (inputRecord.value() == null) { throw new RuntimeException("Cannot process"); } throw new UnsupportedOperationException(); @@ -223,9 +223,9 @@ public void init(final FixedKeyProcessorContext context) { } @Override - public void process(final FixedKeyRecord record) { - if ("bar".equals(record.value())) { - this.context.forward(record.withValue(null)); + public void process(final FixedKeyRecord inputRecord) { + if ("bar".equals(inputRecord.value())) { + this.context.forward(inputRecord.withValue(null)); return; } throw new UnsupportedOperationException(); diff --git a/error-handling-core/src/test/java/com/bakdata/kafka/ErrorLoggingProcessorTopologyTest.java b/error-handling-core/src/test/java/com/bakdata/kafka/ErrorLoggingProcessorTopologyTest.java index 99a2c22..e66d78d 100644 --- a/error-handling-core/src/test/java/com/bakdata/kafka/ErrorLoggingProcessorTopologyTest.java +++ b/error-handling-core/src/test/java/com/bakdata/kafka/ErrorLoggingProcessorTopologyTest.java @@ -95,8 +95,8 @@ public void init(final ProcessorContext context) { } @Override - public void process(final Record record) { - if (1 == record.key() && "foo".equals(record.value())) { + public void process(final Record inputRecord) { + if (1 == inputRecord.key() && "foo".equals(inputRecord.value())) { throw throwable; } throw new UnsupportedOperationException(); @@ -126,12 +126,13 @@ void shouldNotCaptureThrowable(final SoftAssertions softly) { this.mapper = new Processor<>() { @Override - public void init(final ProcessorContext context) { + public void init(final ProcessorContext context) { + // do nothing } @Override - public void process(final Record record) { - if (1 == record.key() && "foo".equals(record.value())) { + public void process(final Record inputRecord) { + if (1 == inputRecord.key() && "foo".equals(inputRecord.value())) { throw throwable; } throw new UnsupportedOperationException(); @@ -160,12 +161,12 @@ public void init(final ProcessorContext context) { } @Override - public void process(final Record record) { - if (1 == record.key() && "foo".equals(record.value())) { + public void process(final Record inputRecord) { + if (1 == inputRecord.key() && "foo".equals(inputRecord.value())) { throw new RuntimeException("Cannot process"); } - if (2 == record.key() && "bar".equals(record.value())) { - this.context.forward(record.withKey(2.0).withValue(2L)); + if (2 == inputRecord.key() && "bar".equals(inputRecord.value())) { + this.context.forward(inputRecord.withKey(2.0).withValue(2L)); return; } throw new UnsupportedOperationException(); @@ -207,9 +208,9 @@ public void init(final ProcessorContext context) { } @Override - public void process(final Record record) { - if (record.key() == null && record.value() == null) { - this.context.forward(record.withKey(2.0).withValue(2L)); + public void process(final Record inputRecord) { + if (inputRecord.key() == null && inputRecord.value() == null) { + this.context.forward(inputRecord.withKey(2.0).withValue(2L)); return; } throw new UnsupportedOperationException(); @@ -244,12 +245,13 @@ void shouldHandleErrorOnNullInput(final SoftAssertions softly) { this.mapper = new Processor<>() { @Override - public void init(final ProcessorContext context) { + public void init(final ProcessorContext context) { + // do nothing } @Override - public void process(final Record record) { - if (record.key() == null && record.value() == null) { + public void process(final Record inputRecord) { + if (inputRecord.key() == null && inputRecord.value() == null) { throw new RuntimeException("Cannot process"); } throw new UnsupportedOperationException(); @@ -283,9 +285,9 @@ public void init(final ProcessorContext context) { } @Override - public void process(final Record record) { - if (record.key() == null && record.value() == null) { - this.context.forward(record.withKey(3.0).withValue(3L)); + public void process(final Record inputRecord) { + if (inputRecord.key() == null && inputRecord.value() == null) { + this.context.forward(inputRecord.withKey(3.0).withValue(3L)); return; } throw new UnsupportedOperationException(); @@ -326,13 +328,13 @@ public void init(final ProcessorContext context) { } @Override - public void process(final Record record) { - if (2 == record.key() && "bar".equals(record.value())) { - this.context.forward(record.withKey(2.0).withValue(2L)); + public void process(final Record inputRecord) { + if (2 == inputRecord.key() && "bar".equals(inputRecord.value())) { + this.context.forward(inputRecord.withKey(2.0).withValue(2L)); return; } - if (3 == record.key() && "baz".equals(record.value())) { - this.context.forward(record.withKey(null).withValue(null)); + if (3 == inputRecord.key() && "baz".equals(inputRecord.value())) { + this.context.forward(inputRecord.withKey(null).withValue(null)); return; } throw new UnsupportedOperationException(); diff --git a/error-handling-core/src/test/java/com/bakdata/kafka/ErrorLoggingValueProcessorTopologyTest.java b/error-handling-core/src/test/java/com/bakdata/kafka/ErrorLoggingValueProcessorTopologyTest.java index 6edc0d2..bc12401 100644 --- a/error-handling-core/src/test/java/com/bakdata/kafka/ErrorLoggingValueProcessorTopologyTest.java +++ b/error-handling-core/src/test/java/com/bakdata/kafka/ErrorLoggingValueProcessorTopologyTest.java @@ -96,8 +96,8 @@ public void init(final FixedKeyProcessorContext context) { } @Override - public void process(final FixedKeyRecord record) { - if ("foo".equals(record.value())) { + public void process(final FixedKeyRecord inputRecord) { + if ("foo".equals(inputRecord.value())) { throw throwable; } throw new UnsupportedOperationException(); @@ -130,8 +130,8 @@ public void init(final FixedKeyProcessorContext context) { } @Override - public void process(final FixedKeyRecord record) { - if ("foo".equals(record.value())) { + public void process(final FixedKeyRecord inputRecord) { + if ("foo".equals(inputRecord.value())) { throw throwable; } throw new UnsupportedOperationException(); @@ -160,12 +160,12 @@ public void init(final FixedKeyProcessorContext context) { } @Override - public void process(final FixedKeyRecord record) { - if ("foo".equals(record.value())) { + public void process(final FixedKeyRecord inputRecord) { + if ("foo".equals(inputRecord.value())) { throw new RuntimeException("Cannot process"); } - if ("bar".equals(record.value())) { - this.context.forward(record.withValue(2L)); + if ("bar".equals(inputRecord.value())) { + this.context.forward(inputRecord.withValue(2L)); return; } throw new UnsupportedOperationException(); @@ -207,9 +207,9 @@ public void init(final FixedKeyProcessorContext context) { } @Override - public void process(final FixedKeyRecord record) { - if (record.value() == null) { - this.context.forward(record.withValue(2L)); + public void process(final FixedKeyRecord inputRecord) { + if (inputRecord.value() == null) { + this.context.forward(inputRecord.withValue(2L)); return; } throw new UnsupportedOperationException(); @@ -248,8 +248,8 @@ public void init(final FixedKeyProcessorContext context) { } @Override - public void process(final FixedKeyRecord record) { - if (record.value() == null) { + public void process(final FixedKeyRecord inputRecord) { + if (inputRecord.value() == null) { throw new RuntimeException("Cannot process"); } throw new UnsupportedOperationException(); @@ -283,9 +283,9 @@ public void init(final FixedKeyProcessorContext context) { } @Override - public void process(final FixedKeyRecord record) { - if ("bar".equals(record.value())) { - this.context.forward(record.withValue(null)); + public void process(final FixedKeyRecord inputRecord) { + if ("bar".equals(inputRecord.value())) { + this.context.forward(inputRecord.withValue(null)); return; } throw new UnsupportedOperationException();