Skip to content

Commit

Permalink
Make metadata pod lookups more resilient to short lived processes
Browse files Browse the repository at this point in the history
Signed-off-by: Dom Del Nano <[email protected]>
  • Loading branch information
ddelnano committed Jan 16, 2025
1 parent 534b892 commit 1458137
Show file tree
Hide file tree
Showing 9 changed files with 516 additions and 14 deletions.
1 change: 1 addition & 0 deletions src/carnot/funcs/metadata/metadata_ops.cc
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ void RegisterMetadataOpsOrDie(px::carnot::udf::Registry* registry) {
registry->RegisterOrDie<HasServiceNameUDF>("has_service_name");
registry->RegisterOrDie<HasValueUDF>("has_value");
registry->RegisterOrDie<IPToPodIDUDF>("ip_to_pod_id");
registry->RegisterOrDie<IPToPodIDAtTimePEMExecUDF>("_ip_to_pod_id_pem_exec");
registry->RegisterOrDie<IPToPodIDAtTimeUDF>("ip_to_pod_id");
registry->RegisterOrDie<PodIDToPodNameUDF>("pod_id_to_pod_name");
registry->RegisterOrDie<PodIDToPodLabelsUDF>("pod_id_to_pod_labels");
Expand Down
19 changes: 19 additions & 0 deletions src/carnot/funcs/metadata/metadata_ops.h
Original file line number Diff line number Diff line change
Expand Up @@ -2977,6 +2977,25 @@ class IPToPodIDUDF : public ScalarUDF {
static udfspb::UDFSourceExecutor Executor() { return udfspb::UDFSourceExecutor::UDF_KELVIN; }
};

/**
* This UDF is a compiler internal function. It should only be used when the IP address is
* guaranteed to be a local address since this function is forced to run on PEMs. In cases
* where the IP could be a remote address, then it is more correct to have the function run on
* Kelvin (IPToPodIDUDF or IPToPodIDAtTimeUDF).
*/
class IPToPodIDAtTimePEMExecUDF : public ScalarUDF {
public:
/**
* @brief Gets the pod id of pod with given pod_ip and time
*/
StringValue Exec(FunctionContext* ctx, StringValue pod_ip, Time64NSValue time) {
auto md = GetMetadataState(ctx);
return md->k8s_metadata_state().PodIDByIPAtTime(pod_ip, time.val);
}

static udfspb::UDFSourceExecutor Executor() { return udfspb::UDFSourceExecutor::UDF_PEM; }
};

class IPToPodIDAtTimeUDF : public ScalarUDF {
public:
/**
Expand Down
130 changes: 126 additions & 4 deletions src/carnot/planner/compiler/analyzer/convert_metadata_rule.cc
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,58 @@ namespace carnot {
namespace planner {
namespace compiler {

namespace {
std::string GetUniquePodNameCol(std::shared_ptr<TableType> parent_type,
absl::flat_hash_set<std::string>* used_column_names) {
auto col_name_counter = 0;
do {
auto new_col = absl::StrCat("pod_name_", col_name_counter++);
if (!used_column_names->contains(new_col) && !parent_type->HasColumn(new_col)) {
used_column_names->insert(new_col);
return new_col;
}
} while (true);
}
} // namespace

Status ConvertMetadataRule::AddPodNameConversionMapsWithFallback(
IR* graph, IRNode* container, ExpressionIR* metadata_expr, ExpressionIR* fallback_expr,
const std::pair<std::string, std::string>& col_names) const {
if (Match(container, Func())) {
for (int64_t parent_id : graph->dag().ParentsOf(container->id())) {
PX_RETURN_IF_ERROR(AddPodNameConversionMapsWithFallback(
graph, graph->Get(parent_id), metadata_expr, fallback_expr, col_names));
}
} else if (Match(container, Operator())) {
auto container_op = static_cast<OperatorIR*>(container);
for (auto parent_op : container_op->parents()) {
auto metadata_col_expr = ColumnExpression(col_names.first, metadata_expr);
auto fallback_col_expr = ColumnExpression(col_names.second, fallback_expr);
PX_ASSIGN_OR_RETURN(
auto md_map_ir,
graph->CreateNode<MapIR>(container->ast(), parent_op,
std::vector<ColumnExpression>{metadata_col_expr}, true));
PX_ASSIGN_OR_RETURN(
auto fallback_md_map_ir,
graph->CreateNode<MapIR>(container->ast(), static_cast<OperatorIR*>(md_map_ir),
std::vector<ColumnExpression>{fallback_col_expr}, true));
PX_RETURN_IF_ERROR(container_op->ReplaceParent(parent_op, fallback_md_map_ir));

for (auto child : parent_op->Children()) {
if (child == md_map_ir) {
continue;
}
PX_RETURN_IF_ERROR(child->ReplaceParent(parent_op, md_map_ir));
}

PX_RETURN_IF_ERROR(PropagateTypeChangesFromNode(graph, md_map_ir, compiler_state_));
PX_RETURN_IF_ERROR(PropagateTypeChangesFromNode(graph, fallback_md_map_ir, compiler_state_));
}
}

return Status::OK();
}

Status ConvertMetadataRule::UpdateMetadataContainer(IRNode* container, MetadataIR* metadata,
ExpressionIR* metadata_expr) const {
if (Match(container, Func())) {
Expand Down Expand Up @@ -70,6 +122,12 @@ StatusOr<std::string> ConvertMetadataRule::FindKeyColumn(std::shared_ptr<TableTy
absl::StrJoin(parent_type->ColumnNames(), ","));
}

bool CheckBackupConversionAvailable(std::shared_ptr<TableType> parent_type,
const std::string& func_name) {
return parent_type->HasColumn("time_") && parent_type->HasColumn("local_addr") &&
func_name == "upid_to_pod_name";
}

StatusOr<bool> ConvertMetadataRule::Apply(IRNode* ir_node) {
if (!Match(ir_node, Metadata())) {
return false;
Expand All @@ -85,8 +143,9 @@ StatusOr<bool> ConvertMetadataRule::Apply(IRNode* ir_node) {
PX_ASSIGN_OR_RETURN(auto parent, metadata->ReferencedOperator());
PX_ASSIGN_OR_RETURN(auto containing_ops, metadata->ContainingOperators());

auto resolved_table_type = parent->resolved_table_type();
PX_ASSIGN_OR_RETURN(std::string key_column_name,
FindKeyColumn(parent->resolved_table_type(), md_property, ir_node));
FindKeyColumn(resolved_table_type, md_property, ir_node));

PX_ASSIGN_OR_RETURN(ColumnIR * key_column,
graph->CreateNode<ColumnIR>(ir_node->ast(), key_column_name, parent_op_idx));
Expand All @@ -96,19 +155,82 @@ StatusOr<bool> ConvertMetadataRule::Apply(IRNode* ir_node) {
FuncIR * conversion_func,
graph->CreateNode<FuncIR>(ir_node->ast(), FuncIR::Op{FuncIR::Opcode::non_op, "", func_name},
std::vector<ExpressionIR*>{key_column}));
FuncIR* orig_conversion_func = conversion_func;
ExpressionIR* conversion_expr = static_cast<ExpressionIR*>(conversion_func);

// TODO(ddelnano): Until the short lived process issue (gh#1638) is resolved, add a fallback
// conversion function that uses local_addr for pod lookups when the upid based default
// (upid_to_pod_name) fails. This turns the `df.ctx["pod"]` lookup into the following pseudo code:
//
// fallback = px.pod_id_to_pod_name(px.ip_to_pod_id(df.ctx["local_addr"]))
// df.pod = px.select(px.upid_to_pod_name(df.upid) == "", fallback, px.upid_to_pod_name(df.upid))
FuncIR* backup_conversion_func = nullptr;
auto backup_conversion_available = CheckBackupConversionAvailable(resolved_table_type, func_name);
std::pair<std::string, std::string> col_names;
if (backup_conversion_available) {
absl::flat_hash_set<std::string> used_column_names;
col_names = std::make_pair(GetUniquePodNameCol(resolved_table_type, &used_column_names),
GetUniquePodNameCol(resolved_table_type, &used_column_names));
PX_ASSIGN_OR_RETURN(ColumnIR * local_addr_col,
graph->CreateNode<ColumnIR>(ir_node->ast(), "local_addr", parent_op_idx));
PX_ASSIGN_OR_RETURN(ColumnIR * time_col,
graph->CreateNode<ColumnIR>(ir_node->ast(), "time_", parent_op_idx));
PX_ASSIGN_OR_RETURN(
ColumnIR * md_expr_col,
graph->CreateNode<ColumnIR>(ir_node->ast(), col_names.first, parent_op_idx));
PX_ASSIGN_OR_RETURN(
FuncIR * ip_conversion_func,
graph->CreateNode<FuncIR>(ir_node->ast(),
FuncIR::Op{FuncIR::Opcode::non_op, "", "_ip_to_pod_id_pem_exec"},
std::vector<ExpressionIR*>{local_addr_col, time_col}));

// This doesn't need to have a "pem exec" equivalent function as long as the metadata
// annotation is set as done below
PX_ASSIGN_OR_RETURN(
backup_conversion_func,
graph->CreateNode<FuncIR>(
ir_node->ast(), FuncIR::Op{FuncIR::Opcode::non_op, "", "pod_id_to_pod_name"},
std::vector<ExpressionIR*>{static_cast<ExpressionIR*>(ip_conversion_func)}));

backup_conversion_func->set_annotations(ExpressionIR::Annotations(md_type));

PX_ASSIGN_OR_RETURN(ExpressionIR * empty_string,
graph->CreateNode<StringIR>(ir_node->ast(), ""));
PX_ASSIGN_OR_RETURN(
FuncIR * select_expr,
graph->CreateNode<FuncIR>(
ir_node->ast(), FuncIR::Op{FuncIR::Opcode::eq, "==", "equal"},
std::vector<ExpressionIR*>{static_cast<ExpressionIR*>(md_expr_col), empty_string}));
PX_ASSIGN_OR_RETURN(auto duplicate_md_expr_col, graph->CopyNode<ColumnIR>(md_expr_col));
PX_ASSIGN_OR_RETURN(
FuncIR * select_func,
graph->CreateNode<FuncIR>(
ir_node->ast(), FuncIR::Op{FuncIR::Opcode::non_op, "", "select"},
std::vector<ExpressionIR*>{static_cast<ExpressionIR*>(select_expr),
backup_conversion_func, duplicate_md_expr_col}));

conversion_func = select_func;
PX_ASSIGN_OR_RETURN(conversion_expr, graph->CreateNode<ColumnIR>(
ir_node->ast(), col_names.second, parent_op_idx));
}

for (int64_t parent_id : graph->dag().ParentsOf(metadata->id())) {
// For each container node of the metadata expression, update it to point to the
// new conversion func instead.
PX_RETURN_IF_ERROR(UpdateMetadataContainer(graph->Get(parent_id), metadata, conversion_func));
auto container = graph->Get(parent_id);
PX_RETURN_IF_ERROR(UpdateMetadataContainer(container, metadata, conversion_expr));
if (backup_conversion_available) {
PX_RETURN_IF_ERROR(AddPodNameConversionMapsWithFallback(
graph, container, orig_conversion_func, conversion_func, col_names));
}
}

// Propagate type changes from the new conversion_func.
PX_RETURN_IF_ERROR(PropagateTypeChangesFromNode(graph, conversion_func, compiler_state_));

DCHECK_EQ(conversion_func->EvaluatedDataType(), column_type)
<< "Expected the parent key column type and metadata property type to match.";
conversion_func->set_annotations(ExpressionIR::Annotations(md_type));

orig_conversion_func->set_annotations(ExpressionIR::Annotations(md_type));
return true;
}

Expand Down
26 changes: 26 additions & 0 deletions src/carnot/planner/compiler/analyzer/convert_metadata_rule.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

#include <memory>
#include <string>
#include <utility>

#include "src/carnot/planner/compiler_state/compiler_state.h"
#include "src/carnot/planner/rules/rules.h"
Expand All @@ -46,6 +47,31 @@ class ConvertMetadataRule : public Rule {
ExpressionIR* metadata_expr) const;
StatusOr<std::string> FindKeyColumn(std::shared_ptr<TableType> parent_type,
MetadataProperty* property, IRNode* node_for_error) const;

/**
*
* This function aids in applying a fallback to a metadata conversion expression.
* It works by adding two conversion maps to the graph. The first map contains a column expression
* that contains the metadata expression result. The second map contains a column expression that
* contains the fallback expression result. It is intended to be used as a short term workaround
* for gh#1638 where pod name lookups fail for short lived processes. This fallback expression
* allows for an alternative mechanism if the primary lookup fails. See the example below:
*
* Before applying the rule:
*
* MemorySource -> MapIR (containing MetadataIR)
*
* After applying the rule:
*
* MemorySource
* -> MapIR (col_names[0]: metadata_expr)
* -> MapIR (col_names[1]: fallback_expr)
* -> MapIR (col_names[1] & existing cols)
*/

Status AddPodNameConversionMapsWithFallback(
IR* graph, IRNode* container, ExpressionIR* metadata_expr, ExpressionIR* fallback_expr,
const std::pair<std::string, std::string>& col_names) const;
};

} // namespace compiler
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ using table_store::schema::Relation;

using ConvertMetadataRuleTest = RulesTest;

TEST_F(ConvertMetadataRuleTest, multichild) {
TEST_F(ConvertMetadataRuleTest, multichild_without_fallback_func) {
auto relation = Relation(cpu_relation);
MetadataType conversion_column = MetadataType::UPID;
std::string conversion_column_str = MetadataProperty::GetMetadataString(conversion_column);
Expand Down Expand Up @@ -114,6 +114,95 @@ TEST_F(ConvertMetadataRuleTest, missing_conversion_column) {
skip_check_stray_nodes_ = true;
}

TEST_F(ConvertMetadataRuleTest, multichild_with_fallback_func) {
auto relation = Relation(http_events_relation);
MetadataType conversion_column = MetadataType::UPID;
std::string conversion_column_str = MetadataProperty::GetMetadataString(conversion_column);
relation.AddColumn(types::DataType::UINT128, conversion_column_str);
compiler_state_->relation_map()->emplace("table", relation);

auto metadata_name = "pod_name";
MetadataProperty* property = md_handler->GetProperty(metadata_name).ValueOrDie();
MetadataIR* metadata_ir = MakeMetadataIR(metadata_name, /* parent_op_idx */ 0);
metadata_ir->set_property(property);

auto src = MakeMemSource(relation);
auto map = MakeMap(src, {{"md", metadata_ir}});

ResolveTypesRule type_rule(compiler_state_.get());
ASSERT_OK(type_rule.Execute(graph.get()));

ConvertMetadataRule rule(compiler_state_.get());
auto result = rule.Execute(graph.get());
ASSERT_OK(result);
EXPECT_TRUE(result.ValueOrDie());

EXPECT_EQ(0, graph->FindNodesThatMatch(Metadata()).size());

EXPECT_EQ(1, src->Children().size());
auto md_map = static_cast<MapIR*>(src->Children()[0]);
EXPECT_NE(md_map, map);

FuncIR* upid_to_pod_name = nullptr;
for (auto col_expr : md_map->col_exprs()) {
if (col_expr.name == "pod_name_0") {
EXPECT_MATCH(col_expr.node, Func());
upid_to_pod_name = static_cast<FuncIR*>(col_expr.node);
}
}
EXPECT_NE(upid_to_pod_name, nullptr);
EXPECT_EQ(absl::Substitute("upid_to_$0", metadata_name), upid_to_pod_name->func_name());
EXPECT_EQ(1, upid_to_pod_name->all_args().size());
auto input_col = upid_to_pod_name->all_args()[0];
EXPECT_MATCH(input_col, ColumnNode("upid"));
EXPECT_MATCH(upid_to_pod_name, ResolvedExpression());
EXPECT_MATCH(input_col, ResolvedExpression());

EXPECT_EQ(1, md_map->Children().size());
auto fallback_map = static_cast<MapIR*>(md_map->Children()[0]);
FuncIR* fallback_func_select = nullptr;
for (auto col_expr : fallback_map->col_exprs()) {
if (col_expr.name == "pod_name_1") {
EXPECT_MATCH(col_expr.node, Func());
fallback_func_select = static_cast<FuncIR*>(col_expr.node);
}
}

EXPECT_NE(fallback_func_select, nullptr);
EXPECT_EQ("select", fallback_func_select->func_name());
EXPECT_EQ(3, fallback_func_select->all_args().size());

auto orig_func_check = fallback_func_select->all_args()[0];
EXPECT_MATCH(orig_func_check, Func());
auto equals_func = static_cast<FuncIR*>(orig_func_check);
EXPECT_EQ("equal", equals_func->func_name());
EXPECT_EQ(2, equals_func->all_args().size());
EXPECT_MATCH(equals_func->all_args()[0], ColumnNode("pod_name_0"));
EXPECT_MATCH(equals_func->all_args()[1], String(""));
EXPECT_MATCH(orig_func_check, ResolvedExpression());

EXPECT_MATCH(fallback_func_select->all_args()[1], Func());
auto fallback_func = static_cast<FuncIR*>(fallback_func_select->all_args()[1]);
EXPECT_EQ("pod_id_to_pod_name", fallback_func->func_name());
EXPECT_EQ(1, fallback_func->all_args().size());
EXPECT_MATCH(fallback_func->all_args()[0], Func());
EXPECT_MATCH(fallback_func, ResolvedExpression());

auto ip_func = static_cast<FuncIR*>(fallback_func->all_args()[0]);
EXPECT_EQ("_ip_to_pod_id_pem_exec", ip_func->func_name());
EXPECT_EQ(2, ip_func->all_args().size());
EXPECT_MATCH(ip_func->all_args()[0], ColumnNode("local_addr"));
EXPECT_MATCH(ip_func->all_args()[1], ColumnNode("time_"));
EXPECT_MATCH(ip_func, ResolvedExpression());

// Check that the semantic type of the conversion func is propagated properly.
auto type_or_s = map->resolved_table_type()->GetColumnType("md");
ASSERT_OK(type_or_s);
auto type = std::static_pointer_cast<ValueType>(type_or_s.ConsumeValueOrDie());
EXPECT_EQ(types::STRING, type->data_type());
EXPECT_EQ(types::ST_POD_NAME, type->semantic_type());
}

} // namespace compiler
} // namespace planner
} // namespace carnot
Expand Down
8 changes: 8 additions & 0 deletions src/carnot/planner/compiler/test_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -860,10 +860,15 @@ class RulesTest : public OperatorTests {
std::vector<types::DataType>({types::DataType::INT64, types::DataType::FLOAT64,
types::DataType::FLOAT64, types::DataType::FLOAT64}),
std::vector<std::string>({"count", "cpu0", "cpu1", "cpu2"}));
http_events_relation = table_store::schema::Relation(
std::vector<types::DataType>(
{types::DataType::TIME64NS, types::DataType::STRING, types::DataType::INT64}),
std::vector<std::string>({"time_", "local_addr", "local_port"}));
semantic_rel =
Relation({types::INT64, types::FLOAT64, types::STRING}, {"bytes", "cpu", "str_col"},
{types::ST_BYTES, types::ST_PERCENT, types::ST_NONE});
rel_map->emplace("cpu", cpu_relation);
rel_map->emplace("http_events", http_events_relation);
rel_map->emplace("semantic_table", semantic_rel);

compiler_state_ = std::make_unique<CompilerState>(
Expand Down Expand Up @@ -935,6 +940,7 @@ class RulesTest : public OperatorTests {
std::unique_ptr<RegistryInfo> info_;
int64_t time_now = 1552607213931245000;
table_store::schema::Relation cpu_relation;
table_store::schema::Relation http_events_relation;
table_store::schema::Relation semantic_rel;
std::unique_ptr<MetadataHandler> md_handler;
};
Expand Down Expand Up @@ -1110,6 +1116,8 @@ class ASTVisitorTest : public OperatorTests {
Relation http_events_relation;
http_events_relation.AddColumn(types::TIME64NS, "time_");
http_events_relation.AddColumn(types::UINT128, "upid");
http_events_relation.AddColumn(types::STRING, "local_addr");
http_events_relation.AddColumn(types::INT64, "local_port");
http_events_relation.AddColumn(types::STRING, "remote_addr");
http_events_relation.AddColumn(types::INT64, "remote_port");
http_events_relation.AddColumn(types::INT64, "major_version");
Expand Down
Loading

0 comments on commit 1458137

Please sign in to comment.