Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow for custom record grouping #300

Open
wants to merge 1 commit into
base: main
Choose a base branch
from

Conversation

mkeskells
Copy link
Contributor

Introduce a new property file.record.grouper.builder to specify a builder for a grouper
enable the grouper to define additional properties and associated documentation

Minor refactors of the 'File' common configuration shared between S3 and GCS introduce some more validators

@mkeskells mkeskells requested review from a team as code owners September 27, 2024 14:36
@mkeskells mkeskells marked this pull request as draft September 27, 2024 14:37
@mkeskells
Copy link
Contributor Author

mkeskells commented Sep 27, 2024

Added tests

@mkeskells mkeskells marked this pull request as ready for review September 28, 2024 23:19
@mkeskells
Copy link
Contributor Author

Hi aiven-maintainers

  • Can I get a review of this PR please

Introduce a new property `file.record.grouper.builder` to specify a builder for a
grouper
enable the grouper to define additional properties and associated documentation

Minor refactors of the 'File' common configuration shared between S3 and GCS
introduce some more validators

Add tests for custom record grouper factory
add tests for additional config definition
@mkeskells mkeskells force-pushed the i-297-custom-record-grouping-part2 branch from 97a23bd to 9e1646a Compare October 8, 2024 12:21
@z-kovacs
Copy link

hi @jjaakola-aiven @AnatolyPopov @gharris1727 @eliax1996
apologies - I am not sure what group shall I ping, could you pls take a look at this PR?

your help is much appreciated!
Thanks!

Copy link
Contributor

@Claudenw Claudenw left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The ExtraConfiguration idea and extensible configuration is a great idea but I think there are some issues that need to be worked out.

  1. How are parameter name collisions resolved? i.e. if two implementations of ExtraConfiguration use the same property name what happens. I would suggest that at a minimum a warning should be placed in the log indicating the collision.
  2. does ExtraConfiguration need to provide any validation functionality?
  3. In addition to the sink side, how does this play on the source side? Are there any issues there?

Comment on lines +73 to +125
protected static int addFileConfigGroup(final ConfigDef configDef, final String groupFile, final String type,
int fileGroupCounter, final CompressionType defaultCompressionType) {
configDef.define(FILE_NAME_TEMPLATE_CONFIG, ConfigDef.Type.STRING, null, null, ConfigDef.Importance.MEDIUM,
"The template for file names on " + type + ". "
+ "Supports `{{ variable }}` placeholders for substituting variables. "
+ "Currently supported variables are `topic`, `partition`, and `start_offset` "
+ "(the offset of the first record in the file). "
+ "Only some combinations of variables are valid, which currently are:\n"
+ "- `topic`, `partition`, `start_offset`."
+ "There is also `key` only variable {{key}} for grouping by keys" + "If a "
+ CUSTOM_RECORD_GROUPER_BUILDER + " is set, the template will be passed"
+ " to that builder and validated according to to its rules which may be more or less constrained.",
groupFile, fileGroupCounter++, // NOPMD UnusedAssignment
ConfigDef.Width.LONG, FILE_NAME_TEMPLATE_CONFIG);

configDef.define(FILE_COMPRESSION_TYPE_CONFIG, ConfigDef.Type.STRING,
defaultCompressionType == null ? null : defaultCompressionType.name, new FileCompressionTypeValidator(),
ConfigDef.Importance.MEDIUM,
"The compression type used for files put on " + type + ". " + "The supported values are: "
+ CompressionType.SUPPORTED_COMPRESSION_TYPES + ".",
groupFile, fileGroupCounter++, // NOPMD UnusedAssignment
ConfigDef.Width.NONE, FILE_COMPRESSION_TYPE_CONFIG,
FixedSetRecommender.ofSupportedValues(CompressionType.names()));

configDef.define(FILE_MAX_RECORDS, ConfigDef.Type.INT, 0, new NonNegativeValidator(),
ConfigDef.Importance.MEDIUM,
"The maximum number of records to put in a single file. " + "Must be a non-negative integer number. "
+ "0 is interpreted as \"unlimited\", which is the default.",
groupFile, fileGroupCounter++, ConfigDef.Width.SHORT, FILE_MAX_RECORDS);

configDef.define(FILE_NAME_TIMESTAMP_TIMEZONE, ConfigDef.Type.STRING, ZoneOffset.UTC.toString(),
new TimeZoneValidator(), ConfigDef.Importance.LOW,
"Specifies the timezone in which the dates and time for the timestamp variable will be treated. "
+ "Use standard shot and long names. Default is UTC",
groupFile, fileGroupCounter++, // NOPMD UnusedAssignment
ConfigDef.Width.SHORT, FILE_NAME_TIMESTAMP_TIMEZONE);

configDef.define(FILE_NAME_TIMESTAMP_SOURCE, ConfigDef.Type.STRING, TimestampSource.Type.WALLCLOCK.name(),
new TimestampSourceValidator(), ConfigDef.Importance.LOW,
"Specifies the the timestamp variable source. Default is wall-clock.", groupFile, fileGroupCounter++, // NOPMD
// UnusedAssignment
ConfigDef.Width.SHORT, FILE_NAME_TIMESTAMP_SOURCE);

configDef.define(CUSTOM_RECORD_GROUPER_BUILDER, ConfigDef.Type.CLASS, null,
new ClassValidator(CustomRecordGrouperBuilder.class), ConfigDef.Importance.LOW,
"Specifies a custom record grouper. The default record grouper is defined by "
+ FILE_NAME_TEMPLATE_CONFIG,
groupFile, fileGroupCounter++, // NOPMD UnusedAssignment
ConfigDef.Width.SHORT, CUSTOM_RECORD_GROUPER_BUILDER);

return fileGroupCounter;

}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps this method could be re-implemented as a class that implements ExtraConfiguration? In which case the method changes I noted above become local to the ExtraConfiguration.configure() method and the type and defaultCompressionType parameters become part of the constructor call?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you proposing that all of the internal Aiven properties get handled this way?

Most of the lines above that you commented on are existed (duplicated) code that I moved here. Only the CUSTOM_RECORD_GROUPER_BUILDER is added, but I didnt want to add it in 2 places, so I moved this common block here. Maybe it shoul dbe 2 PR, one for the common chnage and one for this specific add?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a comment elsewhere about making this type of change generic. Greg spoke about using the embed() method. I am thinking that it might make sense to define a "Builder" pattern to define additional config options. Then modify AivenCommonConfig to have accept the Builder and apply it via a call to embed()

@aindriu-aiven What do you think?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mkeskells would you take a look at #330 and see if it has any impact on what you are trying to do here? I want to make sure we are in alignment.


package io.aiven.kafka.connect.common.config;

public interface Configurable {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At first I thought this was an interface that meant the object would modify the AivenCommonConfig itself. But I see now that this an interface that specifies the object is configured by AivenCommonsConfig. I also see that it is only used by the RecordGrouperBuilder.

I think that this interface is extraneous and should be removed. RecordGrouperBuilder can implement the method directly and there is should probably be called setConfiguration

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

its a seperate concern IMO
I think that when additional external plugin (or which I am proposing this as the first, but also the timestamp source could appl as we discussed on that PR) require additional configuration there will have to be code that allow that plugin to be configured. We would I believe want to have the consumption of that confugyration being documented in one place only, not across several


import org.apache.kafka.common.config.ConfigDef;

public interface ExtraConfiguration {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I really like this interface and where this design goes. I think that a number of static methods can be converted to implementations of ExtraConfiguration.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was expecting this to be for a was for external components to declare that they need to participate in the configuration process, not for the code in this repo

Comment on lines +147 to +148
final boolean isSchemaBased = config.getFormatType() == FormatType.PARQUET
|| config.getFormatType() == FormatType.AVRO;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks like a calculation that is needed in multiple places. It should be moved into the AivenCommonsConfig class as a method. perhaps isSchemaBased() would be a good name there.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems logical, will do

Comment on lines +151 to +160
final CustomRecordGrouperBuilder builder = config.getCustomRecordGrouperBuilder()
.getDeclaredConstructor()
.newInstance();

builder.configure(config);
builder.setMaxRecordsPerFile(config.getMaxRecordsPerFile());
builder.setFilenameTemplate(fileNameTemplate);
builder.setSchemaBased(isSchemaBased);
builder.setTimestampSource(config.getFilenameTimestampSource());

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All of the options here are derived from the AvienCommonConfig. So there really is only 1 argument to the builder. In which case you could simply add an AivenCommonsConfig to the CustomRecordGrouper implementation constructor.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was looking to expose the concerns that are already in the codebase to decide about. I would have though that the existing record groupers could work in the same way. For what the builder needs, the first acall on 155 is enough, but the other call expose the decision parameters explicitely, so if a new concern arrises a plugin would be forced to consider the impleications of it

Comment on lines +73 to +74
protected static int addFileConfigGroup(final ConfigDef configDef, final String groupFile, final String type,
int fileGroupCounter, final CompressionType defaultCompressionType) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method can be simplified. The method is only called with the groupFile = "File" and with the fileGroupCounter set to 0. Both can be removed from the call and simply specified within the method.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the group is defined in the child classes, and ifor GCS is is the second parameter.

Copy link
Contributor

@gharris1727 gharris1727 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey thanks for the PR!

I had some comments about typical practices for using ConfigDefs, but I have no opinion on adding this extension/plugin interface, and i'll let others comment on that.


import org.apache.kafka.common.config.ConfigDef;

public interface ExtraConfiguration {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The general practice for extensibly adding configurations is via config embedding: https://kafka.apache.org/38/javadoc/org/apache/kafka/common/config/ConfigDef.html#embed(java.lang.String,java.lang.String,int,org.apache.kafka.common.config.ConfigDef)

So rather than having the mutable ConfigDef passed inward and giving the plugin control over the whole namespace, the plugin emits an immutable ConfigDef that then is embedded in an outer namespace under some prefix that the plugin is given exclusive control over.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TIL, again - thanks
that makes sence
So maybe the Extraconfiguratoin should expose a prefix (or the prefix is the full class name)
that would avoid conflicts. I di see the parameter behaviour but didnt see how this was done

SO would you propose that the ExtraConfiguration is just one group (I would thing so)?

Copy link
Contributor

@Claudenw Claudenw Nov 4, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gharris1727 @mkeskells I would like to see a class that would add the functionality in blocks. So, for example, ExtraConfiguration becomes an abstract class that has a final method void configure(ConfigDev config) that simply calls the config.embed(). The instances of ExtraConfiguration would then contain all the setup for the additional components. For this change the FILE_NAME_TIMESTAMP_TIMEZONE and FILE_NAME_TIMESTAMP_SOURCE are defined within an instance of ExtraConfiguration.

Basically this is a builder for embed calls. Perhaps a "Builder" name would be better.

@@ -229,4 +311,10 @@ private Boolean isKeyBased(final String groupType) {
return RecordGrouperFactory.KEY_RECORD.equals(groupType)
|| RecordGrouperFactory.KEY_TOPIC_PARTITION_RECORD.equals(groupType);
}

public Class<? extends CustomRecordGrouperBuilder> getCustomRecordGrouperBuilder() {
final Class<?> result = getClass(CUSTOM_RECORD_GROUPER_BUILDER);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also consider AbstractConfig#getConfiguredInstance

validateKeyFilenameTemplate();
}
protected static void addExtensionConfig(final ConfigDef configDef) {
final ServiceLoader<ExtraConfiguration> extraConfigurations = ServiceLoader.load(ExtraConfiguration.class);
extraConfigurations.forEach(extraConfiguration -> extraConfiguration.configure(configDef));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is being present on the classpath a sufficient condition for inclusion?
When used in test environments with test ExtraConfigurations, you wouldn't be able to turn them off.
If someone packaged multiple plugins together, each with their own specific ExtraConfigurations, how would each connector get the configurations specific to them?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would expect that a suer would package only the ones that were needed

You could add a a parameter to the request that specifies the additional groups, but that would mean that those extra config require the user to request them, so maybe Configurable would specify which additional configuration extras eare needed

Not sure if that would work though. WDYT

@@ -64,10 +63,11 @@ void incorrectFilenameTemplates(final String template) {
.filter(x -> GcsSinkConfig.FILE_NAME_TEMPLATE_CONFIG.equals(x.name()))
.findFirst()
.get();
assertFalse(configValue.errorMessages().isEmpty());
assertTrue(configValue.errorMessages().isEmpty());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤨

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants