From 309685bb4d93974e61d8e06d08c887e05410c246 Mon Sep 17 00:00:00 2001 From: Bogdan Drutu Date: Wed, 15 Jan 2025 13:13:19 -0800 Subject: [PATCH] Make scrapercontroler more generic and move closer to the scraperreceiver Signed-off-by: Bogdan Drutu --- .../{scrapercontroller.go => controller.go} | 157 +++++++++++------- ...rcontroller_test.go => controller_test.go} | 14 +- scraper/scraperhelper/obs_metrics.go | 16 +- scraper/scraperhelper/obs_metrics_test.go | 13 +- 4 files changed, 120 insertions(+), 80 deletions(-) rename scraper/scraperhelper/{scrapercontroller.go => controller.go} (51%) rename scraper/scraperhelper/{scrapercontroller_test.go => controller_test.go} (96%) diff --git a/scraper/scraperhelper/scrapercontroller.go b/scraper/scraperhelper/controller.go similarity index 51% rename from scraper/scraperhelper/scrapercontroller.go rename to scraper/scraperhelper/controller.go index b41ccb83efd..374682fae7c 100644 --- a/scraper/scraperhelper/scrapercontroller.go +++ b/scraper/scraperhelper/controller.go @@ -20,14 +20,17 @@ import ( "go.opentelemetry.io/collector/scraper/scrapererror" ) -// ScraperControllerOption apply changes to internal options. -type ScraperControllerOption interface { - apply(*controller) +// Deprecated: [v0.118.0] use ControllerOption. +type ScraperControllerOption = ControllerOption + +// ControllerOption apply changes to internal options. +type ControllerOption interface { + apply(*controllerOptions) } -type scraperControllerOptionFunc func(*controller) +type optionFunc func(*controllerOptions) -func (of scraperControllerOptionFunc) apply(e *controller) { +func (of optionFunc) apply(e *controllerOptions) { of(e) } @@ -36,33 +39,38 @@ func (of scraperControllerOptionFunc) apply(e *controller) { // // Observability information will be reported, and the scraped metrics // will be passed to the next consumer. -func AddScraper(t component.Type, scraper scraper.Metrics) ScraperControllerOption { - return scraperControllerOptionFunc(func(o *controller) { - o.scrapers = append(o.scrapers, scraperWithID{ - Metrics: scraper, - id: component.NewID(t), - }) +func AddScraper(t component.Type, sc scraper.Metrics) ControllerOption { + return optionFunc(func(o *controllerOptions) { + o.scrapers = append(o.scrapers, scraper.NewFactory(t, nil, + scraper.WithMetrics(func(ctx context.Context, set scraper.Settings, config component.Config) (scraper.Metrics, error) { + return sc, nil + }, component.StabilityLevelAlpha))) }) } // WithTickerChannel allows you to override the scraper controller's ticker // channel to specify when scrape is called. This is only expected to be // used by tests. -func WithTickerChannel(tickerCh <-chan time.Time) ScraperControllerOption { - return scraperControllerOptionFunc(func(o *controller) { +func WithTickerChannel(tickerCh <-chan time.Time) ControllerOption { + return optionFunc(func(o *controllerOptions) { o.tickerCh = tickerCh }) } -type controller struct { +type controllerOptions struct { + tickerCh <-chan time.Time + scrapers []scraper.Factory +} + +type createScraperFunc[T any] func(context.Context, scraper.Settings, component.Config) (T, error) + +type controller[T component.Component] struct { collectionInterval time.Duration initialDelay time.Duration timeout time.Duration - nextConsumer consumer.Metrics - - scrapers []scraperWithID - obsScrapers []scraper.Metrics + scrapers []T + scrape func() tickerCh <-chan time.Time done chan struct{} @@ -71,61 +79,60 @@ type controller struct { obsrecv *receiverhelper.ObsReport } -type scraperWithID struct { - scraper.Metrics - id component.ID -} - -// NewScraperControllerReceiver creates a Receiver with the configured options, that can control multiple scrapers. -func NewScraperControllerReceiver( +// newController creates a Receiver with the configured options, that can control multiple scrapers. +func newController[T component.Component]( cfg *ControllerConfig, - set receiver.Settings, - nextConsumer consumer.Metrics, - options ...ScraperControllerOption, -) (component.Component, error) { + rSet receiver.Settings, + csf func(scraper.Factory) createScraperFunc[T], + wrapObs func(T, component.ID, component.ID, component.TelemetrySettings) (T, error), + options []ControllerOption, +) (*controller[T], error) { obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{ - ReceiverID: set.ID, + ReceiverID: rSet.ID, Transport: "", - ReceiverCreateSettings: set, + ReceiverCreateSettings: rSet, }) if err != nil { return nil, err } - sc := &controller{ + co := controllerOptions{} + for _, op := range options { + op.apply(&co) + } + + cs := &controller[T]{ collectionInterval: cfg.CollectionInterval, initialDelay: cfg.InitialDelay, timeout: cfg.Timeout, - nextConsumer: nextConsumer, done: make(chan struct{}), obsrecv: obsrecv, } - for _, op := range options { - op.apply(sc) - } - - sc.obsScrapers = make([]scraper.Metrics, len(sc.scrapers)) - for i := range sc.scrapers { - telSet := set.TelemetrySettings - telSet.Logger = telSet.Logger.With(zap.String("scraper", sc.scrapers[i].id.String())) - var obsScrp scraper.ScrapeMetricsFunc - obsScrp, err = newObsMetrics(sc.scrapers[i].ScrapeMetrics, set.ID, sc.scrapers[i].id, telSet) + cs.scrapers = make([]T, len(co.scrapers)) + for i := range co.scrapers { + set := scraper.Settings{ + ID: component.NewID(co.scrapers[i].Type()), + TelemetrySettings: rSet.TelemetrySettings, + BuildInfo: rSet.BuildInfo, + } + set.TelemetrySettings.Logger = set.TelemetrySettings.Logger.With(zap.String("scraper", set.ID.String())) + cs.scrapers[i], err = csf(co.scrapers[i])(context.Background(), set, nil) if err != nil { return nil, err } - sc.obsScrapers[i], err = scraper.NewMetrics(obsScrp, scraper.WithStart(sc.scrapers[i].Start), scraper.WithShutdown(sc.scrapers[i].Shutdown)) + cs.scrapers[i], err = wrapObs(cs.scrapers[i], rSet.ID, set.ID, set.TelemetrySettings) if err != nil { return nil, err } } - return sc, nil + return cs, nil } // Start the receiver, invoked during service start. -func (sc *controller) Start(ctx context.Context, host component.Host) error { - for _, scrp := range sc.obsScrapers { +func (sc *controller[K]) Start(ctx context.Context, host component.Host) error { + for _, scrp := range sc.scrapers { if err := scrp.Start(ctx, host); err != nil { return err } @@ -136,12 +143,12 @@ func (sc *controller) Start(ctx context.Context, host component.Host) error { } // Shutdown the receiver, invoked during service shutdown. -func (sc *controller) Shutdown(ctx context.Context) error { +func (sc *controller[K]) Shutdown(ctx context.Context) error { // Signal the goroutine to stop. close(sc.done) sc.wg.Wait() var errs error - for _, scrp := range sc.obsScrapers { + for _, scrp := range sc.scrapers { errs = multierr.Append(errs, scrp.Shutdown(ctx)) } @@ -150,7 +157,7 @@ func (sc *controller) Shutdown(ctx context.Context) error { // startScraping initiates a ticker that calls Scrape based on the configured // collection interval. -func (sc *controller) startScraping() { +func (sc *controller[K]) startScraping() { sc.wg.Add(1) go func() { defer sc.wg.Done() @@ -171,11 +178,11 @@ func (sc *controller) startScraping() { // Call scrape method on initialization to ensure // that scrapers start from when the component starts // instead of waiting for the full duration to start. - sc.scrapeMetricsAndReport() + sc.scrape() for { select { case <-sc.tickerCh: - sc.scrapeMetricsAndReport() + sc.scrape() case <-sc.done: return } @@ -183,16 +190,42 @@ func (sc *controller) startScraping() { }() } -// scrapeMetricsAndReport calls the Scrape function for each of the configured -// Scrapers, records observability information, and passes the scraped metrics -// to the next component. -func (sc *controller) scrapeMetricsAndReport() { - ctx, done := withScrapeContext(sc.timeout) +// Deprecated: [v0.118.0] Use NewMetricsController. +func NewScraperControllerReceiver( + cfg *ControllerConfig, + set receiver.Settings, + nextConsumer consumer.Metrics, + options ...ControllerOption, +) (component.Component, error) { + return NewMetricsController(cfg, set, nextConsumer, options...) +} + +// NewMetricsController creates a receiver.Metrics with the configured options, that can control multiple scrapers. +func NewMetricsController(cfg *ControllerConfig, + set receiver.Settings, + nextConsumer consumer.Metrics, + options ...ControllerOption, +) (receiver.Metrics, error) { + c, err := newController[scraper.Metrics](cfg, set, + func(factory scraper.Factory) createScraperFunc[scraper.Metrics] { + return factory.CreateMetrics + }, wrapObsMetrics, options) + if err != nil { + return nil, err + } + c.scrape = func() { + scrapeMetrics(c, nextConsumer) + } + return c, nil +} + +func scrapeMetrics(c *controller[scraper.Metrics], nextConsumer consumer.Metrics) { + ctx, done := withScrapeContext(c.timeout) defer done() metrics := pmetric.NewMetrics() - for i := range sc.obsScrapers { - md, err := sc.obsScrapers[i].ScrapeMetrics(ctx) + for i := range c.scrapers { + md, err := c.scrapers[i].ScrapeMetrics(ctx) if err != nil && !scrapererror.IsPartialScrapeError(err) { continue } @@ -200,9 +233,9 @@ func (sc *controller) scrapeMetricsAndReport() { } dataPointCount := metrics.DataPointCount() - ctx = sc.obsrecv.StartMetricsOp(ctx) - err := sc.nextConsumer.ConsumeMetrics(ctx, metrics) - sc.obsrecv.EndMetricsOp(ctx, "", dataPointCount, err) + ctx = c.obsrecv.StartMetricsOp(ctx) + err := nextConsumer.ConsumeMetrics(ctx, metrics) + c.obsrecv.EndMetricsOp(ctx, "", dataPointCount, err) } // withScrapeContext will return a context that has no deadline if timeout is 0 diff --git a/scraper/scraperhelper/scrapercontroller_test.go b/scraper/scraperhelper/controller_test.go similarity index 96% rename from scraper/scraperhelper/scrapercontroller_test.go rename to scraper/scraperhelper/controller_test.go index 5f6ec4ebbca..26d5cd3f9f9 100644 --- a/scraper/scraperhelper/scrapercontroller_test.go +++ b/scraper/scraperhelper/controller_test.go @@ -138,7 +138,7 @@ func TestScrapeController(t *testing.T) { cfg = test.scraperControllerSettings } - mr, err := NewScraperControllerReceiver(cfg, receiver.Settings{ID: receiverID, TelemetrySettings: tt.TelemetrySettings(), BuildInfo: component.NewDefaultBuildInfo()}, sink, options...) + mr, err := NewMetricsController(cfg, receiver.Settings{ID: receiverID, TelemetrySettings: tt.TelemetrySettings(), BuildInfo: component.NewDefaultBuildInfo()}, sink, options...) require.NoError(t, err) err = mr.Start(context.Background(), componenttest.NewNopHost()) @@ -194,8 +194,8 @@ func TestScrapeController(t *testing.T) { } } -func configureMetricOptions(t *testing.T, test metricsTestCase, initializeChs []chan bool, scrapeMetricsChs []chan int, closeChs []chan bool) []ScraperControllerOption { - var metricOptions []ScraperControllerOption +func configureMetricOptions(t *testing.T, test metricsTestCase, initializeChs []chan bool, scrapeMetricsChs []chan int, closeChs []chan bool) []ControllerOption { + var metricOptions []ControllerOption for i := 0; i < test.scrapers; i++ { var scraperOptions []scraper.Option @@ -317,7 +317,7 @@ func TestSingleScrapePerInterval(t *testing.T) { scp, err := scraper.NewMetrics(tsm.scrape) require.NoError(t, err) - recv, err := NewScraperControllerReceiver( + recv, err := NewMetricsController( cfg, receivertest.NewNopSettings(), new(consumertest.MetricsSink), @@ -359,7 +359,7 @@ func TestScrapeControllerStartsOnInit(t *testing.T) { scp, err := scraper.NewMetrics(tsm.scrape) require.NoError(t, err, "Must not error when creating scraper") - r, err := NewScraperControllerReceiver( + r, err := NewMetricsController( &ControllerConfig{ CollectionInterval: time.Hour, InitialDelay: 0, @@ -398,7 +398,7 @@ func TestScrapeControllerInitialDelay(t *testing.T) { }) require.NoError(t, err, "Must not error when creating scraper") - r, err := NewScraperControllerReceiver( + r, err := NewMetricsController( &cfg, receivertest.NewNopSettings(), new(consumertest.MetricsSink), @@ -428,7 +428,7 @@ func TestShutdownBeforeScrapeCanStart(t *testing.T) { }) require.NoError(t, err, "Must not error when creating scraper") - r, err := NewScraperControllerReceiver( + r, err := NewMetricsController( &cfg, receivertest.NewNopSettings(), new(consumertest.MetricsSink), diff --git a/scraper/scraperhelper/obs_metrics.go b/scraper/scraperhelper/obs_metrics.go index 8723215da2a..2c31051539d 100644 --- a/scraper/scraperhelper/obs_metrics.go +++ b/scraper/scraperhelper/obs_metrics.go @@ -37,28 +37,28 @@ const ( formatKey = "format" ) -func newObsMetrics(delegate scraper.ScrapeMetricsFunc, receiverID component.ID, scraperID component.ID, telSettings component.TelemetrySettings) (scraper.ScrapeMetricsFunc, error) { - telemetryBuilder, errBuilder := metadata.NewTelemetryBuilder(telSettings) +func wrapObsMetrics(sc scraper.Metrics, receiverID component.ID, scraperID component.ID, set component.TelemetrySettings) (scraper.Metrics, error) { + telemetryBuilder, errBuilder := metadata.NewTelemetryBuilder(set) if errBuilder != nil { return nil, errBuilder } - tracer := metadata.Tracer(telSettings) + tracer := metadata.Tracer(set) spanName := scraperKey + spanNameSep + scraperID.String() + spanNameSep + "ScrapeMetrics" otelAttrs := metric.WithAttributeSet(attribute.NewSet( attribute.String(receiverKey, receiverID.String()), attribute.String(scraperKey, scraperID.String()), )) - return func(ctx context.Context) (pmetric.Metrics, error) { + scraperFuncs := func(ctx context.Context) (pmetric.Metrics, error) { ctx, span := tracer.Start(ctx, spanName) defer span.End() - md, err := delegate(ctx) + md, err := sc.ScrapeMetrics(ctx) numScrapedMetrics := 0 numErroredMetrics := 0 if err != nil { - telSettings.Logger.Error("Error scraping metrics", zap.Error(err)) + set.Logger.Error("Error scraping metrics", zap.Error(err)) var partialErr scrapererror.PartialScrapeError if errors.As(err, &partialErr) { numErroredMetrics = partialErr.Failed @@ -85,5 +85,7 @@ func newObsMetrics(delegate scraper.ScrapeMetricsFunc, receiverID component.ID, } return md, err - }, nil + } + + return scraper.NewMetrics(scraperFuncs, scraper.WithStart(sc.Start), scraper.WithShutdown(sc.Shutdown)) } diff --git a/scraper/scraperhelper/obs_metrics_test.go b/scraper/scraperhelper/obs_metrics_test.go index 9471e6495e0..7216b5a61df 100644 --- a/scraper/scraperhelper/obs_metrics_test.go +++ b/scraper/scraperhelper/obs_metrics_test.go @@ -20,6 +20,7 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/pdata/testdata" + "go.opentelemetry.io/collector/scraper" "go.opentelemetry.io/collector/scraper/scrapererror" "go.opentelemetry.io/collector/scraper/scraperhelper/internal/metadatatest" ) @@ -55,9 +56,11 @@ func TestScrapeMetricsDataOp(t *testing.T) { {items: 15, err: nil}, } for i := range params { - sf, err := newObsMetrics(func(context.Context) (pmetric.Metrics, error) { + sm, err := scraper.NewMetrics(func(context.Context) (pmetric.Metrics, error) { return testdata.GenerateMetrics(params[i].items), params[i].err - }, receiverID, scraperID, tel) + }) + require.NoError(t, err) + sf, err := wrapObsMetrics(sm, receiverID, scraperID, tel) require.NoError(t, err) _, err = sf.ScrapeMetrics(parentCtx) require.ErrorIs(t, err, params[i].err) @@ -100,9 +103,11 @@ func TestCheckScraperMetrics(t *testing.T) { tt := metadatatest.SetupTelemetry() t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) }) - sf, err := newObsMetrics(func(context.Context) (pmetric.Metrics, error) { + sm, err := scraper.NewMetrics(func(context.Context) (pmetric.Metrics, error) { return testdata.GenerateMetrics(7), nil - }, receiverID, scraperID, tt.NewTelemetrySettings()) + }) + require.NoError(t, err) + sf, err := wrapObsMetrics(sm, receiverID, scraperID, tt.NewTelemetrySettings()) require.NoError(t, err) _, err = sf.ScrapeMetrics(context.Background()) require.NoError(t, err)