Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ingest/processed/bytes metric #17581

Open
wants to merge 10 commits into
base: master
Choose a base branch
from

Conversation

neha-ellur
Copy link

@neha-ellur neha-ellur commented Dec 17, 2024

A new metric ingest/processed/bytes has been introduced to track the total number of bytes processed during ingestion tasks, including native batch ingestion, streaming ingestion, and multi-stage query (MSQ) ingestion tasks. This metric helps provide a unified view of data processing across different ingestion pathways.

Key changed/added classes in this PR

This metric was added in three key ingestion task classes:

  • IndexTask: A sequential ingestion task. The processed bytes were retrieved from the RowIngestionMetersTotals object (buildSegmentsMeters) and emitted directly after segment publication.
  • ParallelIndexSupervisorTask: A task that supervises parallel ingestion. Processed bytes were aggregated from subtasks' ingestion metrics (RowIngestionMetersTotals).
  • SeekableStreamIndexTaskRunner: A runner for ingestion tasks that consume data from seekable streams (e.g., Kafka). The processed bytes were calculated based on the size of the data buffers (e.g., ByteEntity buffers) being processed for each record. The metric was emitted for each processed record.
  • MsqContollerImpl: The ingest/processed/bytes metric is emitted by aggregating bytes processed across all stages and workers during MSQ task execution. This includes fetching counters from the CounterSnapshotsTree, summing up bytes from all input channels for each worker and stage using a stream-based aggregation logic and emitting the aggregated bytes as the ingest/processed/bytes metric for the entire MSQ task.

This PR has:

  • been self-reviewed.
  • added documentation for new or modified features or behaviors.
  • a release note entry in the PR description.
  • added Javadocs for most classes and all non-trivial methods. Linked related entities via Javadoc links.
  • added or updated version, license, or notice information in licenses.yaml
  • added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
  • added unit tests or modified existing tests to cover new code paths, ensuring the threshold for code coverage is met.
  • added integration tests.
  • been tested in a test Druid cluster.

@github-actions github-actions bot added Area - Batch Ingestion Area - MSQ For multi stage queries - https://github.com/apache/druid/issues/12262 labels Dec 18, 2024
@kfaraz kfaraz self-requested a review December 18, 2024 03:52
@apache apache deleted a comment from neha-ellur Dec 18, 2024
@@ -329,6 +331,27 @@ public void run(final QueryListener queryListener) throws Exception
}
// Call onQueryComplete after Closer is fully closed, ensuring no controller-related processing is ongoing.
queryListener.onQueryComplete(reportPayload);

long totalProcessedBytes = reportPayload.getCounters().copyMap().values().stream()
Copy link
Contributor

@cryptoe cryptoe Jan 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems like a wrong place to put this logic .
Ingest/processed/bytes seems like a ingestion only metric no ?
If that is the case, we should emit the metric only if the query is an ingestion query.

you could probably expose a method here https://github.com/apache/druid/blob/9bebe7f1e5ab0f40efbff620769d0413c943683c/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java#L517 saying emit summary metrics and have the task report and the query passed to it.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved the logic

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the place where it has moved is correct.
Rather than ingest in the metric name can we rename the matric to input/processed/bytes or something since we would want that metric in msq selects as well.

Also the msq code might need to be adjusted so that only leaf nodes contribute to this metric no ? as an equivalent batch ingest with range partitioning will show less processed bytes since the shuffle stage input is not being counted for. A simple test should be sufficient to rule this out.

Try a query like replace bar all using select * from extern(http) partitioned by day clustered by col1 and an equivalent range partitioning spec for batch ingestion for the same http input source.
cc @kfaraz

Copy link
Author

@neha-ellur neha-ellur Jan 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cryptoe This metric will be used in the billing console and should be named ingest/processed/bytes.
Regarding the msq code to being on the leaf nodes, where would that be? Regarding the test, any pointers to existing tests would be helpful, this is my first time in this area of code.

@cryptoe
Copy link
Contributor

cryptoe commented Jan 7, 2025

Also there are some static check failures which need to be looked at.

@neha-ellur
Copy link
Author

neha-ellur commented Jan 9, 2025

Also there are some static check failures which need to be looked at.

@cryptoe fixed

Comment on lines 661 to 664
long bytesProcessed = 0;
for (ByteEntity entity : record.getData()) {
bytesProcessed += entity.getBuffer().remaining();
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we just reuse the rowIngestionMeters.getProcessedBytes() instead of computing the value explicitly here?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated

Comment on lines 542 to 556
long totalProcessedBytes = msqTaskReportPayload.getCounters() != null
? msqTaskReportPayload.getCounters().copyMap().values().stream().mapToLong(
integerCounterSnapshotsMap -> integerCounterSnapshotsMap.values().stream()
.mapToLong(counterSnapshots -> {
Map<String, QueryCounterSnapshot> workerCounters = counterSnapshots.getMap();
return workerCounters.entrySet().stream().mapToLong(
channel -> {
if (channel.getKey().startsWith("input")) {
ChannelCounters.Snapshot snapshot = (ChannelCounters.Snapshot) channel.getValue();
return snapshot.getBytes() == null ? 0L : Arrays.stream(snapshot.getBytes()).sum();
}
return 0L;
}).sum();
}).sum()).sum()
: 0;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems a little difficult to read.
Can we clean up this logic a little? Maybe by returning early when msqTaskReportPayload.getCounters() is null etc.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

refactored the code

final ServiceMetricEvent.Builder metricBuilder = new ServiceMetricEvent.Builder();
IndexTaskUtils.setTaskDimensions(metricBuilder, task);
toolbox.getEmitter().emit(
metricBuilder.setMetric("ingest/processed/bytes", rowIngestionMeters.getProcessedBytes()));
Copy link
Contributor

@kfaraz kfaraz Jan 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this metric be emitted once per task probably at the end of the run method?
The other task types seem to be doing that.

@kfaraz
Copy link
Contributor

kfaraz commented Jan 15, 2025

@neha-ellur , just found this PR #14582 .
I wonder if the changes here are even needed since the ingest/input/bytes metric already contains the processed bytes for index, index_kafka, and some other task types.

We probably just need to wire up things for MSQ tasks.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Area - Batch Ingestion Area - Ingestion Area - MSQ For multi stage queries - https://github.com/apache/druid/issues/12262
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants