From eeede7809a0b2363ff68d2a3cc8b595a7a68085c Mon Sep 17 00:00:00 2001 From: Robert Roeser Date: Mon, 13 Jan 2025 12:52:58 -0800 Subject: [PATCH] clean-up serial executor on ParallelConcurrencyController Summary: Moves the serial executor logic into parallel concurrency controller so we don't have duplicate versions. Using a serial executor is controlled by an enum where the existing behavior is default. Updates the stress test tool to use the new version. SEParallelConcurrencyController will be deleted in the following diffs. Reviewed By: ot Differential Revision: D66832942 fbshipit-source-id: 185a71ded3222a1385c48776fce800cc74226df5 --- .../stresstest/server/StressTestServer.cpp | 12 ++-- .../server/ParallelConcurrencyController.cpp | 62 ++++++++++++++++--- .../server/ParallelConcurrencyController.h | 41 ++++++++++-- 3 files changed, 96 insertions(+), 19 deletions(-) diff --git a/thrift/conformance/stresstest/server/StressTestServer.cpp b/thrift/conformance/stresstest/server/StressTestServer.cpp index 45ff9904d80..1e3dd932264 100644 --- a/thrift/conformance/stresstest/server/StressTestServer.cpp +++ b/thrift/conformance/stresstest/server/StressTestServer.cpp @@ -25,7 +25,6 @@ #include #include #include -#include #include DEFINE_int32(port, 5000, "Server port"); @@ -211,13 +210,14 @@ std::shared_ptr createStressTestServer( *requestPile.get(), *executor.get()); } } else if (FLAGS_se_parallel_concurrency_controller) { - LOG(INFO) << "SEParallelConcurrencyController enabled"; - executor = std::make_shared( - t, folly::CPUThreadPoolExecutor::makeThrottledLifoSemQueue()); + LOG(INFO) << "ParallelConcurrencyController with SerialExecutor enabled"; + executor = std::make_shared(t); RoundRobinRequestPile::Options options; requestPile = std::make_unique(std::move(options)); - concurrencyController = std::make_unique( - *requestPile.get(), *executor.get()); + concurrencyController = std::make_unique( + *requestPile.get(), + *executor.get(), + ParallelConcurrencyController::RequestExecutionMode::Serial); } server->resourcePoolSet().setResourcePool( diff --git a/thrift/lib/cpp2/server/ParallelConcurrencyController.cpp b/thrift/lib/cpp2/server/ParallelConcurrencyController.cpp index 7e7085bf7e7..2ee6dd3b67f 100644 --- a/thrift/lib/cpp2/server/ParallelConcurrencyController.cpp +++ b/thrift/lib/cpp2/server/ParallelConcurrencyController.cpp @@ -14,11 +14,35 @@ * limitations under the License. */ +#include #include #include namespace apache::thrift { +namespace { +template +void scheduleWithExecutor(Executor& executor, folly::Func&& task) { + if (executor.getNumPriorities() > 1) { + // By default we have 2 prios, external requests should go to + // lower priority queue to yield to the internal ones + executor.addWithPriority(std::move(task), folly::Executor::LO_PRI); + } else { + executor.add(std::move(task)); + } +} + +std::string describeRequestExecutionMode( + ParallelConcurrencyController::RequestExecutionMode requestExecutionMode) { + switch (requestExecutionMode) { + case ParallelConcurrencyController::RequestExecutionMode::Serial: + return "Serial"; + case ParallelConcurrencyController::RequestExecutionMode::Parallel: + return "Parallel"; + } +} +} // namespace + void ParallelConcurrencyControllerBase::setExecutionLimitRequests( uint64_t limit) { executionLimit_.store(limit); @@ -112,14 +136,33 @@ bool ParallelConcurrencyControllerBase::trySchedule(bool onEnqueued) { } } +void ParallelConcurrencyController::scheduleWithSerialExecutor() { + auto keepAlive = folly::SmallSerialExecutor::create( + folly::Executor::getKeepAliveToken(executor_)); + auto& executor = *keepAlive.get(); + scheduleWithExecutor(executor, [this, ka = std::move(keepAlive)]() mutable { + auto req = pile_.dequeue(); + if (req) { + apache::thrift::detail::ServerRequestHelper::setExecutor( + req.value(), std::move(ka)); + } + executeRequest(std::move(req)); + }); +} + +void ParallelConcurrencyController::scheduleWithoutSerialExecutor() { + scheduleWithExecutor( + executor_, [this]() { executeRequest(pile_.dequeue()); }); +} + void ParallelConcurrencyController::scheduleOnExecutor() { - if (executor_.getNumPriorities() > 1) { - // By default we have 2 prios, external requests should go to - // lower priority queue to yield to the internal ones - executor_.addWithPriority( - [this]() { executeRequest(pile_.dequeue()); }, folly::Executor::LO_PRI); - } else { - executor_.add([this]() { executeRequest(pile_.dequeue()); }); + switch (requestExecutionMode_) { + case RequestExecutionMode::Serial: + scheduleWithSerialExecutor(); + break; + case RequestExecutionMode::Parallel: + scheduleWithoutSerialExecutor(); + break; } } @@ -162,8 +205,9 @@ void ParallelConcurrencyControllerBase::stop() {} std::string ParallelConcurrencyController::describe() const { return fmt::format( - "{{ParallelConcurrencyController executionLimit={}}}", - executionLimit_.load()); + "{{ParallelConcurrencyController executionLimit={}, requestExecutionMode={}}}", + executionLimit_.load(), + describeRequestExecutionMode(requestExecutionMode_)); } serverdbginfo::ConcurrencyControllerDbgInfo diff --git a/thrift/lib/cpp2/server/ParallelConcurrencyController.h b/thrift/lib/cpp2/server/ParallelConcurrencyController.h index 9982bbe8133..e0ba57e0d63 100644 --- a/thrift/lib/cpp2/server/ParallelConcurrencyController.h +++ b/thrift/lib/cpp2/server/ParallelConcurrencyController.h @@ -31,8 +31,23 @@ namespace apache::thrift { class ParallelConcurrencyControllerBase : public ConcurrencyControllerBase { public: - explicit ParallelConcurrencyControllerBase(RequestPileInterface& pile) - : pile_(pile) {} + enum class RequestExecutionMode : uint8_t { + // Requests are executed in the order they are enqueued, but coroutines are + // reenqueued to the CPU Executor directly allowing any thread to execute + // them. This is the default mode. + Parallel, + + // Requests are executed in the order they are enqueued, but coroutines are + // reenqueued to the CPU Executor through a SerialExecutor bound to the + // request. This lows contention on the Executor when using coroutines. + Serial, + }; + + explicit ParallelConcurrencyControllerBase( + RequestPileInterface& pile, + RequestExecutionMode requestExecutionMode = + RequestExecutionMode::Parallel) + : requestExecutionMode_(requestExecutionMode), pile_(pile) {} void setExecutionLimitRequests(uint64_t limit) final; @@ -61,6 +76,8 @@ class ParallelConcurrencyControllerBase : public ConcurrencyControllerBase { } protected: + const RequestExecutionMode requestExecutionMode_; + struct Counters { constexpr Counters() noexcept = default; // Number of requests that are being executed @@ -91,8 +108,22 @@ class ParallelConcurrencyControllerBase : public ConcurrencyControllerBase { class ParallelConcurrencyController : public ParallelConcurrencyControllerBase { public: - ParallelConcurrencyController(RequestPileInterface& pile, folly::Executor& ex) - : ParallelConcurrencyControllerBase(pile), executor_(ex) {} + using RequestExecutionMode = + ParallelConcurrencyControllerBase::RequestExecutionMode; + + /** + * + * @param requestExecutor: If set to RequestExecutor::Serial, the requests + * will be executed using a a folly::SerialExecutor. Consider using this if + * your code uses coroutines. + */ + ParallelConcurrencyController( + RequestPileInterface& pile, + folly::Executor& ex, + RequestExecutionMode requestExecutionMode = + RequestExecutionMode::Parallel) + : ParallelConcurrencyControllerBase(pile, requestExecutionMode), + executor_(ex) {} std::string describe() const override; serverdbginfo::ConcurrencyControllerDbgInfo getDbgInfo() const override; @@ -100,6 +131,8 @@ class ParallelConcurrencyController : public ParallelConcurrencyControllerBase { private: folly::Executor& executor_; + void scheduleWithSerialExecutor(); + void scheduleWithoutSerialExecutor(); void scheduleOnExecutor() override; };