Skip to content
This repository has been archived by the owner on Jun 6, 2024. It is now read-only.

Commit

Permalink
Merge branch 'consolidate-ds' into main
Browse files Browse the repository at this point in the history
  • Loading branch information
Makoto-Tomokiyo committed May 2, 2024
2 parents 72d54a9 + 4cc7277 commit d6da98e
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 0 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,5 @@ test_data/
**/*.tbl

executor_logs/*

job_summary.json
62 changes: 62 additions & 0 deletions src/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -361,4 +361,66 @@ mod tests {
assert!(queue.lock().await.size() == 0);
println!("Finished {:?} tasks.", nplans);
}

// Test correctness of stride scheduling algorithm.
#[tokio::test]
async fn test_stride() {
let test_sql_dir = concat!(env!("CARGO_MANIFEST_DIR"), "/test_sql/");
let catalog_path = concat!(env!("CARGO_MANIFEST_DIR"), "/test_data/");
let mut queue = Box::new(Queue::new(Arc::new(Notify::new())));
let parser = ExecutionPlanParser::new(catalog_path).await;
println!("test_queue_conc: Testing files in {}", test_sql_dir);

// Generate list of all tpch plans
let mut physical_plans = Vec::new();
for file in fs::read_dir(test_sql_dir).unwrap() {
let path_buf = file.unwrap().path();
let path = path_buf.to_str().unwrap();
physical_plans.extend(parser.get_execution_plan_from_file(&path).await.unwrap());
}

println!("Got {} plans.", physical_plans.len());
let mut graphs = Vec::new();

// Generate list of plans with at least `rounds` stages
let rounds = 5;
let mut qid = 0;
for plan in physical_plans {
let graph = QueryGraph::new(qid, Arc::clone(&plan)).await;
if graph.stages.len() >= rounds {
graphs.push(graph);
qid += 1;
}
}
let ngraphs = graphs.len();

// Add a bunch of queries with staggered submission time
let start_enqueue = SystemTime::now();
for graph in graphs {
queue
.add_query(graph.query_id, Arc::new(Mutex::new(graph)))
.await;
sleep(Duration::from_millis(10)).await;
}
let enq_time = SystemTime::now().duration_since(start_enqueue).unwrap();

// Ensure correct order of queue
for rnd in 0..rounds {
for i in 0..ngraphs {
let task = queue.next_task().await.unwrap();
// Queries should be processed in order
assert_eq!(task.query_id, i as u64);
// "process" for at least as long as (max_pass - min_pass)
sleep(enq_time).await;
// Return task; update query's pass
queue.remove_task(task, StageStatus::Finished(0)).await;
println!("(Round {}) Query {}/{} ok.", rnd + 1, i + 1, ngraphs);
}
}
// println!("Queued {} queries.", qid.lock().await.load(Ordering::SeqCst));
// make sure no more tasks remain
assert!(Arc::clone(&queue).lock().await.next_task().await.is_none());
assert!(queue.lock().await.size() == 0);
println!("Finished {:?} tasks.", nplans);
}
}

0 comments on commit d6da98e

Please sign in to comment.