diff --git a/buildSrc/src/main/java/org/opensearch/gradle/testclusters/OpenSearchNode.java b/buildSrc/src/main/java/org/opensearch/gradle/testclusters/OpenSearchNode.java index 3840c022d0a67..3f661e154952d 100644 --- a/buildSrc/src/main/java/org/opensearch/gradle/testclusters/OpenSearchNode.java +++ b/buildSrc/src/main/java/org/opensearch/gradle/testclusters/OpenSearchNode.java @@ -174,7 +174,6 @@ public class OpenSearchNode implements TestClusterConfiguration { private boolean isWorkingDirConfigured = false; private String httpPort = "0"; private String transportPort = "0"; - private String streamPort = "0"; private Path confPathData; private String keystorePassword = ""; private boolean preserveDataDir = false; @@ -1176,7 +1175,6 @@ private void createConfiguration() { baseConfig.put("node.portsfile", "true"); baseConfig.put("http.port", httpPort); baseConfig.put("transport.port", transportPort); - baseConfig.put("node.attr.transport.stream.port", streamPort); // Default the watermarks to absurdly low to prevent the tests from failing on nodes without enough disk space baseConfig.put("cluster.routing.allocation.disk.watermark.low", "1b"); @@ -1450,10 +1448,6 @@ void setTransportPort(String transportPort) { this.transportPort = transportPort; } - void setStreamPort(String streamPort) { - this.streamPort = streamPort; - } - void setDataPath(Path dataPath) { this.confPathData = dataPath; } diff --git a/buildSrc/src/main/java/org/opensearch/gradle/testclusters/RunTask.java b/buildSrc/src/main/java/org/opensearch/gradle/testclusters/RunTask.java index cf7a504fd3893..8781631bf8761 100644 --- a/buildSrc/src/main/java/org/opensearch/gradle/testclusters/RunTask.java +++ b/buildSrc/src/main/java/org/opensearch/gradle/testclusters/RunTask.java @@ -61,7 +61,6 @@ public class RunTask extends DefaultTestClustersTask { public static final String CUSTOM_SETTINGS_PREFIX = "tests.opensearch."; private static final int DEFAULT_HTTP_PORT = 9200; private static final int DEFAULT_TRANSPORT_PORT = 9300; - private static final int DEFAULT_STREAM_PORT = 9880; private static final int DEFAULT_DEBUG_PORT = 5005; public static final String LOCALHOST_ADDRESS_PREFIX = "127.0.0.1:"; @@ -141,7 +140,6 @@ public void beforeStart() { int debugPort = DEFAULT_DEBUG_PORT; int httpPort = DEFAULT_HTTP_PORT; int transportPort = DEFAULT_TRANSPORT_PORT; - int streamPort = DEFAULT_STREAM_PORT; Map additionalSettings = System.getProperties() .entrySet() @@ -167,9 +165,7 @@ public void beforeStart() { firstNode.setHttpPort(String.valueOf(httpPort)); httpPort++; firstNode.setTransportPort(String.valueOf(transportPort)); - firstNode.setStreamPort(String.valueOf(streamPort)); transportPort++; - streamPort++; firstNode.setting("discovery.seed_hosts", LOCALHOST_ADDRESS_PREFIX + DEFAULT_TRANSPORT_PORT); cluster.setPreserveDataDir(preserveData); for (OpenSearchNode node : cluster.getNodes()) { @@ -177,9 +173,7 @@ public void beforeStart() { node.setHttpPort(String.valueOf(httpPort)); httpPort++; node.setTransportPort(String.valueOf(transportPort)); - node.setStreamPort(String.valueOf(streamPort)); transportPort++; - streamPort++; node.setting("discovery.seed_hosts", LOCALHOST_ADDRESS_PREFIX + DEFAULT_TRANSPORT_PORT); } additionalSettings.forEach(node::setting); diff --git a/modules/arrow-flight-rpc/src/internalClusterTest/java/org/opensearch/arrow/flight/ArrowFlightServerIT.java b/modules/arrow-flight-rpc/src/internalClusterTest/java/org/opensearch/arrow/flight/ArrowFlightServerIT.java index 28a3374fc6ae6..8bf127f7e697b 100644 --- a/modules/arrow-flight-rpc/src/internalClusterTest/java/org/opensearch/arrow/flight/ArrowFlightServerIT.java +++ b/modules/arrow-flight-rpc/src/internalClusterTest/java/org/opensearch/arrow/flight/ArrowFlightServerIT.java @@ -25,7 +25,7 @@ import java.util.Set; import java.util.concurrent.TimeUnit; -@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.SUITE, numDataNodes = 3) +@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.SUITE, numDataNodes = 5) public class ArrowFlightServerIT extends OpenSearchIntegTestCase { private FlightClientManager flightClientManager; diff --git a/modules/arrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/BaseFlightStreamPlugin.java b/modules/arrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/BaseFlightStreamPlugin.java index 2fd256af1dd89..1ae8c1fad4d5a 100644 --- a/modules/arrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/BaseFlightStreamPlugin.java +++ b/modules/arrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/BaseFlightStreamPlugin.java @@ -12,22 +12,29 @@ import org.opensearch.client.Client; import org.opensearch.cluster.metadata.IndexNameExpressionResolver; import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.cluster.node.DiscoveryNodes; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.network.NetworkService; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.IndexScopedSettings; import org.opensearch.common.settings.Setting; import org.opensearch.common.settings.Settings; +import org.opensearch.common.settings.SettingsFilter; import org.opensearch.common.util.PageCacheRecycler; import org.opensearch.core.common.io.stream.NamedWriteableRegistry; import org.opensearch.core.indices.breaker.CircuitBreakerService; import org.opensearch.core.xcontent.NamedXContentRegistry; import org.opensearch.env.Environment; import org.opensearch.env.NodeEnvironment; +import org.opensearch.plugins.ActionPlugin; import org.opensearch.plugins.ClusterPlugin; import org.opensearch.plugins.NetworkPlugin; import org.opensearch.plugins.Plugin; import org.opensearch.plugins.SecureTransportSettingsProvider; import org.opensearch.plugins.StreamManagerPlugin; import org.opensearch.repositories.RepositoriesService; +import org.opensearch.rest.RestController; +import org.opensearch.rest.RestHandler; import org.opensearch.script.ScriptService; import org.opensearch.telemetry.tracing.Tracer; import org.opensearch.threadpool.ExecutorBuilder; @@ -44,7 +51,7 @@ * BaseFlightStreamPlugin is a plugin that implements the StreamManagerPlugin interface. * It provides the necessary components for handling flight streams in the OpenSearch cluster. */ -public abstract class BaseFlightStreamPlugin extends Plugin implements StreamManagerPlugin, NetworkPlugin, ClusterPlugin { +public abstract class BaseFlightStreamPlugin extends Plugin implements StreamManagerPlugin, NetworkPlugin, ClusterPlugin, ActionPlugin { /** * Constructor for BaseFlightStreamPlugin. @@ -107,6 +114,16 @@ public abstract Map> getSecureTransports( Tracer tracer ); + @Override + public abstract Map> getAuxTransports( + Settings settings, + ThreadPool threadPool, + CircuitBreakerService circuitBreakerService, + NetworkService networkService, + ClusterSettings clusterSettings, + Tracer tracer + ); + /** * Returns the StreamManager instance for managing flight streams. */ @@ -132,4 +149,18 @@ public abstract Map> getSecureTransports( */ @Override public abstract void onNodeStarted(DiscoveryNode localNode); + + @Override + public abstract List getRestHandlers( + Settings settings, + RestController restController, + ClusterSettings clusterSettings, + IndexScopedSettings indexScopedSettings, + SettingsFilter settingsFilter, + IndexNameExpressionResolver indexNameExpressionResolver, + Supplier nodesInCluster + ); + + @Override + public abstract List> getActions(); } diff --git a/modules/arrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/FlightStreamPlugin.java b/modules/arrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/FlightStreamPlugin.java index 1911c922ef8da..df3ada46c2964 100644 --- a/modules/arrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/FlightStreamPlugin.java +++ b/modules/arrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/FlightStreamPlugin.java @@ -13,11 +13,15 @@ import org.opensearch.client.Client; import org.opensearch.cluster.metadata.IndexNameExpressionResolver; import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.cluster.node.DiscoveryNodes; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.annotation.ExperimentalApi; import org.opensearch.common.network.NetworkService; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.IndexScopedSettings; import org.opensearch.common.settings.Setting; import org.opensearch.common.settings.Settings; +import org.opensearch.common.settings.SettingsFilter; import org.opensearch.common.util.FeatureFlags; import org.opensearch.common.util.PageCacheRecycler; import org.opensearch.core.common.io.stream.NamedWriteableRegistry; @@ -27,6 +31,8 @@ import org.opensearch.env.NodeEnvironment; import org.opensearch.plugins.SecureTransportSettingsProvider; import org.opensearch.repositories.RepositoriesService; +import org.opensearch.rest.RestController; +import org.opensearch.rest.RestHandler; import org.opensearch.script.ScriptService; import org.opensearch.telemetry.tracing.Tracer; import org.opensearch.threadpool.ExecutorBuilder; @@ -90,6 +96,18 @@ public Map> getSecureTransports( return Map.of(); } + @Override + public Map> getAuxTransports( + Settings settings, + ThreadPool threadPool, + CircuitBreakerService circuitBreakerService, + NetworkService networkService, + ClusterSettings clusterSettings, + Tracer tracer + ) { + return Map.of(); + } + @Override public Supplier getStreamManager() { return () -> null; @@ -109,6 +127,24 @@ public List> getSettings() { public void onNodeStarted(DiscoveryNode localNode) { } + + @Override + public List getRestHandlers( + Settings settings, + RestController restController, + ClusterSettings clusterSettings, + IndexScopedSettings indexScopedSettings, + SettingsFilter settingsFilter, + IndexNameExpressionResolver indexNameExpressionResolver, + Supplier nodesInCluster + ) { + return List.of(); + } + + @Override + public List> getActions() { + return List.of(); + } }; } } @@ -192,6 +228,44 @@ public Map> getSecureTransports( ); } + @Override + public Map> getAuxTransports( + Settings settings, + ThreadPool threadPool, + CircuitBreakerService circuitBreakerService, + NetworkService networkService, + ClusterSettings clusterSettings, + Tracer tracer + ) { + return delegate.getAuxTransports(settings, threadPool, circuitBreakerService, networkService, clusterSettings, tracer); + } + + @Override + public List getRestHandlers( + Settings settings, + RestController restController, + ClusterSettings clusterSettings, + IndexScopedSettings indexScopedSettings, + SettingsFilter settingsFilter, + IndexNameExpressionResolver indexNameExpressionResolver, + Supplier nodesInCluster + ) { + return delegate.getRestHandlers( + settings, + restController, + clusterSettings, + indexScopedSettings, + settingsFilter, + indexNameExpressionResolver, + nodesInCluster + ); + } + + @Override + public List> getActions() { + return delegate.getActions(); + } + /** * Gets the StreamManager instance for managing flight streams. */ diff --git a/modules/arrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/api/FlightServerInfoAction.java b/modules/arrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/api/FlightServerInfoAction.java new file mode 100644 index 0000000000000..d47092e030662 --- /dev/null +++ b/modules/arrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/api/FlightServerInfoAction.java @@ -0,0 +1,45 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ +package org.opensearch.arrow.flight.api; + +import org.opensearch.client.node.NodeClient; +import org.opensearch.rest.BaseRestHandler; +import org.opensearch.rest.RestRequest; +import org.opensearch.rest.action.RestToXContentListener; + +import java.util.List; + +import static org.opensearch.rest.RestRequest.Method.GET; + +public class FlightServerInfoAction extends BaseRestHandler { + + public FlightServerInfoAction() {} + + @Override + public String getName() { + return "flight_server_info_action"; + } + + @Override + public List routes() { + return List.of(new Route(GET, "/_flight/info"), new Route(GET, "/_flight/info/{nodeId}")); + } + + @Override + protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) { + String nodeId = request.param("nodeId"); + if (nodeId != null) { + // Query specific node + NodesFlightInfoRequest nodesRequest = new NodesFlightInfoRequest(nodeId); + return channel -> client.execute(NodesFlightInfoAction.INSTANCE, nodesRequest, new RestToXContentListener<>(channel)); + } else { + NodesFlightInfoRequest nodesRequest = new NodesFlightInfoRequest(); + return channel -> client.execute(NodesFlightInfoAction.INSTANCE, nodesRequest, new RestToXContentListener<>(channel)); + } + } +} diff --git a/modules/arrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/api/NodeFlightInfo.java b/modules/arrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/api/NodeFlightInfo.java new file mode 100644 index 0000000000000..416f01b169a51 --- /dev/null +++ b/modules/arrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/api/NodeFlightInfo.java @@ -0,0 +1,70 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.arrow.flight.api; + +import org.opensearch.action.support.nodes.BaseNodeResponse; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.common.io.stream.StreamOutput; +import org.opensearch.core.common.transport.BoundTransportAddress; +import org.opensearch.core.common.transport.TransportAddress; +import org.opensearch.core.xcontent.ToXContent; +import org.opensearch.core.xcontent.ToXContentObject; +import org.opensearch.core.xcontent.XContentBuilder; + +import java.io.IOException; + +public class NodeFlightInfo extends BaseNodeResponse implements ToXContentObject { + private final BoundTransportAddress boundAddress; + + public NodeFlightInfo(StreamInput in) throws IOException { + super(in); + boundAddress = new BoundTransportAddress(in); + } + + public NodeFlightInfo(DiscoveryNode node, BoundTransportAddress boundAddress) { + super(node); + this.boundAddress = boundAddress; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + boundAddress.writeTo(out); + } + + public BoundTransportAddress getBoundAddress() { + return boundAddress; + } + + public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) throws IOException { + builder.startObject(); + builder.startObject("flight_server"); + + builder.startArray("bound_addresses"); + for (TransportAddress address : boundAddress.boundAddresses()) { + builder.startObject(); + builder.field("host", address.address().getHostString()); + builder.field("port", address.address().getPort()); + builder.endObject(); + } + builder.endArray(); + + TransportAddress publishAddress = boundAddress.publishAddress(); + builder.startObject("publish_address"); + builder.field("host", publishAddress.address().getHostString()); + builder.field("port", publishAddress.address().getPort()); + builder.endObject(); + + builder.endObject(); + builder.endObject(); + return builder; + } + +} diff --git a/modules/arrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/api/NodesFlightInfoAction.java b/modules/arrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/api/NodesFlightInfoAction.java new file mode 100644 index 0000000000000..a970c500b92e2 --- /dev/null +++ b/modules/arrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/api/NodesFlightInfoAction.java @@ -0,0 +1,20 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.arrow.flight.api; + +import org.opensearch.action.ActionType; + +public class NodesFlightInfoAction extends ActionType { + public static final NodesFlightInfoAction INSTANCE = new NodesFlightInfoAction(); + public static final String NAME = "cluster:admin/flight/info"; + + private NodesFlightInfoAction() { + super(NAME, NodesFlightInfoResponse::new); + } +} diff --git a/modules/arrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/api/NodesFlightInfoRequest.java b/modules/arrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/api/NodesFlightInfoRequest.java new file mode 100644 index 0000000000000..7ea7a73477ba2 --- /dev/null +++ b/modules/arrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/api/NodesFlightInfoRequest.java @@ -0,0 +1,44 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.arrow.flight.api; + +import org.opensearch.action.support.nodes.BaseNodesRequest; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.common.io.stream.StreamOutput; +import org.opensearch.transport.TransportRequest; + +import java.io.IOException; + +public class NodesFlightInfoRequest extends BaseNodesRequest { + + public NodesFlightInfoRequest(StreamInput in) throws IOException { + super(in); + } + + public NodesFlightInfoRequest(String... nodesIds) { + super(nodesIds); + } + + public static class NodeFlightInfoRequest extends TransportRequest { + NodesFlightInfoRequest request; + + public NodeFlightInfoRequest(StreamInput in) throws IOException { + super(in); + } + + NodeFlightInfoRequest(NodesFlightInfoRequest request) { + this.request = request; + } + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + } +} diff --git a/modules/arrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/api/NodesFlightInfoResponse.java b/modules/arrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/api/NodesFlightInfoResponse.java new file mode 100644 index 0000000000000..6ba21a60a729e --- /dev/null +++ b/modules/arrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/api/NodesFlightInfoResponse.java @@ -0,0 +1,73 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.arrow.flight.api; + +import org.opensearch.action.FailedNodeException; +import org.opensearch.action.support.nodes.BaseNodesResponse; +import org.opensearch.cluster.ClusterName; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.common.io.stream.StreamOutput; +import org.opensearch.core.xcontent.ToXContentObject; +import org.opensearch.core.xcontent.XContentBuilder; + +import java.io.IOException; +import java.util.List; + +public class NodesFlightInfoResponse extends BaseNodesResponse implements ToXContentObject { + public NodesFlightInfoResponse(StreamInput in) throws IOException { + super(in); + } + + public NodesFlightInfoResponse(ClusterName clusterName, List nodes, List failures) { + super(clusterName, nodes, failures); + } + + @Override + protected List readNodesFrom(StreamInput in) throws IOException { + return in.readList(NodeFlightInfo::new); + } + + @Override + protected void writeNodesTo(StreamOutput out, List nodes) throws IOException { + out.writeList(nodes); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + builder.startObject("_nodes"); + builder.field("total", getNodes().size()); + builder.field("successful", getNodes().size()); + builder.field("failed", failures().size()); + builder.endObject(); + + builder.field("cluster_name", getClusterName().value()); + + builder.startObject("nodes"); + for (NodeFlightInfo nodeInfo : getNodes()) { + builder.field(nodeInfo.getNode().getId()); + nodeInfo.toXContent(builder, params); + } + builder.endObject(); + + if (!failures().isEmpty()) { + builder.startArray("failures"); + for (FailedNodeException failure : failures()) { + builder.startObject(); + builder.field("node_id", failure.nodeId()); + builder.field("reason", failure.getMessage()); + builder.endObject(); + } + builder.endArray(); + } + + builder.endObject(); + return builder; + } +} diff --git a/modules/arrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/api/TransportNodesFlightInfoAction.java b/modules/arrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/api/TransportNodesFlightInfoAction.java new file mode 100644 index 0000000000000..b31ce2df34129 --- /dev/null +++ b/modules/arrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/api/TransportNodesFlightInfoAction.java @@ -0,0 +1,79 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.arrow.flight.api; + +import org.opensearch.action.FailedNodeException; +import org.opensearch.action.support.ActionFilters; +import org.opensearch.action.support.nodes.TransportNodesAction; +import org.opensearch.arrow.flight.bootstrap.FlightService; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.inject.Inject; +import org.opensearch.common.settings.Settings; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.threadpool.ThreadPool; +import org.opensearch.transport.TransportService; + +import java.io.IOException; +import java.util.List; + +public class TransportNodesFlightInfoAction extends TransportNodesAction< + NodesFlightInfoRequest, + NodesFlightInfoResponse, + NodesFlightInfoRequest.NodeFlightInfoRequest, + NodeFlightInfo> { + + private final FlightService flightService; + + @Inject + public TransportNodesFlightInfoAction( + Settings settings, + ThreadPool threadPool, + ClusterService clusterService, + TransportService transportService, + ActionFilters actionFilters, + FlightService flightService + ) { + super( + NodesFlightInfoAction.NAME, + threadPool, + clusterService, + transportService, + actionFilters, + NodesFlightInfoRequest::new, + NodesFlightInfoRequest.NodeFlightInfoRequest::new, + ThreadPool.Names.MANAGEMENT, + NodeFlightInfo.class + ); + this.flightService = flightService; + } + + @Override + protected NodesFlightInfoResponse newResponse( + NodesFlightInfoRequest request, + List nodeFlightInfos, + List failures + ) { + return new NodesFlightInfoResponse(clusterService.getClusterName(), nodeFlightInfos, failures); + } + + @Override + protected NodesFlightInfoRequest.NodeFlightInfoRequest newNodeRequest(NodesFlightInfoRequest request) { + return new NodesFlightInfoRequest.NodeFlightInfoRequest(request); + } + + @Override + protected NodeFlightInfo newNodeResponse(StreamInput in) throws IOException { + return new NodeFlightInfo(in); + } + + @Override + protected NodeFlightInfo nodeOperation(NodesFlightInfoRequest.NodeFlightInfoRequest request) { + return new NodeFlightInfo(clusterService.localNode(), flightService.getBoundAddress()); + } +} diff --git a/modules/arrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/bootstrap/FlightClientManager.java b/modules/arrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/bootstrap/FlightClientManager.java index 19bdbb4df13b2..7c84710e64dd9 100644 --- a/modules/arrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/bootstrap/FlightClientManager.java +++ b/modules/arrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/bootstrap/FlightClientManager.java @@ -11,19 +11,33 @@ import org.apache.arrow.flight.OSFlightClient; import org.apache.arrow.memory.BufferAllocator; import org.apache.arrow.util.VisibleForTesting; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.opensearch.Version; +import org.opensearch.arrow.flight.api.NodeFlightInfo; +import org.opensearch.arrow.flight.api.NodesFlightInfoAction; +import org.opensearch.arrow.flight.api.NodesFlightInfoRequest; +import org.opensearch.arrow.flight.api.NodesFlightInfoResponse; import org.opensearch.arrow.flight.bootstrap.tls.SslContextProvider; +import org.opensearch.client.Client; import org.opensearch.cluster.ClusterChangedEvent; import org.opensearch.cluster.ClusterStateListener; import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.cluster.node.DiscoveryNodes; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.util.FeatureFlags; +import org.opensearch.core.action.ActionListener; +import org.opensearch.core.common.transport.TransportAddress; import java.util.Map; import java.util.Objects; import java.util.Set; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.function.Function; import io.netty.channel.EventLoopGroup; @@ -40,25 +54,30 @@ */ public class FlightClientManager implements ClusterStateListener, AutoCloseable { private static final Version MIN_SUPPORTED_VERSION = Version.fromString("3.0.0"); - + private static final Logger logger = LogManager.getLogger(FlightClientManager.class); + static final int LOCATION_TIMEOUT_MS = 1000; private final ClientPool clientPool; private final ClientConfiguration clientConfig; + private final Client client; + private final Map nodeLocations = new ConcurrentHashMap<>(); /** * Creates a new FlightClientManager instance. * - * @param allocator Supplier for buffer allocation - * @param clusterService Service for cluster state management + * @param allocator Supplier for buffer allocation + * @param clusterService Service for cluster state management * @param sslContextProvider Provider for SSL/TLS context configuration - * @param elg Event loop group for network operations - * @param grpcExecutor Executor service for gRPC operations + * @param elg Event loop group for network operations + * @param grpcExecutor Executor service for gRPC operations + * @param client OpenSearch client */ public FlightClientManager( BufferAllocator allocator, ClusterService clusterService, SslContextProvider sslContextProvider, EventLoopGroup elg, - ExecutorService grpcExecutor + ExecutorService grpcExecutor, + Client client ) { this.clientConfig = new ClientConfiguration( Objects.requireNonNull(allocator, "BufferAllocator cannot be null"), @@ -67,28 +86,29 @@ public FlightClientManager( Objects.requireNonNull(elg, "EventLoopGroup cannot be null"), Objects.requireNonNull(grpcExecutor, "ExecutorService cannot be null") ); + this.client = Objects.requireNonNull(client, "Client cannot be null"); this.clientPool = new ClientPool(); clusterService.addListener(this); } /** * Returns a Flight client for a given node ID. + * * @param nodeId The ID of the node for which to retrieve the Flight client * @return An OpenSearchFlightClient instance for the specified node */ public OSFlightClient getFlightClient(String nodeId) { - FlightClientHolder clientHolder = clientPool.getOrCreateClient(nodeId, this::buildFlightClient); - return clientHolder == null ? null : clientHolder.flightClient; + return clientPool.getOrCreateClient(nodeId, this::buildFlightClient); } /** * Returns the location of a Flight client for a given node ID. + * * @param nodeId The ID of the node for which to retrieve the location * @return The Location of the Flight client for the specified node */ public Location getFlightClientLocation(String nodeId) { - FlightClientHolder clientHolder = clientPool.getOrCreateClient(nodeId, this::buildFlightClient); - return clientHolder == null ? null : clientHolder.location; + return nodeLocations.get(nodeId); } /** @@ -105,36 +125,97 @@ public String getLocalNodeId() { */ @Override public void close() throws Exception { + nodeLocations.clear(); clientPool.close(); } - private FlightClientHolder buildFlightClient(String nodeId) { + @Override + public void clusterChanged(ClusterChangedEvent event) { + if (event.nodesChanged()) { + updateNodeLocations(event.state().nodes()); + } + } + + private void updateNodeLocations(DiscoveryNodes nodes) { + nodeLocations.keySet().removeIf(nodeId -> !nodes.nodeExists(nodeId)); + for (DiscoveryNode node : nodes) { + if (!nodeLocations.containsKey(node.getId()) && isValidNode(node)) { + CompletableFuture locationFuture = new CompletableFuture<>(); + requestNodeLocation(node, locationFuture); + locationFuture.thenAccept(location -> { nodeLocations.put(node.getId(), location); }).exceptionally(throwable -> { + logger.error("Failed to get Flight server location for node: {}", node.getId(), throwable); + return null; + }); + } + } + } + + private OSFlightClient buildFlightClient(String nodeId) { DiscoveryNode node = getNodeFromCluster(nodeId); if (!isValidNode(node)) { return null; } - Location location = createClientLocation(node); - OSFlightClient client = buildClient(location); - return new FlightClientHolder(client, location); + Location location = nodeLocations.get(nodeId); + if (location != null) { + return buildClient(location); + } + + // If location is not available, request it + CompletableFuture locationFuture = new CompletableFuture<>(); + + requestNodeLocation(node, locationFuture); + + try { + // Wait for a limited time to get the location + location = locationFuture.get(LOCATION_TIMEOUT_MS, TimeUnit.MILLISECONDS); + return buildClient(location); + } catch (TimeoutException e) { + throw new IllegalStateException("Timeout waiting for Flight server location for node: " + nodeId, e); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IllegalStateException("Interrupted while waiting for Flight server location", e); + } catch (ExecutionException e) { + throw new IllegalStateException("Error getting Flight server location for node: " + nodeId, e.getCause()); + } + } + + private void requestNodeLocation(DiscoveryNode node, CompletableFuture future) { + NodesFlightInfoRequest request = new NodesFlightInfoRequest(node.getId()); + client.execute(NodesFlightInfoAction.INSTANCE, request, new ActionListener<>() { + @Override + public void onResponse(NodesFlightInfoResponse response) { + NodeFlightInfo nodeInfo = response.getNodesMap().get(node.getId()); + if (nodeInfo != null) { + TransportAddress publishAddress = nodeInfo.getBoundAddress().publishAddress(); + String address = node.getAddress().getAddress(); + int flightPort = publishAddress.address().getPort(); + Location location = clientConfig.sslContextProvider.isSslEnabled() + ? Location.forGrpcTls(address, flightPort) + : Location.forGrpcInsecure(address, flightPort); + + future.complete(location); + } else { + future.completeExceptionally(new IllegalStateException("No Flight info received for node: " + node.getId())); + } + } + + @Override + public void onFailure(Exception e) { + future.completeExceptionally(e); + logger.error("Failed to get Flight server info for node: {}", node.getId(), e); + } + }); } private DiscoveryNode getNodeFromCluster(String nodeId) { return Objects.requireNonNull(clientConfig.clusterService).state().nodes().get(nodeId); } - private boolean isValidNode(DiscoveryNode node) { + private static boolean isValidNode(DiscoveryNode node) { return node != null && !node.getVersion().before(MIN_SUPPORTED_VERSION) && FeatureFlags.isEnabled(ARROW_STREAMS_SETTING); } - private Location createClientLocation(DiscoveryNode node) { - String address = node.getAddress().getAddress(); - int clientPort = Integer.parseInt(node.getAttributes().get("transport.stream.port")); - return clientConfig.sslContextProvider.isSslEnabled() - ? Location.forGrpcTls(address, clientPort) - : Location.forGrpcInsecure(address, clientPort); - } - private OSFlightClient buildClient(Location location) { return OSFlightClient.builder( clientConfig.allocator, @@ -146,17 +227,6 @@ private OSFlightClient buildClient(Location location) { ).build(); } - /** - * Initializes the Flight clients for the current cluster state. - * @param event The ClusterChangedEvent containing the current cluster state - */ - @Override - public void clusterChanged(ClusterChangedEvent event) { - if (event.nodesChanged()) { - updateFlightClients(); - } - } - @VisibleForTesting void updateFlightClients() { Set currentNodes = getCurrentClusterNodes(); @@ -175,13 +245,10 @@ private void initializeFlightClients() { } @VisibleForTesting - Map getFlightClients() { + Map getFlightClients() { return clientPool.getClients(); } - /** - * Holds configuration for Flight client creation - */ private static class ClientConfiguration { private final BufferAllocator allocator; private final ClusterService clusterService; @@ -208,9 +275,9 @@ private static class ClientConfiguration { * Manages the pool of Flight clients */ private static class ClientPool implements AutoCloseable { - private final Map flightClients = new ConcurrentHashMap<>(); + private final Map flightClients = new ConcurrentHashMap<>(); - FlightClientHolder getOrCreateClient(String nodeId, Function clientBuilder) { + OSFlightClient getOrCreateClient(String nodeId, Function clientBuilder) { return flightClients.computeIfAbsent(nodeId, clientBuilder); } @@ -218,29 +285,16 @@ void removeStaleClients(Set currentNodes) { flightClients.keySet().removeIf(nodeId -> !currentNodes.contains(nodeId)); } - Map getClients() { + Map getClients() { return flightClients; } @Override public void close() throws Exception { - for (FlightClientHolder clientHolder : flightClients.values()) { - clientHolder.flightClient.close(); + for (OSFlightClient flightClient : flightClients.values()) { + flightClient.close(); } flightClients.clear(); } } - - /** - * Holds a Flight client and its associated location - */ - private static class FlightClientHolder { - final OSFlightClient flightClient; - final Location location; - - FlightClientHolder(OSFlightClient flightClient, Location location) { - this.flightClient = Objects.requireNonNull(flightClient, "FlightClient cannot be null"); - this.location = Objects.requireNonNull(location, "Location cannot be null"); - } - } } diff --git a/modules/arrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/bootstrap/FlightServerTransport.java b/modules/arrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/bootstrap/FlightServerTransport.java new file mode 100644 index 0000000000000..8327dfd04ce55 --- /dev/null +++ b/modules/arrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/bootstrap/FlightServerTransport.java @@ -0,0 +1,169 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.arrow.flight.bootstrap; + +import org.opensearch.common.network.NetworkService; +import org.opensearch.common.settings.Setting; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.transport.PortsRange; +import org.opensearch.core.common.transport.BoundTransportAddress; +import org.opensearch.core.common.transport.TransportAddress; +import org.opensearch.plugins.NetworkPlugin; +import org.opensearch.transport.BindTransportException; + +import java.io.IOException; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Function; + +import static java.util.Collections.emptyList; +import static org.opensearch.common.settings.Setting.intSetting; +import static org.opensearch.common.settings.Setting.listSetting; +import static org.opensearch.transport.Transport.resolveTransportPublishPort; + +public class FlightServerTransport extends NetworkPlugin.AuxTransport { + + public static final String FLIGHT_TRANSPORT_SETTING_KEY = "transport-flight"; + + public static final Setting SETTING_FLIGHT_PORTS = AUX_TRANSPORT_PORTS.getConcreteSettingForNamespace( + FLIGHT_TRANSPORT_SETTING_KEY + ); + + public static final Setting> SETTING_FLIGHT_HOST = listSetting( + "flight.host", + emptyList(), + Function.identity(), + Setting.Property.NodeScope + ); + + public static final Setting> SETTING_FLIGHT_BIND_HOST = listSetting( + "flight.bind_host", + SETTING_FLIGHT_HOST, + Function.identity(), + Setting.Property.NodeScope + ); + + public static final Setting> SETTING_FLIGHT_PUBLISH_HOST = listSetting( + "flight.publish_host", + SETTING_FLIGHT_HOST, + Function.identity(), + Setting.Property.NodeScope + ); + + public static final Setting SETTING_FLIGHT_PUBLISH_PORT = intSetting( + "flight.publish_port", + -1, + -1, + Setting.Property.NodeScope + ); + + private final Settings settings; + private final NetworkService networkService; + private final PortsRange port; + private final String[] bindHosts; + private final String[] publishHosts; + private volatile BoundTransportAddress boundAddress; + private final Function startServerCallback; + + public FlightServerTransport( + Settings settings, + NetworkService networkService, + Function startServerCallback + ) { + this.settings = settings; + this.networkService = networkService; + this.port = SETTING_FLIGHT_PORTS.get(settings); + + List bindHosts = SETTING_FLIGHT_BIND_HOST.get(settings); + this.bindHosts = bindHosts.toArray(new String[0]); + + List publishHosts = SETTING_FLIGHT_PUBLISH_HOST.get(settings); + this.publishHosts = publishHosts.toArray(new String[0]); + this.startServerCallback = startServerCallback; + } + + @Override + protected void doStart() { + InetAddress[] hostAddresses; + try { + hostAddresses = networkService.resolveBindHostAddresses(bindHosts); + } catch (IOException e) { + throw new BindTransportException("Failed to resolve host [" + Arrays.toString(bindHosts) + "]", e); + } + + List boundAddresses = new ArrayList<>(hostAddresses.length); + for (InetAddress address : hostAddresses) { + boundAddresses.add(bindAddress(address, port)); + } + + final InetAddress publishInetAddress; + try { + publishInetAddress = networkService.resolvePublishHostAddresses(publishHosts); + } catch (Exception e) { + throw new BindTransportException("Failed to resolve publish address", e); + } + + final int publishPort = resolveTransportPublishPort(SETTING_FLIGHT_PUBLISH_PORT.get(settings), boundAddresses, publishInetAddress); + + if (publishPort < 0) { + throw new BindTransportException( + "Failed to auto-resolve flight publish port, multiple bound addresses " + + boundAddresses + + " with distinct ports and none of them matched the publish address (" + + publishInetAddress + + "). Please specify a unique port by setting " + + SETTING_FLIGHT_PUBLISH_PORT.getKey() + ); + } + + TransportAddress publishAddress = new TransportAddress(new InetSocketAddress(publishInetAddress, publishPort)); + this.boundAddress = new BoundTransportAddress(boundAddresses.toArray(new TransportAddress[0]), publishAddress); + } + + public BoundTransportAddress boundAddress() { + return boundAddress; + } + + private TransportAddress bindAddress(final InetAddress hostAddress, final PortsRange portsRange) { + final AtomicReference lastException = new AtomicReference<>(); + final AtomicReference boundSocket = new AtomicReference<>(); + final TransportAddress[] address = new TransportAddress[1]; + boolean success = portsRange.iterate(portNumber -> { + boundSocket.set(new InetSocketAddress(hostAddress, portNumber)); + address[0] = new TransportAddress(boundSocket.get()); + try { + return startServerCallback.apply(address[0]); + } catch (Exception e) { + lastException.set(e); + return false; + } + }); + + if (!success) { + throw new BindTransportException("Failed to bind to [" + hostAddress + "]", lastException.get()); + } + + return address[0]; + } + + @Override + protected void doStop() { + + } + + @Override + protected void doClose() { + + } + +} diff --git a/modules/arrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/bootstrap/FlightService.java b/modules/arrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/bootstrap/FlightService.java index d1497a224a53c..1621dada6d4db 100644 --- a/modules/arrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/bootstrap/FlightService.java +++ b/modules/arrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/bootstrap/FlightService.java @@ -26,16 +26,20 @@ import org.opensearch.arrow.spi.StreamReader; import org.opensearch.arrow.spi.StreamTicket; import org.opensearch.arrow.spi.StreamTicketFactory; +import org.opensearch.client.Client; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.node.DiscoveryNodeRole; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.lifecycle.AbstractLifecycleComponent; import org.opensearch.common.settings.Settings; +import org.opensearch.core.common.transport.BoundTransportAddress; +import org.opensearch.core.common.transport.TransportAddress; import org.opensearch.core.tasks.TaskId; import org.opensearch.plugins.SecureTransportSettingsProvider; import org.opensearch.threadpool.ThreadPool; import java.io.IOException; +import java.net.InetSocketAddress; import java.security.AccessController; import java.security.PrivilegedExceptionAction; import java.util.Objects; @@ -59,10 +63,11 @@ public class FlightService extends AbstractLifecycleComponent { private static final String GRPC_WORKER_ELG = "os-grpc-worker-ELG"; private static final String GRPC_BOSS_ELG = "os-grpc-boss-ELG"; private static final int SHUTDOWN_TIMEOUT_SECONDS = 5; - private static final String TRANSPORT_STREAM_PORT = "transport.stream.port"; + private FlightServerTransport flightTransport; private final ServerComponents serverComponents; private final NetworkResources networkResources; + Client client; /** * Constructor for FlightService. @@ -89,12 +94,15 @@ private void initializeServerConfig(Settings settings) { /** * Initializes the FlightService with the provided ClusterService and ThreadPool. * It sets up the SSL context provider, client manager, and stream manager. + * * @param clusterService The ClusterService instance. - * @param threadPool The ThreadPool instance. + * @param threadPool The ThreadPool instance. + * @param client */ - public void initialize(ClusterService clusterService, ThreadPool threadPool) { + public void initialize(ClusterService clusterService, ThreadPool threadPool, Client client) { serverComponents.setClusterService(Objects.requireNonNull(clusterService, "ClusterService cannot be null")); serverComponents.setThreadPool(Objects.requireNonNull(threadPool, "ThreadPool cannot be null")); + this.client = client; } /** @@ -107,12 +115,25 @@ public void setSecureTransportSettingsProvider(SecureTransportSettingsProvider s ); } + public void setFlightTransport(FlightServerTransport flightTransport) { + this.flightTransport = flightTransport; + } + /** * Starts the FlightService by initializing the stream manager. */ @Override protected void doStart() { - serverComponents.initializeStreamManager(); + try { + serverComponents.initialize(); + networkResources.initialize(serverComponents); + flightTransport.doStart(); + serverComponents.initializeStreamManager(); + } catch (Exception e) { + logger.error("Failed to start Flight server", e); + cleanup(); + throw new RuntimeException("Failed to start Flight server", e); + } } /** @@ -120,6 +141,7 @@ protected void doStart() { */ @Override protected void doStop() { + flightTransport.doStop(); serverComponents.close(); networkResources.close(); } @@ -130,7 +152,7 @@ protected void doStop() { */ @Override protected void doClose() { - + flightTransport.doClose(); } /** @@ -145,18 +167,11 @@ public void onNodeStart(DiscoveryNode localNode) { if (isDedicatedClusterManagerNode(localNode)) { doClose(); - return; } + } - try { - serverComponents.initialize(); - networkResources.initialize(serverComponents); - startFlightServer(localNode); - } catch (Exception e) { - logger.error("Failed to start Flight server", e); - cleanup(); - throw new RuntimeException("Failed to start Flight server", e); - } + public BoundTransportAddress getBoundAddress() { + return flightTransport.boundAddress(); } private void cleanup() { @@ -167,27 +182,27 @@ private void cleanup() { } } - private void startFlightServer(DiscoveryNode localNode) { - Location serverLocation = createServerLocation(localNode); + public boolean startFlightServer(TransportAddress transportAddress) { + + InetSocketAddress address = transportAddress.address(); + Location serverLocation = getSslContextProvider().isSslEnabled() + ? Location.forGrpcTls(address.getHostString(), address.getPort()) + : Location.forGrpcInsecure(address.getHostString(), address.getPort()); + FlightProducer producer = serverComponents.createFlightProducer(); try { OSFlightServer server = buildAndStartServer(serverLocation, producer); serverComponents.setServer(server); logger.info("Arrow Flight server started. Listening at {}", serverLocation); + return true; } catch (Exception e) { String errorMsg = "Failed to start Arrow Flight server at " + serverLocation; logger.error(errorMsg, e); - throw new RuntimeException(errorMsg, e); + return false; } } - private Location createServerLocation(DiscoveryNode localNode) { - String host = localNode.getAddress().getAddress(); - int port = Integer.parseInt(localNode.getAttributes().get(TRANSPORT_STREAM_PORT)); - return ServerConfig.getLocation(host, port); - } - private OSFlightServer buildAndStartServer(Location location, FlightProducer producer) throws IOException { OSFlightServer server = OSFlightServer.builder( serverComponents.getAllocator(), @@ -347,8 +362,8 @@ public SslContextProvider getSslContextProvider() { } } - private static class NetworkResources implements AutoCloseable { - private static final Logger logger = LogManager.getLogger(NetworkResources.class); + private class NetworkResources implements AutoCloseable { + private final Logger logger = LogManager.getLogger(NetworkResources.class); private EventLoopGroup bossEventLoopGroup; private EventLoopGroup workerEventLoopGroup; @@ -379,7 +394,8 @@ private void initializeClientManager(ServerComponents components) { components.getClusterService(), components.getSslContextProvider(), workerEventLoopGroup, - clientExecutor + clientExecutor, + client ); components.setClientManager(clientManager); } diff --git a/modules/arrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/bootstrap/FlightStreamPluginImpl.java b/modules/arrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/bootstrap/FlightStreamPluginImpl.java index 1e466a644514b..df40daa40fb98 100644 --- a/modules/arrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/bootstrap/FlightStreamPluginImpl.java +++ b/modules/arrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/bootstrap/FlightStreamPluginImpl.java @@ -9,14 +9,21 @@ package org.opensearch.arrow.flight.bootstrap; import org.opensearch.arrow.flight.BaseFlightStreamPlugin; +import org.opensearch.arrow.flight.api.FlightServerInfoAction; +import org.opensearch.arrow.flight.api.NodesFlightInfoAction; +import org.opensearch.arrow.flight.api.TransportNodesFlightInfoAction; import org.opensearch.arrow.spi.StreamManager; import org.opensearch.client.Client; import org.opensearch.cluster.metadata.IndexNameExpressionResolver; import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.cluster.node.DiscoveryNodes; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.network.NetworkService; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.IndexScopedSettings; import org.opensearch.common.settings.Setting; import org.opensearch.common.settings.Settings; +import org.opensearch.common.settings.SettingsFilter; import org.opensearch.common.util.PageCacheRecycler; import org.opensearch.core.common.io.stream.NamedWriteableRegistry; import org.opensearch.core.indices.breaker.CircuitBreakerService; @@ -25,6 +32,8 @@ import org.opensearch.env.NodeEnvironment; import org.opensearch.plugins.SecureTransportSettingsProvider; import org.opensearch.repositories.RepositoriesService; +import org.opensearch.rest.RestController; +import org.opensearch.rest.RestHandler; import org.opensearch.script.ScriptService; import org.opensearch.telemetry.tracing.Tracer; import org.opensearch.threadpool.ExecutorBuilder; @@ -32,6 +41,8 @@ import org.opensearch.transport.Transport; import org.opensearch.watcher.ResourceWatcherService; +import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.List; @@ -82,7 +93,7 @@ public Collection createComponents( IndexNameExpressionResolver indexNameExpressionResolver, Supplier repositoriesServiceSupplier ) { - flightService.initialize(clusterService, threadPool); + flightService.initialize(clusterService, threadPool, client); return List.of(flightService); } @@ -113,6 +124,20 @@ public Map> getSecureTransports( return Collections.emptyMap(); } + @Override + public Map> getAuxTransports( + Settings settings, + ThreadPool threadPool, + CircuitBreakerService circuitBreakerService, + NetworkService networkService, + ClusterSettings clusterSettings, + Tracer tracer + ) { + FlightServerTransport flightServerTransport = new FlightServerTransport(settings, networkService, flightService::startFlightServer); + flightService.setFlightTransport(flightServerTransport); + return Collections.singletonMap(FlightServerTransport.AUX_TRANSPORT_TYPES_KEY, () -> flightServerTransport); + } + /** * Called when a node is started. Starts the FlightService * @param localNode local Node. @@ -122,6 +147,24 @@ public void onNodeStarted(DiscoveryNode localNode) { flightService.onNodeStart(localNode); } + @Override + public List getRestHandlers( + Settings settings, + RestController restController, + ClusterSettings clusterSettings, + IndexScopedSettings indexScopedSettings, + SettingsFilter settingsFilter, + IndexNameExpressionResolver indexNameExpressionResolver, + Supplier nodesInCluster + ) { + return List.of(new FlightServerInfoAction()); + } + + @Override + public List> getActions() { + return List.of(new ActionHandler<>(NodesFlightInfoAction.INSTANCE, TransportNodesFlightInfoAction.class)); + } + /** * Gets the StreamManager instance for managing flight streams. */ @@ -144,6 +187,17 @@ public List> getExecutorBuilders(Settings settings) { */ @Override public List> getSettings() { - return ServerConfig.getSettings(); + return new ArrayList<>( + Arrays.asList( + FlightServerTransport.SETTING_FLIGHT_PORTS, + FlightServerTransport.SETTING_FLIGHT_HOST, + FlightServerTransport.SETTING_FLIGHT_BIND_HOST, + FlightServerTransport.SETTING_FLIGHT_PUBLISH_HOST + ) + ) { + { + addAll(ServerConfig.getSettings()); + } + }; } } diff --git a/modules/arrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/bootstrap/ServerConfig.java b/modules/arrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/bootstrap/ServerConfig.java index db792a686145f..8e18831cdaef9 100644 --- a/modules/arrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/bootstrap/ServerConfig.java +++ b/modules/arrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/bootstrap/ServerConfig.java @@ -41,17 +41,6 @@ public class ServerConfig { */ public ServerConfig() {} - /** - * Setting for the transport stream port. - */ - public static final Setting STREAM_PORT = Setting.intSetting( - "node.attr.transport.stream.port", - 9880, - 1024, - 65535, - Setting.Property.NodeScope - ); - static final Setting ARROW_ALLOCATION_MANAGER_TYPE = Setting.simpleString( "arrow.allocation.manager.type", "Netty", diff --git a/modules/arrow-flight-rpc/src/test/java/org/opensearch/arrow/flight/bootstrap/FlightClientManagerTests.java b/modules/arrow-flight-rpc/src/test/java/org/opensearch/arrow/flight/bootstrap/FlightClientManagerTests.java index 59dfad0878fd2..47d8a33ae2279 100644 --- a/modules/arrow-flight-rpc/src/test/java/org/opensearch/arrow/flight/bootstrap/FlightClientManagerTests.java +++ b/modules/arrow-flight-rpc/src/test/java/org/opensearch/arrow/flight/bootstrap/FlightClientManagerTests.java @@ -12,7 +12,12 @@ import org.apache.arrow.memory.BufferAllocator; import org.apache.arrow.memory.RootAllocator; import org.opensearch.Version; +import org.opensearch.arrow.flight.api.NodeFlightInfo; +import org.opensearch.arrow.flight.api.NodesFlightInfoAction; +import org.opensearch.arrow.flight.api.NodesFlightInfoRequest; +import org.opensearch.arrow.flight.api.NodesFlightInfoResponse; import org.opensearch.arrow.flight.bootstrap.tls.SslContextProvider; +import org.opensearch.client.Client; import org.opensearch.cluster.ClusterChangedEvent; import org.opensearch.cluster.ClusterName; import org.opensearch.cluster.ClusterState; @@ -22,6 +27,8 @@ import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.settings.Settings; import org.opensearch.common.util.FeatureFlags; +import org.opensearch.core.action.ActionListener; +import org.opensearch.core.common.transport.BoundTransportAddress; import org.opensearch.core.common.transport.TransportAddress; import org.opensearch.test.FeatureFlagSetter; import org.opensearch.test.OpenSearchTestCase; @@ -29,11 +36,19 @@ import org.junit.BeforeClass; import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import io.grpc.netty.GrpcSslContexts; @@ -42,7 +57,12 @@ import io.netty.handler.ssl.SslContextBuilder; import io.netty.util.NettyRuntime; +import static org.opensearch.arrow.flight.bootstrap.FlightClientManager.LOCATION_TIMEOUT_MS; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.reset; import static org.mockito.Mockito.when; public class FlightClientManagerTests extends OpenSearchTestCase { @@ -53,8 +73,10 @@ public class FlightClientManagerTests extends OpenSearchTestCase { private static final AtomicInteger port = new AtomicInteger(0); private ClusterService clusterService; + private Client client; private ClusterState state; private FlightClientManager clientManager; + private ScheduledExecutorService locationUpdaterExecutor; @BeforeClass public static void setupClass() throws Exception { @@ -67,21 +89,62 @@ public static void setupClass() throws Exception { @Override public void setUp() throws Exception { super.setUp(); + locationUpdaterExecutor = Executors.newScheduledThreadPool(1); + FeatureFlagSetter.set(FeatureFlags.ARROW_STREAMS_SETTING.getKey()); clusterService = mock(ClusterService.class); + client = mock(Client.class); state = getDefaultState(); when(clusterService.state()).thenReturn(state); + + mockFlightInfoResponse(state.nodes(), 0); + SslContextProvider sslContextProvider = mock(SslContextProvider.class); - // Create a proper gRPC client SSL context with ALPN and HTTP/2 support SslContext clientSslContext = GrpcSslContexts.configure(SslContextBuilder.forClient()).build(); when(sslContextProvider.isSslEnabled()).thenReturn(true); when(sslContextProvider.getClientSslContext()).thenReturn(clientSslContext); - clientManager = new FlightClientManager(allocator, clusterService, sslContextProvider, elg, executorService); + + clientManager = new FlightClientManager(allocator, clusterService, sslContextProvider, elg, executorService, client); + ClusterChangedEvent event = new ClusterChangedEvent("test", state, ClusterState.EMPTY_STATE); + clientManager.clusterChanged(event); + clientManager.updateFlightClients(); } + private void mockFlightInfoResponse(DiscoveryNodes nodes, int sleepDuration) { + doAnswer(invocation -> { + locationUpdaterExecutor.schedule(() -> { + try { + NodesFlightInfoRequest request = invocation.getArgument(1); + ActionListener listener = invocation.getArgument(2); + + List nodeInfos = new ArrayList<>(); + for (DiscoveryNode node : nodes) { + if (request.nodesIds().length == 0 || Arrays.asList(request.nodesIds()).contains(node.getId())) { + int flightPort = getBaseStreamPort() + port.addAndGet(2); + TransportAddress address = new TransportAddress( + InetAddress.getByName(node.getAddress().getAddress()), + flightPort + ); + BoundTransportAddress boundAddress = new BoundTransportAddress(new TransportAddress[] { address }, address); + NodeFlightInfo nodeInfo = new NodeFlightInfo(node, boundAddress); + nodeInfos.add(nodeInfo); + } + } + NodesFlightInfoResponse response = new NodesFlightInfoResponse(ClusterName.DEFAULT, nodeInfos, Collections.emptyList()); + listener.onResponse(response); + } catch (UnknownHostException e) { + throw new RuntimeException(e); + } + }, sleepDuration, TimeUnit.MILLISECONDS); + return null; + }).when(client).execute(eq(NodesFlightInfoAction.INSTANCE), any(NodesFlightInfoRequest.class), any(ActionListener.class)); + + } + @Override public void tearDown() throws Exception { + locationUpdaterExecutor.shutdown(); super.tearDown(); clientManager.close(); } @@ -105,7 +168,6 @@ private ClusterState getDefaultState() throws Exception { private DiscoveryNode createNode(String nodeId, String host, int port) throws Exception { TransportAddress address = new TransportAddress(InetAddress.getByName(host), port); Map attributes = new HashMap<>(); - attributes.put("transport.stream.port", String.valueOf(port)); attributes.put("arrow.streams.enabled", "true"); Set roles = Collections.singleton(DiscoveryNodeRole.DATA_ROLE); return new DiscoveryNode(nodeId, address, attributes, roles, Version.CURRENT); @@ -145,12 +207,12 @@ public void testClusterChangedWithNodesChanged() throws Exception { DiscoveryNodes newNodes = newNodesBuilder.build(); ClusterState newState = ClusterState.builder(new ClusterName("test")).nodes(newNodes).build(); - + mockFlightInfoResponse(newNodes, 0); when(clusterService.state()).thenReturn(newState); - + clientManager.clusterChanged(new ClusterChangedEvent("test", newState, state)); clientManager.updateFlightClients(); - for (DiscoveryNode node : state.nodes()) { + for (DiscoveryNode node : newState.nodes()) { assertNotNull(clientManager.getFlightClient(node.getId())); } } @@ -169,26 +231,6 @@ public void testGetLocalNodeId() throws Exception { assertEquals("Local node ID should match", "local_node", clientManager.getLocalNodeId()); } - public void testNodeWithoutStreamPort() throws Exception { - DiscoveryNode invalidNode = new DiscoveryNode( - "invalid_node", - new TransportAddress(InetAddress.getByName("127.0.0.4"), getBasePort() + port.addAndGet(1)), - Map.of("arrow.streams.enabled", "true"), - Collections.singleton(DiscoveryNodeRole.DATA_ROLE), - Version.CURRENT - ); - - DiscoveryNodes.Builder nodesBuilder = DiscoveryNodes.builder(); - nodesBuilder.add(invalidNode); - nodesBuilder.localNodeId("local_node"); - DiscoveryNodes nodes = nodesBuilder.build(); - ClusterState invalidState = ClusterState.builder(new ClusterName("test")).nodes(nodes).build(); - - when(clusterService.state()).thenReturn(invalidState); - - expectThrows(NumberFormatException.class, () -> { clientManager.getFlightClient(invalidNode.getId()); }); - } - public void testCloseWithActiveClients() throws Exception { for (DiscoveryNode node : state.nodes()) { OSFlightClient client = clientManager.getFlightClient(node.getId()); @@ -201,7 +243,6 @@ public void testCloseWithActiveClients() throws Exception { public void testIncompatibleNodeVersion() throws Exception { Map attributes = new HashMap<>(); - attributes.put("transport.stream.port", String.valueOf(getBasePort() + port.addAndGet(1))); attributes.put("arrow.streams.enabled", "true"); DiscoveryNode oldVersionNode = new DiscoveryNode( "old_version_node", @@ -219,14 +260,162 @@ public void testIncompatibleNodeVersion() throws Exception { ClusterState oldVersionState = ClusterState.builder(new ClusterName("test")).nodes(nodes).build(); when(clusterService.state()).thenReturn(oldVersionState); + mockFlightInfoResponse(nodes, 0); assertNull(clientManager.getFlightClient(oldVersionNode.getId())); } + public void testGetFlightClientLocationTimeout() throws Exception { + reset(client); + + String nodeId = "test_node"; + DiscoveryNode testNode = createNode(nodeId, "127.0.0.1", getBasePort() + port.addAndGet(2)); + + // Update cluster state with the test node + DiscoveryNodes.Builder nodesBuilder = DiscoveryNodes.builder(); + nodesBuilder.add(testNode); + nodesBuilder.localNodeId(nodeId); + ClusterState newState = ClusterState.builder(new ClusterName("test")).nodes(nodesBuilder.build()).build(); + when(clusterService.state()).thenReturn(newState); + // Mock a delayed response that will cause timeout + mockFlightInfoResponse(newState.nodes(), LOCATION_TIMEOUT_MS + 100); + + ClusterChangedEvent event = new ClusterChangedEvent("test", newState, ClusterState.EMPTY_STATE); + clientManager.clusterChanged(event); + + IllegalStateException exception = expectThrows(IllegalStateException.class, () -> { clientManager.getFlightClient(nodeId); }); + assertTrue(exception.getMessage().contains("Timeout waiting for Flight server location")); + } + + public void testGetFlightClientLocationInterrupted() throws Exception { + reset(client); + + String nodeId = "test_node"; + DiscoveryNode testNode = createNode(nodeId, "127.0.0.1", getBasePort() + port.addAndGet(2)); + + // Update cluster state with the test node + DiscoveryNodes.Builder nodesBuilder = DiscoveryNodes.builder(); + nodesBuilder.add(testNode); + nodesBuilder.localNodeId(nodeId); + ClusterState newState = ClusterState.builder(new ClusterName("test")).nodes(nodesBuilder.build()).build(); + + when(clusterService.state()).thenReturn(newState); + + // Mock an interrupted response + doAnswer(invocation -> { + Thread currentThread = Thread.currentThread(); + locationUpdaterExecutor.schedule(currentThread::interrupt, 100, TimeUnit.MILLISECONDS); + return null; + }).when(client).execute(eq(NodesFlightInfoAction.INSTANCE), any(NodesFlightInfoRequest.class), any(ActionListener.class)); + + ClusterChangedEvent event = new ClusterChangedEvent("test", newState, ClusterState.EMPTY_STATE); + clientManager.clusterChanged(event); + + IllegalStateException exception = expectThrows(IllegalStateException.class, () -> { clientManager.getFlightClient(nodeId); }); + assertTrue(exception.getMessage().contains("Interrupted while waiting for Flight server location")); + assertTrue(Thread.interrupted()); + } + + public void testGetFlightClientLocationExecutionError() throws Exception { + reset(client); + + String nodeId = "test_node"; + DiscoveryNode testNode = createNode(nodeId, "127.0.0.1", getBasePort() + port.addAndGet(2)); + + // Update cluster state with the test node + DiscoveryNodes.Builder nodesBuilder = DiscoveryNodes.builder(); + nodesBuilder.add(testNode); + nodesBuilder.localNodeId(nodeId); + ClusterState newState = ClusterState.builder(new ClusterName("test")).nodes(nodesBuilder.build()).build(); + + when(clusterService.state()).thenReturn(newState); + + // Mock failure + doAnswer(invocation -> { + ActionListener listener = invocation.getArgument(2); + listener.onFailure(new RuntimeException("Test execution error")); + return null; + }).when(client).execute(eq(NodesFlightInfoAction.INSTANCE), any(NodesFlightInfoRequest.class), any(ActionListener.class)); + + ClusterChangedEvent event = new ClusterChangedEvent("test", newState, ClusterState.EMPTY_STATE); + clientManager.clusterChanged(event); + + IllegalStateException exception = expectThrows(IllegalStateException.class, () -> { clientManager.getFlightClient(nodeId); }); + assertTrue(exception.getMessage().contains("Error getting Flight server location")); + assertTrue(exception.getCause() instanceof RuntimeException); + assertEquals("Test execution error", exception.getCause().getMessage()); + } + + public void testFailedClusterUpdateButSuccessfulDirectRequest() throws Exception { + reset(client); + + String nodeId = "test_node"; + DiscoveryNode testNode = createNode(nodeId, "127.0.0.1", getBasePort() + port.addAndGet(2)); + + // Update cluster state with the test node + DiscoveryNodes.Builder nodesBuilder = DiscoveryNodes.builder(); + nodesBuilder.add(testNode); + nodesBuilder.localNodeId(nodeId); + ClusterState newState = ClusterState.builder(new ClusterName("test")).nodes(nodesBuilder.build()).build(); + + when(clusterService.state()).thenReturn(newState); + + // First mock call fails during cluster update + AtomicBoolean firstCall = new AtomicBoolean(true); + doAnswer(invocation -> { + locationUpdaterExecutor.schedule(() -> { + ActionListener listener = invocation.getArgument(2); + if (firstCall.getAndSet(false)) { + // Fail on first call (during cluster update) + listener.onFailure(new RuntimeException("Failed during cluster update")); + } else { + // Succeed on second call (direct request) + try { + NodesFlightInfoRequest request = invocation.getArgument(1); + List nodeInfos = new ArrayList<>(); + for (DiscoveryNode node : newState.nodes()) { + if (request.nodesIds().length == 0 || Arrays.asList(request.nodesIds()).contains(node.getId())) { + int flightPort = getBaseStreamPort() + port.addAndGet(2); + TransportAddress address = new TransportAddress( + InetAddress.getByName(node.getAddress().getAddress()), + flightPort + ); + BoundTransportAddress boundAddress = new BoundTransportAddress(new TransportAddress[] { address }, address); + NodeFlightInfo nodeInfo = new NodeFlightInfo(node, boundAddress); + nodeInfos.add(nodeInfo); + } + } + NodesFlightInfoResponse response = new NodesFlightInfoResponse( + ClusterName.DEFAULT, + nodeInfos, + Collections.emptyList() + ); + listener.onResponse(response); + } catch (UnknownHostException e) { + throw new RuntimeException(e); + } + } + }, 0, TimeUnit.MICROSECONDS); + return null; + }).when(client).execute(eq(NodesFlightInfoAction.INSTANCE), any(NodesFlightInfoRequest.class), any(ActionListener.class)); + + ClusterChangedEvent event = new ClusterChangedEvent("test", newState, ClusterState.EMPTY_STATE); + clientManager.clusterChanged(event); + + // Verify that the client can still be created successfully on direct request + OSFlightClient flightClient = clientManager.getFlightClient(nodeId); + assertFalse("first call should be invoked", firstCall.get()); + assertNotNull("Flight client should be created successfully on direct request", flightClient); + } + private void validateNodes() { for (DiscoveryNode node : state.nodes()) { OSFlightClient client = clientManager.getFlightClient(node.getId()); assertNotNull("Flight client should be created for existing node", client); } } + + protected static int getBaseStreamPort() { + return generateBasePort(9401); + } } diff --git a/modules/arrow-flight-rpc/src/test/java/org/opensearch/arrow/flight/bootstrap/FlightServiceTests.java b/modules/arrow-flight-rpc/src/test/java/org/opensearch/arrow/flight/bootstrap/FlightServiceTests.java index 197aa8f75112d..e1346e2d6a255 100644 --- a/modules/arrow-flight-rpc/src/test/java/org/opensearch/arrow/flight/bootstrap/FlightServiceTests.java +++ b/modules/arrow-flight-rpc/src/test/java/org/opensearch/arrow/flight/bootstrap/FlightServiceTests.java @@ -11,6 +11,7 @@ import org.opensearch.arrow.flight.bootstrap.tls.DefaultSslContextProvider; import org.opensearch.arrow.flight.bootstrap.tls.DisabledSslContextProvider; import org.opensearch.arrow.flight.bootstrap.tls.SslContextProvider; +import org.opensearch.client.Client; import org.opensearch.cluster.ClusterName; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.node.DiscoveryNode; @@ -74,9 +75,9 @@ public void testInitializeWithSslEnabled() throws Exception { try (FlightService sslService = new FlightService(sslSettings)) { sslService.setSecureTransportSettingsProvider(secureTransportSettingsProvider); - sslService.initialize(clusterService, threadPool); - expectThrows(RuntimeException.class, () -> sslService.onNodeStart(localNode)); - + sslService.initialize(clusterService, threadPool, mock(Client.class)); + sslService.setFlightTransport(mock(FlightServerTransport.class)); + sslService.start(); SslContextProvider sslContextProvider = sslService.getSslContextProvider(); assertNotNull("SSL context provider should not be null", sslContextProvider); assertTrue("SSL context provider should be DefaultSslContextProvider", sslContextProvider instanceof DefaultSslContextProvider); @@ -93,7 +94,8 @@ public void testInitializeWithSslDisabled() throws Exception { .build(); try (FlightService noSslService = new FlightService(noSslSettings)) { - noSslService.initialize(clusterService, threadPool); + noSslService.initialize(clusterService, threadPool, mock(Client.class)); + noSslService.setFlightTransport(mock(FlightServerTransport.class)); noSslService.start(); noSslService.onNodeStart(localNode); // Verify SSL is properly disabled @@ -109,8 +111,8 @@ public void testInitializeWithSslDisabled() throws Exception { public void testStartAndStop() throws Exception { try (FlightService testService = new FlightService(Settings.EMPTY)) { - testService.initialize(clusterService, threadPool); - + testService.initialize(clusterService, threadPool, mock(Client.class)); + testService.setFlightTransport(mock(FlightServerTransport.class)); testService.start(); testService.onNodeStart(localNode); testService.stop(); @@ -126,26 +128,29 @@ public void testInitializeWithoutSecureTransportSettingsProvider() { try (FlightService sslService = new FlightService(sslSettings)) { // Should throw exception when initializing without provider expectThrows(RuntimeException.class, () -> { - sslService.initialize(clusterService, threadPool); - sslService.onNodeStart(localNode); + sslService.initialize(clusterService, threadPool, mock(Client.class)); + sslService.setFlightTransport(mock(FlightServerTransport.class)); + sslService.start(); }); } } public void testServerStartupFailure() { Settings invalidSettings = Settings.builder() - .put("node.attr.transport.stream.port", "-1") // Invalid port + .put(FlightServerTransport.SETTING_FLIGHT_PUBLISH_PORT.getKey(), "-1") // Invalid port .build(); try (FlightService invalidService = new FlightService(invalidSettings)) { - expectThrows(RuntimeException.class, () -> { invalidService.onNodeStart(localNode); }); + invalidService.initialize(clusterService, threadPool, mock(Client.class)); + invalidService.setFlightTransport(mock(FlightServerTransport.class)); + expectThrows(RuntimeException.class, () -> { invalidService.doStart(); }); } } public void testLifecycleStateTransitions() throws Exception { // Find new port for this test try (FlightService testService = new FlightService(Settings.EMPTY)) { - testService.initialize(clusterService, threadPool); - + testService.initialize(clusterService, threadPool, mock(Client.class)); + testService.setFlightTransport(mock(FlightServerTransport.class)); // Test all state transitions testService.start(); testService.onNodeStart(localNode); @@ -173,4 +178,8 @@ private DiscoveryNode createNode(int port) throws Exception { Set roles = Collections.singleton(DiscoveryNodeRole.DATA_ROLE); return new DiscoveryNode("local_node", address, attributes, roles, Version.CURRENT); } + + protected static int getBaseStreamPort() { + return generateBasePort(9401); + } } diff --git a/modules/arrow-flight-rpc/src/test/java/org/opensearch/arrow/flight/bootstrap/ServerConfigTests.java b/modules/arrow-flight-rpc/src/test/java/org/opensearch/arrow/flight/bootstrap/ServerConfigTests.java index ec7b05a30eb49..9a833daaa6b9b 100644 --- a/modules/arrow-flight-rpc/src/test/java/org/opensearch/arrow/flight/bootstrap/ServerConfigTests.java +++ b/modules/arrow-flight-rpc/src/test/java/org/opensearch/arrow/flight/bootstrap/ServerConfigTests.java @@ -12,6 +12,8 @@ import org.opensearch.test.OpenSearchTestCase; import org.opensearch.threadpool.ScalingExecutorBuilder; +import static org.opensearch.arrow.flight.bootstrap.FlightServerTransport.SETTING_FLIGHT_PUBLISH_PORT; + public class ServerConfigTests extends OpenSearchTestCase { private Settings settings; @@ -69,7 +71,7 @@ public void testDefaultSettings() { ServerConfig.init(defaultSettings); // Verify default values - assertEquals(9880, ServerConfig.STREAM_PORT.get(defaultSettings).intValue()); + assertEquals(9401, SETTING_FLIGHT_PUBLISH_PORT.get(defaultSettings).intValue()); assertEquals("Netty", ServerConfig.ARROW_ALLOCATION_MANAGER_TYPE.get(defaultSettings)); assertFalse(ServerConfig.ARROW_ENABLE_NULL_CHECK_FOR_GET.get(defaultSettings)); assertTrue(ServerConfig.ARROW_ENABLE_UNSAFE_MEMORY_ACCESS.get(defaultSettings)); diff --git a/server/src/main/java/org/opensearch/common/util/FeatureFlags.java b/server/src/main/java/org/opensearch/common/util/FeatureFlags.java index 4be45aed70023..4f0462f0b5cdd 100644 --- a/server/src/main/java/org/opensearch/common/util/FeatureFlags.java +++ b/server/src/main/java/org/opensearch/common/util/FeatureFlags.java @@ -129,7 +129,7 @@ public class FeatureFlags { ); public static final String ARROW_STREAMS = "opensearch.experimental.feature.arrow.streams.enabled"; - public static final Setting ARROW_STREAMS_SETTING = Setting.boolSetting(ARROW_STREAMS, false, Property.NodeScope); + public static final Setting ARROW_STREAMS_SETTING = Setting.boolSetting(ARROW_STREAMS, true, Property.NodeScope); private static final List> ALL_FEATURE_FLAG_SETTINGS = List.of( REMOTE_STORE_MIGRATION_EXPERIMENTAL_SETTING, diff --git a/test/framework/src/main/java/org/opensearch/test/InternalTestCluster.java b/test/framework/src/main/java/org/opensearch/test/InternalTestCluster.java index 4ee441a2688ad..501aa40d3286a 100644 --- a/test/framework/src/main/java/org/opensearch/test/InternalTestCluster.java +++ b/test/framework/src/main/java/org/opensearch/test/InternalTestCluster.java @@ -165,7 +165,6 @@ import static org.opensearch.test.NodeRoles.onlyRoles; import static org.opensearch.test.NodeRoles.removeRoles; import static org.opensearch.test.OpenSearchTestCase.assertBusy; -import static org.opensearch.test.OpenSearchTestCase.getBaseStreamPort; import static org.opensearch.test.OpenSearchTestCase.randomBoolean; import static org.opensearch.test.OpenSearchTestCase.randomFrom; import static org.hamcrest.Matchers.equalTo; @@ -238,8 +237,6 @@ public final class InternalTestCluster extends TestCluster { static final int DEFAULT_MIN_NUM_CLIENT_NODES = 0; static final int DEFAULT_MAX_NUM_CLIENT_NODES = 1; - private static final AtomicInteger FLIGHT_PORT_COUNTER = new AtomicInteger(0); - /* Sorted map to make traverse order reproducible. * The map of nodes is never mutated so individual reads are safe without synchronization. * Updates are intended to follow a copy-on-write approach. */ @@ -758,7 +755,6 @@ private Settings getNodeSettings( final Settings.Builder updatedSettings = Settings.builder(); updatedSettings.put(Environment.PATH_HOME_SETTING.getKey(), baseDir); - updatedSettings.put("node.attr.transport.stream.port", getBaseStreamPort() + FLIGHT_PORT_COUNTER.getAndIncrement()); if (numDataPaths > 1) { updatedSettings.putList( Environment.PATH_DATA_SETTING.getKey(), diff --git a/test/framework/src/main/java/org/opensearch/test/OpenSearchTestCase.java b/test/framework/src/main/java/org/opensearch/test/OpenSearchTestCase.java index c6d215b443545..24b2c80f8af4c 100644 --- a/test/framework/src/main/java/org/opensearch/test/OpenSearchTestCase.java +++ b/test/framework/src/main/java/org/opensearch/test/OpenSearchTestCase.java @@ -1768,7 +1768,7 @@ public static String getPortRange() { return getBasePort() + "-" + (getBasePort() + 99); // upper bound is inclusive } - private static int generateBasePort(int start) { + protected static int generateBasePort(int start) { // some tests use MockTransportService to do network based testing. Yet, we run tests in multiple JVMs that means // concurrent tests could claim port that another JVM just released and if that test tries to simulate a disconnect it might // be smart enough to re-connect depending on what is tested. To reduce the risk, since this is very hard to debug we use @@ -1795,10 +1795,6 @@ private static int generateBasePort(int start) { return start + (startAt * 100); } - protected static int getBaseStreamPort() { - return generateBasePort(9880); - } - protected static int getBasePort() { return generateBasePort(10300); }