Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Do election in order based on failed primary rank to avoid voting conflicts #1018

Merged
merged 9 commits into from
Jan 11, 2025
74 changes: 73 additions & 1 deletion src/cluster_legacy.c
Original file line number Diff line number Diff line change
Expand Up @@ -1091,6 +1091,7 @@ void clusterInit(void) {
server.cluster->failover_auth_time = 0;
server.cluster->failover_auth_count = 0;
server.cluster->failover_auth_rank = 0;
server.cluster->failover_failed_primary_rank = 0;
server.cluster->failover_auth_epoch = 0;
server.cluster->cant_failover_reason = CLUSTER_CANT_FAILOVER_NONE;
server.cluster->lastVoteEpoch = 0;
Expand Down Expand Up @@ -3113,6 +3114,20 @@ int clusterProcessPacket(clusterLink *link) {
if (sender_claims_to_be_primary && sender_claimed_config_epoch > sender->configEpoch) {
sender->configEpoch = sender_claimed_config_epoch;
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG | CLUSTER_TODO_FSYNC_CONFIG);

/* This change is in #1009, revert it after #1009 get merge. */
if (server.cluster->failover_auth_time && sender->configEpoch == server.cluster->failover_auth_epoch) {
/* There are another node has claimed it in this epoch, if we have any ongoing
* election, we can reset it since there won't be enough votes and we can start
* a new one ASAP. */
server.cluster->failover_auth_time = 0;
serverLog(LL_WARNING,
"I have a failover election for epoch %llu in progress and "
"received node %.40s (%s) claiming this epoch, resetting the election.",
(unsigned long long)sender->configEpoch, sender->name, sender->human_nodename);
clusterDoBeforeSleep(CLUSTER_TODO_HANDLE_FAILOVER);
}

}
/* Update the replication offset info for this node. */
sender->repl_offset = ntohu64(hdr->offset);
Expand Down Expand Up @@ -4383,6 +4398,45 @@ int clusterGetReplicaRank(void) {
return rank;
}

/* This function returns the "rank" of this instance's primary, in the context
* of all failed primary list. The primary node will be ignored if failed time
* exceeds cluster-node-timeout * cluster-replica-validity-factor.
*
* If multiple primary nodes go down at the same time, there is a certain
* probability that their replicas will initiate the elections at the same time,
* and lead to insufficient votes.
*
* The failed primary rank is used to add a delay to start an election in order
* to avoid simultaneous elections of replicas. */
int clusterGetFailedPrimaryRank(void) {
serverAssert(nodeIsReplica(myself));
serverAssert(myself->replicaof);

int rank = 0;
mstime_t now = mstime();
dictIterator *di;
dictEntry *de;

di = dictGetSafeIterator(server.cluster->nodes);
while((de = dictNext(di)) != NULL) {
clusterNode *node = dictGetVal(de);

/* Skip nodes that do not need to participate in the rank. */
if (!nodeFailed(node) || !clusterNodeIsVotingPrimary(node) || node->num_replicas == 0) continue;

/* If cluster-replica-validity-factor is enabled, skip the invalid nodes. */
if (server.cluster_replica_validity_factor) {
if ((now - node->fail_time) > (server.cluster_node_timeout * server.cluster_replica_validity_factor))
continue;
}

if (memcmp(node->name, myself->replicaof->name, CLUSTER_NAMELEN) < 0) rank++;
enjoy-binbin marked this conversation as resolved.
Show resolved Hide resolved
}
dictReleaseIterator(di);

return rank;
}

/* This function is called by clusterHandleReplicaFailover() in order to
* let the replica log why it is not able to failover. Sometimes there are
* not the conditions, but since the failover function is called again and
Expand Down Expand Up @@ -4553,6 +4607,11 @@ void clusterHandleReplicaFailover(void) {
* Specifically 1 second * rank. This way replicas that have a probably
* less updated replication offset, are penalized. */
server.cluster->failover_auth_time += server.cluster->failover_auth_rank * 1000;
/* We add another delay that is proportional to the failed primary rank.
* Specifically 0.5 second * rank. This way those failed primaries will be
* elected in rank to avoid the vote conflicts. */
server.cluster->failover_failed_primary_rank = clusterGetFailedPrimaryRank();
server.cluster->failover_auth_time += server.cluster->failover_failed_primary_rank * 500;
enjoy-binbin marked this conversation as resolved.
Show resolved Hide resolved
/* However if this is a manual failover, no delay is needed. */
if (server.cluster->mf_end) {
server.cluster->failover_auth_time = mstime();
Expand All @@ -4561,8 +4620,9 @@ void clusterHandleReplicaFailover(void) {
}
serverLog(LL_NOTICE,
"Start of election delayed for %lld milliseconds "
"(rank #%d, offset %lld).",
"(rank #%d, primary rank #%d, offset %lld).",
server.cluster->failover_auth_time - mstime(), server.cluster->failover_auth_rank,
server.cluster->failover_failed_primary_rank,
replicationGetReplicaOffset());
/* Now that we have a scheduled election, broadcast our offset
* to all the other replicas so that they'll updated their offsets
Expand All @@ -4575,6 +4635,9 @@ void clusterHandleReplicaFailover(void) {
* replicas for the same primary since we computed our election delay.
* Update the delay if our rank changed.
*
* It is also possible that we received the message that telling a
* shard is up. Update the delay if our failed_primary_rank changed.
*
* Not performed if this is a manual failover. */
if (server.cluster->failover_auth_sent == 0 && server.cluster->mf_end == 0) {
int newrank = clusterGetReplicaRank();
Expand All @@ -4585,6 +4648,15 @@ void clusterHandleReplicaFailover(void) {
serverLog(LL_NOTICE, "Replica rank updated to #%d, added %lld milliseconds of delay.", newrank,
added_delay);
}

int new_failed_primary_rank = clusterGetFailedPrimaryRank();
if (new_failed_primary_rank != server.cluster->failover_failed_primary_rank) {
long long added_delay = (new_failed_primary_rank - server.cluster->failover_failed_primary_rank) * 500;
server.cluster->failover_auth_time += added_delay;
server.cluster->failover_failed_primary_rank = new_failed_primary_rank;
serverLog(LL_NOTICE, "Failed primary rank updated to #%d, added %lld milliseconds of delay.",
new_failed_primary_rank, added_delay);
}
}

/* Return ASAP if we can't still start the election. */
Expand Down
15 changes: 8 additions & 7 deletions src/cluster_legacy.h
Original file line number Diff line number Diff line change
Expand Up @@ -377,13 +377,14 @@ struct clusterState {
clusterNode *importing_slots_from[CLUSTER_SLOTS];
clusterNode *slots[CLUSTER_SLOTS];
/* The following fields are used to take the replica state on elections. */
mstime_t failover_auth_time; /* Time of previous or next election. */
int failover_auth_count; /* Number of votes received so far. */
int failover_auth_sent; /* True if we already asked for votes. */
int failover_auth_rank; /* This replica rank for current auth request. */
uint64_t failover_auth_epoch; /* Epoch of the current election. */
int cant_failover_reason; /* Why a replica is currently not able to
failover. See the CANT_FAILOVER_* macros. */
mstime_t failover_auth_time; /* Time of previous or next election. */
int failover_auth_count; /* Number of votes received so far. */
int failover_auth_sent; /* True if we already asked for votes. */
int failover_auth_rank; /* This replica rank for current auth request. */
int failover_failed_primary_rank; /* The rank of this instance in the context of all failed primary list. */
uint64_t failover_auth_epoch; /* Epoch of the current election. */
int cant_failover_reason; /* Why a replica is currently not able to
* failover. See the CANT_FAILOVER_* macros. */
/* Manual failover state in common. */
mstime_t mf_end; /* Manual failover time limit (ms unixtime).
It is zero if there is no MF in progress. */
Expand Down
31 changes: 31 additions & 0 deletions tests/unit/cluster/failover2.tcl
Original file line number Diff line number Diff line change
Expand Up @@ -64,3 +64,34 @@ start_cluster 3 4 {tags {external:skip cluster} overrides {cluster-ping-interval
}

} ;# start_cluster

enjoy-binbin marked this conversation as resolved.
Show resolved Hide resolved
start_cluster 32 15 {tags {external:skip cluster} overrides {cluster-ping-interval 1000 cluster-node-timeout 15000}} {
enjoy-binbin marked this conversation as resolved.
Show resolved Hide resolved
test "Multiple primary nodes are down, rank them based on the failed primary" {
# Killing these primary nodes.
for {set j 0} {$j < 15} {incr j} {
pause_process [srv -$j pid]
}

# Make sure that a node starts failover.
wait_for_condition 1000 100 {
[s -40 role] == "master"
} else {
fail "No failover detected"
}

# Wait for the cluster state to become ok.
for {set j 0} {$j < [llength $::servers]} {incr j} {
if {[process_is_paused [srv -$j pid]]} continue
wait_for_condition 1000 100 {
[CI $j cluster_state] eq "ok"
} else {
fail "Cluster node $j cluster_state:[CI $j cluster_state]"
}
}

# Resuming these primary nodes, speed up the shutdown.
for {set j 0} {$j < 31} {incr j} {
resume_process [srv -$j pid]
}
}
} ;# start_cluster
Loading