Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(execution): NativeExecutor refactor #3689

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 1 addition & 4 deletions daft/daft/__init__.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -1688,10 +1688,7 @@ class LogicalPlanBuilder:
def repr_mermaid(self, options: MermaidOptions) -> str: ...

class NativeExecutor:
@staticmethod
def from_logical_plan_builder(
logical_plan_builder: LogicalPlanBuilder,
) -> NativeExecutor: ...
def __init__(self) -> None: ...
def run(
self, psets: dict[str, list[PartitionT]], cfg: PyDaftExecutionConfig, results_buffer_size: int | None
) -> Iterator[PyMicroPartition]: ...
Expand Down
14 changes: 5 additions & 9 deletions daft/execution/native_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@
from daft.daft import (
NativeExecutor as _NativeExecutor,
)
from daft.daft import PyDaftExecutionConfig
from daft.table import MicroPartition

if TYPE_CHECKING:
from daft.daft import PyDaftExecutionConfig
from daft.logical.builder import LogicalPlanBuilder
from daft.runners.partitioning import (
LocalMaterializedResult,
Expand All @@ -18,16 +18,12 @@


class NativeExecutor:
def __init__(self, executor: _NativeExecutor):
self._executor = executor

@classmethod
def from_logical_plan_builder(cls, builder: LogicalPlanBuilder) -> NativeExecutor:
executor = _NativeExecutor.from_logical_plan_builder(builder._builder)
return cls(executor)
def __init__(self):
self._executor = _NativeExecutor()

def run(
self,
builder: LogicalPlanBuilder,
psets: dict[str, list[MaterializedResult[PartitionT]]],
daft_execution_config: PyDaftExecutionConfig,
results_buffer_size: int | None,
Expand All @@ -39,5 +35,5 @@ def run(
}
return (
LocalMaterializedResult(MicroPartition._from_pymicropartition(part))
for part in self._executor.run(psets_mp, daft_execution_config, results_buffer_size)
for part in self._executor.run(builder._builder, psets_mp, daft_execution_config, results_buffer_size)
)
3 changes: 2 additions & 1 deletion daft/runners/native_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,9 @@ def run_iter(

# Optimize the logical plan.
builder = builder.optimize()
executor = NativeExecutor.from_logical_plan_builder(builder)
executor = NativeExecutor()
universalmind303 marked this conversation as resolved.
Show resolved Hide resolved
results_gen = executor.run(
builder,
{k: v.values() for k, v in self._part_set_cache.get_all_partition_sets().items()},
daft_execution_config,
results_buffer_size,
Expand Down
3 changes: 2 additions & 1 deletion daft/runners/pyrunner.py
Original file line number Diff line number Diff line change
Expand Up @@ -380,8 +380,9 @@ def run_iter(
if daft_execution_config.enable_native_executor:
logger.info("Using native executor")

executor = NativeExecutor.from_logical_plan_builder(builder)
executor = NativeExecutor()
results_gen = executor.run(
builder,
{k: v.values() for k, v in self._part_set_cache.get_all_partition_sets().items()},
daft_execution_config,
results_buffer_size,
Expand Down
7 changes: 5 additions & 2 deletions src/common/runtime/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,13 +69,16 @@ impl<T: Send + 'static> Future for RuntimeTask<T> {
}

pub struct Runtime {
runtime: tokio::runtime::Runtime,
pub runtime: Arc<tokio::runtime::Runtime>,
pool_type: PoolType,
}

impl Runtime {
pub(crate) fn new(runtime: tokio::runtime::Runtime, pool_type: PoolType) -> RuntimeRef {
Arc::new(Self { runtime, pool_type })
Arc::new(Self {
runtime: Arc::new(runtime),
pool_type,
})
}

async fn execute_task<F>(future: F, pool_type: PoolType) -> DaftResult<F::Output>
Expand Down
1 change: 1 addition & 0 deletions src/daft-connect/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ tokio = {version = "1.40.0", features = ["full"]}
tonic = "0.12.3"
tracing = {workspace = true}
uuid = {version = "1.10.0", features = ["v4"]}
common-runtime.workspace = true

[features]
default = ["python"]
Expand Down
114 changes: 63 additions & 51 deletions src/daft-connect/src/execute.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::{future::ready, sync::Arc};

use common_daft_config::DaftExecutionConfig;
use common_error::{DaftError, DaftResult};
use common_error::DaftResult;
use common_file_formats::FileFormat;
use daft_dsl::LiteralValue;
use daft_local_execution::NativeExecutor;
Expand All @@ -12,7 +12,7 @@ use daft_table::Table;
use eyre::bail;
use futures::{
stream::{self, BoxStream},
StreamExt, TryFutureExt, TryStreamExt,
StreamExt, TryStreamExt,
};
use itertools::Itertools;
use pyo3::Python;
Expand Down Expand Up @@ -62,17 +62,20 @@ impl Session {

Runner::Native => {
let this = self.clone();
let result_stream = tokio::task::spawn_blocking(move || {
let plan = lp.optimize()?;
let cfg = Arc::new(DaftExecutionConfig::default());
let native_executor = NativeExecutor::from_logical_plan_builder(&plan)?;
let results = native_executor.run(&*this.psets, cfg, None)?;
let it = results.into_iter();
Ok::<_, DaftError>(it.collect_vec())
})
.await??;

let plan = lp.optimize()?;

let cfg = Arc::new(DaftExecutionConfig::default());
let native_executor =
NativeExecutor::default().with_runtime(self.compute_runtime.runtime.clone());

let results = native_executor.run(&plan, &*this.psets, cfg, None)?;
let it = results.into_iter();
universalmind303 marked this conversation as resolved.
Show resolved Hide resolved
let result_stream = it.collect_vec();
Ok(Box::pin(stream::iter(result_stream)))

// todo: use into_stream once it works as expected
// Ok(results.into_stream().boxed())
}
}
}
Expand All @@ -90,8 +93,7 @@ impl Session {
let (tx, rx) = tokio::sync::mpsc::channel::<eyre::Result<ExecutePlanResponse>>(1);

let this = self.clone();

tokio::spawn(async move {
self.compute_runtime.spawn(async move {
let execution_fut = async {
let translator = translation::SparkAnalyzer::new(&this);
match command.rel_type {
Expand Down Expand Up @@ -143,7 +145,7 @@ impl Session {
pub async fn execute_write_operation(
&self,
operation: WriteOperation,
response_builder: ResponseBuilder<ExecutePlanResponse>,
res: ResponseBuilder<ExecutePlanResponse>,
) -> Result<ExecuteStream, Status> {
fn check_write_operation(write_op: &WriteOperation) -> Result<(), Status> {
if !write_op.sort_column_names.is_empty() {
Expand Down Expand Up @@ -178,60 +180,70 @@ impl Session {
}
}

let finished = response_builder.result_complete_response();
let finished = res.result_complete_response();

let (tx, rx) = tokio::sync::mpsc::channel::<eyre::Result<ExecutePlanResponse>>(1);

let this = self.clone();

let result = async move {
check_write_operation(&operation)?;
tokio::spawn(async move {
let result = async {
check_write_operation(&operation)?;

let WriteOperation {
input,
source,
save_type,
..
} = operation;
let WriteOperation {
input,
source,
save_type,
..
} = operation;

let input = input.required("input")?;
let source = source.required("source")?;
let input = input.required("input")?;
let source = source.required("source")?;

let file_format: FileFormat = source.parse()?;
let file_format: FileFormat = source.parse()?;

let Some(save_type) = save_type else {
bail!("Save type is required");
};
let Some(save_type) = save_type else {
bail!("Save type is required");
};

let path = match save_type {
SaveType::Path(path) => path,
SaveType::Table(_) => {
return not_yet_implemented!("write to table").map_err(|e| e.into())
}
};
let path = match save_type {
SaveType::Path(path) => path,
SaveType::Table(_) => {
return not_yet_implemented!("write to table").map_err(|e| e.into())
}
};

let translator = translation::SparkAnalyzer::new(&this);
let translator = translation::SparkAnalyzer::new(&this);

let plan = translator.to_logical_plan(input).await?;
let plan = translator.to_logical_plan(input).await?;

let plan = plan.table_write(&path, file_format, None, None, None)?;
let plan = plan.table_write(&path, file_format, None, None, None)?;

let mut result_stream = this.run_query(plan).await?;
let mut result_stream = this.run_query(plan).await?;

// this is so we make sure the operation is actually done
// before we return
//
// an example where this is important is if we write to a parquet file
// and then read immediately after, we need to wait for the write to finish
while let Some(_result) = result_stream.next().await {}
// this is so we make sure the operation is actually done
// before we return
//
// an example where this is important is if we write to a parquet file
// and then read immediately after, we need to wait for the write to finish
while let Some(_result) = result_stream.next().await {}

Ok(())
};
Ok(())
};

let result = result.map_err(|e| {
Status::internal(textwrap::wrap(&format!("Error in Daft server: {e}"), 120).join("\n"))
if let Err(e) = result.await {
let _ = tx.send(Err(e)).await;
}
});
let stream = ReceiverStream::new(rx);

let future = result.and_then(|()| ready(Ok(finished)));
let stream = futures::stream::once(future);
let stream = stream
.map_err(|e| {
Status::internal(
textwrap::wrap(&format!("Error in Daft server: {e}"), 120).join("\n"),
)
})
.chain(stream::once(ready(Ok(finished))));

Ok(Box::pin(stream))
}
Expand Down
4 changes: 2 additions & 2 deletions src/daft-connect/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,10 +81,10 @@ pub fn start(addr: &str) -> eyre::Result<ConnectionHandle> {
shutdown_signal: Some(shutdown_signal),
port,
};
let runtime = common_runtime::get_io_runtime(true);

std::thread::spawn(move || {
let runtime = tokio::runtime::Runtime::new().unwrap();
let result = runtime.block_on(async {
let result = runtime.runtime.block_on(async {
let incoming = {
let listener = tokio::net::TcpListener::from_std(listener)
.wrap_err("Failed to create TcpListener from std::net::TcpListener")?;
Expand Down
3 changes: 3 additions & 0 deletions src/daft-connect/src/session.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::{collections::BTreeMap, sync::Arc};

use common_runtime::RuntimeRef;
use daft_micropartition::partitioning::InMemoryPartitionSetCache;
use uuid::Uuid;

Expand All @@ -15,6 +16,7 @@ pub struct Session {
/// MicroPartitionSet associated with this session
/// this will be filled up as the user runs queries
pub(crate) psets: Arc<InMemoryPartitionSetCache>,
pub(crate) compute_runtime: RuntimeRef,
}

impl Session {
Expand All @@ -34,6 +36,7 @@ impl Session {
id,
server_side_session_id,
psets: Arc::new(InMemoryPartitionSetCache::empty()),
compute_runtime: common_runtime::get_compute_runtime(),
}
}

Expand Down
6 changes: 3 additions & 3 deletions src/daft-local-execution/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use common_runtime::{RuntimeRef, RuntimeTask};
use lazy_static::lazy_static;
use progress_bar::{OperatorProgressBar, ProgressBarColor, ProgressBarManager};
use resource_manager::MemoryManager;
pub use run::{run_local, ExecutionEngineResult, NativeExecutor};
pub use run::{ExecutionEngineResult, NativeExecutor};
use runtime_stats::{RuntimeStatsContext, TimedFuture};
use snafu::{futures::TryFutureExt, ResultExt, Snafu};
use tracing::Instrument;
Expand Down Expand Up @@ -124,15 +124,15 @@ pub(crate) struct ExecutionRuntimeContext {
worker_set: TaskSet<crate::Result<()>>,
default_morsel_size: usize,
memory_manager: Arc<MemoryManager>,
progress_bar_manager: Option<Box<dyn ProgressBarManager>>,
progress_bar_manager: Option<Arc<dyn ProgressBarManager>>,
}

impl ExecutionRuntimeContext {
#[must_use]
pub fn new(
default_morsel_size: usize,
memory_manager: Arc<MemoryManager>,
progress_bar_manager: Option<Box<dyn ProgressBarManager>>,
progress_bar_manager: Option<Arc<dyn ProgressBarManager>>,
) -> Self {
Self {
worker_set: TaskSet::new(),
Expand Down
13 changes: 7 additions & 6 deletions src/daft-local-execution/src/progress_bar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ pub trait ProgressBar: Send + Sync {
fn close(&self) -> DaftResult<()>;
}

pub trait ProgressBarManager {
pub trait ProgressBarManager: std::fmt::Debug + Send + Sync {
fn make_new_bar(
&self,
color: ProgressBarColor,
Expand Down Expand Up @@ -128,6 +128,7 @@ impl ProgressBar for IndicatifProgressBar {
}
}

#[derive(Debug)]
struct IndicatifProgressBarManager {
multi_progress: indicatif::MultiProgress,
}
Expand Down Expand Up @@ -168,19 +169,19 @@ impl ProgressBarManager for IndicatifProgressBarManager {
}
}

pub fn make_progress_bar_manager() -> Box<dyn ProgressBarManager> {
pub fn make_progress_bar_manager() -> Arc<dyn ProgressBarManager> {
#[cfg(feature = "python")]
{
if python::in_notebook() {
Box::new(python::TqdmProgressBarManager::new())
Arc::new(python::TqdmProgressBarManager::new())
} else {
Box::new(IndicatifProgressBarManager::new())
Arc::new(IndicatifProgressBarManager::new())
}
}

#[cfg(not(feature = "python"))]
{
Box::new(IndicatifProgressBarManager::new())
Arc::new(IndicatifProgressBarManager::new())
}
}

Expand Down Expand Up @@ -215,7 +216,7 @@ mod python {
}
}

#[derive(Clone)]
#[derive(Clone, Debug)]
pub struct TqdmProgressBarManager {
inner: Arc<PyObject>,
}
Expand Down
Loading
Loading