Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make scrapercontroler more generic and move closer to the scraperreceiver #12103

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,17 @@ import (
"go.opentelemetry.io/collector/scraper/scrapererror"
)

// ScraperControllerOption apply changes to internal options.
type ScraperControllerOption interface {
apply(*controller)
// Deprecated: [v0.118.0] use ControllerOption.
type ScraperControllerOption = ControllerOption

// ControllerOption apply changes to internal options.
type ControllerOption interface {
apply(*controllerOptions)
}

type scraperControllerOptionFunc func(*controller)
type optionFunc func(*controllerOptions)

func (of scraperControllerOptionFunc) apply(e *controller) {
func (of optionFunc) apply(e *controllerOptions) {
of(e)
}

Expand All @@ -36,33 +39,38 @@ func (of scraperControllerOptionFunc) apply(e *controller) {
//
// Observability information will be reported, and the scraped metrics
// will be passed to the next consumer.
func AddScraper(t component.Type, scraper scraper.Metrics) ScraperControllerOption {
return scraperControllerOptionFunc(func(o *controller) {
o.scrapers = append(o.scrapers, scraperWithID{
Metrics: scraper,
id: component.NewID(t),
})
func AddScraper(t component.Type, sc scraper.Metrics) ControllerOption {
return optionFunc(func(o *controllerOptions) {
o.scrapers = append(o.scrapers, scraper.NewFactory(t, nil,
scraper.WithMetrics(func(ctx context.Context, set scraper.Settings, config component.Config) (scraper.Metrics, error) {
return sc, nil
}, component.StabilityLevelAlpha)))
})
}

// WithTickerChannel allows you to override the scraper controller's ticker
// channel to specify when scrape is called. This is only expected to be
// used by tests.
func WithTickerChannel(tickerCh <-chan time.Time) ScraperControllerOption {
return scraperControllerOptionFunc(func(o *controller) {
func WithTickerChannel(tickerCh <-chan time.Time) ControllerOption {
return optionFunc(func(o *controllerOptions) {
o.tickerCh = tickerCh
})
}

type controller struct {
type controllerOptions struct {
tickerCh <-chan time.Time
scrapers []scraper.Factory
}

type createScraperFunc[T any] func(context.Context, scraper.Settings, component.Config) (T, error)

type controller[T component.Component] struct {
collectionInterval time.Duration
initialDelay time.Duration
timeout time.Duration
nextConsumer consumer.Metrics

scrapers []scraperWithID
obsScrapers []scraper.Metrics

scrapers []T
scrape func()
tickerCh <-chan time.Time

done chan struct{}
Expand All @@ -71,61 +79,60 @@ type controller struct {
obsrecv *receiverhelper.ObsReport
}

type scraperWithID struct {
scraper.Metrics
id component.ID
}

// NewScraperControllerReceiver creates a Receiver with the configured options, that can control multiple scrapers.
func NewScraperControllerReceiver(
// newController creates a Receiver with the configured options, that can control multiple scrapers.
func newController[T component.Component](
cfg *ControllerConfig,
set receiver.Settings,
nextConsumer consumer.Metrics,
options ...ScraperControllerOption,
) (component.Component, error) {
rSet receiver.Settings,
csf func(scraper.Factory) createScraperFunc[T],
wrapObs func(T, component.ID, component.ID, component.TelemetrySettings) (T, error),
options []ControllerOption,
) (*controller[T], error) {
obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{
ReceiverID: set.ID,
ReceiverID: rSet.ID,
Transport: "",
ReceiverCreateSettings: set,
ReceiverCreateSettings: rSet,
})
if err != nil {
return nil, err
}

sc := &controller{
co := controllerOptions{}
for _, op := range options {
op.apply(&co)
}

cs := &controller[T]{
collectionInterval: cfg.CollectionInterval,
initialDelay: cfg.InitialDelay,
timeout: cfg.Timeout,
nextConsumer: nextConsumer,
done: make(chan struct{}),
obsrecv: obsrecv,
}

for _, op := range options {
op.apply(sc)
}

sc.obsScrapers = make([]scraper.Metrics, len(sc.scrapers))
for i := range sc.scrapers {
telSet := set.TelemetrySettings
telSet.Logger = telSet.Logger.With(zap.String("scraper", sc.scrapers[i].id.String()))
var obsScrp scraper.ScrapeMetricsFunc
obsScrp, err = newObsMetrics(sc.scrapers[i].ScrapeMetrics, set.ID, sc.scrapers[i].id, telSet)
cs.scrapers = make([]T, len(co.scrapers))
for i := range co.scrapers {
set := scraper.Settings{
ID: component.NewID(co.scrapers[i].Type()),
TelemetrySettings: rSet.TelemetrySettings,
BuildInfo: rSet.BuildInfo,
}
set.TelemetrySettings.Logger = set.TelemetrySettings.Logger.With(zap.String("scraper", set.ID.String()))
cs.scrapers[i], err = csf(co.scrapers[i])(context.Background(), set, nil)
if err != nil {
return nil, err
}
sc.obsScrapers[i], err = scraper.NewMetrics(obsScrp, scraper.WithStart(sc.scrapers[i].Start), scraper.WithShutdown(sc.scrapers[i].Shutdown))
cs.scrapers[i], err = wrapObs(cs.scrapers[i], rSet.ID, set.ID, set.TelemetrySettings)
if err != nil {
return nil, err
}
}

return sc, nil
return cs, nil
}

// Start the receiver, invoked during service start.
func (sc *controller) Start(ctx context.Context, host component.Host) error {
for _, scrp := range sc.obsScrapers {
func (sc *controller[K]) Start(ctx context.Context, host component.Host) error {
for _, scrp := range sc.scrapers {
if err := scrp.Start(ctx, host); err != nil {
return err
}
Expand All @@ -136,12 +143,12 @@ func (sc *controller) Start(ctx context.Context, host component.Host) error {
}

// Shutdown the receiver, invoked during service shutdown.
func (sc *controller) Shutdown(ctx context.Context) error {
func (sc *controller[K]) Shutdown(ctx context.Context) error {
// Signal the goroutine to stop.
close(sc.done)
sc.wg.Wait()
var errs error
for _, scrp := range sc.obsScrapers {
for _, scrp := range sc.scrapers {
errs = multierr.Append(errs, scrp.Shutdown(ctx))
}

Expand All @@ -150,7 +157,7 @@ func (sc *controller) Shutdown(ctx context.Context) error {

// startScraping initiates a ticker that calls Scrape based on the configured
// collection interval.
func (sc *controller) startScraping() {
func (sc *controller[K]) startScraping() {
sc.wg.Add(1)
go func() {
defer sc.wg.Done()
Expand All @@ -171,38 +178,64 @@ func (sc *controller) startScraping() {
// Call scrape method on initialization to ensure
// that scrapers start from when the component starts
// instead of waiting for the full duration to start.
sc.scrapeMetricsAndReport()
sc.scrape()
for {
select {
case <-sc.tickerCh:
sc.scrapeMetricsAndReport()
sc.scrape()
case <-sc.done:
return
}
}
}()
}

// scrapeMetricsAndReport calls the Scrape function for each of the configured
// Scrapers, records observability information, and passes the scraped metrics
// to the next component.
func (sc *controller) scrapeMetricsAndReport() {
ctx, done := withScrapeContext(sc.timeout)
// Deprecated: [v0.118.0] Use NewMetricsController.
func NewScraperControllerReceiver(
cfg *ControllerConfig,
set receiver.Settings,
nextConsumer consumer.Metrics,
options ...ControllerOption,
) (component.Component, error) {
return NewMetricsController(cfg, set, nextConsumer, options...)
}

// NewMetricsController creates a receiver.Metrics with the configured options, that can control multiple scrapers.
func NewMetricsController(cfg *ControllerConfig,
set receiver.Settings,
nextConsumer consumer.Metrics,
options ...ControllerOption,
) (receiver.Metrics, error) {
c, err := newController[scraper.Metrics](cfg, set,
func(factory scraper.Factory) createScraperFunc[scraper.Metrics] {
return factory.CreateMetrics
}, wrapObsMetrics, options)
if err != nil {
return nil, err
}
c.scrape = func() {
scrapeMetrics(c, nextConsumer)
}
return c, nil
}

func scrapeMetrics(c *controller[scraper.Metrics], nextConsumer consumer.Metrics) {
ctx, done := withScrapeContext(c.timeout)
defer done()

metrics := pmetric.NewMetrics()
for i := range sc.obsScrapers {
md, err := sc.obsScrapers[i].ScrapeMetrics(ctx)
for i := range c.scrapers {
md, err := c.scrapers[i].ScrapeMetrics(ctx)
if err != nil && !scrapererror.IsPartialScrapeError(err) {
continue
}
md.ResourceMetrics().MoveAndAppendTo(metrics.ResourceMetrics())
}

dataPointCount := metrics.DataPointCount()
ctx = sc.obsrecv.StartMetricsOp(ctx)
err := sc.nextConsumer.ConsumeMetrics(ctx, metrics)
sc.obsrecv.EndMetricsOp(ctx, "", dataPointCount, err)
ctx = c.obsrecv.StartMetricsOp(ctx)
err := nextConsumer.ConsumeMetrics(ctx, metrics)
c.obsrecv.EndMetricsOp(ctx, "", dataPointCount, err)
}

// withScrapeContext will return a context that has no deadline if timeout is 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ func TestScrapeController(t *testing.T) {
cfg = test.scraperControllerSettings
}

mr, err := NewScraperControllerReceiver(cfg, receiver.Settings{ID: receiverID, TelemetrySettings: tt.TelemetrySettings(), BuildInfo: component.NewDefaultBuildInfo()}, sink, options...)
mr, err := NewMetricsController(cfg, receiver.Settings{ID: receiverID, TelemetrySettings: tt.TelemetrySettings(), BuildInfo: component.NewDefaultBuildInfo()}, sink, options...)
require.NoError(t, err)

err = mr.Start(context.Background(), componenttest.NewNopHost())
Expand Down Expand Up @@ -194,8 +194,8 @@ func TestScrapeController(t *testing.T) {
}
}

func configureMetricOptions(t *testing.T, test metricsTestCase, initializeChs []chan bool, scrapeMetricsChs []chan int, closeChs []chan bool) []ScraperControllerOption {
var metricOptions []ScraperControllerOption
func configureMetricOptions(t *testing.T, test metricsTestCase, initializeChs []chan bool, scrapeMetricsChs []chan int, closeChs []chan bool) []ControllerOption {
var metricOptions []ControllerOption

for i := 0; i < test.scrapers; i++ {
var scraperOptions []scraper.Option
Expand Down Expand Up @@ -317,7 +317,7 @@ func TestSingleScrapePerInterval(t *testing.T) {
scp, err := scraper.NewMetrics(tsm.scrape)
require.NoError(t, err)

recv, err := NewScraperControllerReceiver(
recv, err := NewMetricsController(
cfg,
receivertest.NewNopSettings(),
new(consumertest.MetricsSink),
Expand Down Expand Up @@ -359,7 +359,7 @@ func TestScrapeControllerStartsOnInit(t *testing.T) {
scp, err := scraper.NewMetrics(tsm.scrape)
require.NoError(t, err, "Must not error when creating scraper")

r, err := NewScraperControllerReceiver(
r, err := NewMetricsController(
&ControllerConfig{
CollectionInterval: time.Hour,
InitialDelay: 0,
Expand Down Expand Up @@ -398,7 +398,7 @@ func TestScrapeControllerInitialDelay(t *testing.T) {
})
require.NoError(t, err, "Must not error when creating scraper")

r, err := NewScraperControllerReceiver(
r, err := NewMetricsController(
&cfg,
receivertest.NewNopSettings(),
new(consumertest.MetricsSink),
Expand Down Expand Up @@ -428,7 +428,7 @@ func TestShutdownBeforeScrapeCanStart(t *testing.T) {
})
require.NoError(t, err, "Must not error when creating scraper")

r, err := NewScraperControllerReceiver(
r, err := NewMetricsController(
&cfg,
receivertest.NewNopSettings(),
new(consumertest.MetricsSink),
Expand Down
16 changes: 9 additions & 7 deletions scraper/scraperhelper/obs_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,28 +37,28 @@ const (
formatKey = "format"
)

func newObsMetrics(delegate scraper.ScrapeMetricsFunc, receiverID component.ID, scraperID component.ID, telSettings component.TelemetrySettings) (scraper.ScrapeMetricsFunc, error) {
telemetryBuilder, errBuilder := metadata.NewTelemetryBuilder(telSettings)
func wrapObsMetrics(sc scraper.Metrics, receiverID component.ID, scraperID component.ID, set component.TelemetrySettings) (scraper.Metrics, error) {
telemetryBuilder, errBuilder := metadata.NewTelemetryBuilder(set)
if errBuilder != nil {
return nil, errBuilder
}

tracer := metadata.Tracer(telSettings)
tracer := metadata.Tracer(set)
spanName := scraperKey + spanNameSep + scraperID.String() + spanNameSep + "ScrapeMetrics"
otelAttrs := metric.WithAttributeSet(attribute.NewSet(
attribute.String(receiverKey, receiverID.String()),
attribute.String(scraperKey, scraperID.String()),
))

return func(ctx context.Context) (pmetric.Metrics, error) {
scraperFuncs := func(ctx context.Context) (pmetric.Metrics, error) {
ctx, span := tracer.Start(ctx, spanName)
defer span.End()

md, err := delegate(ctx)
md, err := sc.ScrapeMetrics(ctx)
numScrapedMetrics := 0
numErroredMetrics := 0
if err != nil {
telSettings.Logger.Error("Error scraping metrics", zap.Error(err))
set.Logger.Error("Error scraping metrics", zap.Error(err))
var partialErr scrapererror.PartialScrapeError
if errors.As(err, &partialErr) {
numErroredMetrics = partialErr.Failed
Expand All @@ -85,5 +85,7 @@ func newObsMetrics(delegate scraper.ScrapeMetricsFunc, receiverID component.ID,
}

return md, err
}, nil
}

return scraper.NewMetrics(scraperFuncs, scraper.WithStart(sc.Start), scraper.WithShutdown(sc.Shutdown))
}
Loading
Loading