Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add DedicatedExecutor to FlightSQL Server #247

Merged
2 changes: 0 additions & 2 deletions f.csv
matthewmturner marked this conversation as resolved.
Show resolved Hide resolved

This file was deleted.

333 changes: 333 additions & 0 deletions src/execution/dedicated_executor.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,333 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use std::{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is great -- thank you @matthewmturner

Once we get this sorted out I definitely think we should contemplate merging it back upstream in DataFusion (with documentation). I can totally help with that

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good

sync::{Arc, OnceLock, RwLock},
time::Duration,
};

use futures::{
future::{BoxFuture, Shared},
FutureExt,
};
use log::warn;
use tokio::{
runtime::Handle,
sync::{oneshot::error::RecvError, Notify},
};

const SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(60 * 5);

/// Manages a separate tokio runtime (thread pool) for executing tasks.
///
/// A `DedicatedExecutor` runs futures (and any `tasks` that are
/// `tokio::task::spawned` by them) on a separate tokio Executor
///
/// # Background
///
/// Tokio has the notion of the "current" runtime, which runs the current future
/// and any tasks spawned by it. Typically, this is the runtime created by
/// `tokio::main` and is used for the main application logic and I/O handling
///
/// For CPU bound work, such as DataFusion plan execution, it is important to
/// run on a separate thread pool to avoid blocking the I/O handling for extended
/// periods of time in order to avoid long poll latencies (which decreases the
/// throughput of small requests under concurrent load).
///
/// # IO Scheduling
///
/// I/O, such as network calls, should not be performed on the runtime managed
/// by [`DedicatedExecutor`]. As tokio is a cooperative scheduler, long-running
/// CPU tasks will not be preempted and can therefore starve servicing of other
/// tasks. This manifests in long poll-latencies, where a task is ready to run
/// but isn't being scheduled to run. For CPU-bound work this isn't a problem as
/// there is no external party waiting on a response, however, for I/O tasks,
/// long poll latencies can prevent timely servicing of IO, which can have a
/// significant detrimental effect.
///
/// # Details
///
/// The worker thread priority is set to low so that such tasks do
/// not starve other more important tasks (such as answering health checks)
///
/// Follows the example from stack overflow and spawns a new
/// thread to install a Tokio runtime "context"
/// <https://stackoverflow.com/questions/62536566>
///
/// # Trouble Shooting:
///
/// ## "No IO runtime registered. Call `register_io_runtime`/`register_current_runtime_for_io` in current thread!
///
/// This means that IO was attempted on a tokio runtime that was not registered
/// for IO. One solution is to run the task using [DedicatedExecutor::spawn].
///
/// ## "Cannot drop a runtime in a context where blocking is not allowed"`
///
/// If you try to use this structure from an async context you see something like
/// thread 'plan::stringset::tests::test_builder_plan' panicked at 'Cannot
/// drop a runtime in a context where blocking is not allowed. This
/// happens when a runtime is dropped from within an asynchronous
/// context.', .../tokio-1.4.0/src/runtime/blocking/shutdown.rs:51:21
struct DedicatedExecutor {
state: Arc<RwLock<State>>,
}

/// [`DedicatedExecutor`] for testing purposes.
static TESTING_EXECUTOR: OnceLock<DedicatedExecutor> = OnceLock::new();

impl DedicatedExecutor {
/// Creates a new `DedicatedExecutor` with a dedicated tokio
/// executor that is separate from the threadpool created via
/// `[tokio::main]` or similar.
///
/// See the documentation on [`DedicatedExecutor`] for more details.
///
/// If [`DedicatedExecutor::new`] is called from an existing tokio runtime,
/// it will assume that the existing runtime should be used for I/O, and is
/// thus set, via [`register_io_runtime`] by all threads spawned by the
/// executor. This will allow scheduling IO outside the context of
/// [`DedicatedExecutor`] using [`spawn_io`].
pub fn new(name: &str, runtime_builder: tokio::runtime::Builder) -> Self {
Self::new_inner(name, runtime_builder, false)
}

fn new_inner(name: &str, runtime_builder: tokio::runtime::Builder, testing: bool) -> Self {
let name = name.to_owned();

let notify_shutdown = Arc::new(Notify::new());
let notify_shutdown_captured = Arc::clone(&notify_shutdown);

let (tx_shutdown, rx_shutdown) = tokio::sync::oneshot::channel();
let (tx_handle, rx_handle) = std::sync::mpsc::channel();

let io_handle = tokio::runtime::Handle::try_current().ok();
let thread = std::thread::Builder::new()
.name(format!("{name} driver"))
.spawn(move || {
// also register the IO runtime for the current thread, since it might be used as well (esp. for the
// current thread RT)
register_io_runtime(io_handle.clone());
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not clear to me what this means in practice. Maybe will become more obvious once I start plugging this in to the rest of the app.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It basically means that the IO (e.g. for object store) will be done on the "current" tokio run time(aka the implicit one that is created by #[tokio::main]

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that part i was clear on - i meant more under what circumstances its used in actual app code. no real concern right now - just memorializing my thoughts as i work on this.


let mut runtime_builder = runtime_builder;
let runtime = runtime_builder
.on_thread_start(move || register_io_runtime(io_handle.clone()))
.build()
.expect("Creating tokio runtime");
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we want to allow configuring the number of threads - but i guess that could be done by the caller since this takes tokio::runtime::Builder


runtime.block_on(async move {
// Enable the "notified" receiver BEFORE sending the runtime handle back to the constructor thread
// (i.e .the one that runs `new`) to avoid the potential (but unlikely) race that the shutdown is
// started right after the constructor finishes and the new runtime calls
// `notify_shutdown_captured.notified().await`.
//
// Tokio provides an API for that by calling `enable` on the `notified` future (this requires
// pinning though).
let shutdown = notify_shutdown_captured.notified();
let mut shutdown = std::pin::pin!(shutdown);
shutdown.as_mut().enable();

if tx_handle.send(Handle::current()).is_err() {
return;
}
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we want a log or something here

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think typically when a send handle (tx) fails to send it means the receiving side has hung up (no one is there to get the message), which can happen during normal shutdown

shutdown.await;
});

runtime.shutdown_timeout(SHUTDOWN_TIMEOUT);

// send shutdown "done" signal
tx_shutdown.send(()).ok();
})
.expect("executor setup");

let handle = rx_handle.recv().expect("driver started");

let state = State {
handle: Some(handle),
start_shutdown: notify_shutdown,
completed_shutdown: rx_shutdown.map_err(Arc::new).boxed().shared(),
thread: Some(thread),
};

Self {
state: Arc::new(RwLock::new(state)),
testing,
}
}

/// Create new executor for testing purposes.
///
/// Internal state may be shared with other tests.
pub fn new_testing() -> Self {
TESTING_EXECUTOR
.get_or_init(|| {
let mut runtime_builder = tokio::runtime::Builder::new_current_thread();

// only enable `time` but NOT the IO integration since IO shouldn't run on the DataFusion runtime
// See:
// - https://github.com/influxdata/influxdb_iox/issues/10803
// - https://github.com/influxdata/influxdb_iox/pull/11030
runtime_builder.enable_time();

Self::new_inner(
"testing",
runtime_builder,
Arc::new(Registry::default()),
true,
)
})
.clone()
}

/// Runs the specified [`Future`] (and any tasks it spawns) on the thread
/// pool managed by this `DedicatedExecutor`.
///
/// # Notes
///
/// UNLIKE [`tokio::task::spawn`], the returned future is **cancelled** when
/// it is dropped. Thus, you need ensure the returned future lives until it
/// completes (call `await`) or you wish to cancel it.
///
/// Currently all tasks are added to the tokio executor immediately and
/// compete for the threadpool's resources.
pub fn spawn<T>(&self, task: T) -> impl Future<Output = Result<T::Output, JobError>>
where
T: Future + Send + 'static,
T::Output: Send + 'static,
{
let handle = {
let state = self.state.read();
state.handle.clone()
};

let Some(handle) = handle else {
return futures::future::err(JobError::WorkerGone).boxed();
};

// use JoinSet implement "cancel on drop"
let mut join_set = JoinSet::new();
join_set.spawn_on(task, &handle);
async move {
join_set
.join_next()
.await
.expect("just spawned task")
.map_err(|e| match e.try_into_panic() {
Ok(e) => {
let s = if let Some(s) = e.downcast_ref::<String>() {
s.clone()
} else if let Some(s) = e.downcast_ref::<&str>() {
s.to_string()
} else {
"unknown internal error".to_string()
};

JobError::Panic { msg: s }
}
Err(_) => JobError::WorkerGone,
})
}
.boxed()
}

/// signals shutdown of this executor and any Clones
pub fn shutdown(&self) {
if self.testing {
return;
}

// hang up the channel which will cause the dedicated thread
// to quit
let mut state = self.state.write();
state.handle = None;
state.start_shutdown.notify_one();
}

/// Stops all subsequent task executions, and waits for the worker
/// thread to complete. Note this will shutdown all clones of this
/// `DedicatedExecutor` as well.
///
/// Only the first all to `join` will actually wait for the
/// executing thread to complete. All other calls to join will
/// complete immediately.
///
/// # Panic / Drop
/// [`DedicatedExecutor`] implements shutdown on [`Drop`]. You should just use this behavior and NOT call
/// [`join`](Self::join) manually during [`Drop`] or panics because this might lead to another panic, see
/// <https://github.com/rust-lang/futures-rs/issues/2575>.
pub async fn join(&self) {
if self.testing {
return;
}

self.shutdown();

// get handle mutex is held
let handle = {
let state = self.state.read();
state.completed_shutdown.clone()
};

// wait for completion while not holding the mutex to avoid
// deadlocks
handle.await.expect("Thread died?")
}
}

/// Runs futures (and any `tasks` that are `tokio::task::spawned` by
/// them) on a separate tokio Executor.
///
/// The state is only used by the "outer" API, not by the newly created runtime. The new runtime waits for
/// [`start_shutdown`](Self::start_shutdown) and signals the completion via
/// [`completed_shutdown`](Self::completed_shutdown) (for which is owns the sender side).
struct State {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We might be able to use a tokio JoinSet https://docs.rs/tokio/latest/tokio/task/struct.JoinSet.html instead of this now (I think this code predates JoinSet)

/// Runtime handle.
///
/// This is `None` when the executor is shutting down.
handle: Option<Handle>,

/// If notified, the executor tokio runtime will begin to shutdown.
///
/// We could implement this by checking `handle.is_none()` in regular intervals but requires regular wake-ups and
/// locking of the state. Just using a proper async signal is nicer.
start_shutdown: Arc<Notify>,

/// Receiver side indicating that shutdown is complete.
completed_shutdown: Shared<BoxFuture<'static, Result<(), Arc<RecvError>>>>,

/// The inner thread that can be used to join during drop.
thread: Option<std::thread::JoinHandle<()>>,
}

// IMPORTANT: Implement `Drop` for `State`, NOT for `DedicatedExecutor`, because the executor can be cloned and clones
// share their inner state.
impl Drop for State {
fn drop(&mut self) {
if self.handle.is_some() {
warn!("DedicatedExecutor dropped without calling shutdown()");
self.handle = None;
self.start_shutdown.notify_one();
}

// do NOT poll the shared future if we are panicking due to https://github.com/rust-lang/futures-rs/issues/2575
if !std::thread::panicking() && self.completed_shutdown.clone().now_or_never().is_none() {
warn!("DedicatedExecutor dropped without waiting for worker termination",);
}

// join thread but don't care about the results
self.thread.take().expect("not dropped yet").join().ok();
}
}
1 change: 1 addition & 0 deletions src/execution/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
// specific language governing permissions and limitations
// under the License.

pub mod dedicated_executor;
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will probably end up including this in flightsql feature

#[cfg(feature = "flightsql")]
pub mod flightsql;
#[cfg(feature = "flightsql")]
Expand Down
Loading