Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kafka Connect: How to connect to Hive Metastore with SSL #11925

Open
duc-dn opened this issue Jan 8, 2025 · 0 comments
Open

Kafka Connect: How to connect to Hive Metastore with SSL #11925

duc-dn opened this issue Jan 8, 2025 · 0 comments
Labels
question Further information is requested

Comments

@duc-dn
Copy link

duc-dn commented Jan 8, 2025

Query engine

No response

Question

Hi team,
I am using Kafka Connect to sink data as format iceberg with Hive catalog
However, my hive-metastore service requires SSL to connect it
I tried to add config into IcebergSinkConfig.java :

  private static final String HIVE_METASTORE_CLIENT_AUTH_MODE = "iceberg.catalog.hive.metastore.client.auth.mode";
  private static final String HIVE_METASTORE_PLAIN_PASSWORD = "iceberg.catalog.hive.metastore.client.plain.password";
  private static final String HIVE_METASTORE_PLAIN_USERNAME = "iceberg.catalog.hive.metastore.client.plain.username";
  private static final String HIVE_METASTORE_USE_SSL = "iceberg.catalog.hive.metastore.use.SSL";
  private static final String HIVE_METASTORE_TRUSTSTORE_TYPE = "iceberg.catalog.hive.metastore.truststore.type";
  private static final String HIVE_METASTORE_TRUSTSTORE_PATH = "iceberg.catalog.hive.metastore.truststore.path";
  private static final String HIVE_METASTORE_TRUSTSTORE_PASSWORD = "iceberg.catalog.hive.metastore.truststore.password";
  • When running connector, these config is loaded into IcebergSinkConfig values:
2025-01-07 04:06:38,486 INFO [ductest|task-0] IcebergSinkConfig values: 
	iceberg.catalog = landingdev
	iceberg.catalog.hive.metastore.client.auth.mode = PLAIN
	iceberg.catalog.hive.metastore.client.plain.password = sdfdsfdsfds
	iceberg.catalog.hive.metastore.client.plain.username = devadmin
	iceberg.catalog.hive.metastore.truststore.password = vcxvcxv
	iceberg.catalog.hive.metastore.truststore.path = file:///opt/hms/secrets/truststore.jks
	iceberg.catalog.hive.metastore.truststore.type = JKS
	iceberg.catalog.hive.metastore.use.SSL = true

But I faced an error:

2025-01-07 04:06:45,107 INFO [ductest|task-0] HMS client filtering is enabled. (org.apache.hadoop.hive.metastore.HiveMetaStoreClient) [task-thread-ductest-0]
2025-01-07 04:06:45,107 INFO [ductest|task-0] Resolved metastore uris: [thrift://lh-lakehouse-hive-metastore-svc.zen.svc.cluster.local:9083] (org.apache.hadoop.hive.metastore.HiveMetaStoreClient) [task-thread-ductest-0]
2025-01-07 04:06:45,107 INFO [ductest|task-0] Trying to connect to metastore with URI (thrift://lh-lakehouse-hive-metastore-svc.zen.svc.cluster.local:9083) in binary transport mode (org.apache.hadoop.hive.metastore.HiveMetaStoreClient) [task-thread-ductest-0]
2025-01-07 04:06:45,137 INFO [ductest|task-0] Opened a connection to metastore, URI (thrift://lh-lakehouse-hive-metastore-svc.zen.svc.cluster.local:9083) current connections: 1 (org.apache.hadoop.hive.metastore.HiveMetaStoreClient) [task-thread-ductest-0]
2025-01-07 04:06:45,280 WARN [ductest|task-0] set_ugi() not successful, Likely cause: new client talking to old server. Continuing without it. (org.apache.hadoop.hive.metastore.HiveMetaStoreClient) [task-thread-ductest-0]
org.apache.thrift.transport.TTransportException: Socket is closed by peer.
	at org.apache.thrift.transport.TIOStreamTransport.read(TIOStreamTransport.java:184)
	at org.apache.thrift.transport.TTransport.readAll(TTransport.java:109)
	at org.apache.thrift.protocol.TBinaryProtocol.readStringBody(TBinaryProtocol.java:417)
	at org.apache.thrift.protocol.TBinaryProtocol.readMessageBegin(TBinaryProtocol.java:256)
	at org.apache.thrift.TServiceClient.receiveBase(TServiceClient.java:77)
	at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.recv_set_ugi(ThriftHiveMetastore.java:5837)
	at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.set_ugi(ThriftHiveMetastore.java:5823)
	at org.apache.hadoop.hive.metastore.HiveMetaStoreClient.open(HiveMetaStoreClient.java:811)
	at org.apache.hadoop.hive.metastore.HiveMetaStoreClient.<init>(HiveMetaStoreClient.java:277)
	at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
	at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:77)
	at java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
	at java.base/java.lang.reflect.Constructor.newInstanceWithCaller(Constructor.java:499)
	at java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:480)
	at org.apache.hadoop.hive.metastore.utils.JavaUtils.newInstance(JavaUtils.java:84)
	at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.<init>(RetryingMetaStoreClient.java:101)
	at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.getProxy(RetryingMetaStoreClient.java:154)
	at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.getProxy(RetryingMetaStoreClient.java:125)
	at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.getProxy(RetryingMetaStoreClient.java:118)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:568)
	at org.apache.iceberg.common.DynMethods$UnboundMethod.invokeChecked(DynMethods.java:60)
	at org.apache.iceberg.common.DynMethods$UnboundMethod.invoke(DynMethods.java:72)
	at org.apache.iceberg.common.DynMethods$StaticMethod.invoke(DynMethods.java:185)
	at org.apache.iceberg.hive.HiveClientPool.newClient(HiveClientPool.java:63)
	at org.apache.iceberg.hive.HiveClientPool.newClient(HiveClientPool.java:34)
	at org.apache.iceberg.ClientPoolImpl.get(ClientPoolImpl.java:125)
	at org.apache.iceberg.ClientPoolImpl.run(ClientPoolImpl.java:56)
	at org.apache.iceberg.ClientPoolImpl.run(ClientPoolImpl.java:51)
	at org.apache.iceberg.hive.CachedClientPool.run(CachedClientPool.java:122)
	at org.apache.iceberg.hive.HiveTableOperations.doRefresh(HiveTableOperations.java:146)
	at org.apache.iceberg.BaseMetastoreTableOperations.refresh(BaseMetastoreTableOperations.java:97)
	at org.apache.iceberg.BaseMetastoreTableOperations.current(BaseMetastoreTableOperations.java:80)
	at org.apache.iceberg.BaseMetastoreCatalog.loadTable(BaseMetastoreCatalog.java:49)
	at io.tabular.iceberg.connect.data.IcebergWriterFactory.createWriter(IcebergWriterFactory.java:54)
	at io.tabular.iceberg.connect.channel.Worker.lambda$writerForTable$4(Worker.java:157)
	at java.base/java.util.HashMap.computeIfAbsent(HashMap.java:1220)
	at io.tabular.iceberg.connect.channel.Worker.writerForTable(Worker.java:156)
	at io.tabular.iceberg.connect.channel.Worker.lambda$routeRecordStatically$1(Worker.java:112)
	at java.base/java.util.Arrays$ArrayList.forEach(Arrays.java:4235)
	at io.tabular.iceberg.connect.channel.Worker.routeRecordStatically(Worker.java:110)
	at io.tabular.iceberg.connect.channel.Worker.save(Worker.java:99)
	at java.base/java.util.ArrayList.forEach(ArrayList.java:1511)
	at io.tabular.iceberg.connect.channel.Worker.write(Worker.java:85)
	at io.tabular.iceberg.connect.channel.TaskImpl.put(TaskImpl.java:42)
	at io.tabular.iceberg.connect.IcebergSinkTask.put(IcebergSinkTask.java:76)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:583)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:336)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:237)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:206)
	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:202)
	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:257)
	at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:177)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.run(Thread.java:857)
2025-01-07 04:06:45,286 INFO [ductest|task-0] RetryingMetaStoreClient proxy=class org.apache.hadoop.hive.metastore.HiveMetaStoreClient ugi=1000750000 (auth:SIMPLE) retries=1 delay=1 lifetime=0 (org.apache.hadoop.hive.metastore.RetryingMetaStoreClient) **[task-thread-ductest-0]**

I enabled SSL with iceberg.catalog.hive.metastore.use.SSL = true, but it doesn't seem to have any effect. The logs still show Opened a connection to metastore instead of Opened an SSL connection to metastore, as expected from the Hive Metastore Client class (https://github.dev/apache/hive/blob/master/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java (line 833)
So, can you recommend any solution to resolve this problem?

@duc-dn duc-dn added the question Further information is requested label Jan 8, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
question Further information is requested
Projects
None yet
Development

No branches or pull requests

1 participant