Skip to content

Commit

Permalink
[fix] Change IoTConsensusService and PipeConsensusService from async …
Browse files Browse the repository at this point in the history
…to sync (#13077)
  • Loading branch information
133tosakarin authored and OneSizeFitsQuorum committed Sep 25, 2024
1 parent ef3738b commit e24ebd3
Show file tree
Hide file tree
Showing 4 changed files with 85 additions and 137 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ public IoTConsensus(ConsensusConfig config, Registry registry) {
@Override
public synchronized void start() throws IOException {
initAndRecover();
service.initAsyncedServiceImpl(new IoTConsensusRPCServiceProcessor(this));
service.initSyncedServiceImpl(new IoTConsensusRPCServiceProcessor(this));
try {
registerManager.register(service);
} catch (StartupException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,9 @@
import org.apache.iotdb.consensus.iot.thrift.IoTConsensusIService;
import org.apache.iotdb.rpc.ZeroCopyRpcTransportFactory;

import org.apache.thrift.TBaseAsyncProcessor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.lang.reflect.InvocationTargetException;

public class IoTConsensusRPCService extends ThriftService implements IoTConsensusRPCServiceMBean {

private static final Logger logger = LoggerFactory.getLogger(IoTConsensusRPCService.class);
Expand All @@ -54,17 +51,15 @@ public ServiceType getID() {
}

@Override
public void initAsyncedServiceImpl(Object iotConsensusRPCServiceProcessor) {
public void initSyncedServiceImpl(Object iotConsensusRPCServiceProcessor) {
this.iotConsensusRPCServiceProcessor =
(IoTConsensusRPCServiceProcessor) iotConsensusRPCServiceProcessor;
super.initAsyncedServiceImpl(this.iotConsensusRPCServiceProcessor);
super.initSyncedServiceImpl(iotConsensusRPCServiceProcessor);
}

@Override
public void initTProcessor()
throws ClassNotFoundException, IllegalAccessException, InstantiationException,
NoSuchMethodException, InvocationTargetException {
processor = new IoTConsensusIService.AsyncProcessor<>(iotConsensusRPCServiceProcessor);
public void initTProcessor() {
processor = new IoTConsensusIService.Processor<>(iotConsensusRPCServiceProcessor);
}

@Override
Expand All @@ -73,20 +68,15 @@ public void initThriftServiceThread()
try {
thriftServiceThread =
new ThriftServiceThread(
(TBaseAsyncProcessor<?>) processor,
processor,
getID().getName(),
ThreadName.IOT_CONSENSUS_RPC_PROCESSOR.getName(),
getBindIP(),
getBindPort(),
config.getRpc().getRpcSelectorThreadNum(),
config.getRpc().getRpcMinConcurrentClientNum(),
config.getRpc().getRpcMaxConcurrentClientNum(),
config.getRpc().getThriftServerAwaitTimeForStopService(),
new IoTConsensusRPCServiceHandler(iotConsensusRPCServiceProcessor),
config.getRpc().isRpcThriftCompressionEnabled(),
config.getRpc().getConnectionTimeoutInMs(),
config.getRpc().getThriftMaxFrameSize(),
ThriftServiceThread.ServerType.SELECTOR,
ZeroCopyRpcTransportFactory.INSTANCE);
} catch (RPCServiceException e) {
throw new IllegalAccessException(e.getMessage());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,14 +55,13 @@
import org.apache.iotdb.rpc.TSStatusCode;

import org.apache.thrift.TException;
import org.apache.thrift.async.AsyncMethodCallback;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Collections;
import java.util.stream.Collectors;

public class IoTConsensusRPCServiceProcessor implements IoTConsensusIService.AsyncIface {
public class IoTConsensusRPCServiceProcessor implements IoTConsensusIService.Iface {

private static final Logger LOGGER =
LoggerFactory.getLogger(IoTConsensusRPCServiceProcessor.class);
Expand All @@ -74,98 +73,85 @@ public IoTConsensusRPCServiceProcessor(IoTConsensus consensus) {
}

@Override
public void syncLogEntries(
TSyncLogEntriesReq req, AsyncMethodCallback<TSyncLogEntriesRes> resultHandler) {
try {
ConsensusGroupId groupId =
ConsensusGroupId.Factory.createFromTConsensusGroupId(req.getConsensusGroupId());
IoTConsensusServerImpl impl = consensus.getImpl(groupId);
if (impl == null) {
String message =
String.format(
"unexpected consensusGroupId %s for TSyncLogEntriesReq which size is %s",
groupId, req.getLogEntries().size());
LOGGER.error(message);
TSStatus status = new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
status.setMessage(message);
resultHandler.onComplete(new TSyncLogEntriesRes(Collections.singletonList(status)));
return;
}
if (impl.isReadOnly()) {
String message = "fail to sync logEntries because system is read-only.";
LOGGER.error(message);
TSStatus status = new TSStatus(TSStatusCode.SYSTEM_READ_ONLY.getStatusCode());
status.setMessage(message);
resultHandler.onComplete(new TSyncLogEntriesRes(Collections.singletonList(status)));
return;
}
if (!impl.isActive()) {
TSStatus status = new TSStatus(TSStatusCode.WRITE_PROCESS_REJECT.getStatusCode());
status.setMessage("peer is inactive and not ready to receive sync log request");
resultHandler.onComplete(new TSyncLogEntriesRes(Collections.singletonList(status)));
return;
}
BatchIndexedConsensusRequest logEntriesInThisBatch =
new BatchIndexedConsensusRequest(req.peerId);
// We use synchronized to ensure atomicity of executing multiple logs
for (TLogEntry entry : req.getLogEntries()) {
logEntriesInThisBatch.add(
impl.buildIndexedConsensusRequestForRemoteRequest(
entry.getSearchIndex(),
entry.getData().stream()
.map(
entry.isFromWAL()
? IoTConsensusRequest::new
: ByteBufferConsensusRequest::new)
.collect(Collectors.toList())));
}
long buildRequestTime = System.nanoTime();
IConsensusRequest deserializedRequest =
impl.getStateMachine().deserializeRequest(logEntriesInThisBatch);
impl.getIoTConsensusServerMetrics()
.recordDeserializeCost(System.nanoTime() - buildRequestTime);
TSStatus writeStatus =
impl.syncLog(logEntriesInThisBatch.getSourcePeerId(), deserializedRequest);
LOGGER.debug(
"execute TSyncLogEntriesReq for {} with result {}",
req.consensusGroupId,
writeStatus.subStatus);
resultHandler.onComplete(new TSyncLogEntriesRes(writeStatus.subStatus));
} catch (Exception e) {
resultHandler.onError(e);
public TSyncLogEntriesRes syncLogEntries(TSyncLogEntriesReq req) {
ConsensusGroupId groupId =
ConsensusGroupId.Factory.createFromTConsensusGroupId(req.getConsensusGroupId());
IoTConsensusServerImpl impl = consensus.getImpl(groupId);
if (impl == null) {
String message =
String.format(
"unexpected consensusGroupId %s for TSyncLogEntriesReq which size is %s",
groupId, req.getLogEntries().size());
LOGGER.error(message);
TSStatus status = new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
status.setMessage(message);
return new TSyncLogEntriesRes(Collections.singletonList(status));
}
if (impl.isReadOnly()) {
String message = "fail to sync logEntries because system is read-only.";
LOGGER.error(message);
TSStatus status = new TSStatus(TSStatusCode.SYSTEM_READ_ONLY.getStatusCode());
status.setMessage(message);
return new TSyncLogEntriesRes(Collections.singletonList(status));
}
if (!impl.isActive()) {
TSStatus status = new TSStatus(TSStatusCode.WRITE_PROCESS_REJECT.getStatusCode());
status.setMessage("peer is inactive and not ready to receive sync log request");
return new TSyncLogEntriesRes(Collections.singletonList(status));
}
BatchIndexedConsensusRequest logEntriesInThisBatch =
new BatchIndexedConsensusRequest(req.peerId);
// We use synchronized to ensure atomicity of executing multiple logs
for (TLogEntry entry : req.getLogEntries()) {
logEntriesInThisBatch.add(
impl.buildIndexedConsensusRequestForRemoteRequest(
entry.getSearchIndex(),
entry.getData().stream()
.map(
entry.isFromWAL()
? IoTConsensusRequest::new
: ByteBufferConsensusRequest::new)
.collect(Collectors.toList())));
}
long buildRequestTime = System.nanoTime();
IConsensusRequest deserializedRequest =
impl.getStateMachine().deserializeRequest(logEntriesInThisBatch);
impl.getIoTConsensusServerMetrics().recordDeserializeCost(System.nanoTime() - buildRequestTime);
TSStatus writeStatus =
impl.syncLog(logEntriesInThisBatch.getSourcePeerId(), deserializedRequest);
LOGGER.debug(
"execute TSyncLogEntriesReq for {} with result {}",
req.consensusGroupId,
writeStatus.subStatus);
return new TSyncLogEntriesRes(writeStatus.subStatus);
}

@Override
public void inactivatePeer(
TInactivatePeerReq req, AsyncMethodCallback<TInactivatePeerRes> resultHandler)
throws TException {
public TInactivatePeerRes inactivatePeer(TInactivatePeerReq req) throws TException {
if (req.isForDeletionPurpose()) {
KillPoint.setKillPoint(IoTConsensusInactivatePeerKillPoints.BEFORE_INACTIVATE);
}
ConsensusGroupId groupId =
ConsensusGroupId.Factory.createFromTConsensusGroupId(req.getConsensusGroupId());
IoTConsensusServerImpl impl = consensus.getImpl(groupId);

if (impl == null) {
String message =
String.format("unexpected consensusGroupId %s for inactivatePeer request", groupId);
LOGGER.error(message);
TSStatus status = new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
status.setMessage(message);
resultHandler.onComplete(new TInactivatePeerRes(status));
return;
return new TInactivatePeerRes(status);
}
impl.setActive(false);
resultHandler.onComplete(
new TInactivatePeerRes(new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode())));
if (req.isForDeletionPurpose()) {
KillPoint.setKillPoint(IoTConsensusInactivatePeerKillPoints.AFTER_INACTIVATE);
}
return new TInactivatePeerRes(new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()));
}

@Override
public void activatePeer(
TActivatePeerReq req, AsyncMethodCallback<TActivatePeerRes> resultHandler) throws TException {
public TActivatePeerRes activatePeer(TActivatePeerReq req) throws TException {
ConsensusGroupId groupId =
ConsensusGroupId.Factory.createFromTConsensusGroupId(req.getConsensusGroupId());
IoTConsensusServerImpl impl = consensus.getImpl(groupId);
Expand All @@ -175,18 +161,15 @@ public void activatePeer(
LOGGER.error(message);
TSStatus status = new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
status.setMessage(message);
resultHandler.onComplete(new TActivatePeerRes(status));
return;
return new TActivatePeerRes(status);
}
KillPoint.setKillPoint(DataNodeKillPoints.DESTINATION_ADD_PEER_DONE);
impl.setActive(true);
resultHandler.onComplete(
new TActivatePeerRes(new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode())));
return new TActivatePeerRes(new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()));
}

@Override
public void buildSyncLogChannel(
TBuildSyncLogChannelReq req, AsyncMethodCallback<TBuildSyncLogChannelRes> resultHandler)
public TBuildSyncLogChannelRes buildSyncLogChannel(TBuildSyncLogChannelReq req)
throws TException {
ConsensusGroupId groupId =
ConsensusGroupId.Factory.createFromTConsensusGroupId(req.getConsensusGroupId());
Expand All @@ -197,8 +180,7 @@ public void buildSyncLogChannel(
LOGGER.error(message);
TSStatus status = new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
status.setMessage(message);
resultHandler.onComplete(new TBuildSyncLogChannelRes(status));
return;
return new TBuildSyncLogChannelRes(status);
}
TSStatus responseStatus;
try {
Expand All @@ -208,12 +190,11 @@ public void buildSyncLogChannel(
responseStatus = new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
responseStatus.setMessage(e.getMessage());
}
resultHandler.onComplete(new TBuildSyncLogChannelRes(responseStatus));
return new TBuildSyncLogChannelRes(responseStatus);
}

@Override
public void removeSyncLogChannel(
TRemoveSyncLogChannelReq req, AsyncMethodCallback<TRemoveSyncLogChannelRes> resultHandler)
public TRemoveSyncLogChannelRes removeSyncLogChannel(TRemoveSyncLogChannelReq req)
throws TException {
ConsensusGroupId groupId =
ConsensusGroupId.Factory.createFromTConsensusGroupId(req.getConsensusGroupId());
Expand All @@ -224,8 +205,7 @@ public void removeSyncLogChannel(
LOGGER.error(message);
TSStatus status = new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
status.setMessage(message);
resultHandler.onComplete(new TRemoveSyncLogChannelRes(status));
return;
return new TRemoveSyncLogChannelRes(status);
}
TSStatus responseStatus;
try {
Expand All @@ -235,12 +215,11 @@ public void removeSyncLogChannel(
responseStatus = new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
responseStatus.setMessage(e.getMessage());
}
resultHandler.onComplete(new TRemoveSyncLogChannelRes(responseStatus));
return new TRemoveSyncLogChannelRes(responseStatus);
}

@Override
public void waitSyncLogComplete(
TWaitSyncLogCompleteReq req, AsyncMethodCallback<TWaitSyncLogCompleteRes> resultHandler)
public TWaitSyncLogCompleteRes waitSyncLogComplete(TWaitSyncLogCompleteReq req)
throws TException {
ConsensusGroupId groupId =
ConsensusGroupId.Factory.createFromTConsensusGroupId(req.getConsensusGroupId());
Expand All @@ -251,18 +230,15 @@ public void waitSyncLogComplete(
LOGGER.error(message);
TSStatus status = new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
status.setMessage(message);
resultHandler.onComplete(new TWaitSyncLogCompleteRes(true, 0, 0));
return;
return new TWaitSyncLogCompleteRes(true, 0, 0);
}
long searchIndex = impl.getSearchIndex();
long safeIndex = impl.getMinSyncIndex();
resultHandler.onComplete(
new TWaitSyncLogCompleteRes(searchIndex == safeIndex, searchIndex, safeIndex));
return new TWaitSyncLogCompleteRes(searchIndex == safeIndex, searchIndex, safeIndex);
}

@Override
public void sendSnapshotFragment(
TSendSnapshotFragmentReq req, AsyncMethodCallback<TSendSnapshotFragmentRes> resultHandler)
public TSendSnapshotFragmentRes sendSnapshotFragment(TSendSnapshotFragmentReq req)
throws TException {
ConsensusGroupId groupId =
ConsensusGroupId.Factory.createFromTConsensusGroupId(req.getConsensusGroupId());
Expand All @@ -273,8 +249,7 @@ public void sendSnapshotFragment(
LOGGER.error(message);
TSStatus status = new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
status.setMessage(message);
resultHandler.onComplete(new TSendSnapshotFragmentRes(status));
return;
return new TSendSnapshotFragmentRes(status);
}
TSStatus responseStatus;
try {
Expand All @@ -284,12 +259,11 @@ public void sendSnapshotFragment(
responseStatus = new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
responseStatus.setMessage(e.getMessage());
}
resultHandler.onComplete(new TSendSnapshotFragmentRes(responseStatus));
return new TSendSnapshotFragmentRes(responseStatus);
}

@Override
public void triggerSnapshotLoad(
TTriggerSnapshotLoadReq req, AsyncMethodCallback<TTriggerSnapshotLoadRes> resultHandler)
public TTriggerSnapshotLoadRes triggerSnapshotLoad(TTriggerSnapshotLoadReq req)
throws TException {
ConsensusGroupId groupId =
ConsensusGroupId.Factory.createFromTConsensusGroupId(req.getConsensusGroupId());
Expand All @@ -300,20 +274,16 @@ public void triggerSnapshotLoad(
LOGGER.error(message);
TSStatus status = new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
status.setMessage(message);
resultHandler.onComplete(new TTriggerSnapshotLoadRes(status));
return;
return new TTriggerSnapshotLoadRes(status);
}
impl.loadSnapshot(req.snapshotId);
KillPoint.setKillPoint(DataNodeKillPoints.DESTINATION_ADD_PEER_TRANSITION);
resultHandler.onComplete(
new TTriggerSnapshotLoadRes(new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode())));
return new TTriggerSnapshotLoadRes(new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()));
}

@Override
public void cleanupTransferredSnapshot(
TCleanupTransferredSnapshotReq req,
AsyncMethodCallback<TCleanupTransferredSnapshotRes> resultHandler)
throws TException {
public TCleanupTransferredSnapshotRes cleanupTransferredSnapshot(
TCleanupTransferredSnapshotReq req) throws TException {
ConsensusGroupId groupId =
ConsensusGroupId.Factory.createFromTConsensusGroupId(req.getConsensusGroupId());
IoTConsensusServerImpl impl = consensus.getImpl(groupId);
Expand All @@ -323,8 +293,7 @@ public void cleanupTransferredSnapshot(
LOGGER.error(message);
TSStatus status = new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
status.setMessage(message);
resultHandler.onComplete(new TCleanupTransferredSnapshotRes(status));
return;
return new TCleanupTransferredSnapshotRes(status);
}
TSStatus responseStatus;
try {
Expand All @@ -335,7 +304,7 @@ public void cleanupTransferredSnapshot(
responseStatus = new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
responseStatus.setMessage(e.getMessage());
}
resultHandler.onComplete(new TCleanupTransferredSnapshotRes(responseStatus));
return new TCleanupTransferredSnapshotRes(responseStatus);
}

public void handleClientExit() {}
Expand Down
Loading

0 comments on commit e24ebd3

Please sign in to comment.