Skip to content

Commit

Permalink
clean-up serial executor on ParallelConcurrencyController
Browse files Browse the repository at this point in the history
Summary: Moves the serial executor logic into parallel concurrency controller so we don't have duplicate versions. Using a serial executor is controlled by an enum where the existing behavior is default.  Updates the stress test tool to use the new version. SEParallelConcurrencyController will be deleted in the following diffs.

Reviewed By: ot

Differential Revision: D66832942

fbshipit-source-id: 185a71ded3222a1385c48776fce800cc74226df5
  • Loading branch information
Robert Roeser authored and facebook-github-bot committed Jan 13, 2025
1 parent b8799fe commit eeede78
Show file tree
Hide file tree
Showing 3 changed files with 96 additions and 19 deletions.
12 changes: 6 additions & 6 deletions thrift/conformance/stresstest/server/StressTestServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
#include <scripts/rroeser/src/executor/WorkStealingExecutor.h>
#include <folly/executors/CPUThreadPoolExecutor.h>
#include <thrift/lib/cpp2/server/ParallelConcurrencyController.h>
#include <thrift/lib/cpp2/server/SEParallelConcurrencyController.h>
#include <thrift/lib/cpp2/server/ThriftServer.h>

DEFINE_int32(port, 5000, "Server port");
Expand Down Expand Up @@ -211,13 +210,14 @@ std::shared_ptr<ThriftServer> createStressTestServer(
*requestPile.get(), *executor.get());
}
} else if (FLAGS_se_parallel_concurrency_controller) {
LOG(INFO) << "SEParallelConcurrencyController enabled";
executor = std::make_shared<folly::CPUThreadPoolExecutor>(
t, folly::CPUThreadPoolExecutor::makeThrottledLifoSemQueue());
LOG(INFO) << "ParallelConcurrencyController with SerialExecutor enabled";
executor = std::make_shared<folly::CPUThreadPoolExecutor>(t);
RoundRobinRequestPile::Options options;
requestPile = std::make_unique<RoundRobinRequestPile>(std::move(options));
concurrencyController = std::make_unique<SEParallelConcurrencyController>(
*requestPile.get(), *executor.get());
concurrencyController = std::make_unique<ParallelConcurrencyController>(
*requestPile.get(),
*executor.get(),
ParallelConcurrencyController::RequestExecutionMode::Serial);
}

server->resourcePoolSet().setResourcePool(
Expand Down
62 changes: 53 additions & 9 deletions thrift/lib/cpp2/server/ParallelConcurrencyController.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,35 @@
* limitations under the License.
*/

#include <folly/executors/SerialExecutor.h>
#include <thrift/lib/cpp2/async/AsyncProcessorHelper.h>
#include <thrift/lib/cpp2/server/ParallelConcurrencyController.h>

namespace apache::thrift {

namespace {
template <typename Executor>
void scheduleWithExecutor(Executor& executor, folly::Func&& task) {
if (executor.getNumPriorities() > 1) {
// By default we have 2 prios, external requests should go to
// lower priority queue to yield to the internal ones
executor.addWithPriority(std::move(task), folly::Executor::LO_PRI);
} else {
executor.add(std::move(task));
}
}

std::string describeRequestExecutionMode(
ParallelConcurrencyController::RequestExecutionMode requestExecutionMode) {
switch (requestExecutionMode) {
case ParallelConcurrencyController::RequestExecutionMode::Serial:
return "Serial";
case ParallelConcurrencyController::RequestExecutionMode::Parallel:
return "Parallel";
}
}
} // namespace

void ParallelConcurrencyControllerBase::setExecutionLimitRequests(
uint64_t limit) {
executionLimit_.store(limit);
Expand Down Expand Up @@ -112,14 +136,33 @@ bool ParallelConcurrencyControllerBase::trySchedule(bool onEnqueued) {
}
}

void ParallelConcurrencyController::scheduleWithSerialExecutor() {
auto keepAlive = folly::SmallSerialExecutor::create(
folly::Executor::getKeepAliveToken(executor_));
auto& executor = *keepAlive.get();
scheduleWithExecutor(executor, [this, ka = std::move(keepAlive)]() mutable {
auto req = pile_.dequeue();
if (req) {
apache::thrift::detail::ServerRequestHelper::setExecutor(
req.value(), std::move(ka));
}
executeRequest(std::move(req));
});
}

void ParallelConcurrencyController::scheduleWithoutSerialExecutor() {
scheduleWithExecutor(
executor_, [this]() { executeRequest(pile_.dequeue()); });
}

void ParallelConcurrencyController::scheduleOnExecutor() {
if (executor_.getNumPriorities() > 1) {
// By default we have 2 prios, external requests should go to
// lower priority queue to yield to the internal ones
executor_.addWithPriority(
[this]() { executeRequest(pile_.dequeue()); }, folly::Executor::LO_PRI);
} else {
executor_.add([this]() { executeRequest(pile_.dequeue()); });
switch (requestExecutionMode_) {
case RequestExecutionMode::Serial:
scheduleWithSerialExecutor();
break;
case RequestExecutionMode::Parallel:
scheduleWithoutSerialExecutor();
break;
}
}

Expand Down Expand Up @@ -162,8 +205,9 @@ void ParallelConcurrencyControllerBase::stop() {}

std::string ParallelConcurrencyController::describe() const {
return fmt::format(
"{{ParallelConcurrencyController executionLimit={}}}",
executionLimit_.load());
"{{ParallelConcurrencyController executionLimit={}, requestExecutionMode={}}}",
executionLimit_.load(),
describeRequestExecutionMode(requestExecutionMode_));
}

serverdbginfo::ConcurrencyControllerDbgInfo
Expand Down
41 changes: 37 additions & 4 deletions thrift/lib/cpp2/server/ParallelConcurrencyController.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,23 @@ namespace apache::thrift {

class ParallelConcurrencyControllerBase : public ConcurrencyControllerBase {
public:
explicit ParallelConcurrencyControllerBase(RequestPileInterface& pile)
: pile_(pile) {}
enum class RequestExecutionMode : uint8_t {
// Requests are executed in the order they are enqueued, but coroutines are
// reenqueued to the CPU Executor directly allowing any thread to execute
// them. This is the default mode.
Parallel,

// Requests are executed in the order they are enqueued, but coroutines are
// reenqueued to the CPU Executor through a SerialExecutor bound to the
// request. This lows contention on the Executor when using coroutines.
Serial,
};

explicit ParallelConcurrencyControllerBase(
RequestPileInterface& pile,
RequestExecutionMode requestExecutionMode =
RequestExecutionMode::Parallel)
: requestExecutionMode_(requestExecutionMode), pile_(pile) {}

void setExecutionLimitRequests(uint64_t limit) final;

Expand Down Expand Up @@ -61,6 +76,8 @@ class ParallelConcurrencyControllerBase : public ConcurrencyControllerBase {
}

protected:
const RequestExecutionMode requestExecutionMode_;

struct Counters {
constexpr Counters() noexcept = default;
// Number of requests that are being executed
Expand Down Expand Up @@ -91,15 +108,31 @@ class ParallelConcurrencyControllerBase : public ConcurrencyControllerBase {

class ParallelConcurrencyController : public ParallelConcurrencyControllerBase {
public:
ParallelConcurrencyController(RequestPileInterface& pile, folly::Executor& ex)
: ParallelConcurrencyControllerBase(pile), executor_(ex) {}
using RequestExecutionMode =
ParallelConcurrencyControllerBase::RequestExecutionMode;

/**
*
* @param requestExecutor: If set to RequestExecutor::Serial, the requests
* will be executed using a a folly::SerialExecutor. Consider using this if
* your code uses coroutines.
*/
ParallelConcurrencyController(
RequestPileInterface& pile,
folly::Executor& ex,
RequestExecutionMode requestExecutionMode =
RequestExecutionMode::Parallel)
: ParallelConcurrencyControllerBase(pile, requestExecutionMode),
executor_(ex) {}
std::string describe() const override;

serverdbginfo::ConcurrencyControllerDbgInfo getDbgInfo() const override;

private:
folly::Executor& executor_;

void scheduleWithSerialExecutor();
void scheduleWithoutSerialExecutor();
void scheduleOnExecutor() override;
};

Expand Down

0 comments on commit eeede78

Please sign in to comment.