Skip to content

Commit

Permalink
Make scrapercontroler more generic and move closer to the scraperrece…
Browse files Browse the repository at this point in the history
…iver

Signed-off-by: Bogdan Drutu <[email protected]>
  • Loading branch information
bogdandrutu committed Jan 16, 2025
1 parent c56efd3 commit 309685b
Show file tree
Hide file tree
Showing 4 changed files with 120 additions and 80 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,17 @@ import (
"go.opentelemetry.io/collector/scraper/scrapererror"
)

// ScraperControllerOption apply changes to internal options.
type ScraperControllerOption interface {
apply(*controller)
// Deprecated: [v0.118.0] use ControllerOption.
type ScraperControllerOption = ControllerOption

// ControllerOption apply changes to internal options.
type ControllerOption interface {
apply(*controllerOptions)
}

type scraperControllerOptionFunc func(*controller)
type optionFunc func(*controllerOptions)

func (of scraperControllerOptionFunc) apply(e *controller) {
func (of optionFunc) apply(e *controllerOptions) {
of(e)
}

Expand All @@ -36,33 +39,38 @@ func (of scraperControllerOptionFunc) apply(e *controller) {
//
// Observability information will be reported, and the scraped metrics
// will be passed to the next consumer.
func AddScraper(t component.Type, scraper scraper.Metrics) ScraperControllerOption {
return scraperControllerOptionFunc(func(o *controller) {
o.scrapers = append(o.scrapers, scraperWithID{
Metrics: scraper,
id: component.NewID(t),
})
func AddScraper(t component.Type, sc scraper.Metrics) ControllerOption {
return optionFunc(func(o *controllerOptions) {
o.scrapers = append(o.scrapers, scraper.NewFactory(t, nil,
scraper.WithMetrics(func(ctx context.Context, set scraper.Settings, config component.Config) (scraper.Metrics, error) {
return sc, nil
}, component.StabilityLevelAlpha)))
})
}

// WithTickerChannel allows you to override the scraper controller's ticker
// channel to specify when scrape is called. This is only expected to be
// used by tests.
func WithTickerChannel(tickerCh <-chan time.Time) ScraperControllerOption {
return scraperControllerOptionFunc(func(o *controller) {
func WithTickerChannel(tickerCh <-chan time.Time) ControllerOption {
return optionFunc(func(o *controllerOptions) {
o.tickerCh = tickerCh
})
}

type controller struct {
type controllerOptions struct {
tickerCh <-chan time.Time
scrapers []scraper.Factory
}

type createScraperFunc[T any] func(context.Context, scraper.Settings, component.Config) (T, error)

type controller[T component.Component] struct {
collectionInterval time.Duration
initialDelay time.Duration
timeout time.Duration
nextConsumer consumer.Metrics

scrapers []scraperWithID
obsScrapers []scraper.Metrics

scrapers []T
scrape func()
tickerCh <-chan time.Time

done chan struct{}
Expand All @@ -71,61 +79,60 @@ type controller struct {
obsrecv *receiverhelper.ObsReport
}

type scraperWithID struct {
scraper.Metrics
id component.ID
}

// NewScraperControllerReceiver creates a Receiver with the configured options, that can control multiple scrapers.
func NewScraperControllerReceiver(
// newController creates a Receiver with the configured options, that can control multiple scrapers.
func newController[T component.Component](
cfg *ControllerConfig,
set receiver.Settings,
nextConsumer consumer.Metrics,
options ...ScraperControllerOption,
) (component.Component, error) {
rSet receiver.Settings,
csf func(scraper.Factory) createScraperFunc[T],
wrapObs func(T, component.ID, component.ID, component.TelemetrySettings) (T, error),
options []ControllerOption,
) (*controller[T], error) {
obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{
ReceiverID: set.ID,
ReceiverID: rSet.ID,
Transport: "",
ReceiverCreateSettings: set,
ReceiverCreateSettings: rSet,
})
if err != nil {
return nil, err
}

sc := &controller{
co := controllerOptions{}
for _, op := range options {
op.apply(&co)
}

cs := &controller[T]{
collectionInterval: cfg.CollectionInterval,
initialDelay: cfg.InitialDelay,
timeout: cfg.Timeout,
nextConsumer: nextConsumer,
done: make(chan struct{}),
obsrecv: obsrecv,
}

for _, op := range options {
op.apply(sc)
}

sc.obsScrapers = make([]scraper.Metrics, len(sc.scrapers))
for i := range sc.scrapers {
telSet := set.TelemetrySettings
telSet.Logger = telSet.Logger.With(zap.String("scraper", sc.scrapers[i].id.String()))
var obsScrp scraper.ScrapeMetricsFunc
obsScrp, err = newObsMetrics(sc.scrapers[i].ScrapeMetrics, set.ID, sc.scrapers[i].id, telSet)
cs.scrapers = make([]T, len(co.scrapers))
for i := range co.scrapers {
set := scraper.Settings{
ID: component.NewID(co.scrapers[i].Type()),
TelemetrySettings: rSet.TelemetrySettings,
BuildInfo: rSet.BuildInfo,
}
set.TelemetrySettings.Logger = set.TelemetrySettings.Logger.With(zap.String("scraper", set.ID.String()))
cs.scrapers[i], err = csf(co.scrapers[i])(context.Background(), set, nil)
if err != nil {
return nil, err
}
sc.obsScrapers[i], err = scraper.NewMetrics(obsScrp, scraper.WithStart(sc.scrapers[i].Start), scraper.WithShutdown(sc.scrapers[i].Shutdown))
cs.scrapers[i], err = wrapObs(cs.scrapers[i], rSet.ID, set.ID, set.TelemetrySettings)
if err != nil {
return nil, err
}
}

return sc, nil
return cs, nil
}

// Start the receiver, invoked during service start.
func (sc *controller) Start(ctx context.Context, host component.Host) error {
for _, scrp := range sc.obsScrapers {
func (sc *controller[K]) Start(ctx context.Context, host component.Host) error {
for _, scrp := range sc.scrapers {
if err := scrp.Start(ctx, host); err != nil {
return err
}
Expand All @@ -136,12 +143,12 @@ func (sc *controller) Start(ctx context.Context, host component.Host) error {
}

// Shutdown the receiver, invoked during service shutdown.
func (sc *controller) Shutdown(ctx context.Context) error {
func (sc *controller[K]) Shutdown(ctx context.Context) error {
// Signal the goroutine to stop.
close(sc.done)
sc.wg.Wait()
var errs error
for _, scrp := range sc.obsScrapers {
for _, scrp := range sc.scrapers {
errs = multierr.Append(errs, scrp.Shutdown(ctx))
}

Expand All @@ -150,7 +157,7 @@ func (sc *controller) Shutdown(ctx context.Context) error {

// startScraping initiates a ticker that calls Scrape based on the configured
// collection interval.
func (sc *controller) startScraping() {
func (sc *controller[K]) startScraping() {
sc.wg.Add(1)
go func() {
defer sc.wg.Done()
Expand All @@ -171,38 +178,64 @@ func (sc *controller) startScraping() {
// Call scrape method on initialization to ensure
// that scrapers start from when the component starts
// instead of waiting for the full duration to start.
sc.scrapeMetricsAndReport()
sc.scrape()
for {
select {
case <-sc.tickerCh:
sc.scrapeMetricsAndReport()
sc.scrape()
case <-sc.done:
return
}
}
}()
}

// scrapeMetricsAndReport calls the Scrape function for each of the configured
// Scrapers, records observability information, and passes the scraped metrics
// to the next component.
func (sc *controller) scrapeMetricsAndReport() {
ctx, done := withScrapeContext(sc.timeout)
// Deprecated: [v0.118.0] Use NewMetricsController.
func NewScraperControllerReceiver(
cfg *ControllerConfig,
set receiver.Settings,
nextConsumer consumer.Metrics,
options ...ControllerOption,
) (component.Component, error) {
return NewMetricsController(cfg, set, nextConsumer, options...)
}

// NewMetricsController creates a receiver.Metrics with the configured options, that can control multiple scrapers.
func NewMetricsController(cfg *ControllerConfig,
set receiver.Settings,
nextConsumer consumer.Metrics,
options ...ControllerOption,
) (receiver.Metrics, error) {
c, err := newController[scraper.Metrics](cfg, set,
func(factory scraper.Factory) createScraperFunc[scraper.Metrics] {
return factory.CreateMetrics
}, wrapObsMetrics, options)
if err != nil {
return nil, err
}
c.scrape = func() {
scrapeMetrics(c, nextConsumer)
}
return c, nil
}

func scrapeMetrics(c *controller[scraper.Metrics], nextConsumer consumer.Metrics) {
ctx, done := withScrapeContext(c.timeout)
defer done()

metrics := pmetric.NewMetrics()
for i := range sc.obsScrapers {
md, err := sc.obsScrapers[i].ScrapeMetrics(ctx)
for i := range c.scrapers {
md, err := c.scrapers[i].ScrapeMetrics(ctx)
if err != nil && !scrapererror.IsPartialScrapeError(err) {
continue
}
md.ResourceMetrics().MoveAndAppendTo(metrics.ResourceMetrics())
}

dataPointCount := metrics.DataPointCount()
ctx = sc.obsrecv.StartMetricsOp(ctx)
err := sc.nextConsumer.ConsumeMetrics(ctx, metrics)
sc.obsrecv.EndMetricsOp(ctx, "", dataPointCount, err)
ctx = c.obsrecv.StartMetricsOp(ctx)
err := nextConsumer.ConsumeMetrics(ctx, metrics)
c.obsrecv.EndMetricsOp(ctx, "", dataPointCount, err)
}

// withScrapeContext will return a context that has no deadline if timeout is 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ func TestScrapeController(t *testing.T) {
cfg = test.scraperControllerSettings
}

mr, err := NewScraperControllerReceiver(cfg, receiver.Settings{ID: receiverID, TelemetrySettings: tt.TelemetrySettings(), BuildInfo: component.NewDefaultBuildInfo()}, sink, options...)
mr, err := NewMetricsController(cfg, receiver.Settings{ID: receiverID, TelemetrySettings: tt.TelemetrySettings(), BuildInfo: component.NewDefaultBuildInfo()}, sink, options...)
require.NoError(t, err)

err = mr.Start(context.Background(), componenttest.NewNopHost())
Expand Down Expand Up @@ -194,8 +194,8 @@ func TestScrapeController(t *testing.T) {
}
}

func configureMetricOptions(t *testing.T, test metricsTestCase, initializeChs []chan bool, scrapeMetricsChs []chan int, closeChs []chan bool) []ScraperControllerOption {
var metricOptions []ScraperControllerOption
func configureMetricOptions(t *testing.T, test metricsTestCase, initializeChs []chan bool, scrapeMetricsChs []chan int, closeChs []chan bool) []ControllerOption {
var metricOptions []ControllerOption

for i := 0; i < test.scrapers; i++ {
var scraperOptions []scraper.Option
Expand Down Expand Up @@ -317,7 +317,7 @@ func TestSingleScrapePerInterval(t *testing.T) {
scp, err := scraper.NewMetrics(tsm.scrape)
require.NoError(t, err)

recv, err := NewScraperControllerReceiver(
recv, err := NewMetricsController(
cfg,
receivertest.NewNopSettings(),
new(consumertest.MetricsSink),
Expand Down Expand Up @@ -359,7 +359,7 @@ func TestScrapeControllerStartsOnInit(t *testing.T) {
scp, err := scraper.NewMetrics(tsm.scrape)
require.NoError(t, err, "Must not error when creating scraper")

r, err := NewScraperControllerReceiver(
r, err := NewMetricsController(
&ControllerConfig{
CollectionInterval: time.Hour,
InitialDelay: 0,
Expand Down Expand Up @@ -398,7 +398,7 @@ func TestScrapeControllerInitialDelay(t *testing.T) {
})
require.NoError(t, err, "Must not error when creating scraper")

r, err := NewScraperControllerReceiver(
r, err := NewMetricsController(
&cfg,
receivertest.NewNopSettings(),
new(consumertest.MetricsSink),
Expand Down Expand Up @@ -428,7 +428,7 @@ func TestShutdownBeforeScrapeCanStart(t *testing.T) {
})
require.NoError(t, err, "Must not error when creating scraper")

r, err := NewScraperControllerReceiver(
r, err := NewMetricsController(
&cfg,
receivertest.NewNopSettings(),
new(consumertest.MetricsSink),
Expand Down
16 changes: 9 additions & 7 deletions scraper/scraperhelper/obs_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,28 +37,28 @@ const (
formatKey = "format"
)

func newObsMetrics(delegate scraper.ScrapeMetricsFunc, receiverID component.ID, scraperID component.ID, telSettings component.TelemetrySettings) (scraper.ScrapeMetricsFunc, error) {
telemetryBuilder, errBuilder := metadata.NewTelemetryBuilder(telSettings)
func wrapObsMetrics(sc scraper.Metrics, receiverID component.ID, scraperID component.ID, set component.TelemetrySettings) (scraper.Metrics, error) {
telemetryBuilder, errBuilder := metadata.NewTelemetryBuilder(set)
if errBuilder != nil {
return nil, errBuilder
}

tracer := metadata.Tracer(telSettings)
tracer := metadata.Tracer(set)
spanName := scraperKey + spanNameSep + scraperID.String() + spanNameSep + "ScrapeMetrics"
otelAttrs := metric.WithAttributeSet(attribute.NewSet(
attribute.String(receiverKey, receiverID.String()),
attribute.String(scraperKey, scraperID.String()),
))

return func(ctx context.Context) (pmetric.Metrics, error) {
scraperFuncs := func(ctx context.Context) (pmetric.Metrics, error) {
ctx, span := tracer.Start(ctx, spanName)
defer span.End()

md, err := delegate(ctx)
md, err := sc.ScrapeMetrics(ctx)
numScrapedMetrics := 0
numErroredMetrics := 0
if err != nil {
telSettings.Logger.Error("Error scraping metrics", zap.Error(err))
set.Logger.Error("Error scraping metrics", zap.Error(err))
var partialErr scrapererror.PartialScrapeError
if errors.As(err, &partialErr) {
numErroredMetrics = partialErr.Failed
Expand All @@ -85,5 +85,7 @@ func newObsMetrics(delegate scraper.ScrapeMetricsFunc, receiverID component.ID,
}

return md, err
}, nil
}

return scraper.NewMetrics(scraperFuncs, scraper.WithStart(sc.Start), scraper.WithShutdown(sc.Shutdown))
}
Loading

0 comments on commit 309685b

Please sign in to comment.