Skip to content

Commit

Permalink
Fix explain source and sink type (#2374)
Browse files Browse the repository at this point in the history
### What problem does this PR solve?

1. Add source and sink type in explain
2. Fix physical match source type from empty to table

### Type of change

- [x] Bug Fix (non-breaking change which fixes an issue)

---------

Signed-off-by: Jin Hai <[email protected]>
  • Loading branch information
JinHai-CN authored Dec 16, 2024
1 parent 995ee04 commit 11879c1
Show file tree
Hide file tree
Showing 8 changed files with 68 additions and 18 deletions.
6 changes: 3 additions & 3 deletions src/common/utility/exception.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,12 +66,12 @@ void UnrecoverableError(const String &message, const char *file_name, u32 line)
// String error_msg = cleanup_tracer->GetCleanupInfo();
// LOG_ERROR(std::move(error_msg));
// }
String location_message = fmt::format("{}@{}:{}", message, infinity::TrimPath(file_name), line);
if (IS_LOGGER_INITIALIZED()) {
LOG_CRITICAL(message);
PrintStacktrace(location_message);
}
Logger::Flush();
PrintStacktrace(message);
throw UnrecoverableException(fmt::format("{}@{}:{}", message, infinity::TrimPath(file_name), line));
throw UnrecoverableException(location_message);
}

#else
Expand Down
16 changes: 8 additions & 8 deletions src/executor/explain_physical_plan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1227,10 +1227,10 @@ void ExplainPhysicalPlan::Explain(const PhysicalShow *show_node, SharedPtr<Vecto
case ShowStmtType::kIndexChunk: {
String show_str;
if (intent_size != 0) {
show_str = String(intent_size - 2, ' ');
show_str += "-> SHOW INDEX CHUNK";
show_str = String(intent_size - 2, ' ');
show_str += "-> SHOW INDEX CHUNK";
} else {
show_str = "SHOW INDEX CHUNK";
show_str = "SHOW INDEX CHUNK";
}
show_str += "(";
show_str += std::to_string(show_node->node_id());
Expand Down Expand Up @@ -1673,10 +1673,10 @@ void ExplainPhysicalPlan::Explain(const PhysicalShow *show_node, SharedPtr<Vecto
case ShowStmtType::kDeltaLogs: {
String show_str;
if (intent_size != 0) {
show_str = String(intent_size - 2, ' ');
show_str += "-> SHOW DELTA LOGS ";
show_str = String(intent_size - 2, ' ');
show_str += "-> SHOW DELTA LOGS ";
} else {
show_str = "SHOW DELTA LOGS ";
show_str = "SHOW DELTA LOGS ";
}
show_str += "(";
show_str += std::to_string(show_node->node_id());
Expand Down Expand Up @@ -2180,7 +2180,7 @@ void ExplainPhysicalPlan::Explain(const PhysicalSource *source_node,
} else {
explain_header_str = "SOURCE ";
}
explain_header_str += "(" + std::to_string(source_node->node_id()) + ")";
explain_header_str += "(" + std::to_string(source_node->node_id()) + ") " + ToString(source_node->source_type());
result->emplace_back(MakeShared<String>(explain_header_str));
}

Expand All @@ -2191,7 +2191,7 @@ void ExplainPhysicalPlan::Explain(const PhysicalSink *sink_node, SharedPtr<Vecto
} else {
explain_header_str = "SINK ";
}
explain_header_str += "(" + std::to_string(sink_node->node_id()) + ")";
explain_header_str += "(" + std::to_string(sink_node->node_id()) + ") " + ToString(sink_node->sink_type());
result->emplace_back(MakeShared<String>(explain_header_str));
}

Expand Down
11 changes: 10 additions & 1 deletion src/executor/fragment_builder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,6 @@ void FragmentBuilder::BuildFragments(PhysicalOperator *phys_op, PlanFragment *cu
case PhysicalOperatorType::kInsert:
case PhysicalOperatorType::kImport:
case PhysicalOperatorType::kExport:
case PhysicalOperatorType::kMatch:
case PhysicalOperatorType::kReadCache: {
current_fragment_ptr->AddOperator(phys_op);
if (phys_op->left() != nullptr or phys_op->right() != nullptr) {
Expand Down Expand Up @@ -261,6 +260,16 @@ void FragmentBuilder::BuildFragments(PhysicalOperator *phys_op, PlanFragment *cu
String error_message = fmt::format("Not support {}.", phys_op->GetName());
UnrecoverableError(error_message);
}
case PhysicalOperatorType::kMatch: {
current_fragment_ptr->AddOperator(phys_op);
if (phys_op->left() != nullptr or phys_op->right() != nullptr) {
String error_message = fmt::format("{} shouldn't have child.", phys_op->GetName());
UnrecoverableError(error_message);
}
current_fragment_ptr->SetFragmentType(FragmentType::kSerialMaterialize);
current_fragment_ptr->SetSourceNode(query_context_ptr_, SourceType::kTable, phys_op->GetOutputNames(), phys_op->GetOutputTypes());
return;
}
case PhysicalOperatorType::kMatchSparseScan:
case PhysicalOperatorType::kMatchTensorScan:
case PhysicalOperatorType::kKnnScan: {
Expand Down
17 changes: 17 additions & 0 deletions src/executor/operator/physical_sink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,23 @@ import column_def;

namespace infinity {

String ToString(SinkType sink_type) {
switch (sink_type) {
case SinkType::kInvalid: {
return "Invalid";
}
case SinkType::kLocalQueue: {
return "LocalQueue";
}
case SinkType::kRemote: {
return "Remote";
}
case SinkType::kResult: {
return "Result";
}
}
}

void PhysicalSink::Init() {}

bool PhysicalSink::Execute(QueryContext *, OperatorState *) { return true; }
Expand Down
2 changes: 2 additions & 0 deletions src/executor/operator/physical_sink.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ export enum class SinkType {
kResult,
};

export String ToString(SinkType sink_type);

export class PhysicalSink final : public PhysicalOperator {
public:
explicit PhysicalSink(u64 id, SinkType sink_type, SharedPtr<Vector<String>> names, SharedPtr<Vector<SharedPtr<DataType>>> types, SharedPtr<Vector<LoadMeta>> load_metas)
Expand Down
20 changes: 20 additions & 0 deletions src/executor/operator/physical_source.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,26 @@ import logger;

namespace infinity {

String ToString(SourceType source_type) {
switch (source_type) {
case SourceType::kInvalid: {
return "Invalid";
}
case SourceType::kTable: {
return "Table";
}
case SourceType::kLocalQueue: {
return "LocalQueue";
}
case SourceType::kEmpty: {
return "Empty";
}
case SourceType::kRemote: {
return "Remote";
}
}
}

void PhysicalSource::Init() {}

bool PhysicalSource::Execute(QueryContext *, OperatorState *) { return true; }
Expand Down
2 changes: 2 additions & 0 deletions src/executor/operator/physical_source.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ export enum class SourceType {
kRemote,
};

export String ToString(SourceType source_type);

export class PhysicalSource final : public PhysicalOperator {
public:
explicit PhysicalSource(u64 id,
Expand Down
12 changes: 6 additions & 6 deletions test/sql/dql/knn/tensor/fusion_rerank_maxsim.slt
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ query I
EXPLAIN FRAGMENT SELECT title, SCORE() FROM sqllogic_fusion_rerank_maxsim SEARCH MATCH TEXT ('body', 'off', 'topn=4'), MATCH TENSOR (t, [1.0, 0.0, 0.0, 0.0], 'float', 'maxsim', 'topn=2'), FUSION('rrf'), FUSION('match_tensor', 'column_name=t;search_tensor=[[0.0, -10.0, 0.0, 0.7], [9.2, 45.6, -55.8, 3.5]];tensor_data_type=float;match_method=MaxSim;topn=2');
----
FRAGMENT (1)
-> SINK (9)
-> SINK (9) Result
-> PROJECT (6)
- table index: #4
- expressions: [title (#0), SCORE (#1)]
Expand All @@ -119,22 +119,22 @@ FRAGMENT (1)
-> FUSION (4)
- fusion: #FUSION('rrf', '')
- output columns: [__score, __rowid]
-> SOURCE (10): FRAGMENT #2, FRAGMENT #3
-> SOURCE (10) LocalQueue: FRAGMENT #2, FRAGMENT #3
(empty)
FRAGMENT (2)
-> SINK (11)
-> SINK (11) LocalQueue
-> MATCH (2)
- table name: sqllogic_fusion_rerank_maxsim(default_db.sqllogic_fusion_rerank_maxsim)
- table index: #1
- match expression: MATCH TEXT ('body', 'off', 'topn=4')
- index filter: None
- leftover filter: None
- output columns: [__score, __rowid]
-> SOURCE (12)
-> SOURCE (12) Table
(empty)
(empty)
FRAGMENT (3)
-> SINK (13)
-> SINK (13) LocalQueue
-> MatchTensorScan (3)
- table name: sqllogic_fusion_rerank_maxsim(default_db.sqllogic_fusion_rerank_maxsim)
- table index: #1
Expand All @@ -143,7 +143,7 @@ FRAGMENT (3)
- index filter: None
- leftover filter: None
- output columns: [__score, __rowid]
-> SOURCE (14)
-> SOURCE (14) Table
(empty)
(empty)

Expand Down

0 comments on commit 11879c1

Please sign in to comment.