From bce3cee45cec068527c00eb00c9ba48b75db2952 Mon Sep 17 00:00:00 2001 From: Bogdan Drutu Date: Wed, 15 Jan 2025 13:13:19 -0800 Subject: [PATCH] Make scrapercontroler more generic and move closer to the scraperreceiver Signed-off-by: Bogdan Drutu --- scraper/scraperhelper/controller.go | 262 ++++++++++++++++++ ...rcontroller_test.go => controller_test.go} | 14 +- scraper/scraperhelper/obs_metrics.go | 16 +- scraper/scraperhelper/obs_metrics_test.go | 13 +- scraper/scraperhelper/scrapercontroller.go | 216 --------------- 5 files changed, 287 insertions(+), 234 deletions(-) create mode 100644 scraper/scraperhelper/controller.go rename scraper/scraperhelper/{scrapercontroller_test.go => controller_test.go} (96%) delete mode 100644 scraper/scraperhelper/scrapercontroller.go diff --git a/scraper/scraperhelper/controller.go b/scraper/scraperhelper/controller.go new file mode 100644 index 00000000000..8e4dd9d531f --- /dev/null +++ b/scraper/scraperhelper/controller.go @@ -0,0 +1,262 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package scraperhelper // import "go.opentelemetry.io/collector/scraper/scraperhelper" + +import ( + "context" + "sync" + "time" + + "go.uber.org/multierr" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/pdata/pmetric" + "go.opentelemetry.io/collector/receiver" + "go.opentelemetry.io/collector/receiver/receiverhelper" + "go.opentelemetry.io/collector/scraper" + "go.opentelemetry.io/collector/scraper/scrapererror" +) + +// Deprecated: [v0.118.0] use ControllerOption. +type ScraperControllerOption = ControllerOption + +// ControllerOption apply changes to internal options. +type ControllerOption interface { + apply(*controllerOptions) +} + +type optionFunc func(*controllerOptions) + +func (of optionFunc) apply(e *controllerOptions) { + of(e) +} + +// AddScraper configures the scraper.Metrics to be called with the specified options, +// and at the specified collection interval. +// +// Observability information will be reported, and the scraped metrics +// will be passed to the next consumer. +func AddScraper(t component.Type, sc scraper.Metrics) ControllerOption { + f := scraper.NewFactory(t, nil, + scraper.WithMetrics(func(ctx context.Context, set scraper.Settings, config component.Config) (scraper.Metrics, error) { + return sc, nil + }, component.StabilityLevelAlpha)) + return AddFactoryWithConfig(f, nil) +} + +// AddFactoryWithConfig configures the scraper.Factory and associated config that +// will be used to create a new scraper. The created scraper will be called with +// the specified options, and at the specified collection interval. +// +// Observability information will be reported, and the scraped metrics +// will be passed to the next consumer. +func AddFactoryWithConfig(f scraper.Factory, cfg component.Config) ControllerOption { + return optionFunc(func(o *controllerOptions) { + o.factoriesWithConfig = append(o.factoriesWithConfig, factoryWithConfig{f: f, cfg: cfg}) + }) +} + +// WithTickerChannel allows you to override the scraper controller's ticker +// channel to specify when scrape is called. This is only expected to be +// used by tests. +func WithTickerChannel(tickerCh <-chan time.Time) ControllerOption { + return optionFunc(func(o *controllerOptions) { + o.tickerCh = tickerCh + }) +} + +type factoryWithConfig struct { + f scraper.Factory + cfg component.Config +} + +type controllerOptions struct { + tickerCh <-chan time.Time + factoriesWithConfig []factoryWithConfig +} + +type controller[T component.Component] struct { + collectionInterval time.Duration + initialDelay time.Duration + timeout time.Duration + + scrapers []T + scrapeFunc func(*controller[T]) + tickerCh <-chan time.Time + + done chan struct{} + wg sync.WaitGroup + + obsrecv *receiverhelper.ObsReport +} + +func newController[T component.Component]( + cfg *ControllerConfig, + rSet receiver.Settings, + scrapers []T, + scrapeFunc func(*controller[T]), + tickerCh <-chan time.Time, +) (*controller[T], error) { + obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{ + ReceiverID: rSet.ID, + Transport: "", + ReceiverCreateSettings: rSet, + }) + if err != nil { + return nil, err + } + + cs := &controller[T]{ + collectionInterval: cfg.CollectionInterval, + initialDelay: cfg.InitialDelay, + timeout: cfg.Timeout, + scrapers: scrapers, + scrapeFunc: scrapeFunc, + done: make(chan struct{}), + tickerCh: tickerCh, + obsrecv: obsrecv, + } + + return cs, nil +} + +// Start the receiver, invoked during service start. +func (sc *controller[T]) Start(ctx context.Context, host component.Host) error { + for _, scrp := range sc.scrapers { + if err := scrp.Start(ctx, host); err != nil { + return err + } + } + + sc.startScraping() + return nil +} + +// Shutdown the receiver, invoked during service shutdown. +func (sc *controller[T]) Shutdown(ctx context.Context) error { + // Signal the goroutine to stop. + close(sc.done) + sc.wg.Wait() + var errs error + for _, scrp := range sc.scrapers { + errs = multierr.Append(errs, scrp.Shutdown(ctx)) + } + + return errs +} + +// startScraping initiates a ticker that calls Scrape based on the configured +// collection interval. +func (sc *controller[T]) startScraping() { + sc.wg.Add(1) + go func() { + defer sc.wg.Done() + if sc.initialDelay > 0 { + select { + case <-time.After(sc.initialDelay): + case <-sc.done: + return + } + } + + if sc.tickerCh == nil { + ticker := time.NewTicker(sc.collectionInterval) + defer ticker.Stop() + + sc.tickerCh = ticker.C + } + // Call scrape method during initialization to ensure + // that scrapers start from when the component starts + // instead of waiting for the full duration to start. + sc.scrapeFunc(sc) + for { + select { + case <-sc.tickerCh: + sc.scrapeFunc(sc) + case <-sc.done: + return + } + } + }() +} + +// Deprecated: [v0.118.0] Use NewMetricsController. +func NewScraperControllerReceiver( + cfg *ControllerConfig, + set receiver.Settings, + nextConsumer consumer.Metrics, + options ...ControllerOption, +) (component.Component, error) { + return NewMetricsController(cfg, set, nextConsumer, options...) +} + +// NewMetricsController creates a receiver.Metrics with the configured options, that can control multiple scraper.Metrics. +func NewMetricsController(cfg *ControllerConfig, + rSet receiver.Settings, + nextConsumer consumer.Metrics, + options ...ControllerOption, +) (receiver.Metrics, error) { + co := getOptions(options) + scrapers := make([]scraper.Metrics, 0, len(co.factoriesWithConfig)) + for _, fwc := range co.factoriesWithConfig { + set := getSettings(fwc.f.Type(), rSet) + s, err := fwc.f.CreateMetrics(context.Background(), set, fwc.cfg) + if err != nil { + return nil, err + } + s, err = wrapObsMetrics(s, rSet.ID, set.ID, set.TelemetrySettings) + if err != nil { + return nil, err + } + scrapers = append(scrapers, s) + } + return newController[scraper.Metrics]( + cfg, rSet, scrapers, func(c *controller[scraper.Metrics]) { scrapeMetrics(c, nextConsumer) }, co.tickerCh) +} + +func scrapeMetrics(c *controller[scraper.Metrics], nextConsumer consumer.Metrics) { + ctx, done := withScrapeContext(c.timeout) + defer done() + + metrics := pmetric.NewMetrics() + for i := range c.scrapers { + md, err := c.scrapers[i].ScrapeMetrics(ctx) + if err != nil && !scrapererror.IsPartialScrapeError(err) { + continue + } + md.ResourceMetrics().MoveAndAppendTo(metrics.ResourceMetrics()) + } + + dataPointCount := metrics.DataPointCount() + ctx = c.obsrecv.StartMetricsOp(ctx) + err := nextConsumer.ConsumeMetrics(ctx, metrics) + c.obsrecv.EndMetricsOp(ctx, "", dataPointCount, err) +} + +func getOptions(options []ControllerOption) controllerOptions { + co := controllerOptions{} + for _, op := range options { + op.apply(&co) + } + return co +} + +func getSettings(sType component.Type, rSet receiver.Settings) scraper.Settings { + return scraper.Settings{ + ID: component.NewID(sType), + TelemetrySettings: rSet.TelemetrySettings, + BuildInfo: rSet.BuildInfo, + } +} + +// withScrapeContext will return a context that has no deadline if timeout is 0 +// which implies no explicit timeout had occurred, otherwise, a context +// with a deadline of the provided timeout is returned. +func withScrapeContext(timeout time.Duration) (context.Context, context.CancelFunc) { + if timeout == 0 { + return context.WithCancel(context.Background()) + } + return context.WithTimeout(context.Background(), timeout) +} diff --git a/scraper/scraperhelper/scrapercontroller_test.go b/scraper/scraperhelper/controller_test.go similarity index 96% rename from scraper/scraperhelper/scrapercontroller_test.go rename to scraper/scraperhelper/controller_test.go index 5f6ec4ebbca..26d5cd3f9f9 100644 --- a/scraper/scraperhelper/scrapercontroller_test.go +++ b/scraper/scraperhelper/controller_test.go @@ -138,7 +138,7 @@ func TestScrapeController(t *testing.T) { cfg = test.scraperControllerSettings } - mr, err := NewScraperControllerReceiver(cfg, receiver.Settings{ID: receiverID, TelemetrySettings: tt.TelemetrySettings(), BuildInfo: component.NewDefaultBuildInfo()}, sink, options...) + mr, err := NewMetricsController(cfg, receiver.Settings{ID: receiverID, TelemetrySettings: tt.TelemetrySettings(), BuildInfo: component.NewDefaultBuildInfo()}, sink, options...) require.NoError(t, err) err = mr.Start(context.Background(), componenttest.NewNopHost()) @@ -194,8 +194,8 @@ func TestScrapeController(t *testing.T) { } } -func configureMetricOptions(t *testing.T, test metricsTestCase, initializeChs []chan bool, scrapeMetricsChs []chan int, closeChs []chan bool) []ScraperControllerOption { - var metricOptions []ScraperControllerOption +func configureMetricOptions(t *testing.T, test metricsTestCase, initializeChs []chan bool, scrapeMetricsChs []chan int, closeChs []chan bool) []ControllerOption { + var metricOptions []ControllerOption for i := 0; i < test.scrapers; i++ { var scraperOptions []scraper.Option @@ -317,7 +317,7 @@ func TestSingleScrapePerInterval(t *testing.T) { scp, err := scraper.NewMetrics(tsm.scrape) require.NoError(t, err) - recv, err := NewScraperControllerReceiver( + recv, err := NewMetricsController( cfg, receivertest.NewNopSettings(), new(consumertest.MetricsSink), @@ -359,7 +359,7 @@ func TestScrapeControllerStartsOnInit(t *testing.T) { scp, err := scraper.NewMetrics(tsm.scrape) require.NoError(t, err, "Must not error when creating scraper") - r, err := NewScraperControllerReceiver( + r, err := NewMetricsController( &ControllerConfig{ CollectionInterval: time.Hour, InitialDelay: 0, @@ -398,7 +398,7 @@ func TestScrapeControllerInitialDelay(t *testing.T) { }) require.NoError(t, err, "Must not error when creating scraper") - r, err := NewScraperControllerReceiver( + r, err := NewMetricsController( &cfg, receivertest.NewNopSettings(), new(consumertest.MetricsSink), @@ -428,7 +428,7 @@ func TestShutdownBeforeScrapeCanStart(t *testing.T) { }) require.NoError(t, err, "Must not error when creating scraper") - r, err := NewScraperControllerReceiver( + r, err := NewMetricsController( &cfg, receivertest.NewNopSettings(), new(consumertest.MetricsSink), diff --git a/scraper/scraperhelper/obs_metrics.go b/scraper/scraperhelper/obs_metrics.go index 8723215da2a..2c31051539d 100644 --- a/scraper/scraperhelper/obs_metrics.go +++ b/scraper/scraperhelper/obs_metrics.go @@ -37,28 +37,28 @@ const ( formatKey = "format" ) -func newObsMetrics(delegate scraper.ScrapeMetricsFunc, receiverID component.ID, scraperID component.ID, telSettings component.TelemetrySettings) (scraper.ScrapeMetricsFunc, error) { - telemetryBuilder, errBuilder := metadata.NewTelemetryBuilder(telSettings) +func wrapObsMetrics(sc scraper.Metrics, receiverID component.ID, scraperID component.ID, set component.TelemetrySettings) (scraper.Metrics, error) { + telemetryBuilder, errBuilder := metadata.NewTelemetryBuilder(set) if errBuilder != nil { return nil, errBuilder } - tracer := metadata.Tracer(telSettings) + tracer := metadata.Tracer(set) spanName := scraperKey + spanNameSep + scraperID.String() + spanNameSep + "ScrapeMetrics" otelAttrs := metric.WithAttributeSet(attribute.NewSet( attribute.String(receiverKey, receiverID.String()), attribute.String(scraperKey, scraperID.String()), )) - return func(ctx context.Context) (pmetric.Metrics, error) { + scraperFuncs := func(ctx context.Context) (pmetric.Metrics, error) { ctx, span := tracer.Start(ctx, spanName) defer span.End() - md, err := delegate(ctx) + md, err := sc.ScrapeMetrics(ctx) numScrapedMetrics := 0 numErroredMetrics := 0 if err != nil { - telSettings.Logger.Error("Error scraping metrics", zap.Error(err)) + set.Logger.Error("Error scraping metrics", zap.Error(err)) var partialErr scrapererror.PartialScrapeError if errors.As(err, &partialErr) { numErroredMetrics = partialErr.Failed @@ -85,5 +85,7 @@ func newObsMetrics(delegate scraper.ScrapeMetricsFunc, receiverID component.ID, } return md, err - }, nil + } + + return scraper.NewMetrics(scraperFuncs, scraper.WithStart(sc.Start), scraper.WithShutdown(sc.Shutdown)) } diff --git a/scraper/scraperhelper/obs_metrics_test.go b/scraper/scraperhelper/obs_metrics_test.go index 9471e6495e0..7216b5a61df 100644 --- a/scraper/scraperhelper/obs_metrics_test.go +++ b/scraper/scraperhelper/obs_metrics_test.go @@ -20,6 +20,7 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/pdata/testdata" + "go.opentelemetry.io/collector/scraper" "go.opentelemetry.io/collector/scraper/scrapererror" "go.opentelemetry.io/collector/scraper/scraperhelper/internal/metadatatest" ) @@ -55,9 +56,11 @@ func TestScrapeMetricsDataOp(t *testing.T) { {items: 15, err: nil}, } for i := range params { - sf, err := newObsMetrics(func(context.Context) (pmetric.Metrics, error) { + sm, err := scraper.NewMetrics(func(context.Context) (pmetric.Metrics, error) { return testdata.GenerateMetrics(params[i].items), params[i].err - }, receiverID, scraperID, tel) + }) + require.NoError(t, err) + sf, err := wrapObsMetrics(sm, receiverID, scraperID, tel) require.NoError(t, err) _, err = sf.ScrapeMetrics(parentCtx) require.ErrorIs(t, err, params[i].err) @@ -100,9 +103,11 @@ func TestCheckScraperMetrics(t *testing.T) { tt := metadatatest.SetupTelemetry() t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) }) - sf, err := newObsMetrics(func(context.Context) (pmetric.Metrics, error) { + sm, err := scraper.NewMetrics(func(context.Context) (pmetric.Metrics, error) { return testdata.GenerateMetrics(7), nil - }, receiverID, scraperID, tt.NewTelemetrySettings()) + }) + require.NoError(t, err) + sf, err := wrapObsMetrics(sm, receiverID, scraperID, tt.NewTelemetrySettings()) require.NoError(t, err) _, err = sf.ScrapeMetrics(context.Background()) require.NoError(t, err) diff --git a/scraper/scraperhelper/scrapercontroller.go b/scraper/scraperhelper/scrapercontroller.go deleted file mode 100644 index b41ccb83efd..00000000000 --- a/scraper/scraperhelper/scrapercontroller.go +++ /dev/null @@ -1,216 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -package scraperhelper // import "go.opentelemetry.io/collector/scraper/scraperhelper" - -import ( - "context" - "sync" - "time" - - "go.uber.org/multierr" - "go.uber.org/zap" - - "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/consumer" - "go.opentelemetry.io/collector/pdata/pmetric" - "go.opentelemetry.io/collector/receiver" - "go.opentelemetry.io/collector/receiver/receiverhelper" - "go.opentelemetry.io/collector/scraper" - "go.opentelemetry.io/collector/scraper/scrapererror" -) - -// ScraperControllerOption apply changes to internal options. -type ScraperControllerOption interface { - apply(*controller) -} - -type scraperControllerOptionFunc func(*controller) - -func (of scraperControllerOptionFunc) apply(e *controller) { - of(e) -} - -// AddScraper configures the provided scrape function to be called -// with the specified options, and at the specified collection interval. -// -// Observability information will be reported, and the scraped metrics -// will be passed to the next consumer. -func AddScraper(t component.Type, scraper scraper.Metrics) ScraperControllerOption { - return scraperControllerOptionFunc(func(o *controller) { - o.scrapers = append(o.scrapers, scraperWithID{ - Metrics: scraper, - id: component.NewID(t), - }) - }) -} - -// WithTickerChannel allows you to override the scraper controller's ticker -// channel to specify when scrape is called. This is only expected to be -// used by tests. -func WithTickerChannel(tickerCh <-chan time.Time) ScraperControllerOption { - return scraperControllerOptionFunc(func(o *controller) { - o.tickerCh = tickerCh - }) -} - -type controller struct { - collectionInterval time.Duration - initialDelay time.Duration - timeout time.Duration - nextConsumer consumer.Metrics - - scrapers []scraperWithID - obsScrapers []scraper.Metrics - - tickerCh <-chan time.Time - - done chan struct{} - wg sync.WaitGroup - - obsrecv *receiverhelper.ObsReport -} - -type scraperWithID struct { - scraper.Metrics - id component.ID -} - -// NewScraperControllerReceiver creates a Receiver with the configured options, that can control multiple scrapers. -func NewScraperControllerReceiver( - cfg *ControllerConfig, - set receiver.Settings, - nextConsumer consumer.Metrics, - options ...ScraperControllerOption, -) (component.Component, error) { - obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{ - ReceiverID: set.ID, - Transport: "", - ReceiverCreateSettings: set, - }) - if err != nil { - return nil, err - } - - sc := &controller{ - collectionInterval: cfg.CollectionInterval, - initialDelay: cfg.InitialDelay, - timeout: cfg.Timeout, - nextConsumer: nextConsumer, - done: make(chan struct{}), - obsrecv: obsrecv, - } - - for _, op := range options { - op.apply(sc) - } - - sc.obsScrapers = make([]scraper.Metrics, len(sc.scrapers)) - for i := range sc.scrapers { - telSet := set.TelemetrySettings - telSet.Logger = telSet.Logger.With(zap.String("scraper", sc.scrapers[i].id.String())) - var obsScrp scraper.ScrapeMetricsFunc - obsScrp, err = newObsMetrics(sc.scrapers[i].ScrapeMetrics, set.ID, sc.scrapers[i].id, telSet) - if err != nil { - return nil, err - } - sc.obsScrapers[i], err = scraper.NewMetrics(obsScrp, scraper.WithStart(sc.scrapers[i].Start), scraper.WithShutdown(sc.scrapers[i].Shutdown)) - if err != nil { - return nil, err - } - } - - return sc, nil -} - -// Start the receiver, invoked during service start. -func (sc *controller) Start(ctx context.Context, host component.Host) error { - for _, scrp := range sc.obsScrapers { - if err := scrp.Start(ctx, host); err != nil { - return err - } - } - - sc.startScraping() - return nil -} - -// Shutdown the receiver, invoked during service shutdown. -func (sc *controller) Shutdown(ctx context.Context) error { - // Signal the goroutine to stop. - close(sc.done) - sc.wg.Wait() - var errs error - for _, scrp := range sc.obsScrapers { - errs = multierr.Append(errs, scrp.Shutdown(ctx)) - } - - return errs -} - -// startScraping initiates a ticker that calls Scrape based on the configured -// collection interval. -func (sc *controller) startScraping() { - sc.wg.Add(1) - go func() { - defer sc.wg.Done() - if sc.initialDelay > 0 { - select { - case <-time.After(sc.initialDelay): - case <-sc.done: - return - } - } - - if sc.tickerCh == nil { - ticker := time.NewTicker(sc.collectionInterval) - defer ticker.Stop() - - sc.tickerCh = ticker.C - } - // Call scrape method on initialization to ensure - // that scrapers start from when the component starts - // instead of waiting for the full duration to start. - sc.scrapeMetricsAndReport() - for { - select { - case <-sc.tickerCh: - sc.scrapeMetricsAndReport() - case <-sc.done: - return - } - } - }() -} - -// scrapeMetricsAndReport calls the Scrape function for each of the configured -// Scrapers, records observability information, and passes the scraped metrics -// to the next component. -func (sc *controller) scrapeMetricsAndReport() { - ctx, done := withScrapeContext(sc.timeout) - defer done() - - metrics := pmetric.NewMetrics() - for i := range sc.obsScrapers { - md, err := sc.obsScrapers[i].ScrapeMetrics(ctx) - if err != nil && !scrapererror.IsPartialScrapeError(err) { - continue - } - md.ResourceMetrics().MoveAndAppendTo(metrics.ResourceMetrics()) - } - - dataPointCount := metrics.DataPointCount() - ctx = sc.obsrecv.StartMetricsOp(ctx) - err := sc.nextConsumer.ConsumeMetrics(ctx, metrics) - sc.obsrecv.EndMetricsOp(ctx, "", dataPointCount, err) -} - -// withScrapeContext will return a context that has no deadline if timeout is 0 -// which implies no explicit timeout had occurred, otherwise, a context -// with a deadline of the provided timeout is returned. -func withScrapeContext(timeout time.Duration) (context.Context, context.CancelFunc) { - if timeout == 0 { - return context.WithCancel(context.Background()) - } - return context.WithTimeout(context.Background(), timeout) -}