diff --git a/api/src/main/java/io/onetable/spi/sync/TargetClient.java b/api/src/main/java/io/onetable/spi/sync/TargetClient.java index bd9b4962b..ceec93ae9 100644 --- a/api/src/main/java/io/onetable/spi/sync/TargetClient.java +++ b/api/src/main/java/io/onetable/spi/sync/TargetClient.java @@ -23,7 +23,6 @@ import org.apache.hadoop.conf.Configuration; -import io.onetable.client.PerTableConfig; import io.onetable.model.OneTable; import io.onetable.model.OneTableMetadata; import io.onetable.model.schema.OnePartitionField; @@ -89,5 +88,5 @@ public interface TargetClient { String getTableFormat(); /** Initializes the client with provided configuration */ - void init(PerTableConfig perTableConfig, Configuration configuration); + void init(Configuration configuration); } diff --git a/api/src/main/java/io/onetable/client/CatalogConfig.java b/core/src/main/java/io/onetable/client/CatalogConfig.java similarity index 100% rename from api/src/main/java/io/onetable/client/CatalogConfig.java rename to core/src/main/java/io/onetable/client/CatalogConfig.java diff --git a/api/src/main/java/io/onetable/client/HudiSourceConfig.java b/core/src/main/java/io/onetable/client/HudiSourceConfig.java similarity index 100% rename from api/src/main/java/io/onetable/client/HudiSourceConfig.java rename to core/src/main/java/io/onetable/client/HudiSourceConfig.java diff --git a/api/src/main/java/io/onetable/client/PerTableConfig.java b/core/src/main/java/io/onetable/client/PerTableConfig.java similarity index 100% rename from api/src/main/java/io/onetable/client/PerTableConfig.java rename to core/src/main/java/io/onetable/client/PerTableConfig.java diff --git a/core/src/main/java/io/onetable/client/TableFormatClientFactory.java b/core/src/main/java/io/onetable/client/TableFormatClientFactory.java index 9d085d526..4554aff53 100644 --- a/core/src/main/java/io/onetable/client/TableFormatClientFactory.java +++ b/core/src/main/java/io/onetable/client/TableFormatClientFactory.java @@ -18,19 +18,26 @@ package io.onetable.client; +import java.beans.Expression; +import java.beans.Statement; +import java.lang.reflect.Method; import java.util.ServiceLoader; import lombok.AccessLevel; import lombok.NoArgsConstructor; +import lombok.extern.log4j.Log4j2; import org.apache.hadoop.conf.Configuration; import io.onetable.exception.NotSupportedException; import io.onetable.spi.sync.TargetClient; +@Log4j2 @NoArgsConstructor(access = AccessLevel.PRIVATE) public class TableFormatClientFactory { private static final TableFormatClientFactory INSTANCE = new TableFormatClientFactory(); + public static final String IO_ONETABLE_CLIENT_PER_TABLE_CONFIG = + "io.onetable.client.PerTableConfig"; public static TableFormatClientFactory getInstance() { return INSTANCE; @@ -38,7 +45,7 @@ public static TableFormatClientFactory getInstance() { /** * Create a fully initialized instance of the TargetClient represented by the given Table Format - * name. Initialization is done with the config provideed through PerTableConfig and Conifuration + * name. Initialization is done with the config provided through PerTableConfig and Configuration * params. * * @param tableFormat @@ -50,10 +57,68 @@ public TargetClient createForFormat( String tableFormat, PerTableConfig perTableConfig, Configuration configuration) { TargetClient targetClient = createTargetClientForName(tableFormat); - targetClient.init(perTableConfig, configuration); + injectPerTableConfigAsNeeded(perTableConfig, targetClient); + + targetClient.init(configuration); return targetClient; } + private void injectPerTableConfigAsNeeded( + PerTableConfig perTableConfig, TargetClient targetClient) { + Method[] methods = null; + // let's get all the getters from the perTableConfig interface + // to determine what getters to look for on each targetClient + try { + Class cls = Class.forName(IO_ONETABLE_CLIENT_PER_TABLE_CONFIG); + methods = cls.getMethods(); + } catch (Throwable e) { + log.error( + "Unable to determine the methods of the PerTableConfig interface for injection: " + + e.getMessage()); + throw new RuntimeException(e); + } + + for (Method method : methods) { + // translate the getters into setters + String targetMethodName = method.getName().replaceFirst("get", "set"); + try { + // invoke the setter after a cast to the actual implementation class + // otherwise the setters won't be found on the TargetClient reference. + // For now, just catch the NoSuchMethodException rather than use reflection + // to interrogate each implementation first. + Statement stmt = + new Statement( + targetClient.getClass().cast(targetClient), + targetMethodName, + new Object[] {getConfigValue(perTableConfig, method)}); + stmt.execute(); + } catch (NoSuchMethodException nsme) { + // NOP - each client only implements setters for the params it needs + } catch (Exception e) { + log.error( + String.format( + "Unable to execute injection on class %s due to:" + e.getMessage(), targetClient)); + throw new RuntimeException(e); + } + } + } + + private Object getConfigValue(PerTableConfig perTableConfig, Method method) throws Exception { + try { + Class cls = perTableConfig.getClass(); + + String targetMethodName = method.getName().replaceFirst("get", "set"); + Expression expression = new Expression(perTableConfig, method.getName(), new Object[0]); + expression.execute(); + return expression.getValue(); + } catch (Throwable e) { + log.error( + "Unable to get the injectable value from PerTableConfig for injection: " + + e.getMessage()); + throw e; + } + } + /** * Create an instance of the TargetClient via the default no-arg constructor. Expectation is that * target client specific settings may be provided prior to the calling of TargetClient.init() diff --git a/core/src/main/java/io/onetable/delta/DeltaClient.java b/core/src/main/java/io/onetable/delta/DeltaClient.java index 8b2ffdbbe..bdc8f78e1 100644 --- a/core/src/main/java/io/onetable/delta/DeltaClient.java +++ b/core/src/main/java/io/onetable/delta/DeltaClient.java @@ -73,8 +73,8 @@ public class DeltaClient implements TargetClient { private DeltaSchemaExtractor schemaExtractor; private DeltaPartitionExtractor partitionExtractor; private DeltaDataFileUpdatesExtractor dataFileUpdatesExtractor; - private String tableName; + private String tableDataPath; private int logRetentionInHours; private TransactionState transactionState; @@ -100,21 +100,13 @@ public DeltaClient(PerTableConfig perTableConfig, SparkSession sparkSession) { DeltaSchemaExtractor schemaExtractor, DeltaPartitionExtractor partitionExtractor, DeltaDataFileUpdatesExtractor dataFileUpdatesExtractor) { - - _init( - tableDataPath, - tableName, - logRetentionInHours, - sparkSession, - schemaExtractor, - partitionExtractor, - dataFileUpdatesExtractor); + this.tableDataPath = tableDataPath; + this.tableName = tableName; + this.logRetentionInHours = logRetentionInHours; + _init(sparkSession, schemaExtractor, partitionExtractor, dataFileUpdatesExtractor); } private void _init( - String tableDataPath, - String tableName, - int logRetentionInHours, SparkSession sparkSession, DeltaSchemaExtractor schemaExtractor, DeltaPartitionExtractor partitionExtractor, @@ -129,23 +121,49 @@ private void _init( this.dataFileUpdatesExtractor = dataFileUpdatesExtractor; this.deltaLog = deltaLog; this.tableName = tableName; - this.logRetentionInHours = logRetentionInHours; } @Override - public void init(PerTableConfig perTableConfig, Configuration configuration) { + public void init(Configuration configuration) { SparkSession sparkSession = DeltaClientUtils.buildSparkSession(configuration); _init( - perTableConfig.getTableDataPath(), - perTableConfig.getTableName(), - perTableConfig.getTargetMetadataRetentionInHours(), sparkSession, DeltaSchemaExtractor.getInstance(), DeltaPartitionExtractor.getInstance(), DeltaDataFileUpdatesExtractor.builder().build()); } + /** + * For injection purposes from TableFormatClientFactory. To be set prior to calling the init + * method. + * + * @param tableName + */ + public void setTableName(String tableName) { + this.tableName = tableName; + } + + /** + * For injection purposes from TableFormatClientFactory. To be set prior to calling the init + * method. + * + * @param tableDataPath + */ + public void setTableDataPath(String tableDataPath) { + this.tableDataPath = tableDataPath; + } + + /** + * For injection purposes from TableFormatClientFactory. To be set prior to calling the init + * method. + * + * @param logRetentionInHours + */ + public void setTargetMetadataRetentionInHours(int logRetentionInHours) { + this.logRetentionInHours = logRetentionInHours; + } + @Override public void beginSync(OneTable table) { this.transactionState = diff --git a/core/src/main/java/io/onetable/hudi/HudiTargetClient.java b/core/src/main/java/io/onetable/hudi/HudiTargetClient.java index 3c34b91a0..c6f12ddb0 100644 --- a/core/src/main/java/io/onetable/hudi/HudiTargetClient.java +++ b/core/src/main/java/io/onetable/hudi/HudiTargetClient.java @@ -165,19 +165,38 @@ private void _init( } @Override - public void init(PerTableConfig perTableConfig, Configuration configuration) { + public void init(Configuration configuration) { _init( - perTableConfig.getTableDataPath(), - perTableConfig.getTargetMetadataRetentionInHours(), + tableDataPath, + timelineRetentionInHours, HoodieMetadataConfig.COMPACT_NUM_DELTA_COMMITS.defaultValue(), BaseFileUpdatesExtractor.of( - new HoodieJavaEngineContext(configuration), - new CachingPath(perTableConfig.getTableDataPath())), + new HoodieJavaEngineContext(configuration), new CachingPath(tableDataPath)), AvroSchemaConverter.getInstance(), HudiTableManager.of(configuration), CommitState::new); } + /** + * For injection purposes from TableFormatClientFactory. To be set prior to calling the init + * method. + * + * @param timelineRetentionInHours + */ + public void setTargetMetadataRetentionInHours(int timelineRetentionInHours) { + this.timelineRetentionInHours = timelineRetentionInHours; + } + + /** + * For injection purposes from TableFormatClientFactory. To be set prior to calling the init + * method. + * + * @param tableDataPath + */ + public void setTableDataPath(String tableDataPath) { + this.tableDataPath = tableDataPath; + } + @FunctionalInterface interface CommitStateCreator { CommitState create( diff --git a/core/src/main/java/io/onetable/iceberg/IcebergClient.java b/core/src/main/java/io/onetable/iceberg/IcebergClient.java index 36f48a07d..01f8ce5ac 100644 --- a/core/src/main/java/io/onetable/iceberg/IcebergClient.java +++ b/core/src/main/java/io/onetable/iceberg/IcebergClient.java @@ -62,6 +62,8 @@ public class IcebergClient implements TargetClient { private IcebergPartitionSpecSync partitionSpecSync; private IcebergDataFileUpdatesSync dataFileUpdatesExtractor; private IcebergTableManager tableManager; + private String tableName; + private String[] namespace; private String basePath; private TableIdentifier tableIdentifier; private IcebergCatalogConfig catalogConfig; @@ -82,8 +84,18 @@ public IcebergClient() {} IcebergPartitionSpecSync partitionSpecSync, IcebergDataFileUpdatesSync dataFileUpdatesExtractor, IcebergTableManager tableManager) { + this.tableName = perTableConfig.getTableName(); + this.basePath = perTableConfig.getTableBasePath(); + this.configuration = configuration; + this.snapshotRetentionInHours = perTableConfig.getTargetMetadataRetentionInHours(); + namespace = perTableConfig.getNamespace(); + this.tableIdentifier = + namespace == null + ? TableIdentifier.of(tableName) + : TableIdentifier.of(Namespace.of(namespace), tableName); + this.tableManager = tableManager; + this.catalogConfig = (IcebergCatalogConfig) perTableConfig.getIcebergCatalogConfig(); _init( - perTableConfig, configuration, schemaExtractor, schemaSync, @@ -94,7 +106,6 @@ public IcebergClient() {} } private void _init( - PerTableConfig perTableConfig, Configuration configuration, IcebergSchemaExtractor schemaExtractor, IcebergSchemaSync schemaSync, @@ -107,17 +118,8 @@ private void _init( this.partitionSpecExtractor = partitionSpecExtractor; this.partitionSpecSync = partitionSpecSync; this.dataFileUpdatesExtractor = dataFileUpdatesExtractor; - String tableName = perTableConfig.getTableName(); - this.basePath = perTableConfig.getTableBasePath(); this.configuration = configuration; - this.snapshotRetentionInHours = perTableConfig.getTargetMetadataRetentionInHours(); - String[] namespace = perTableConfig.getNamespace(); - this.tableIdentifier = - namespace == null - ? TableIdentifier.of(tableName) - : TableIdentifier.of(Namespace.of(namespace), tableName); this.tableManager = tableManager; - this.catalogConfig = (IcebergCatalogConfig) perTableConfig.getIcebergCatalogConfig(); if (tableManager.tableExists(catalogConfig, tableIdentifier, basePath)) { // Load the table state if it already exists @@ -128,9 +130,8 @@ private void _init( } @Override - public void init(PerTableConfig perTableConfig, Configuration configuration) { + public void init(Configuration configuration) { _init( - perTableConfig, configuration, IcebergSchemaExtractor.getInstance(), IcebergSchemaSync.getInstance(), @@ -142,6 +143,56 @@ public void init(PerTableConfig perTableConfig, Configuration configuration) { IcebergTableManager.of(configuration)); } + /** + * For injection purposes from TableFormatClientFactory. To be set prior to calling the init + * method. + * + * @param tableName + */ + public void setTableName(String tableName) { + this.tableName = tableName; + } + + /** + * For injection purposes from TableFormatClientFactory. To be set prior to calling the init + * method. + * + * @param namespace + */ + public void setNamespace(String[] namespace) { + this.namespace = namespace; + } + + /** + * For injection purposes from TableFormatClientFactory. To be set prior to calling the init + * method. + * + * @param basePath + */ + public void setTableBasePath(String basePath) { + this.basePath = basePath; + } + + /** + * For injection purposes from TableFormatClientFactory. To be set prior to calling the init + * method. + * + * @param catalogConfig + */ + public void setIcebergCatalogConfig(IcebergCatalogConfig catalogConfig) { + this.catalogConfig = catalogConfig; + } + + /** + * For injection purposes from TableFormatClientFactory. To be set prior to calling the init + * method. + * + * @param snapshotRetentionInHours + */ + public void setTargetMetadataRetentionInHours(int snapshotRetentionInHours) { + this.snapshotRetentionInHours = snapshotRetentionInHours; + } + @Override public void beginSync(OneTable oneTable) { initializeTableIfRequired(oneTable); diff --git a/core/src/test/java/io/onetable/client/TestTableFormatClientFactory.java b/core/src/test/java/io/onetable/client/TestTableFormatClientFactory.java index 6897f50b8..46e0954a3 100644 --- a/core/src/test/java/io/onetable/client/TestTableFormatClientFactory.java +++ b/core/src/test/java/io/onetable/client/TestTableFormatClientFactory.java @@ -46,7 +46,7 @@ public void testTableClientFromNameForDELTA() { getPerTableConfig(Arrays.asList(TableFormat.DELTA), SyncMode.INCREMENTAL); Configuration conf = new Configuration(); conf.setStrings("spark.master", "local"); - tc.init(perTableConfig, conf); + TableFormatClientFactory.getInstance().createForFormat(TableFormat.DELTA, perTableConfig, conf); assertEquals(tc.getTableFormat(), TableFormat.DELTA); } @@ -59,7 +59,7 @@ public void testTableClientFromNameForHUDI() { getPerTableConfig(Arrays.asList(TableFormat.HUDI), SyncMode.INCREMENTAL); Configuration conf = new Configuration(); conf.setStrings("spark.master", "local"); - tc.init(perTableConfig, conf); + TableFormatClientFactory.getInstance().createForFormat(TableFormat.HUDI, perTableConfig, conf); assertEquals(tc.getTableFormat(), TableFormat.HUDI); } @@ -72,7 +72,8 @@ public void testTableClientFromNameForICEBERG() { getPerTableConfig(Arrays.asList(TableFormat.ICEBERG), SyncMode.INCREMENTAL); Configuration conf = new Configuration(); conf.setStrings("spark.master", "local"); - tc.init(perTableConfig, conf); + TableFormatClientFactory.getInstance() + .createForFormat(TableFormat.ICEBERG, perTableConfig, conf); assertEquals(tc.getTableFormat(), TableFormat.ICEBERG); }