From e24ebd36ed4ca8075be3f55fa7fed5059b7ba9ac Mon Sep 17 00:00:00 2001 From: 133tosakarin <97331129+133tosakarin@users.noreply.github.com> Date: Fri, 2 Aug 2024 09:47:34 +0800 Subject: [PATCH] [fix] Change IoTConsensusService and PipeConsensusService from async to sync (#13077) --- .../iotdb/consensus/iot/IoTConsensus.java | 2 +- .../iot/service/IoTConsensusRPCService.java | 20 +- .../IoTConsensusRPCServiceProcessor.java | 185 ++++++++---------- .../iotdb/commons/service/ThriftService.java | 15 +- 4 files changed, 85 insertions(+), 137 deletions(-) diff --git a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java index 46d911608f941..b380a100ccf70 100644 --- a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java +++ b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java @@ -133,7 +133,7 @@ public IoTConsensus(ConsensusConfig config, Registry registry) { @Override public synchronized void start() throws IOException { initAndRecover(); - service.initAsyncedServiceImpl(new IoTConsensusRPCServiceProcessor(this)); + service.initSyncedServiceImpl(new IoTConsensusRPCServiceProcessor(this)); try { registerManager.register(service); } catch (StartupException e) { diff --git a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/service/IoTConsensusRPCService.java b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/service/IoTConsensusRPCService.java index 7be6dbefc99ad..b41281043bc8c 100644 --- a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/service/IoTConsensusRPCService.java +++ b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/service/IoTConsensusRPCService.java @@ -29,12 +29,9 @@ import org.apache.iotdb.consensus.iot.thrift.IoTConsensusIService; import org.apache.iotdb.rpc.ZeroCopyRpcTransportFactory; -import org.apache.thrift.TBaseAsyncProcessor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.lang.reflect.InvocationTargetException; - public class IoTConsensusRPCService extends ThriftService implements IoTConsensusRPCServiceMBean { private static final Logger logger = LoggerFactory.getLogger(IoTConsensusRPCService.class); @@ -54,17 +51,15 @@ public ServiceType getID() { } @Override - public void initAsyncedServiceImpl(Object iotConsensusRPCServiceProcessor) { + public void initSyncedServiceImpl(Object iotConsensusRPCServiceProcessor) { this.iotConsensusRPCServiceProcessor = (IoTConsensusRPCServiceProcessor) iotConsensusRPCServiceProcessor; - super.initAsyncedServiceImpl(this.iotConsensusRPCServiceProcessor); + super.initSyncedServiceImpl(iotConsensusRPCServiceProcessor); } @Override - public void initTProcessor() - throws ClassNotFoundException, IllegalAccessException, InstantiationException, - NoSuchMethodException, InvocationTargetException { - processor = new IoTConsensusIService.AsyncProcessor<>(iotConsensusRPCServiceProcessor); + public void initTProcessor() { + processor = new IoTConsensusIService.Processor<>(iotConsensusRPCServiceProcessor); } @Override @@ -73,20 +68,15 @@ public void initThriftServiceThread() try { thriftServiceThread = new ThriftServiceThread( - (TBaseAsyncProcessor) processor, + processor, getID().getName(), ThreadName.IOT_CONSENSUS_RPC_PROCESSOR.getName(), getBindIP(), getBindPort(), - config.getRpc().getRpcSelectorThreadNum(), - config.getRpc().getRpcMinConcurrentClientNum(), config.getRpc().getRpcMaxConcurrentClientNum(), config.getRpc().getThriftServerAwaitTimeForStopService(), new IoTConsensusRPCServiceHandler(iotConsensusRPCServiceProcessor), config.getRpc().isRpcThriftCompressionEnabled(), - config.getRpc().getConnectionTimeoutInMs(), - config.getRpc().getThriftMaxFrameSize(), - ThriftServiceThread.ServerType.SELECTOR, ZeroCopyRpcTransportFactory.INSTANCE); } catch (RPCServiceException e) { throw new IllegalAccessException(e.getMessage()); diff --git a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/service/IoTConsensusRPCServiceProcessor.java b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/service/IoTConsensusRPCServiceProcessor.java index 1c5f5354c82d0..185907334d6b8 100644 --- a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/service/IoTConsensusRPCServiceProcessor.java +++ b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/service/IoTConsensusRPCServiceProcessor.java @@ -55,14 +55,13 @@ import org.apache.iotdb.rpc.TSStatusCode; import org.apache.thrift.TException; -import org.apache.thrift.async.AsyncMethodCallback; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Collections; import java.util.stream.Collectors; -public class IoTConsensusRPCServiceProcessor implements IoTConsensusIService.AsyncIface { +public class IoTConsensusRPCServiceProcessor implements IoTConsensusIService.Iface { private static final Logger LOGGER = LoggerFactory.getLogger(IoTConsensusRPCServiceProcessor.class); @@ -74,98 +73,85 @@ public IoTConsensusRPCServiceProcessor(IoTConsensus consensus) { } @Override - public void syncLogEntries( - TSyncLogEntriesReq req, AsyncMethodCallback resultHandler) { - try { - ConsensusGroupId groupId = - ConsensusGroupId.Factory.createFromTConsensusGroupId(req.getConsensusGroupId()); - IoTConsensusServerImpl impl = consensus.getImpl(groupId); - if (impl == null) { - String message = - String.format( - "unexpected consensusGroupId %s for TSyncLogEntriesReq which size is %s", - groupId, req.getLogEntries().size()); - LOGGER.error(message); - TSStatus status = new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode()); - status.setMessage(message); - resultHandler.onComplete(new TSyncLogEntriesRes(Collections.singletonList(status))); - return; - } - if (impl.isReadOnly()) { - String message = "fail to sync logEntries because system is read-only."; - LOGGER.error(message); - TSStatus status = new TSStatus(TSStatusCode.SYSTEM_READ_ONLY.getStatusCode()); - status.setMessage(message); - resultHandler.onComplete(new TSyncLogEntriesRes(Collections.singletonList(status))); - return; - } - if (!impl.isActive()) { - TSStatus status = new TSStatus(TSStatusCode.WRITE_PROCESS_REJECT.getStatusCode()); - status.setMessage("peer is inactive and not ready to receive sync log request"); - resultHandler.onComplete(new TSyncLogEntriesRes(Collections.singletonList(status))); - return; - } - BatchIndexedConsensusRequest logEntriesInThisBatch = - new BatchIndexedConsensusRequest(req.peerId); - // We use synchronized to ensure atomicity of executing multiple logs - for (TLogEntry entry : req.getLogEntries()) { - logEntriesInThisBatch.add( - impl.buildIndexedConsensusRequestForRemoteRequest( - entry.getSearchIndex(), - entry.getData().stream() - .map( - entry.isFromWAL() - ? IoTConsensusRequest::new - : ByteBufferConsensusRequest::new) - .collect(Collectors.toList()))); - } - long buildRequestTime = System.nanoTime(); - IConsensusRequest deserializedRequest = - impl.getStateMachine().deserializeRequest(logEntriesInThisBatch); - impl.getIoTConsensusServerMetrics() - .recordDeserializeCost(System.nanoTime() - buildRequestTime); - TSStatus writeStatus = - impl.syncLog(logEntriesInThisBatch.getSourcePeerId(), deserializedRequest); - LOGGER.debug( - "execute TSyncLogEntriesReq for {} with result {}", - req.consensusGroupId, - writeStatus.subStatus); - resultHandler.onComplete(new TSyncLogEntriesRes(writeStatus.subStatus)); - } catch (Exception e) { - resultHandler.onError(e); + public TSyncLogEntriesRes syncLogEntries(TSyncLogEntriesReq req) { + ConsensusGroupId groupId = + ConsensusGroupId.Factory.createFromTConsensusGroupId(req.getConsensusGroupId()); + IoTConsensusServerImpl impl = consensus.getImpl(groupId); + if (impl == null) { + String message = + String.format( + "unexpected consensusGroupId %s for TSyncLogEntriesReq which size is %s", + groupId, req.getLogEntries().size()); + LOGGER.error(message); + TSStatus status = new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode()); + status.setMessage(message); + return new TSyncLogEntriesRes(Collections.singletonList(status)); } + if (impl.isReadOnly()) { + String message = "fail to sync logEntries because system is read-only."; + LOGGER.error(message); + TSStatus status = new TSStatus(TSStatusCode.SYSTEM_READ_ONLY.getStatusCode()); + status.setMessage(message); + return new TSyncLogEntriesRes(Collections.singletonList(status)); + } + if (!impl.isActive()) { + TSStatus status = new TSStatus(TSStatusCode.WRITE_PROCESS_REJECT.getStatusCode()); + status.setMessage("peer is inactive and not ready to receive sync log request"); + return new TSyncLogEntriesRes(Collections.singletonList(status)); + } + BatchIndexedConsensusRequest logEntriesInThisBatch = + new BatchIndexedConsensusRequest(req.peerId); + // We use synchronized to ensure atomicity of executing multiple logs + for (TLogEntry entry : req.getLogEntries()) { + logEntriesInThisBatch.add( + impl.buildIndexedConsensusRequestForRemoteRequest( + entry.getSearchIndex(), + entry.getData().stream() + .map( + entry.isFromWAL() + ? IoTConsensusRequest::new + : ByteBufferConsensusRequest::new) + .collect(Collectors.toList()))); + } + long buildRequestTime = System.nanoTime(); + IConsensusRequest deserializedRequest = + impl.getStateMachine().deserializeRequest(logEntriesInThisBatch); + impl.getIoTConsensusServerMetrics().recordDeserializeCost(System.nanoTime() - buildRequestTime); + TSStatus writeStatus = + impl.syncLog(logEntriesInThisBatch.getSourcePeerId(), deserializedRequest); + LOGGER.debug( + "execute TSyncLogEntriesReq for {} with result {}", + req.consensusGroupId, + writeStatus.subStatus); + return new TSyncLogEntriesRes(writeStatus.subStatus); } @Override - public void inactivatePeer( - TInactivatePeerReq req, AsyncMethodCallback resultHandler) - throws TException { + public TInactivatePeerRes inactivatePeer(TInactivatePeerReq req) throws TException { if (req.isForDeletionPurpose()) { KillPoint.setKillPoint(IoTConsensusInactivatePeerKillPoints.BEFORE_INACTIVATE); } ConsensusGroupId groupId = ConsensusGroupId.Factory.createFromTConsensusGroupId(req.getConsensusGroupId()); IoTConsensusServerImpl impl = consensus.getImpl(groupId); + if (impl == null) { String message = String.format("unexpected consensusGroupId %s for inactivatePeer request", groupId); LOGGER.error(message); TSStatus status = new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode()); status.setMessage(message); - resultHandler.onComplete(new TInactivatePeerRes(status)); - return; + return new TInactivatePeerRes(status); } impl.setActive(false); - resultHandler.onComplete( - new TInactivatePeerRes(new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()))); if (req.isForDeletionPurpose()) { KillPoint.setKillPoint(IoTConsensusInactivatePeerKillPoints.AFTER_INACTIVATE); } + return new TInactivatePeerRes(new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode())); } @Override - public void activatePeer( - TActivatePeerReq req, AsyncMethodCallback resultHandler) throws TException { + public TActivatePeerRes activatePeer(TActivatePeerReq req) throws TException { ConsensusGroupId groupId = ConsensusGroupId.Factory.createFromTConsensusGroupId(req.getConsensusGroupId()); IoTConsensusServerImpl impl = consensus.getImpl(groupId); @@ -175,18 +161,15 @@ public void activatePeer( LOGGER.error(message); TSStatus status = new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode()); status.setMessage(message); - resultHandler.onComplete(new TActivatePeerRes(status)); - return; + return new TActivatePeerRes(status); } KillPoint.setKillPoint(DataNodeKillPoints.DESTINATION_ADD_PEER_DONE); impl.setActive(true); - resultHandler.onComplete( - new TActivatePeerRes(new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()))); + return new TActivatePeerRes(new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode())); } @Override - public void buildSyncLogChannel( - TBuildSyncLogChannelReq req, AsyncMethodCallback resultHandler) + public TBuildSyncLogChannelRes buildSyncLogChannel(TBuildSyncLogChannelReq req) throws TException { ConsensusGroupId groupId = ConsensusGroupId.Factory.createFromTConsensusGroupId(req.getConsensusGroupId()); @@ -197,8 +180,7 @@ public void buildSyncLogChannel( LOGGER.error(message); TSStatus status = new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode()); status.setMessage(message); - resultHandler.onComplete(new TBuildSyncLogChannelRes(status)); - return; + return new TBuildSyncLogChannelRes(status); } TSStatus responseStatus; try { @@ -208,12 +190,11 @@ public void buildSyncLogChannel( responseStatus = new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode()); responseStatus.setMessage(e.getMessage()); } - resultHandler.onComplete(new TBuildSyncLogChannelRes(responseStatus)); + return new TBuildSyncLogChannelRes(responseStatus); } @Override - public void removeSyncLogChannel( - TRemoveSyncLogChannelReq req, AsyncMethodCallback resultHandler) + public TRemoveSyncLogChannelRes removeSyncLogChannel(TRemoveSyncLogChannelReq req) throws TException { ConsensusGroupId groupId = ConsensusGroupId.Factory.createFromTConsensusGroupId(req.getConsensusGroupId()); @@ -224,8 +205,7 @@ public void removeSyncLogChannel( LOGGER.error(message); TSStatus status = new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode()); status.setMessage(message); - resultHandler.onComplete(new TRemoveSyncLogChannelRes(status)); - return; + return new TRemoveSyncLogChannelRes(status); } TSStatus responseStatus; try { @@ -235,12 +215,11 @@ public void removeSyncLogChannel( responseStatus = new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode()); responseStatus.setMessage(e.getMessage()); } - resultHandler.onComplete(new TRemoveSyncLogChannelRes(responseStatus)); + return new TRemoveSyncLogChannelRes(responseStatus); } @Override - public void waitSyncLogComplete( - TWaitSyncLogCompleteReq req, AsyncMethodCallback resultHandler) + public TWaitSyncLogCompleteRes waitSyncLogComplete(TWaitSyncLogCompleteReq req) throws TException { ConsensusGroupId groupId = ConsensusGroupId.Factory.createFromTConsensusGroupId(req.getConsensusGroupId()); @@ -251,18 +230,15 @@ public void waitSyncLogComplete( LOGGER.error(message); TSStatus status = new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode()); status.setMessage(message); - resultHandler.onComplete(new TWaitSyncLogCompleteRes(true, 0, 0)); - return; + return new TWaitSyncLogCompleteRes(true, 0, 0); } long searchIndex = impl.getSearchIndex(); long safeIndex = impl.getMinSyncIndex(); - resultHandler.onComplete( - new TWaitSyncLogCompleteRes(searchIndex == safeIndex, searchIndex, safeIndex)); + return new TWaitSyncLogCompleteRes(searchIndex == safeIndex, searchIndex, safeIndex); } @Override - public void sendSnapshotFragment( - TSendSnapshotFragmentReq req, AsyncMethodCallback resultHandler) + public TSendSnapshotFragmentRes sendSnapshotFragment(TSendSnapshotFragmentReq req) throws TException { ConsensusGroupId groupId = ConsensusGroupId.Factory.createFromTConsensusGroupId(req.getConsensusGroupId()); @@ -273,8 +249,7 @@ public void sendSnapshotFragment( LOGGER.error(message); TSStatus status = new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode()); status.setMessage(message); - resultHandler.onComplete(new TSendSnapshotFragmentRes(status)); - return; + return new TSendSnapshotFragmentRes(status); } TSStatus responseStatus; try { @@ -284,12 +259,11 @@ public void sendSnapshotFragment( responseStatus = new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode()); responseStatus.setMessage(e.getMessage()); } - resultHandler.onComplete(new TSendSnapshotFragmentRes(responseStatus)); + return new TSendSnapshotFragmentRes(responseStatus); } @Override - public void triggerSnapshotLoad( - TTriggerSnapshotLoadReq req, AsyncMethodCallback resultHandler) + public TTriggerSnapshotLoadRes triggerSnapshotLoad(TTriggerSnapshotLoadReq req) throws TException { ConsensusGroupId groupId = ConsensusGroupId.Factory.createFromTConsensusGroupId(req.getConsensusGroupId()); @@ -300,20 +274,16 @@ public void triggerSnapshotLoad( LOGGER.error(message); TSStatus status = new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode()); status.setMessage(message); - resultHandler.onComplete(new TTriggerSnapshotLoadRes(status)); - return; + return new TTriggerSnapshotLoadRes(status); } impl.loadSnapshot(req.snapshotId); KillPoint.setKillPoint(DataNodeKillPoints.DESTINATION_ADD_PEER_TRANSITION); - resultHandler.onComplete( - new TTriggerSnapshotLoadRes(new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()))); + return new TTriggerSnapshotLoadRes(new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode())); } @Override - public void cleanupTransferredSnapshot( - TCleanupTransferredSnapshotReq req, - AsyncMethodCallback resultHandler) - throws TException { + public TCleanupTransferredSnapshotRes cleanupTransferredSnapshot( + TCleanupTransferredSnapshotReq req) throws TException { ConsensusGroupId groupId = ConsensusGroupId.Factory.createFromTConsensusGroupId(req.getConsensusGroupId()); IoTConsensusServerImpl impl = consensus.getImpl(groupId); @@ -323,8 +293,7 @@ public void cleanupTransferredSnapshot( LOGGER.error(message); TSStatus status = new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode()); status.setMessage(message); - resultHandler.onComplete(new TCleanupTransferredSnapshotRes(status)); - return; + return new TCleanupTransferredSnapshotRes(status); } TSStatus responseStatus; try { @@ -335,7 +304,7 @@ public void cleanupTransferredSnapshot( responseStatus = new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode()); responseStatus.setMessage(e.getMessage()); } - resultHandler.onComplete(new TCleanupTransferredSnapshotRes(responseStatus)); + return new TCleanupTransferredSnapshotRes(responseStatus); } public void handleClientExit() {} diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/ThriftService.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/ThriftService.java index 98341903d2c27..f214de293efda 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/ThriftService.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/ThriftService.java @@ -76,16 +76,9 @@ public void stop() { JMXService.deregisterMBean(mbeanName); } - boolean setSyncedImpl = false; - boolean setAsyncedImpl = false; + public void initSyncedServiceImpl(Object serviceImpl) {} - public void initSyncedServiceImpl(Object serviceImpl) { - setSyncedImpl = true; - } - - public void initAsyncedServiceImpl(Object serviceImpl) { - setAsyncedImpl = true; - } + public void initAsyncServiceImpl(Object serviceImpl) {} public abstract void initTProcessor() throws ClassNotFoundException, IllegalAccessException, InstantiationException, @@ -111,10 +104,6 @@ public void startService() throws StartupException { try { reset(); initTProcessor(); - if (!setSyncedImpl && !setAsyncedImpl) { - throw new StartupException( - getID().getName(), "At least one service implementation should be set."); - } initThriftServiceThread(); thriftServiceThread.setThreadStopLatch(stopLatch); thriftServiceThread.start();