Skip to content
This repository has been archived by the owner on Jun 6, 2024. It is now read-only.

Commit

Permalink
Merge branch 'cleanup'
Browse files Browse the repository at this point in the history
  • Loading branch information
CPKevin2002 committed May 2, 2024
2 parents 7bef080 + 5dafdf5 commit 3dcceb6
Show file tree
Hide file tree
Showing 11 changed files with 197 additions and 334 deletions.
2 changes: 0 additions & 2 deletions src/bin/mock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,6 @@ async fn main() {

const CONFIG_PATH: &str = concat!(env!("CARGO_MANIFEST_DIR"), "/executors.toml");
const CATALOG_PATH: &str = concat!(env!("CARGO_MANIFEST_DIR"), "/test_data");
const LOG_PATH: &str = concat!(env!("CARGO_MANIFEST_DIR"), "/executor_logs");
const POLL_INTERVAL: Duration = Duration::from_millis(100);

// creates server, executors, and the frontend
Expand Down Expand Up @@ -165,7 +164,6 @@ pub async fn run_single_query(
drop(frontend_lock);
return Ok(());
}
unreachable!();
}

async fn interactive_mode() {
Expand Down
3 changes: 1 addition & 2 deletions src/executor_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,13 @@
use crate::composable_database::scheduler_api_client::SchedulerApiClient;
use crate::composable_database::QueryStatus::InProgress;
use crate::composable_database::{NotifyTaskStateArgs, NotifyTaskStateRet, QueryStatus, TaskId};
use crate::composable_database::{NotifyTaskStateArgs, NotifyTaskStateRet, QueryStatus};
use crate::frontend::JobInfo;
use crate::intermediate_results::{insert_results, rewrite_query, TaskKey};
use crate::mock_catalog::load_catalog;
use crate::mock_executor::MockExecutor;
use chrono::Utc;
use datafusion::execution::context::SessionContext;
use datafusion::physical_plan::ExecutionPlan;
use datafusion_proto::bytes::physical_plan_from_bytes;
use std::path::Path;
use std::path::PathBuf;
Expand Down
4 changes: 2 additions & 2 deletions src/frontend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ impl MockFrontend {
let existing_value = self.jobs.insert(
query_id,
JobInfo {
query_id: query_id,
query_id,
submitted_at: Utc::now(),
sql_string: sql_string.to_string(),
result: None,
Expand Down Expand Up @@ -280,7 +280,7 @@ impl MockFrontend {
// eprintln!("Polling!");
assert!(self.scheduler_api_client.is_some());

let mut client = self.scheduler_api_client.as_mut().unwrap();
let client = self.scheduler_api_client.as_mut().unwrap();

let keys: Vec<u64> = self.jobs.keys().cloned().collect();
for query_id in keys {
Expand Down
12 changes: 5 additions & 7 deletions src/integration_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,21 +8,18 @@ use crate::server::SchedulerService;
use datafusion::arrow::array::RecordBatch;
use datafusion::error::DataFusionError;
use datafusion::logical_expr::{col, Expr};
use datafusion::prelude::{concat, SessionContext};
use serde::{Deserialize, Serialize};
use datafusion::prelude::SessionContext;
use std::sync::Arc;
use tokio::sync::Mutex;
use tonic::transport::Server;

pub struct IntegrationTest {
catalog_path: String,
config_path: String,
ctx: Arc<SessionContext>,
config: Config,
pub frontend: Arc<Mutex<MockFrontend>>,
}

const CONFIG_PATH: &str = concat!(env!("CARGO_MANIFEST_DIR"), "/executors.toml");
pub const CATALOG_PATH: &str = concat!(env!("CARGO_MANIFEST_DIR"), "/test_data");
const LOG_PATH: &str = concat!(env!("CARGO_MANIFEST_DIR"), "/executor_logs");

Expand All @@ -37,7 +34,6 @@ impl IntegrationTest {
ctx,
config,
catalog_path,
config_path,
frontend: Arc::new(Mutex::new(frontend)),
}
}
Expand Down Expand Up @@ -195,20 +191,22 @@ impl IntegrationTest {
mod tests {
use crate::integration_test::IntegrationTest;
use crate::parser::ExecutionPlanParser;
// use crate::CATALOG_PATH;
use super::*;
use datafusion::arrow::array::{Int32Array, RecordBatch};
use datafusion::arrow::datatypes::{DataType, Field, Schema};
use std::path::PathBuf;
use std::sync::Arc;
use tokio::fs;

const CONFIG_PATH: &str = concat!(env!("CARGO_MANIFEST_DIR"), "/executors.toml");

async fn initialize_integration_test() -> IntegrationTest {
let catalog_path = concat!(env!("CARGO_MANIFEST_DIR"), "/test_data");
let config_path = concat!(env!("CARGO_MANIFEST_DIR"), "/executors.toml");
let config_path = CONFIG_PATH;
IntegrationTest::new(catalog_path.to_string(), config_path.to_string()).await
}

#[allow(dead_code)]
pub async fn get_all_tpch_queries_test() -> Vec<String> {
let parser = ExecutionPlanParser::new(CATALOG_PATH).await;
let mut res = Vec::new();
Expand Down
2 changes: 1 addition & 1 deletion src/mock_optimizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

use crate::mock_catalog::load_catalog;
use datafusion::error::DataFusionError;
use datafusion::execution::context::{SessionContext, SessionState};
use datafusion::execution::context::SessionContext;
use datafusion::logical_expr::LogicalPlan;
use datafusion::physical_plan::ExecutionPlan;
use std::sync::Arc;
Expand Down
4 changes: 1 addition & 3 deletions src/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,14 @@
use crate::mock_catalog::load_catalog;
use datafusion::{
arrow::{
array::{RecordBatch, RecordBatchReader},
array::RecordBatch,
ipc::{reader::FileReader, writer::FileWriter},
},
error::{DataFusionError, Result},
execution::context::SessionContext,
physical_plan::ExecutionPlan,
physical_planner::PhysicalPlanner,
};
use datafusion_proto::bytes::{physical_plan_from_bytes, physical_plan_to_bytes};
use futures::TryFutureExt;
use sqlparser::{dialect::GenericDialect, parser::Parser};
use std::{fmt, io::Cursor, sync::Arc};
use tokio::{fs::File, io::AsyncReadExt};
Expand Down
27 changes: 17 additions & 10 deletions src/query_graph.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,20 @@
#![allow(dead_code)]
use crate::composable_database::{QueryStatus, TaskId};
use crate::task::{Task, TaskStatus};
use crate::composable_database::QueryStatus;
use crate::task::Task;
use crate::task_queue::TaskQueue;
use datafusion::arrow::datatypes::Schema;
use datafusion::common::JoinSide;
use datafusion::physical_plan::aggregates::AggregateExec;
use datafusion::physical_plan::joins::{
CrossJoinExec, HashJoinExec, NestedLoopJoinExec, SortMergeJoinExec, SymmetricHashJoinExec,
};
use datafusion::physical_plan::limit::GlobalLimitExec;
// use datafusion::physical_plan::joins::{
// use datafusion::physical_plan::aggregates::AggregateExec;
// CrossJoinExec, HashJoinExec, NestedLoopJoinExec, SortMergeJoinExec, SymmetricHashJoinExec,
// };
// use datafusion::physical_plan::limit::GlobalLimitExec;
// use datafusion::physical_plan::sorts::sort::SortExec;
use datafusion::physical_plan::placeholder_row::PlaceholderRowExec;
use datafusion::physical_plan::sorts::sort::SortExec;
use datafusion::physical_plan::{with_new_children_if_necessary, ExecutionPlan};
use std::collections::HashMap;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use std::time::Duration;

// TODO Change to Waiting, Ready, Running(vec[taskid]), Finished(vec[locations?])
#[derive(Clone, Debug, Default)]
Expand Down Expand Up @@ -48,10 +48,11 @@ pub struct QueryGraph {
tid_counter: AtomicU64, // TODO: add mutex to stages and make elements pointers to avoid copying
pub stages: Vec<QueryStage>, // Can be a vec since all stages in a query are enumerated from 0.
task_queue: TaskQueue, // Ready tasks in this graph
pub time: Duration,
}

impl QueryGraph {
pub async fn new(query_id: u64, plan: Arc<dyn ExecutionPlan>) -> Self {
pub fn new(query_id: u64, plan: Arc<dyn ExecutionPlan>) -> Self {
// Build stages.
let mut builder = GraphBuilder::new();
let stages = builder.build(plan.clone());
Expand All @@ -63,6 +64,7 @@ impl QueryGraph {
tid_counter: AtomicU64::new(0),
stages,
task_queue: TaskQueue::new(),
time: Duration::new(0, 0),
};

// Build tasks for leaf stages.
Expand Down Expand Up @@ -121,6 +123,7 @@ impl QueryGraph {
let outputs = stage.outputs.clone();

if outputs.is_empty() {
println!("QueryGraph::update_stage_status: Query {} is done.", self.query_id);
self.status = QueryStatus::Done;
return Ok(());
}
Expand Down Expand Up @@ -161,6 +164,10 @@ impl QueryGraph {
}

// fn build_tasks(&mut self)
pub fn get_plan(&self, stage_id: u64) -> Arc<dyn ExecutionPlan> {
let plan = self.stages[stage_id as usize].plan.clone();
plan
}
}

#[derive(Clone, Debug)]
Expand Down
Loading

0 comments on commit 3dcceb6

Please sign in to comment.