Skip to content

Commit

Permalink
Add proper handling of deleted return values for query procedures and…
Browse files Browse the repository at this point in the history
… functions ran in analytical mode (#400)

Co-authored-by: Ante Pušić <[email protected]>
  • Loading branch information
Darych and antepusic authored Dec 7, 2023
1 parent ca4e739 commit 085bd27
Show file tree
Hide file tree
Showing 56 changed files with 399 additions and 140 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ env:
MEMGRAPH_PORT: 7687
NEO4J_PORT: 7688
NEO4J_CONTAINER: "neo4j_test"
OFFICIAL: "true"
OFFICIAL: "false"

on: [pull_request, workflow_dispatch]

Expand All @@ -16,7 +16,7 @@ jobs:
runs-on: ubuntu-latest
continue-on-error: True
env:
MEMGRAPH_VERSION: 2.12.1
MEMGRAPH_VERSION: 2.13
strategy:
matrix:
architecture: ["amd64", "arm64"]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,13 @@ constexpr char const *kArgumentThreads = "threads";

void InsertBCRecord(mgp_graph *graph, mgp_result *result, mgp_memory *memory, const double betweenness_centrality,
const int node_id) {
auto *node = mg_utility::GetNodeForInsertion(node_id, graph, memory);
if (!node) return;

auto *record = mgp::result_new_record(result);
if (record == nullptr) throw mg_exception::NotEnoughMemoryException();

mg_utility::InsertNodeValueResult(graph, record, kFieldNode, node_id, memory);
mg_utility::InsertNodeValueResult(record, kFieldNode, node, memory);
mg_utility::InsertDoubleValueResult(record, kFieldBCScore, betweenness_centrality, memory);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,13 @@ bool ConnectedVia(std::uint64_t node_id, std::pair<std::uint64_t, std::uint64_t>

void InsertOnlineBCRecord(mgp_graph *graph, mgp_result *result, mgp_memory *memory, const std::uint64_t node_id,
double bc_score) {
auto *node = mg_utility::GetNodeForInsertion(node_id, graph, memory);
if (!node) return;

auto *record = mgp::result_new_record(result);
if (record == nullptr) throw mg_exception::NotEnoughMemoryException();

mg_utility::InsertNodeValueResult(graph, record, kFieldNode, node_id, memory);
mg_utility::InsertNodeValueResult(record, kFieldNode, node, memory);
mg_utility::InsertDoubleValueResult(record, kFieldBCScore, bc_score, memory);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,18 @@ const char *fieldNodeTo = "node_to";

void InsertBiconnectedComponentRecord(mgp_graph *graph, mgp_result *result, mgp_memory *memory, const int bcc_id,
const int edge_id, const int node_from_id, const int node_to_id) {
auto *node_from = mg_utility::GetNodeForInsertion(node_from_id, graph, memory);
auto *node_to = mg_utility::GetNodeForInsertion(node_to_id, graph, memory);
if (!node_from || !node_to) return;

auto *record = mgp::result_new_record(result);
if (record == nullptr) throw mg_exception::NotEnoughMemoryException();

mg_utility::InsertIntValueResult(record, fieldBiconnectedComponentID, bcc_id, memory);
// TODO: Implement edge getting function
// mg_utility::InsertIntValueResult(record, fieldEdgeID, edge_id, memory);
mg_utility::InsertNodeValueResult(graph, record, fieldNodeFrom, node_from_id, memory);
mg_utility::InsertNodeValueResult(graph, record, fieldNodeTo, node_to_id, memory);
mg_utility::InsertNodeValueResult(record, fieldNodeFrom, node_from, memory);
mg_utility::InsertNodeValueResult(record, fieldNodeTo, node_to, memory);
}

void GetBiconnectedComponents(mgp_list *args, mgp_graph *memgraph_graph, mgp_result *result, mgp_memory *memory) {
Expand Down
9 changes: 7 additions & 2 deletions cpp/bridges_module/bridges_module.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,15 @@ const char *k_field_node_to = "node_to";

void InsertBridgeRecord(mgp_graph *graph, mgp_result *result, mgp_memory *memory, const std::uint64_t node_from_id,
const std::uint64_t node_to_id) {
auto *node_from = mg_utility::GetNodeForInsertion(node_from_id, graph, memory);
auto *node_to = mg_utility::GetNodeForInsertion(node_to_id, graph, memory);
if (!node_from || !node_to) return;

auto *record = mgp::result_new_record(result);
if (record == nullptr) throw mg_exception::NotEnoughMemoryException();

mg_utility::InsertNodeValueResult(graph, record, k_field_node_from, node_from_id, memory);
mg_utility::InsertNodeValueResult(graph, record, k_field_node_to, node_to_id, memory);
mg_utility::InsertNodeValueResult(record, k_field_node_from, node_from, memory);
mg_utility::InsertNodeValueResult(record, k_field_node_to, node_to, memory);
}

void GetBridges(mgp_list *args, mgp_graph *memgraph_graph, mgp_result *result, mgp_memory *memory) {
Expand Down
36 changes: 18 additions & 18 deletions cpp/collections_module/algorithm/collections.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ void Collections::SetResult(mgp::Result &result, const mgp::Value &value) {
}

void Collections::SumLongs(mgp_list *args, mgp_func_context *ctx, mgp_func_result *res, mgp_memory *memory) {
mgp::MemoryDispatcherGuard guard{memory};;
mgp::MemoryDispatcherGuard guard{memory};
const auto arguments = mgp::List(args);
auto result = mgp::Result(res);

Expand All @@ -67,7 +67,7 @@ void Collections::SumLongs(mgp_list *args, mgp_func_context *ctx, mgp_func_resul
}

void Collections::Avg(mgp_list *args, mgp_func_context *ctx, mgp_func_result *res, mgp_memory *memory) {
mgp::MemoryDispatcherGuard guard{memory};;
mgp::MemoryDispatcherGuard guard{memory};
const auto arguments = mgp::List(args);
auto result = mgp::Result(res);

Expand All @@ -94,7 +94,7 @@ void Collections::Avg(mgp_list *args, mgp_func_context *ctx, mgp_func_result *re
}

void Collections::ContainsAll(mgp_list *args, mgp_func_context *ctx, mgp_func_result *res, mgp_memory *memory) {
mgp::MemoryDispatcherGuard guard{memory};;
mgp::MemoryDispatcherGuard guard{memory};
const auto arguments = mgp::List(args);
auto result = mgp::Result(res);

Expand All @@ -104,7 +104,7 @@ void Collections::ContainsAll(mgp_list *args, mgp_func_context *ctx, mgp_func_re

const auto list2{arguments[1].ValueList()};

std::unordered_set<mgp::Value> values(list2.begin(), list2.end());;
std::unordered_set<mgp::Value> values(list2.begin(), list2.end());

result.SetValue(std::all_of(values.begin(), values.end(), [&set](const auto &x) { return set.contains(x); }));

Expand All @@ -115,7 +115,7 @@ void Collections::ContainsAll(mgp_list *args, mgp_func_context *ctx, mgp_func_re
}

void Collections::Intersection(mgp_list *args, mgp_func_context *ctx, mgp_func_result *res, mgp_memory *memory) {
mgp::MemoryDispatcherGuard guard{memory};;
mgp::MemoryDispatcherGuard guard{memory};
const auto arguments = mgp::List(args);
auto result = mgp::Result(res);

Expand Down Expand Up @@ -147,7 +147,7 @@ void Collections::Intersection(mgp_list *args, mgp_func_context *ctx, mgp_func_r
}

void Collections::RemoveAll(mgp_list *args, mgp_func_context *ctx, mgp_func_result *res, mgp_memory *memory) {
mgp::MemoryDispatcherGuard guard{memory};;
mgp::MemoryDispatcherGuard guard{memory};
const auto arguments = mgp::List(args);
auto result = mgp::Result(res);

Expand Down Expand Up @@ -181,7 +181,7 @@ void Collections::RemoveAll(mgp_list *args, mgp_func_context *ctx, mgp_func_resu
}

void Collections::Sum(mgp_list *args, mgp_func_context *ctx, mgp_func_result *res, mgp_memory *memory) {
mgp::MemoryDispatcherGuard guard{memory};;
mgp::MemoryDispatcherGuard guard{memory};
const auto arguments = mgp::List(args);
auto result = mgp::Result(res);
try {
Expand All @@ -204,7 +204,7 @@ void Collections::Sum(mgp_list *args, mgp_func_context *ctx, mgp_func_result *re
}

void Collections::Union(mgp_list *args, mgp_func_context *ctx, mgp_func_result *res, mgp_memory *memory) {
mgp::MemoryDispatcherGuard guard{memory};;
mgp::MemoryDispatcherGuard guard{memory};
const auto arguments = mgp::List(args);
auto result = mgp::Result(res);
try {
Expand Down Expand Up @@ -249,7 +249,7 @@ void Collections::Union(mgp_list *args, mgp_func_context *ctx, mgp_func_result *
}

void Collections::Sort(mgp_list *args, mgp_func_context *ctx, mgp_func_result *res, mgp_memory *memory) {
mgp::MemoryDispatcherGuard guard{memory};;
mgp::MemoryDispatcherGuard guard{memory};
const auto arguments = mgp::List(args);
auto result = mgp::Result(res);
try {
Expand All @@ -271,7 +271,7 @@ void Collections::Sort(mgp_list *args, mgp_func_context *ctx, mgp_func_result *r
}

void Collections::ContainsSorted(mgp_list *args, mgp_func_context *ctx, mgp_func_result *res, mgp_memory *memory) {
mgp::MemoryDispatcherGuard guard{memory};;
mgp::MemoryDispatcherGuard guard{memory};
const auto arguments = mgp::List(args);
auto result = mgp::Result(res);
try {
Expand Down Expand Up @@ -304,7 +304,7 @@ void Collections::ContainsSorted(mgp_list *args, mgp_func_context *ctx, mgp_func
}

void Collections::Max(mgp_list *args, mgp_func_context *ctx, mgp_func_result *res, mgp_memory *memory) {
mgp::MemoryDispatcherGuard guard{memory};;
mgp::MemoryDispatcherGuard guard{memory};
const auto arguments = mgp::List(args);
auto result = mgp::Result(res);
try {
Expand All @@ -331,7 +331,7 @@ void Collections::Max(mgp_list *args, mgp_func_context *ctx, mgp_func_result *re
}

void Collections::Split(mgp_list *args, mgp_graph *memgraph_graph, mgp_result *result, mgp_memory *memory) {
mgp::MemoryDispatcherGuard guard{memory};;
mgp::MemoryDispatcherGuard guard{memory};

const auto arguments = mgp::List(args);
const auto record_factory = mgp::RecordFactory(result);
Expand Down Expand Up @@ -370,7 +370,7 @@ void Collections::Split(mgp_list *args, mgp_graph *memgraph_graph, mgp_result *r
}

void Collections::Pairs(mgp_list *args, mgp_func_context *ctx, mgp_func_result *res, mgp_memory *memory) {
mgp::MemoryDispatcherGuard guard{memory};;
mgp::MemoryDispatcherGuard guard{memory};
const auto arguments = mgp::List(args);
auto result = mgp::Result(res);
try {
Expand Down Expand Up @@ -401,7 +401,7 @@ void Collections::Pairs(mgp_list *args, mgp_func_context *ctx, mgp_func_result *
}

void Collections::Contains(mgp_list *args, mgp_func_context *ctx, mgp_func_result *res, mgp_memory *memory) {
mgp::MemoryDispatcherGuard guard{memory};;
mgp::MemoryDispatcherGuard guard{memory};
const auto arguments = mgp::List(args);
auto result = mgp::Result(res);
try {
Expand Down Expand Up @@ -429,7 +429,7 @@ void Collections::Contains(mgp_list *args, mgp_func_context *ctx, mgp_func_resul
}

void Collections::UnionAll(mgp_list *args, mgp_func_context *ctx, mgp_func_result *res, mgp_memory *memory) {
mgp::MemoryDispatcherGuard guard{memory};;
mgp::MemoryDispatcherGuard guard{memory};
auto arguments = mgp::List(args);
auto result = mgp::Result(res);
try {
Expand All @@ -448,7 +448,7 @@ void Collections::UnionAll(mgp_list *args, mgp_func_context *ctx, mgp_func_resul
}

void Collections::Min(mgp_list *args, mgp_func_context *ctx, mgp_func_result *res, mgp_memory *memory) {
mgp::MemoryDispatcherGuard guard{memory};;
mgp::MemoryDispatcherGuard guard{memory};
const auto arguments = mgp::List(args);
auto result = mgp::Result(res);
try {
Expand Down Expand Up @@ -486,7 +486,7 @@ void Collections::Min(mgp_list *args, mgp_func_context *ctx, mgp_func_result *re
}

void Collections::ToSet(mgp_list *args, mgp_func_context *ctx, mgp_func_result *res, mgp_memory *memory) {
mgp::MemoryDispatcherGuard guard{memory};;
mgp::MemoryDispatcherGuard guard{memory};
auto arguments = mgp::List(args);
auto result = mgp::Result(res);
try {
Expand All @@ -506,7 +506,7 @@ void Collections::ToSet(mgp_list *args, mgp_func_context *ctx, mgp_func_result *
}

void Collections::Partition(mgp_list *args, mgp_graph *memgraph_graph, mgp_result *result, mgp_memory *memory) {
mgp::MemoryDispatcherGuard guard{memory};;
mgp::MemoryDispatcherGuard guard{memory};
auto arguments = mgp::List(args);
const auto record_factory = mgp::RecordFactory(result);
try {
Expand Down
2 changes: 1 addition & 1 deletion cpp/collections_module/collections_module.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

extern "C" int mgp_init_module(struct mgp_module *module, struct mgp_memory *memory) {
try {
mgp::MemoryDispatcherGuard guard{memory};;
mgp::MemoryDispatcherGuard guard{memory};

mgp::AddFunction(Collections::SumLongs, Collections::kProcedureSumLongs,
{mgp::Parameter(Collections::kSumLongsArg1, {mgp::Type::List, mgp::Type::Any})}, module, memory);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
#include <cstdint>
#include <mg_exceptions.hpp>
#include <mg_utils.hpp>

Expand All @@ -24,10 +25,13 @@ const double kDefaultWeight = 1.0;

void InsertLouvainRecord(mgp_graph *graph, mgp_result *result, mgp_memory *memory, const std::uint64_t node_id,
const std::uint64_t community) {
auto *vertex = mg_utility::GetNodeForInsertion(node_id, graph, memory);
if (!vertex) return;

mgp_result_record *record = mgp::result_new_record(result);
if (record == nullptr) throw mg_exception::NotEnoughMemoryException();

mg_utility::InsertNodeValueResult(graph, record, kFieldNode, node_id, memory);
mg_utility::InsertNodeValueResult(record, kFieldNode, vertex, memory);
mg_utility::InsertIntValueResult(record, kFieldCommunity, community, memory);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,24 @@ auto saved_directedness = kDefaultDirected;
auto saved_weightedness = kDefaultWeighted;
std::string saved_weight_property = kDefaultWeightProperty.data();

void WriteResults(const std::uint64_t node_id, const std::int64_t label, const mgp::Graph &graph,
const mgp::RecordFactory &record_factory) {
// As IN_MEMORY_ANALYTICAL doesn’t offer ACID guarantees, check if the graph elements in the result exist
try {
// If so, throw an exception:
const auto node = graph.GetNodeById(mgp::Id::FromUint(node_id));

auto record = record_factory.NewRecord();
record.Insert(kFieldNode.data(), node);
record.Insert(kFieldCommunityId.data(), label);
} catch (const std::exception &) {
// If no node has been found, it’s enough to not write the result
}
}

void Set(mgp_list *args, mgp_graph *memgraph_graph, mgp_result *result, mgp_memory *memory) {
try {
mgp::MemoryDispatcherGuard guard{memory};;
mgp::MemoryDispatcherGuard guard{memory};

const auto graph = mgp::Graph(memgraph_graph);
const auto arguments = mgp::List(args);
Expand All @@ -78,9 +93,7 @@ void Set(mgp_list *args, mgp_graph *memgraph_graph, mgp_result *result, mgp_memo
::initialized = true;

for (const auto [node_id, label] : labels) {
auto record = record_factory.NewRecord();
record.Insert(kFieldNode.data(), graph.GetNodeById(mgp::Id::FromUint(node_id)));
record.Insert(kFieldCommunityId.data(), label);
WriteResults(node_id, label, graph, record_factory);
}
} catch (const std::exception &e) {
return;
Expand All @@ -90,7 +103,7 @@ void Set(mgp_list *args, mgp_graph *memgraph_graph, mgp_result *result, mgp_memo

void Get(mgp_list *args, mgp_graph *memgraph_graph, mgp_result *result, mgp_memory *memory) {
try {
mgp::MemoryDispatcherGuard guard{memory};;
mgp::MemoryDispatcherGuard guard{memory};

const auto graph = mgp::Graph(memgraph_graph);
const auto record_factory = mgp::RecordFactory(result);
Expand All @@ -101,11 +114,11 @@ void Get(mgp_list *args, mgp_graph *memgraph_graph, mgp_result *result, mgp_memo
// Previously-computed communities may contain nodes since deleted
try {
// If so, throw an exception:
const auto maybe_node = graph.GetNodeById(mgp::Id::FromUint(node_id));
const auto node = graph.GetNodeById(mgp::Id::FromUint(node_id));

// Otherwise:
auto record = record_factory.NewRecord();
record.Insert(kFieldNode.data(), graph.GetNodeById(mgp::Id::FromUint(node_id)));
record.Insert(kFieldNode.data(), node);
record.Insert(kFieldCommunityId.data(), label);
} catch (const std::exception &e) {
continue;
Expand All @@ -119,7 +132,7 @@ void Get(mgp_list *args, mgp_graph *memgraph_graph, mgp_result *result, mgp_memo

void Update(mgp_list *args, mgp_graph *memgraph_graph, mgp_result *result, mgp_memory *memory) {
try {
mgp::MemoryDispatcherGuard guard{memory};;
mgp::MemoryDispatcherGuard guard{memory};

const auto graph = mgp::Graph(memgraph_graph);
const auto arguments = mgp::List(args);
Expand Down Expand Up @@ -167,17 +180,13 @@ void Update(mgp_list *args, mgp_graph *memgraph_graph, mgp_result *result, mgp_m
algorithm.UpdateLabels(graph, modified_nodes, modified_relationships, deleted_nodes, deleted_relationships);

for (const auto [node_id, label] : labels) {
auto record = record_factory.NewRecord();
record.Insert(kFieldNode.data(), graph.GetNodeById(mgp::Id::FromUint(node_id)));
record.Insert(kFieldCommunityId.data(), label);
WriteResults(node_id, label, graph, record_factory);
}
} else {
const auto labels = algorithm.SetLabels(graph);

for (const auto [node_id, label] : labels) {
auto record = record_factory.NewRecord();
record.Insert(kFieldNode.data(), graph.GetNodeById(mgp::Id::FromUint(node_id)));
record.Insert(kFieldCommunityId.data(), label);
WriteResults(node_id, label, graph, record_factory);
}
}

Expand All @@ -189,7 +198,7 @@ void Update(mgp_list *args, mgp_graph *memgraph_graph, mgp_result *result, mgp_m

void Reset(mgp_list *args, mgp_graph *memgraph_graph, mgp_result *result, mgp_memory *memory) {
try {
mgp::MemoryDispatcherGuard guard{memory};;
mgp::MemoryDispatcherGuard guard{memory};

const auto record_factory = mgp::RecordFactory(result);

Expand All @@ -203,7 +212,7 @@ void Reset(mgp_list *args, mgp_graph *memgraph_graph, mgp_result *result, mgp_me
auto record = record_factory.NewRecord();
record.Insert(kFieldMessage.data(), "The algorithm has been successfully reset!");
} catch (const std::exception &e) {
mgp::MemoryDispatcherGuard guard{memory};;
mgp::MemoryDispatcherGuard guard{memory};

const auto record_factory = mgp::RecordFactory(result);
auto record = record_factory.NewRecord();
Expand All @@ -216,7 +225,7 @@ void Reset(mgp_list *args, mgp_graph *memgraph_graph, mgp_result *result, mgp_me

extern "C" int mgp_init_module(struct mgp_module *module, struct mgp_memory *memory) {
try {
mgp::MemoryDispatcherGuard guard{memory};;
mgp::MemoryDispatcherGuard guard{memory};

const auto node_list = std::make_pair(mgp::Type::List, mgp::Type::Node);
const auto relationship_list = std::make_pair(mgp::Type::List, mgp::Type::Relationship);
Expand Down
Loading

0 comments on commit 085bd27

Please sign in to comment.