Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow for custom record grouping #300

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,24 +17,33 @@
package io.aiven.kafka.connect.common.config;

import java.time.ZoneId;
import java.time.ZoneOffset;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.ServiceLoader;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigException;

import io.aiven.kafka.connect.common.config.validators.ClassValidator;
import io.aiven.kafka.connect.common.config.validators.FileCompressionTypeValidator;
import io.aiven.kafka.connect.common.config.validators.FilenameTemplateValidator;
import io.aiven.kafka.connect.common.config.validators.NonNegativeValidator;
import io.aiven.kafka.connect.common.config.validators.OutputFieldsEncodingValidator;
import io.aiven.kafka.connect.common.config.validators.OutputFieldsValidator;
import io.aiven.kafka.connect.common.config.validators.OutputTypeValidator;
import io.aiven.kafka.connect.common.config.validators.TimeZoneValidator;
import io.aiven.kafka.connect.common.config.validators.TimestampSourceValidator;
import io.aiven.kafka.connect.common.grouper.CustomRecordGrouperBuilder;
import io.aiven.kafka.connect.common.grouper.RecordGrouperFactory;
import io.aiven.kafka.connect.common.templating.Template;

@SuppressWarnings("PMD.TooManyMethods")
public class AivenCommonConfig extends AbstractConfig {
public static final String FORMAT_OUTPUT_FIELDS_CONFIG = "format.output.fields";
public static final String FORMAT_OUTPUT_FIELDS_VALUE_ENCODING_CONFIG = "format.output.fields.value.encoding";
Expand All @@ -45,6 +54,7 @@ public class AivenCommonConfig extends AbstractConfig {
public static final String FILE_NAME_TIMESTAMP_TIMEZONE = "file.name.timestamp.timezone";
public static final String FILE_NAME_TIMESTAMP_SOURCE = "file.name.timestamp.source";
public static final String FILE_NAME_TEMPLATE_CONFIG = "file.name.template";
public static final String CUSTOM_RECORD_GROUPER_BUILDER = "file.record.grouper.builder";

private static final String GROUP_COMPRESSION = "File Compression";
private static final String GROUP_FORMAT = "Format";
Expand All @@ -53,12 +63,73 @@ public class AivenCommonConfig extends AbstractConfig {
private static final String GROUP_RETRY_BACKOFF_POLICY = "Retry backoff policy";
public static final String KAFKA_RETRY_BACKOFF_MS_CONFIG = "kafka.retry.backoff.ms";

@SuppressWarnings({ "PMD.this-escape", "PMD.ConstructorCallsOverridableMethodcls" })
protected AivenCommonConfig(final ConfigDef definition, final Map<?, ?> originals) {
super(definition, originals);
// TODO: calls getOutputFields, can be overridden in subclasses.
validate(); // NOPMD ConstructorCallsOverridableMethod
validate(); // NOPMD
}

protected static int addFileConfigGroup(final ConfigDef configDef, final String groupFile, final String type,
int fileGroupCounter, final CompressionType defaultCompressionType) {
Comment on lines +73 to +74
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method can be simplified. The method is only called with the groupFile = "File" and with the fileGroupCounter set to 0. Both can be removed from the call and simply specified within the method.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the group is defined in the child classes, and ifor GCS is is the second parameter.

configDef.define(FILE_NAME_TEMPLATE_CONFIG, ConfigDef.Type.STRING, null, null, ConfigDef.Importance.MEDIUM,
"The template for file names on " + type + ". "
+ "Supports `{{ variable }}` placeholders for substituting variables. "
+ "Currently supported variables are `topic`, `partition`, and `start_offset` "
+ "(the offset of the first record in the file). "
+ "Only some combinations of variables are valid, which currently are:\n"
+ "- `topic`, `partition`, `start_offset`."
+ "There is also `key` only variable {{key}} for grouping by keys" + "If a "
+ CUSTOM_RECORD_GROUPER_BUILDER + " is set, the template will be passed"
+ " to that builder and validated according to to its rules which may be more or less constrained.",
groupFile, fileGroupCounter++, // NOPMD UnusedAssignment
ConfigDef.Width.LONG, FILE_NAME_TEMPLATE_CONFIG);

configDef.define(FILE_COMPRESSION_TYPE_CONFIG, ConfigDef.Type.STRING,
defaultCompressionType == null ? null : defaultCompressionType.name, new FileCompressionTypeValidator(),
ConfigDef.Importance.MEDIUM,
"The compression type used for files put on " + type + ". " + "The supported values are: "
+ CompressionType.SUPPORTED_COMPRESSION_TYPES + ".",
groupFile, fileGroupCounter++, // NOPMD UnusedAssignment
ConfigDef.Width.NONE, FILE_COMPRESSION_TYPE_CONFIG,
FixedSetRecommender.ofSupportedValues(CompressionType.names()));

configDef.define(FILE_MAX_RECORDS, ConfigDef.Type.INT, 0, new NonNegativeValidator(),
ConfigDef.Importance.MEDIUM,
"The maximum number of records to put in a single file. " + "Must be a non-negative integer number. "
+ "0 is interpreted as \"unlimited\", which is the default.",
groupFile, fileGroupCounter++, ConfigDef.Width.SHORT, FILE_MAX_RECORDS);

configDef.define(FILE_NAME_TIMESTAMP_TIMEZONE, ConfigDef.Type.STRING, ZoneOffset.UTC.toString(),
new TimeZoneValidator(), ConfigDef.Importance.LOW,
"Specifies the timezone in which the dates and time for the timestamp variable will be treated. "
+ "Use standard shot and long names. Default is UTC",
groupFile, fileGroupCounter++, // NOPMD UnusedAssignment
ConfigDef.Width.SHORT, FILE_NAME_TIMESTAMP_TIMEZONE);

configDef.define(FILE_NAME_TIMESTAMP_SOURCE, ConfigDef.Type.STRING, TimestampSource.Type.WALLCLOCK.name(),
new TimestampSourceValidator(), ConfigDef.Importance.LOW,
"Specifies the the timestamp variable source. Default is wall-clock.", groupFile, fileGroupCounter++, // NOPMD
// UnusedAssignment
ConfigDef.Width.SHORT, FILE_NAME_TIMESTAMP_SOURCE);

configDef.define(CUSTOM_RECORD_GROUPER_BUILDER, ConfigDef.Type.CLASS, null,
new ClassValidator(CustomRecordGrouperBuilder.class), ConfigDef.Importance.LOW,
"Specifies a custom record grouper. The default record grouper is defined by "
+ FILE_NAME_TEMPLATE_CONFIG,
groupFile, fileGroupCounter++, // NOPMD UnusedAssignment
ConfigDef.Width.SHORT, CUSTOM_RECORD_GROUPER_BUILDER);

return fileGroupCounter;

}
Comment on lines +73 to +125
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps this method could be re-implemented as a class that implements ExtraConfiguration? In which case the method changes I noted above become local to the ExtraConfiguration.configure() method and the type and defaultCompressionType parameters become part of the constructor call?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you proposing that all of the internal Aiven properties get handled this way?

Most of the lines above that you commented on are existed (duplicated) code that I moved here. Only the CUSTOM_RECORD_GROUPER_BUILDER is added, but I didnt want to add it in 2 places, so I moved this common block here. Maybe it shoul dbe 2 PR, one for the common chnage and one for this specific add?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a comment elsewhere about making this type of change generic. Greg spoke about using the embed() method. I am thinking that it might make sense to define a "Builder" pattern to define additional config options. Then modify AivenCommonConfig to have accept the Builder and apply it via a call to embed()

@aindriu-aiven What do you think?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mkeskells would you take a look at #330 and see if it has any impact on what you are trying to do here? I want to make sure we are in alignment.


protected static void addCommonConfig(final ConfigDef configDef) {
addKafkaBackoffPolicy(configDef);
addExtensionConfig(configDef);
}

@SuppressWarnings("PMD.this-escape")
private void validate() {
// Special checks for output json envelope config.
final List<OutputField> outputFields = getOutputFields();
Expand All @@ -68,8 +139,17 @@ private void validate() {
FORMAT_OUTPUT_ENVELOPE_CONFIG, false, FORMAT_OUTPUT_FIELDS_CONFIG);
throw new ConfigException(msg);
}
if (getCustomRecordGrouperBuilder() == null) {
// if there is a custom record grouper builder, it will validate the filename template
new FilenameTemplateValidator(FILE_NAME_TEMPLATE_CONFIG).ensureValid(FILE_NAME_TEMPLATE_CONFIG,
getString(FILE_NAME_TEMPLATE_CONFIG));
}
validateKeyFilenameTemplate();
}
protected static void addExtensionConfig(final ConfigDef configDef) {
final ServiceLoader<ExtraConfiguration> extraConfigurations = ServiceLoader.load(ExtraConfiguration.class);
extraConfigurations.forEach(extraConfiguration -> extraConfiguration.configure(configDef));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is being present on the classpath a sufficient condition for inclusion?
When used in test environments with test ExtraConfigurations, you wouldn't be able to turn them off.
If someone packaged multiple plugins together, each with their own specific ExtraConfigurations, how would each connector get the configurations specific to them?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would expect that a suer would package only the ones that were needed

You could add a a parameter to the request that specifies the additional groups, but that would mean that those extra config require the user to request them, so maybe Configurable would specify which additional configuration extras eare needed

Not sure if that would work though. WDYT

}

protected static void addKafkaBackoffPolicy(final ConfigDef configDef) {
configDef.define(KAFKA_RETRY_BACKOFF_MS_CONFIG, ConfigDef.Type.LONG, null, new ConfigDef.Validator() {
Expand Down Expand Up @@ -173,13 +253,15 @@ public final Template getFilenameTemplate() {
}

protected final void validateKeyFilenameTemplate() {
// Special checks for {{key}} filename template.
final Template filenameTemplate = getFilenameTemplate();
final String groupType = RecordGrouperFactory.resolveRecordGrouperType(filenameTemplate);
if (isKeyBased(groupType) && getMaxRecordsPerFile() > 1) {
final String msg = String.format("When %s is %s, %s must be either 1 or not set", FILE_NAME_TEMPLATE_CONFIG,
filenameTemplate, FILE_MAX_RECORDS);
throw new ConfigException(msg);
// Special checks for {{key}} filename template, if there isnt a custom record grouper.
if (getCustomRecordGrouperBuilder() == null) {
final Template filenameTemplate = getFilenameTemplate();
final String groupType = RecordGrouperFactory.resolveRecordGrouperType(filenameTemplate);
if (isKeyBased(groupType) && getMaxRecordsPerFile() > 1) {
final String msg = String.format("When %s is %s, %s must be either 1 or not set",
FILE_NAME_TEMPLATE_CONFIG, filenameTemplate, FILE_MAX_RECORDS);
throw new ConfigException(msg);
}
}
}

Expand Down Expand Up @@ -229,4 +311,10 @@ private Boolean isKeyBased(final String groupType) {
return RecordGrouperFactory.KEY_RECORD.equals(groupType)
|| RecordGrouperFactory.KEY_TOPIC_PARTITION_RECORD.equals(groupType);
}

public Class<? extends CustomRecordGrouperBuilder> getCustomRecordGrouperBuilder() {
final Class<?> result = getClass(CUSTOM_RECORD_GROUPER_BUILDER);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also consider AbstractConfig#getConfiguredInstance

// its already been validated to be a subclass of CustomRecordGrouperBuilder
return result == null ? null : result.asSubclass(CustomRecordGrouperBuilder.class);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Copyright 2024 Aiven Oy
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.aiven.kafka.connect.common.config;

public interface Configurable {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At first I thought this was an interface that meant the object would modify the AivenCommonConfig itself. But I see now that this an interface that specifies the object is configured by AivenCommonsConfig. I also see that it is only used by the RecordGrouperBuilder.

I think that this interface is extraneous and should be removed. RecordGrouperBuilder can implement the method directly and there is should probably be called setConfiguration

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

its a seperate concern IMO
I think that when additional external plugin (or which I am proposing this as the first, but also the timestamp source could appl as we discussed on that PR) require additional configuration there will have to be code that allow that plugin to be configured. We would I believe want to have the consumption of that confugyration being documented in one place only, not across several

void configure(AivenCommonConfig config);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Copyright 2024 Aiven Oy
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.aiven.kafka.connect.common.config;

import org.apache.kafka.common.config.ConfigDef;

public interface ExtraConfiguration {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I really like this interface and where this design goes. I think that a number of static methods can be converted to implementations of ExtraConfiguration.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was expecting this to be for a was for external components to declare that they need to participate in the configuration process, not for the code in this repo

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The general practice for extensibly adding configurations is via config embedding: https://kafka.apache.org/38/javadoc/org/apache/kafka/common/config/ConfigDef.html#embed(java.lang.String,java.lang.String,int,org.apache.kafka.common.config.ConfigDef)

So rather than having the mutable ConfigDef passed inward and giving the plugin control over the whole namespace, the plugin emits an immutable ConfigDef that then is embedded in an outer namespace under some prefix that the plugin is given exclusive control over.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TIL, again - thanks
that makes sence
So maybe the Extraconfiguratoin should expose a prefix (or the prefix is the full class name)
that would avoid conflicts. I di see the parameter behaviour but didnt see how this was done

SO would you propose that the ExtraConfiguration is just one group (I would thing so)?

Copy link
Contributor

@Claudenw Claudenw Nov 4, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gharris1727 @mkeskells I would like to see a class that would add the functionality in blocks. So, for example, ExtraConfiguration becomes an abstract class that has a final method void configure(ConfigDev config) that simply calls the config.embed(). The instances of ExtraConfiguration would then contain all the setup for the additional components. For this change the FILE_NAME_TIMESTAMP_TIMEZONE and FILE_NAME_TIMESTAMP_SOURCE are defined within an instance of ExtraConfiguration.

Basically this is a builder for embed calls. Perhaps a "Builder" name would be better.

void configure(ConfigDef configDef);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Copyright 2024 Aiven Oy
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.aiven.kafka.connect.common.config.validators;

import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigException;

public class ClassValidator implements ConfigDef.Validator {
private final Class<?> baseClass;

public ClassValidator(final Class<?> baseClass) {
this.baseClass = baseClass;
}

@Override
public void ensureValid(final String name, final Object value) {
if (value != null && !baseClass.isAssignableFrom((Class<?>) value)) {
throw new ConfigException(name, value, "must be a subclass of " + baseClass.getName());
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Copyright 2024 Aiven Oy
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.aiven.kafka.connect.common.config.validators;

import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigException;

public class NonNegativeValidator implements ConfigDef.Validator {
@Override
public void ensureValid(final String name, final Object value) {
assert value instanceof Integer;
if ((Integer) value < 0) {
throw new ConfigException(name, value, "must be a non-negative integer number");
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Copyright 2024 Aiven Oy
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.aiven.kafka.connect.common.grouper;

import io.aiven.kafka.connect.common.config.Configurable;
import io.aiven.kafka.connect.common.config.TimestampSource;
import io.aiven.kafka.connect.common.templating.Template;

public interface CustomRecordGrouperBuilder extends Configurable {
void setFilenameTemplate(Template filenameTemplate);
void setMaxRecordsPerFile(Integer maxRecordsPerFile);
void setTimestampSource(TimestampSource timestampSource);
void setSchemaBased(boolean schemaBased);

RecordGrouper build();
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public interface RecordGrouper {
/**
* Get all records associated with files, grouped by the file name.
*
* @return map of records assotiated with files
* @return map of records associated with files
*/
Map<String, List<SinkRecord>> records();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package io.aiven.kafka.connect.common.grouper;

import java.lang.reflect.InvocationTargetException;
import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedHashMap;
Expand Down Expand Up @@ -143,6 +144,26 @@ public static String resolveRecordGrouperType(final Template template) {
@SuppressWarnings("PMD.CognitiveComplexity")
public static RecordGrouper newRecordGrouper(final AivenCommonConfig config) {
final Template fileNameTemplate = config.getFilenameTemplate();
final boolean isSchemaBased = config.getFormatType() == FormatType.PARQUET
|| config.getFormatType() == FormatType.AVRO;
Comment on lines +147 to +148
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks like a calculation that is needed in multiple places. It should be moved into the AivenCommonsConfig class as a method. perhaps isSchemaBased() would be a good name there.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems logical, will do

if (config.getCustomRecordGrouperBuilder() != null) {
try {
final CustomRecordGrouperBuilder builder = config.getCustomRecordGrouperBuilder()
.getDeclaredConstructor()
.newInstance();

builder.configure(config);
builder.setMaxRecordsPerFile(config.getMaxRecordsPerFile());
builder.setFilenameTemplate(fileNameTemplate);
builder.setSchemaBased(isSchemaBased);
builder.setTimestampSource(config.getFilenameTimestampSource());

Comment on lines +151 to +160
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All of the options here are derived from the AvienCommonConfig. So there really is only 1 argument to the builder. In which case you could simply add an AivenCommonsConfig to the CustomRecordGrouper implementation constructor.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was looking to expose the concerns that are already in the codebase to decide about. I would have though that the existing record groupers could work in the same way. For what the builder needs, the first acall on 155 is enough, but the other call expose the decision parameters explicitely, so if a new concern arrises a plugin would be forced to consider the impleications of it

return builder.build();
} catch (final NoSuchMethodException | InstantiationException | IllegalAccessException
| InvocationTargetException e) {
throw new IllegalArgumentException("Failed to create custom record grouper", e);
}
}
final String grType = resolveRecordGrouperType(fileNameTemplate);
if (KEY_RECORD.equals(grType)) {
return new KeyRecordGrouper(fileNameTemplate);
Expand All @@ -151,13 +172,13 @@ public static RecordGrouper newRecordGrouper(final AivenCommonConfig config) {
} else {
final Integer maxRecordsPerFile = config.getMaxRecordsPerFile() == 0 ? null : config.getMaxRecordsPerFile();
if (TOPIC_PARTITION_KEY_RECORD.equals(grType)) {
return config.getFormatType() == FormatType.PARQUET || config.getFormatType() == FormatType.AVRO
return isSchemaBased
? new SchemaBasedTopicPartitionKeyRecordGrouper(fileNameTemplate, maxRecordsPerFile,
config.getFilenameTimestampSource())
: new TopicPartitionKeyRecordGrouper(fileNameTemplate, maxRecordsPerFile,
config.getFilenameTimestampSource());
} else {
return config.getFormatType() == FormatType.PARQUET || config.getFormatType() == FormatType.AVRO
return isSchemaBased
? new SchemaBasedTopicPartitionRecordGrouper(fileNameTemplate, maxRecordsPerFile,
config.getFilenameTimestampSource())
: new TopicPartitionRecordGrouper(fileNameTemplate, maxRecordsPerFile,
Expand Down
Loading