Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
universalmind303 committed Jan 15, 2025
1 parent 5702720 commit f122145
Show file tree
Hide file tree
Showing 7 changed files with 141 additions and 54 deletions.
9 changes: 5 additions & 4 deletions daft/daft/__init__.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -1688,10 +1688,11 @@ class LogicalPlanBuilder:
def repr_mermaid(self, options: MermaidOptions) -> str: ...

class NativeExecutor:
@staticmethod
def from_logical_plan_builder(
logical_plan_builder: LogicalPlanBuilder,
) -> NativeExecutor: ...
def __init__(self) -> None: ...
# @staticmethod
# def from_logical_plan_builder(
# logical_plan_builder: LogicalPlanBuilder,
# ) -> NativeExecutor: ...
def run(
self, psets: dict[str, list[PartitionT]], cfg: PyDaftExecutionConfig, results_buffer_size: int | None
) -> Iterator[PyMicroPartition]: ...
Expand Down
14 changes: 5 additions & 9 deletions daft/execution/native_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@
from daft.daft import (
NativeExecutor as _NativeExecutor,
)
from daft.daft import PyDaftExecutionConfig
from daft.table import MicroPartition

if TYPE_CHECKING:
from daft.daft import PyDaftExecutionConfig
from daft.logical.builder import LogicalPlanBuilder
from daft.runners.partitioning import (
LocalMaterializedResult,
Expand All @@ -18,16 +18,12 @@


class NativeExecutor:
def __init__(self, executor: _NativeExecutor):
self._executor = executor

@classmethod
def from_logical_plan_builder(cls, builder: LogicalPlanBuilder) -> NativeExecutor:
executor = _NativeExecutor.from_logical_plan_builder(builder._builder)
return cls(executor)
def __init__(self):
self._executor = _NativeExecutor()

def run(
self,
builder: LogicalPlanBuilder,
psets: dict[str, list[MaterializedResult[PartitionT]]],
daft_execution_config: PyDaftExecutionConfig,
results_buffer_size: int | None,
Expand All @@ -39,5 +35,5 @@ def run(
}
return (
LocalMaterializedResult(MicroPartition._from_pymicropartition(part))
for part in self._executor.run(psets_mp, daft_execution_config, results_buffer_size)
for part in self._executor.run(builder._builder, psets_mp, daft_execution_config, results_buffer_size)
)
3 changes: 2 additions & 1 deletion daft/runners/native_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,9 @@ def run_iter(

# Optimize the logical plan.
builder = builder.optimize()
executor = NativeExecutor.from_logical_plan_builder(builder)
executor = NativeExecutor()
results_gen = executor.run(
builder,
{k: v.values() for k, v in self._part_set_cache.get_all_partition_sets().items()},
daft_execution_config,
results_buffer_size,
Expand Down
5 changes: 3 additions & 2 deletions src/daft-connect/src/execute.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,9 @@ impl Session {
let result_stream = tokio::task::spawn_blocking(move || {
let plan = lp.optimize()?;
let cfg = Arc::new(DaftExecutionConfig::default());
let native_executor = NativeExecutor::from_logical_plan_builder(&plan)?;
let results = native_executor.run(&*this.psets, cfg, None)?;
let native_executor = NativeExecutor::default();

let results = native_executor.run(&plan, &*this.psets, cfg, None)?;
let it = results.into_iter();
Ok::<_, DaftError>(it.collect_vec())
})
Expand Down
4 changes: 2 additions & 2 deletions src/daft-local-execution/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,15 +124,15 @@ pub(crate) struct ExecutionRuntimeContext {
worker_set: TaskSet<crate::Result<()>>,
default_morsel_size: usize,
memory_manager: Arc<MemoryManager>,
progress_bar_manager: Option<Box<dyn ProgressBarManager>>,
progress_bar_manager: Option<Arc<dyn ProgressBarManager>>,
}

impl ExecutionRuntimeContext {
#[must_use]
pub fn new(
default_morsel_size: usize,
memory_manager: Arc<MemoryManager>,
progress_bar_manager: Option<Box<dyn ProgressBarManager>>,
progress_bar_manager: Option<Arc<dyn ProgressBarManager>>,
) -> Self {
Self {
worker_set: TaskSet::new(),
Expand Down
13 changes: 7 additions & 6 deletions src/daft-local-execution/src/progress_bar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ pub trait ProgressBar: Send + Sync {
fn close(&self) -> DaftResult<()>;
}

pub trait ProgressBarManager {
pub trait ProgressBarManager: std::fmt::Debug + Send + Sync {
fn make_new_bar(
&self,
color: ProgressBarColor,
Expand Down Expand Up @@ -128,6 +128,7 @@ impl ProgressBar for IndicatifProgressBar {
}
}

#[derive(Debug)]
struct IndicatifProgressBarManager {
multi_progress: indicatif::MultiProgress,
}
Expand Down Expand Up @@ -168,19 +169,19 @@ impl ProgressBarManager for IndicatifProgressBarManager {
}
}

pub fn make_progress_bar_manager() -> Box<dyn ProgressBarManager> {
pub fn make_progress_bar_manager() -> Arc<dyn ProgressBarManager> {
#[cfg(feature = "python")]
{
if python::in_notebook() {
Box::new(python::TqdmProgressBarManager::new())
Arc::new(python::TqdmProgressBarManager::new())
} else {
Box::new(IndicatifProgressBarManager::new())
Arc::new(IndicatifProgressBarManager::new())
}
}

#[cfg(not(feature = "python"))]
{
Box::new(IndicatifProgressBarManager::new())
Arc::new(IndicatifProgressBarManager::new())
}
}

Expand Down Expand Up @@ -215,7 +216,7 @@ mod python {
}
}

#[derive(Clone)]
#[derive(Clone, Debug)]
pub struct TqdmProgressBarManager {
inner: Arc<PyObject>,
}
Expand Down
147 changes: 117 additions & 30 deletions src/daft-local-execution/src/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use {
use crate::{
channel::{create_channel, Receiver},
pipeline::{physical_plan_to_pipeline, viz_pipeline},
progress_bar::make_progress_bar_manager,
progress_bar::{make_progress_bar_manager, ProgressBarManager},
resource_manager::get_or_init_memory_manager,
Error, ExecutionRuntimeContext,
};
Expand Down Expand Up @@ -65,22 +65,18 @@ pub struct PyNativeExecutor {
#[cfg(feature = "python")]
#[pymethods]
impl PyNativeExecutor {
#[staticmethod]
pub fn from_logical_plan_builder(
logical_plan_builder: &PyLogicalPlanBuilder,
py: Python,
) -> PyResult<Self> {
py.allow_threads(|| {
Ok(Self {
executor: NativeExecutor::from_logical_plan_builder(&logical_plan_builder.builder)?,
})
})
#[new]
pub fn new() -> Self {
Self {
executor: NativeExecutor::new(),
}
}

#[pyo3(signature = (psets, cfg, results_buffer_size=None))]
#[pyo3(signature = (logical_plan_builder, psets, cfg, results_buffer_size=None))]
pub fn run<'a>(
&self,
py: Python<'a>,
logical_plan_builder: &PyLogicalPlanBuilder,
psets: HashMap<String, Vec<PyMicroPartition>>,
cfg: PyDaftExecutionConfig,
results_buffer_size: Option<usize>,
Expand All @@ -103,7 +99,12 @@ impl PyNativeExecutor {
let psets = InMemoryPartitionSetCache::new(&native_psets);
let out = py.allow_threads(|| {
self.executor
.run(&psets, cfg.config, results_buffer_size)
.run(
&logical_plan_builder.builder,
&psets,
cfg.config,
results_buffer_size,
)
.map(|res| res.into_iter())
})?;
let iter = Box::new(out.map(|part| {
Expand All @@ -119,37 +120,123 @@ impl PyNativeExecutor {
}
}

#[derive(Debug, Clone)]
pub struct NativeExecutor {
local_physical_plan: Arc<LocalPhysicalPlan>,
cancel: CancellationToken,
runtime: Option<Arc<tokio::runtime::Runtime>>,
pb_manager: Option<Arc<dyn ProgressBarManager>>,
}

impl Default for NativeExecutor {
fn default() -> Self {
Self {
cancel: CancellationToken::new(),
runtime: None,
pb_manager: None,
}
}
}
impl NativeExecutor {
pub fn from_logical_plan_builder(
logical_plan_builder: &LogicalPlanBuilder,
) -> DaftResult<Self> {
let logical_plan = logical_plan_builder.build();
let local_physical_plan = translate(&logical_plan)?;
pub fn new() -> Self {
Self::default()
}

Ok(Self {
local_physical_plan,
cancel: CancellationToken::new(),
})
pub fn with_runtime(mut self, runtime: tokio::runtime::Runtime) -> Self {
self.runtime = Some(Arc::new(runtime));
self
}

pub fn with_progress_bar_manager(mut self, pb_manager: Arc<dyn ProgressBarManager>) -> Self {
self.pb_manager = Some(pb_manager);
self
}

pub fn run(
&self,
logical_plan_builder: &LogicalPlanBuilder,
psets: &(impl PartitionSetCache<MicroPartitionRef, Arc<MicroPartitionSet>> + ?Sized),
cfg: Arc<DaftExecutionConfig>,
results_buffer_size: Option<usize>,
) -> DaftResult<ExecutionEngineResult> {
run_local(
&self.local_physical_plan,
psets,
cfg,
results_buffer_size,
self.cancel.clone(),
)
let logical_plan = logical_plan_builder.build();
let physical_plan = translate(&logical_plan)?;
refresh_chrome_trace();
let cancel = self.cancel.clone();
let pipeline = physical_plan_to_pipeline(&physical_plan, psets, &cfg)?;
let (tx, rx) = create_channel(results_buffer_size.unwrap_or(1));
let rt = self.runtime.clone();

let handle = std::thread::spawn(move || {
let pb_manager = should_enable_progress_bar().then(make_progress_bar_manager);
let runtime = rt.unwrap_or_else(|| {
Arc::new(
tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.expect("Failed to create tokio runtime"),
)
});
let execution_task = async {
let memory_manager = get_or_init_memory_manager();
let mut runtime_handle = ExecutionRuntimeContext::new(
cfg.default_morsel_size,
memory_manager.clone(),
pb_manager,
);
let receiver = pipeline.start(true, &mut runtime_handle)?;

while let Some(val) = receiver.recv().await {
if tx.send(val).await.is_err() {
break;
}
}

while let Some(result) = runtime_handle.join_next().await {
match result {
Ok(Err(e)) => {
runtime_handle.shutdown().await;
return DaftResult::Err(e.into());
}
Err(e) => {
runtime_handle.shutdown().await;
return DaftResult::Err(Error::JoinError { source: e }.into());
}
_ => {}
}
}
if should_enable_explain_analyze() {
let curr_ms = SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("Time went backwards")
.as_millis();
let file_name = format!("explain-analyze-{curr_ms}-mermaid.md");
let mut file = File::create(file_name)?;
writeln!(file, "```mermaid\n{}\n```", viz_pipeline(pipeline.as_ref()))?;
}
Ok(())
};

let local_set = tokio::task::LocalSet::new();
local_set.block_on(&runtime, async {
tokio::select! {
biased;
() = cancel.cancelled() => {
log::info!("Execution engine cancelled");
Ok(())
}
_ = tokio::signal::ctrl_c() => {
log::info!("Received Ctrl-C, shutting down execution engine");
Ok(())
}
result = execution_task => result,
}
})
});

Ok(ExecutionEngineResult {
handle,
receiver: rx,
})
}
}

Expand Down

0 comments on commit f122145

Please sign in to comment.