Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#306 - [DRAFT] Introduce Injection of PerTableConfig params into TargetClients by the Factory #307

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions api/src/main/java/io/onetable/spi/sync/TargetClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@

import org.apache.hadoop.conf.Configuration;

import io.onetable.client.PerTableConfig;
import io.onetable.model.OneTable;
import io.onetable.model.OneTableMetadata;
import io.onetable.model.schema.OnePartitionField;
Expand Down Expand Up @@ -89,5 +88,5 @@ public interface TargetClient {
String getTableFormat();

/** Initializes the client with provided configuration */
void init(PerTableConfig perTableConfig, Configuration configuration);
void init(Configuration configuration);
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,27 +18,34 @@

package io.onetable.client;

import java.beans.Expression;
import java.beans.Statement;
import java.lang.reflect.Method;
import java.util.ServiceLoader;

import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import lombok.extern.log4j.Log4j2;

import org.apache.hadoop.conf.Configuration;

import io.onetable.exception.NotSupportedException;
import io.onetable.spi.sync.TargetClient;

@Log4j2
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public class TableFormatClientFactory {
private static final TableFormatClientFactory INSTANCE = new TableFormatClientFactory();
public static final String IO_ONETABLE_CLIENT_PER_TABLE_CONFIG =
"io.onetable.client.PerTableConfig";

public static TableFormatClientFactory getInstance() {
return INSTANCE;
}

/**
* Create a fully initialized instance of the TargetClient represented by the given Table Format
* name. Initialization is done with the config provideed through PerTableConfig and Conifuration
* name. Initialization is done with the config provided through PerTableConfig and Configuration
* params.
*
* @param tableFormat
Expand All @@ -50,10 +57,68 @@ public TargetClient createForFormat(
String tableFormat, PerTableConfig perTableConfig, Configuration configuration) {
TargetClient targetClient = createTargetClientForName(tableFormat);

targetClient.init(perTableConfig, configuration);
injectPerTableConfigAsNeeded(perTableConfig, targetClient);

targetClient.init(configuration);
return targetClient;
}

private void injectPerTableConfigAsNeeded(
PerTableConfig perTableConfig, TargetClient targetClient) {
Method[] methods = null;
// let's get all the getters from the perTableConfig interface
// to determine what getters to look for on each targetClient
try {
Class cls = Class.forName(IO_ONETABLE_CLIENT_PER_TABLE_CONFIG);
methods = cls.getMethods();
} catch (Throwable e) {
log.error(
"Unable to determine the methods of the PerTableConfig interface for injection: "
+ e.getMessage());
throw new RuntimeException(e);
}

for (Method method : methods) {
// translate the getters into setters
String targetMethodName = method.getName().replaceFirst("get", "set");
try {
// invoke the setter after a cast to the actual implementation class
// otherwise the setters won't be found on the TargetClient reference.
// For now, just catch the NoSuchMethodException rather than use reflection
// to interrogate each implementation first.
Statement stmt =
new Statement(
targetClient.getClass().cast(targetClient),
targetMethodName,
new Object[] {getConfigValue(perTableConfig, method)});
stmt.execute();
} catch (NoSuchMethodException nsme) {
// NOP - each client only implements setters for the params it needs
} catch (Exception e) {
log.error(
String.format(
"Unable to execute injection on class %s due to:" + e.getMessage(), targetClient));
throw new RuntimeException(e);
}
}
}

private Object getConfigValue(PerTableConfig perTableConfig, Method method) throws Exception {
try {
Class cls = perTableConfig.getClass();

String targetMethodName = method.getName().replaceFirst("get", "set");
Expression expression = new Expression(perTableConfig, method.getName(), new Object[0]);
expression.execute();
return expression.getValue();
} catch (Throwable e) {
log.error(
"Unable to get the injectable value from PerTableConfig for injection: "
+ e.getMessage());
throw e;
}
}

/**
* Create an instance of the TargetClient via the default no-arg constructor. Expectation is that
* target client specific settings may be provided prior to the calling of TargetClient.init()
Expand Down
54 changes: 36 additions & 18 deletions core/src/main/java/io/onetable/delta/DeltaClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,8 @@ public class DeltaClient implements TargetClient {
private DeltaSchemaExtractor schemaExtractor;
private DeltaPartitionExtractor partitionExtractor;
private DeltaDataFileUpdatesExtractor dataFileUpdatesExtractor;

private String tableName;
private String tableDataPath;
private int logRetentionInHours;
private TransactionState transactionState;

Expand All @@ -100,21 +100,13 @@ public DeltaClient(PerTableConfig perTableConfig, SparkSession sparkSession) {
DeltaSchemaExtractor schemaExtractor,
DeltaPartitionExtractor partitionExtractor,
DeltaDataFileUpdatesExtractor dataFileUpdatesExtractor) {

_init(
tableDataPath,
tableName,
logRetentionInHours,
sparkSession,
schemaExtractor,
partitionExtractor,
dataFileUpdatesExtractor);
this.tableDataPath = tableDataPath;
this.tableName = tableName;
this.logRetentionInHours = logRetentionInHours;
_init(sparkSession, schemaExtractor, partitionExtractor, dataFileUpdatesExtractor);
}

private void _init(
String tableDataPath,
String tableName,
int logRetentionInHours,
SparkSession sparkSession,
DeltaSchemaExtractor schemaExtractor,
DeltaPartitionExtractor partitionExtractor,
Expand All @@ -129,23 +121,49 @@ private void _init(
this.dataFileUpdatesExtractor = dataFileUpdatesExtractor;
this.deltaLog = deltaLog;
this.tableName = tableName;
this.logRetentionInHours = logRetentionInHours;
}

@Override
public void init(PerTableConfig perTableConfig, Configuration configuration) {
public void init(Configuration configuration) {
SparkSession sparkSession = DeltaClientUtils.buildSparkSession(configuration);

_init(
perTableConfig.getTableDataPath(),
perTableConfig.getTableName(),
perTableConfig.getTargetMetadataRetentionInHours(),
sparkSession,
DeltaSchemaExtractor.getInstance(),
DeltaPartitionExtractor.getInstance(),
DeltaDataFileUpdatesExtractor.builder().build());
}

/**
* For injection purposes from TableFormatClientFactory. To be set prior to calling the init
* method.
*
* @param tableName
*/
public void setTableName(String tableName) {
this.tableName = tableName;
}

/**
* For injection purposes from TableFormatClientFactory. To be set prior to calling the init
* method.
*
* @param tableDataPath
*/
public void setTableDataPath(String tableDataPath) {
this.tableDataPath = tableDataPath;
}

/**
* For injection purposes from TableFormatClientFactory. To be set prior to calling the init
* method.
*
* @param logRetentionInHours
*/
public void setTargetMetadataRetentionInHours(int logRetentionInHours) {
this.logRetentionInHours = logRetentionInHours;
}

@Override
public void beginSync(OneTable table) {
this.transactionState =
Expand Down
29 changes: 24 additions & 5 deletions core/src/main/java/io/onetable/hudi/HudiTargetClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -165,19 +165,38 @@ private void _init(
}

@Override
public void init(PerTableConfig perTableConfig, Configuration configuration) {
public void init(Configuration configuration) {
_init(
perTableConfig.getTableDataPath(),
perTableConfig.getTargetMetadataRetentionInHours(),
tableDataPath,
timelineRetentionInHours,
HoodieMetadataConfig.COMPACT_NUM_DELTA_COMMITS.defaultValue(),
BaseFileUpdatesExtractor.of(
new HoodieJavaEngineContext(configuration),
new CachingPath(perTableConfig.getTableDataPath())),
new HoodieJavaEngineContext(configuration), new CachingPath(tableDataPath)),
AvroSchemaConverter.getInstance(),
HudiTableManager.of(configuration),
CommitState::new);
}

/**
* For injection purposes from TableFormatClientFactory. To be set prior to calling the init
* method.
*
* @param timelineRetentionInHours
*/
public void setTargetMetadataRetentionInHours(int timelineRetentionInHours) {
this.timelineRetentionInHours = timelineRetentionInHours;
}

/**
* For injection purposes from TableFormatClientFactory. To be set prior to calling the init
* method.
*
* @param tableDataPath
*/
public void setTableDataPath(String tableDataPath) {
this.tableDataPath = tableDataPath;
}

@FunctionalInterface
interface CommitStateCreator {
CommitState create(
Expand Down
77 changes: 64 additions & 13 deletions core/src/main/java/io/onetable/iceberg/IcebergClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ public class IcebergClient implements TargetClient {
private IcebergPartitionSpecSync partitionSpecSync;
private IcebergDataFileUpdatesSync dataFileUpdatesExtractor;
private IcebergTableManager tableManager;
private String tableName;
private String[] namespace;
private String basePath;
private TableIdentifier tableIdentifier;
private IcebergCatalogConfig catalogConfig;
Expand All @@ -82,8 +84,18 @@ public IcebergClient() {}
IcebergPartitionSpecSync partitionSpecSync,
IcebergDataFileUpdatesSync dataFileUpdatesExtractor,
IcebergTableManager tableManager) {
this.tableName = perTableConfig.getTableName();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why move this out of the _init method?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@the-other-tim-brown - not sure I am answering the right question but... because PerTableConfig was removed from the init() which calls _init() and the ctor's also call _init so they should handle the PerTableConfig specifics. This is assuming that the ctor's are mostly (if not entirely) used by tests and they would be setting these things there rather than via the factory.

Maybe you see something wrong with what I did there that I am still missing?

this.basePath = perTableConfig.getTableBasePath();
this.configuration = configuration;
this.snapshotRetentionInHours = perTableConfig.getTargetMetadataRetentionInHours();
namespace = perTableConfig.getNamespace();
this.tableIdentifier =
namespace == null
? TableIdentifier.of(tableName)
: TableIdentifier.of(Namespace.of(namespace), tableName);
this.tableManager = tableManager;
this.catalogConfig = (IcebergCatalogConfig) perTableConfig.getIcebergCatalogConfig();
_init(
perTableConfig,
configuration,
schemaExtractor,
schemaSync,
Expand All @@ -94,7 +106,6 @@ public IcebergClient() {}
}

private void _init(
PerTableConfig perTableConfig,
Configuration configuration,
IcebergSchemaExtractor schemaExtractor,
IcebergSchemaSync schemaSync,
Expand All @@ -107,17 +118,8 @@ private void _init(
this.partitionSpecExtractor = partitionSpecExtractor;
this.partitionSpecSync = partitionSpecSync;
this.dataFileUpdatesExtractor = dataFileUpdatesExtractor;
String tableName = perTableConfig.getTableName();
this.basePath = perTableConfig.getTableBasePath();
this.configuration = configuration;
this.snapshotRetentionInHours = perTableConfig.getTargetMetadataRetentionInHours();
String[] namespace = perTableConfig.getNamespace();
this.tableIdentifier =
namespace == null
? TableIdentifier.of(tableName)
: TableIdentifier.of(Namespace.of(namespace), tableName);
this.tableManager = tableManager;
this.catalogConfig = (IcebergCatalogConfig) perTableConfig.getIcebergCatalogConfig();

if (tableManager.tableExists(catalogConfig, tableIdentifier, basePath)) {
// Load the table state if it already exists
Expand All @@ -128,9 +130,8 @@ private void _init(
}

@Override
public void init(PerTableConfig perTableConfig, Configuration configuration) {
public void init(Configuration configuration) {
_init(
perTableConfig,
configuration,
IcebergSchemaExtractor.getInstance(),
IcebergSchemaSync.getInstance(),
Expand All @@ -142,6 +143,56 @@ public void init(PerTableConfig perTableConfig, Configuration configuration) {
IcebergTableManager.of(configuration));
}

/**
* For injection purposes from TableFormatClientFactory. To be set prior to calling the init
* method.
*
* @param tableName
*/
public void setTableName(String tableName) {
this.tableName = tableName;
}

/**
* For injection purposes from TableFormatClientFactory. To be set prior to calling the init
* method.
*
* @param namespace
*/
public void setNamespace(String[] namespace) {
this.namespace = namespace;
}

/**
* For injection purposes from TableFormatClientFactory. To be set prior to calling the init
* method.
*
* @param basePath
*/
public void setTableBasePath(String basePath) {
this.basePath = basePath;
}

/**
* For injection purposes from TableFormatClientFactory. To be set prior to calling the init
* method.
*
* @param catalogConfig
*/
public void setIcebergCatalogConfig(IcebergCatalogConfig catalogConfig) {
this.catalogConfig = catalogConfig;
}

/**
* For injection purposes from TableFormatClientFactory. To be set prior to calling the init
* method.
*
* @param snapshotRetentionInHours
*/
public void setTargetMetadataRetentionInHours(int snapshotRetentionInHours) {
this.snapshotRetentionInHours = snapshotRetentionInHours;
}

@Override
public void beginSync(OneTable oneTable) {
initializeTableIfRequired(oneTable);
Expand Down
Loading