Skip to content

Commit

Permalink
Merge pull request #9 from rubrikinc/metrics
Browse files Browse the repository at this point in the history
Metrics
  • Loading branch information
Athishpranav2003 authored Jul 12, 2024
2 parents 6c9db61 + 00dcd16 commit 00e8711
Show file tree
Hide file tree
Showing 4 changed files with 113 additions and 8 deletions.
2 changes: 1 addition & 1 deletion Gemfile
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
source 'https://rubygems.org'

gemspec
gemspec
14 changes: 9 additions & 5 deletions fluent-plugin-quota-throttle.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,14 @@ Gem::Specification.new do |spec|
spec.require_paths = ["lib"]

spec.add_development_dependency "bundler"
spec.add_development_dependency "minitest"
spec.add_development_dependency "test-unit"
spec.add_development_dependency "rake"
spec.add_development_dependency "mutex_m"
spec.add_development_dependency "minitest" , "~> 5.14"
spec.add_development_dependency "test-unit" , "~> 3.6"
spec.add_development_dependency "rake" , "~> 13.0"
spec.add_development_dependency "mutex_m" , "~> 0.1"
spec.add_development_dependency "webrick" , "~> 1.8"
spec.add_development_dependency "csv" , "~> 3.3"
spec.add_development_dependency "base64" , "~> 0.2"

spec.add_runtime_dependency "fluentd"
spec.add_runtime_dependency "fluentd" , "~> 1.9"
spec.add_runtime_dependency "fluent-plugin-prometheus", " = 2.1.0"
end
58 changes: 56 additions & 2 deletions lib/fluent/plugin/filter_quota_throttle.rb
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
require 'fluent/plugin/filter'
require 'fluent/plugin/prometheus'
require_relative 'config_parser'
require_relative 'matcher'
require_relative 'rate_limiter'
Expand All @@ -11,16 +12,24 @@ module Fluent::Plugin
# QuotaThrottleFilter class is derived from the Filter class and is responsible for filtering records based on quotas
class QuotaThrottleFilter < Filter
Fluent::Plugin.register_filter('quota_throttle', self)
include Fluent::Plugin::PrometheusLabelParser
include Fluent::Plugin::Prometheus
attr_reader :registry

desc "Path for the quota config file"
config_param :path, :string, :default => nil

desc "Delay in seconds between warnings for the same group when the quota is breached"
config_param :warning_delay, :time, :default => 60

desc "Enable prometheus metrics"
config_param :enable_metrics, :bool, :default => false

def initialize
@reemit_tag_suffix = "secondary"
super
@reemit_tag_prefix = "secondary"
@registry = ::Prometheus::Client.registry
@placeholder_expander_builder = Fluent::Plugin::Prometheus.placeholder_expander(log)
end

# Configures the plugin
Expand All @@ -32,15 +41,30 @@ def configure(conf)
if @warning_delay < 0
@config = ConfigParser::Configuration.new(@path)
@match_helper = Matcher::MatchHelper.new(@config.quotas, @config.default_quota)
if @enable_metrics
@base_labels = parse_labels_elements(conf)
end
end

def start
super
@bucket_store = RateLimiter::BucketStore.new
if @enable_metrics
@metrics = {
quota_input: get_counter(:fluentd_quota_throttle_input, "Number of records entering quota throttle plugin"),
quota_exceeded: get_counter(:fluentd_quota_throttle_exceeded, "Number of records exceeded the quota"),
}
end
end

def shutdown
super
if @enable_metrics
log.info "Clearing Counters"
@metrics.each do |name, metric|
@registry.unregister(name)
end
end
log.info "Shutting down"
end

Expand All @@ -55,9 +79,17 @@ def filter(tag, time, record)
quota = @match_helper.get_quota(record)
group = quota.group_by.map { |key| record.dig(*key) }
bucket = @bucket_store.get_bucket(group, quota)
labels = {}
if @enable_metrics
labels = get_labels(record)
@metrics[:quota_input].increment(by: 1, labels: labels.merge({quota: quota.name}))
end
if bucket.allow
record
else
if @enable_metrics
@metrics[:quota_exceeded].increment(by: 1, labels: labels.merge({quota: quota.name}))
end
quota_breached(tag, time, record, bucket, quota)
nil
end
Expand All @@ -80,9 +112,31 @@ def quota_breached(tag, timestamp, record, bucket, quota)
log.debug "Dropping record"
when "reemit"
log.debug "Reemitting record"
new_tag = "#{@reemit_tag_preffix}.#{tag}"
new_tag = "#{@reemit_tag_prefix}.#{tag}"
router.emit(new_tag, timestamp, record)
end
end

def get_labels(record)
placeholders = stringify_keys(record)
expander = @placeholder_expander_builder.build(placeholders)
labels = {}
@base_labels.each do |key, value|
if value.is_a?(String)
labels[key] = expander.expand(value)
elsif value.respond_to?(:call)
labels[key] = value.call(record)
end
end
labels
end

def get_counter(name, docstring)
if @registry.exist?(name)
@registry.get(name)
else
@registry.counter(name, docstring: docstring, labels: @base_labels.keys + ["quota"].map(&:to_sym))
end
end
end
end
47 changes: 47 additions & 0 deletions test/fluent/plugin/filter_quota_throttle_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ def setup
CONFIG = %[
path test/config_files/filter_plugin_test.yml
warning_delay 2m
enable_metrics false
]

def create_driver(conf = CONFIG)
Expand All @@ -35,4 +36,50 @@ def test_filter
events = d.filtered_records
assert_equal 23, events.length
end

# Due to the way the Driver is implemented, both metrics tests cannot be same time because the registry of metrics is not cleared between tests
# def test_metrics_without_labels
# modified_config = CONFIG.sub("enable_metrics false", "enable_metrics true")
# d = create_driver(modified_config)
# d.run(default_tag: 'test') do
# 10.times do
# d.feed("group1" => { "a" => "value1" , "b" => "value2" })
# d.feed("group1" => { "a" => "value2" , "b" => "value3" })
# d.feed("group1" => { "a" => "value2" , "b" => "value2" })
# d.feed("group1" => { "a" => "value3" , "b" => "value2" })
# end
# end
# assert_equal 10, d.instance.registry.get(:fluentd_quota_throttle_input).get(labels: {quota: 'quota1'})
# assert_equal 4, d.instance.registry.get(:fluentd_quota_throttle_exceeded).get(labels: {quota: 'quota1'})
# assert_equal 10, d.instance.registry.get(:fluentd_quota_throttle_input).get(labels: {quota: 'quota2'})
# assert_equal 3, d.instance.registry.get(:fluentd_quota_throttle_exceeded).get(labels: {quota: 'quota2'})
# assert_equal 20, d.instance.registry.get(:fluentd_quota_throttle_input).get(labels: {quota: 'default'})
# assert_equal 10, d.instance.registry.get(:fluentd_quota_throttle_exceeded).get(labels: {quota: 'default'})
# end
def test_metrics_with_labels
labels = %[
<labels>
source $.group1.a
dummy d1
</labels>
]
modified_config = CONFIG.sub("enable_metrics false", "enable_metrics true" + labels)
d = create_driver(modified_config)
d.run(default_tag: 'test') do
10.times do
d.feed("group1" => { "a" => "value1" , "b" => "value2" })
d.feed("group1" => { "a" => "value2" , "b" => "value3" })
d.feed("group1" => { "a" => "value2" , "b" => "value2" })
d.feed("group1" => { "a" => "value3" , "b" => "value2" })
end
end
assert_equal 10, d.instance.registry.get(:fluentd_quota_throttle_input).get(labels: {source: "value1", quota: 'quota1', dummy: 'd1'})
assert_equal 4, d.instance.registry.get(:fluentd_quota_throttle_exceeded).get(labels: {source: "value1", quota: 'quota1', dummy: 'd1'})
assert_equal 10, d.instance.registry.get(:fluentd_quota_throttle_input).get(labels: {source: "value2", quota: 'quota2', dummy: 'd1'})
assert_equal 3, d.instance.registry.get(:fluentd_quota_throttle_exceeded).get(labels: {source: "value2", quota: 'quota2', dummy: 'd1'})
assert_equal 10, d.instance.registry.get(:fluentd_quota_throttle_input).get(labels: {source: "value2", quota: 'default', dummy: 'd1'})
assert_equal 5, d.instance.registry.get(:fluentd_quota_throttle_exceeded).get(labels: {source: "value2", quota: 'default', dummy: 'd1'})
assert_equal 10, d.instance.registry.get(:fluentd_quota_throttle_input).get(labels: {source: "value3", quota: 'default', dummy: 'd1'})
assert_equal 5, d.instance.registry.get(:fluentd_quota_throttle_exceeded).get(labels: {source: "value3", quota: 'default', dummy: 'd1'})
end
end

0 comments on commit 00e8711

Please sign in to comment.