Skip to content

Commit

Permalink
Limit RW separation to remote store enabled clusters and update recov…
Browse files Browse the repository at this point in the history
…ery flow (#16760)

* Update search only replica recovery flow

This PR includes multiple changes to search replica recovery.
1. Change search only replica copies to recover as empty store instead of PEER. This will run a store recovery that syncs segments from remote store directly and eliminate any primary communication.
2. Remove search replicas from the in-sync allocation ID set and update routing table to exclude them from allAllocationIds.  This ensures primaries aren't tracking or validating the routing table for any search replica's presence.
3. Change search replica validation to require remote store.  There are versions of the above changes that are still possible with primary based node-node replication, but I don't think they are worth making  at this time.

Signed-off-by: Marc Handalian <[email protected]>

* more coverage

Signed-off-by: Marc Handalian <[email protected]>

* add changelog entry

Signed-off-by: Marc Handalian <[email protected]>

* add assertions that Search Replicas are not in the in-sync id set nor the AllAllocationIds set in the routing table

Signed-off-by: Marc Handalian <[email protected]>

* update async task to only run if the FF is enabled and we are a remote store cluster.

This check had previously only checked for segrep

Signed-off-by: Marc Handalian <[email protected]>

* clean up max shards logic

Signed-off-by: Marc Handalian <[email protected]>

* remove search replicas from check during renewPeerRecoveryRetentionLeases

Signed-off-by: Marc Handalian <[email protected]>

* Revert "update async task to only run if the FF is enabled and we are a remote store cluster."

reverting this, we already check for remote store earlier.

This reverts commit 48ca1a3.

Signed-off-by: Marc Handalian <[email protected]>

* Add more tests for failover case

Signed-off-by: Marc Handalian <[email protected]>

* Update remotestore restore logic and add test ensuring we can restore only writers when red

Signed-off-by: Marc Handalian <[email protected]>

* Fix Search replicas to honor node level recovery limits

Signed-off-by: Marc Handalian <[email protected]>

* Fix translog UUID mismatch on existing store recovery.

This commit adds PR feedback and recovery tests post node restart.

Signed-off-by: Marc Handalian <[email protected]>

* Fix spotless

Signed-off-by: Marc Handalian <[email protected]>

* Fix bug with remote restore and add more tests

Signed-off-by: Marc Handalian <[email protected]>

---------

Signed-off-by: Marc Handalian <[email protected]>
  • Loading branch information
mch2 authored Jan 10, 2025
1 parent 4d94399 commit 8191de8
Show file tree
Hide file tree
Showing 22 changed files with 951 additions and 356 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Make entries for dependencies from server/build.gradle to gradle version catalog ([#16707](https://github.com/opensearch-project/OpenSearch/pull/16707))
- Allow extended plugins to be optional ([#16909](https://github.com/opensearch-project/OpenSearch/pull/16909))
- Use the correct type to widen the sort fields when merging top docs ([#16881](https://github.com/opensearch-project/OpenSearch/pull/16881))
- Limit reader writer separation to remote store enabled clusters [#16760](https://github.com/opensearch-project/OpenSearch/pull/16760)

### Deprecated
- Performing update operation with default pipeline or final pipeline is deprecated ([#16712](https://github.com/opensearch-project/OpenSearch/pull/16712))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.remotestore.RemoteStoreBaseIntegTestCase;
import org.opensearch.test.OpenSearchIntegTestCase;

import java.util.List;
Expand All @@ -23,7 +24,7 @@
import static org.opensearch.cluster.routing.allocation.decider.SearchReplicaAllocationDecider.SEARCH_REPLICA_ROUTING_INCLUDE_GROUP_SETTING;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class SearchReplicaFilteringAllocationIT extends OpenSearchIntegTestCase {
public class SearchReplicaFilteringAllocationIT extends RemoteStoreBaseIntegTestCase {

@Override
protected Settings featureFlagSettings() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,325 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.indices.replication;

import org.opensearch.action.admin.cluster.health.ClusterHealthResponse;
import org.opensearch.action.admin.cluster.node.stats.NodeStats;
import org.opensearch.action.admin.cluster.node.stats.NodesStatsRequest;
import org.opensearch.action.admin.cluster.node.stats.NodesStatsResponse;
import org.opensearch.action.admin.cluster.remotestore.restore.RestoreRemoteStoreRequest;
import org.opensearch.action.admin.indices.recovery.RecoveryRequest;
import org.opensearch.action.admin.indices.recovery.RecoveryResponse;
import org.opensearch.action.admin.indices.replication.SegmentReplicationStatsResponse;
import org.opensearch.action.support.PlainActionFuture;
import org.opensearch.cluster.health.ClusterHealthStatus;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.routing.RecoverySource;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.index.SegmentReplicationPerGroupStats;
import org.opensearch.index.SegmentReplicationShardStats;
import org.opensearch.indices.recovery.RecoveryState;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.test.InternalTestCluster;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.junit.After;

import java.nio.file.Path;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ExecutionException;

import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS;
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SEARCH_REPLICAS;
import static org.opensearch.cluster.routing.RecoverySource.Type.EMPTY_STORE;
import static org.opensearch.cluster.routing.RecoverySource.Type.EXISTING_STORE;
import static org.opensearch.cluster.routing.allocation.decider.SearchReplicaAllocationDecider.SEARCH_REPLICA_ROUTING_INCLUDE_GROUP_SETTING;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class SearchReplicaReplicationAndRecoveryIT extends SegmentReplicationBaseIT {

private static final String REPOSITORY_NAME = "test-remote-store-repo";
protected Path absolutePath;

@Override
protected Settings nodeSettings(int nodeOrdinal) {
if (absolutePath == null) {
absolutePath = randomRepoPath().toAbsolutePath();
}
return Settings.builder()
.put(super.nodeSettings(nodeOrdinal))
.put(remoteStoreClusterSettings(REPOSITORY_NAME, absolutePath))
.build();
}

@After
public void teardown() {
clusterAdmin().prepareCleanupRepository(REPOSITORY_NAME).get();

}

@Override
public Settings indexSettings() {
return Settings.builder()
.put(super.indexSettings())
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
.put(IndexMetadata.SETTING_NUMBER_OF_SEARCH_REPLICAS, 1)
.build();
}

@Override
protected Settings featureFlagSettings() {
return Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.READER_WRITER_SPLIT_EXPERIMENTAL, true).build();
}

public void testReplication() throws Exception {
internalCluster().startClusterManagerOnlyNode();
final String primary = internalCluster().startDataOnlyNode();
createIndex(INDEX_NAME);
ensureYellowAndNoInitializingShards(INDEX_NAME);
final String replica = internalCluster().startDataOnlyNode();
ensureGreen(INDEX_NAME);

final int docCount = 10;
for (int i = 0; i < docCount; i++) {
client().prepareIndex(INDEX_NAME).setId(Integer.toString(i)).setSource("field", "value" + i).execute().get();
}
refresh(INDEX_NAME);
waitForSearchableDocs(docCount, primary, replica);
}

public void testSegmentReplicationStatsResponseWithSearchReplica() throws Exception {
internalCluster().startClusterManagerOnlyNode();
final List<String> nodes = internalCluster().startDataOnlyNodes(2);
createIndex(
INDEX_NAME,
Settings.builder()
.put("number_of_shards", 1)
.put("number_of_replicas", 0)
.put("number_of_search_only_replicas", 1)
.put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT)
.build()
);
ensureGreen(INDEX_NAME);

final int docCount = 5;
for (int i = 0; i < docCount; i++) {
client().prepareIndex(INDEX_NAME).setId(Integer.toString(i)).setSource("field", "value" + i).execute().get();
}
refresh(INDEX_NAME);
waitForSearchableDocs(docCount, nodes);

SegmentReplicationStatsResponse segmentReplicationStatsResponse = dataNodeClient().admin()
.indices()
.prepareSegmentReplicationStats(INDEX_NAME)
.setDetailed(true)
.execute()
.actionGet();

// Verify the number of indices
assertEquals(1, segmentReplicationStatsResponse.getReplicationStats().size());
// Verify total shards
assertEquals(2, segmentReplicationStatsResponse.getTotalShards());
// Verify the number of primary shards
assertEquals(1, segmentReplicationStatsResponse.getReplicationStats().get(INDEX_NAME).size());

SegmentReplicationPerGroupStats perGroupStats = segmentReplicationStatsResponse.getReplicationStats().get(INDEX_NAME).get(0);
Set<SegmentReplicationShardStats> replicaStats = perGroupStats.getReplicaStats();
// Verify the number of replica stats
assertEquals(1, replicaStats.size());
for (SegmentReplicationShardStats replicaStat : replicaStats) {
assertNotNull(replicaStat.getCurrentReplicationState());
}
}

public void testSearchReplicaRecovery() throws Exception {
internalCluster().startClusterManagerOnlyNode();
final String primary = internalCluster().startDataOnlyNode();
final String replica = internalCluster().startDataOnlyNode();

// ensure search replicas are only allocated to "replica" node.
client().admin()
.cluster()
.prepareUpdateSettings()
.setTransientSettings(Settings.builder().put(SEARCH_REPLICA_ROUTING_INCLUDE_GROUP_SETTING.getKey() + "_name", replica))
.execute()
.actionGet();

createIndex(INDEX_NAME);
ensureGreen(INDEX_NAME);
assertRecoverySourceType(replica, EMPTY_STORE);

final int docCount = 10;
for (int i = 0; i < docCount; i++) {
client().prepareIndex(INDEX_NAME).setId(Integer.toString(i)).setSource("field", "value" + i).execute().get();
}
refresh(INDEX_NAME);
flush(INDEX_NAME);
waitForSearchableDocs(10, primary, replica);

// Node stats should show remote download stats as nonzero, use this as a precondition to compare
// post restart.
assertDownloadStats(replica, true);
NodesStatsResponse nodesStatsResponse;
NodeStats nodeStats;

internalCluster().restartNode(replica);
ensureGreen(INDEX_NAME);
assertDocCounts(10, replica);

// assert existing store recovery
assertRecoverySourceType(replica, EXISTING_STORE);
assertDownloadStats(replica, false);
}

public void testRecoveryAfterDocsIndexed() throws Exception {
internalCluster().startClusterManagerOnlyNode();
final String primary = internalCluster().startDataOnlyNode();
createIndex(INDEX_NAME);
ensureYellowAndNoInitializingShards(INDEX_NAME);
final int docCount = 10;
for (int i = 0; i < docCount; i++) {
client().prepareIndex(INDEX_NAME).setId(Integer.toString(i)).setSource("field", "value" + i).execute().get();
}
refresh(INDEX_NAME);

final String replica = internalCluster().startDataOnlyNode();
ensureGreen(INDEX_NAME);
assertDocCounts(10, replica);

assertRecoverySourceType(replica, EMPTY_STORE);
// replica should have downloaded from remote
assertDownloadStats(replica, true);

client().admin()
.indices()
.prepareUpdateSettings(INDEX_NAME)
.setSettings(Settings.builder().put(SETTING_NUMBER_OF_SEARCH_REPLICAS, 0))
.get();

ensureGreen(INDEX_NAME);

client().admin()
.indices()
.prepareUpdateSettings(INDEX_NAME)
.setSettings(Settings.builder().put(SETTING_NUMBER_OF_SEARCH_REPLICAS, 1))
.get();
ensureGreen(INDEX_NAME);
assertDocCounts(10, replica);

internalCluster().restartNode(replica);

ensureGreen(INDEX_NAME);
assertDocCounts(10, replica);
assertRecoverySourceType(replica, EXISTING_STORE);
assertDownloadStats(replica, false);
}

private static void assertRecoverySourceType(String replica, RecoverySource.Type recoveryType) throws InterruptedException,
ExecutionException {
RecoveryResponse recoveryResponse = client().admin().indices().recoveries(new RecoveryRequest(INDEX_NAME)).get();
for (RecoveryState recoveryState : recoveryResponse.shardRecoveryStates().get(INDEX_NAME)) {
if (recoveryState.getPrimary() == false) {
assertEquals("All SR should be of expected recovery type", recoveryType, recoveryState.getRecoverySource().getType());
assertEquals("All SR should be on the specified node", replica, recoveryState.getTargetNode().getName());
}
}
}

private static void assertDownloadStats(String replica, boolean expectBytesDownloaded) throws InterruptedException, ExecutionException {
NodesStatsResponse nodesStatsResponse = client().admin().cluster().nodesStats(new NodesStatsRequest(replica)).get();
assertEquals(1, nodesStatsResponse.getNodes().size());
NodeStats nodeStats = nodesStatsResponse.getNodes().get(0);
assertEquals(replica, nodeStats.getNode().getName());
if (expectBytesDownloaded) {
assertTrue(nodeStats.getIndices().getSegments().getRemoteSegmentStats().getDownloadBytesStarted() > 0);
} else {
assertEquals(0, nodeStats.getIndices().getSegments().getRemoteSegmentStats().getDownloadBytesStarted());
}
}

public void testStopPrimary_RestoreOnNewNode() throws Exception {
internalCluster().startClusterManagerOnlyNode();
final String primary = internalCluster().startDataOnlyNode();
createIndex(INDEX_NAME);
ensureYellowAndNoInitializingShards(INDEX_NAME);
final int docCount = 10;
for (int i = 0; i < docCount; i++) {
client().prepareIndex(INDEX_NAME).setId(Integer.toString(i)).setSource("field", "value" + i).execute().get();
}
refresh(INDEX_NAME);
assertDocCounts(docCount, primary);

final String replica = internalCluster().startDataOnlyNode();
ensureGreen(INDEX_NAME);
assertDocCounts(docCount, replica);
// stop the primary
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(primary));

assertBusy(() -> {
ClusterHealthResponse clusterHealthResponse = clusterAdmin().prepareHealth(INDEX_NAME).get();
assertEquals(ClusterHealthStatus.RED, clusterHealthResponse.getStatus());
});
assertDocCounts(docCount, replica);

String restoredPrimary = internalCluster().startDataOnlyNode();

client().admin().cluster().restoreRemoteStore(new RestoreRemoteStoreRequest().indices(INDEX_NAME), PlainActionFuture.newFuture());
ensureGreen(INDEX_NAME);
assertDocCounts(docCount, replica, restoredPrimary);

for (int i = docCount; i < docCount * 2; i++) {
client().prepareIndex(INDEX_NAME).setId(Integer.toString(i)).setSource("field", "value" + i).execute().get();
}
refresh(INDEX_NAME);
assertBusy(() -> assertDocCounts(20, replica, restoredPrimary));
}

public void testFailoverToNewPrimaryWithPollingReplication() throws Exception {
internalCluster().startClusterManagerOnlyNode();
final String primary = internalCluster().startDataOnlyNode();
createIndex(INDEX_NAME);
ensureYellowAndNoInitializingShards(INDEX_NAME);
final int docCount = 10;
for (int i = 0; i < docCount; i++) {
client().prepareIndex(INDEX_NAME).setId(Integer.toString(i)).setSource("field", "value" + i).execute().get();
}
refresh(INDEX_NAME);

final String replica = internalCluster().startDataOnlyNode();
ensureGreen(INDEX_NAME);
assertDocCounts(10, replica);

client().admin()
.indices()
.prepareUpdateSettings(INDEX_NAME)
.setSettings(Settings.builder().put(SETTING_NUMBER_OF_REPLICAS, 1))
.get();
final String writer_replica = internalCluster().startDataOnlyNode();
ensureGreen(INDEX_NAME);

// stop the primary
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(primary));

assertBusy(() -> {
ClusterHealthResponse clusterHealthResponse = clusterAdmin().prepareHealth(INDEX_NAME).get();
assertEquals(ClusterHealthStatus.YELLOW, clusterHealthResponse.getStatus());
});
ClusterHealthResponse clusterHealthResponse = clusterAdmin().prepareHealth(INDEX_NAME).get();
assertEquals(ClusterHealthStatus.YELLOW, clusterHealthResponse.getStatus());
assertDocCounts(10, replica);

for (int i = docCount; i < docCount * 2; i++) {
client().prepareIndex(INDEX_NAME).setId(Integer.toString(i)).setSource("field", "value" + i).execute().get();
}
refresh(INDEX_NAME);
assertBusy(() -> assertDocCounts(20, replica, writer_replica));
}
}
Loading

0 comments on commit 8191de8

Please sign in to comment.