Skip to content

Commit

Permalink
Make atomic field limits configurable (close #850)
Browse files Browse the repository at this point in the history
  • Loading branch information
pondzix committed Jan 23, 2024
1 parent 56e99a3 commit 304df4e
Show file tree
Hide file tree
Showing 30 changed files with 543 additions and 299 deletions.
12 changes: 12 additions & 0 deletions config/config.kafka.extended.hocon
Original file line number Diff line number Diff line change
Expand Up @@ -264,4 +264,16 @@
"pipelineId": "75a13583-5c99-40e3-81fc-541084dfc784"
}
}

# Optional. Configuration section for various validation-oriented settings.
"validation": {

# Optional. Configuration for custom maximum atomic fields (strings) length.
# Map-like structure with keys being field names and values being their max allowed length
"atomicFieldsLimits": {
"app_id": 5
"mkt_clickid": 100000
# ...and any other 'atomic' field with custom limit
}
}
}
12 changes: 12 additions & 0 deletions config/config.kinesis.extended.hocon
Original file line number Diff line number Diff line change
Expand Up @@ -366,4 +366,16 @@
"pipelineId": "75a13583-5c99-40e3-81fc-541084dfc784"
}
}

# Optional. Configuration section for various validation-oriented settings.
"validation": {

# Optional. Configuration for custom maximum atomic fields (strings) length.
# Map-like structure with keys being field names and values being their max allowed length
"atomicFieldsLimits": {
"app_id": 5
"mkt_clickid": 100000
# ...and any other 'atomic' field with custom limit
}
}
}
12 changes: 12 additions & 0 deletions config/config.nsq.extended.hocon
Original file line number Diff line number Diff line change
Expand Up @@ -266,4 +266,16 @@
"pipelineId": "75a13583-5c99-40e3-81fc-541084dfc784"
}
}

# Optional. Configuration section for various validation-oriented settings.
"validation": {

# Optional. Configuration for custom maximum atomic fields (strings) length.
# Map-like structure with keys being field names and values being their max allowed length
"atomicFieldsLimits": {
"app_id": 5
"mkt_clickid": 100000
# ...and any other 'atomic' field with custom limit
}
}
}
12 changes: 12 additions & 0 deletions config/config.pubsub.extended.hocon
Original file line number Diff line number Diff line change
Expand Up @@ -262,4 +262,16 @@
"pipelineId": "75a13583-5c99-40e3-81fc-541084dfc784"
}
}

# Optional. Configuration section for various validation-oriented settings.
"validation": {

# Optional. Configuration for custom maximum atomic fields (strings) length.
# Map-like structure with keys being field names and values being their max allowed length
"atomicFieldsLimits": {
"app_id": 5
"mkt_clickid": 100000
# ...and any other 'atomic' field with custom limit
}
}
}
88 changes: 88 additions & 0 deletions modules/common-fs2/src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,92 @@
"accept": false
"accept": ${?ACCEPT_LIMITED_USE_LICENSE}
}
"validation": {
"atomicFieldsLimits": {
"app_id": 255
"platform": 255
"event": 128
"event_id": 36
"name_tracker": 128
"v_tracker": 100
"v_collector": 100
"v_etl": 100
"user_id": 255
"user_ipaddress": 128
"user_fingerprint": 128
"domain_userid": 128
"network_userid": 128
"geo_country": 2
"geo_region": 3
"geo_city": 75
"geo_zipcode": 15
"geo_region_name": 100
"ip_isp": 100
"ip_organization": 128
"ip_domain": 128
"ip_netspeed": 100
"page_url": 10000 # Different than the one in atomic schema (4096)
"page_title": 2000
"page_referrer": 10000 # Different than the one in atomic schema (4096)
"page_urlscheme": 16
"page_urlhost": 255
"page_urlpath": 3000
"page_urlquery": 6000
"page_urlfragment": 3000
"refr_urlscheme": 16
"refr_urlhost": 255
"refr_urlpath": 6000
"refr_urlquery": 6000
"refr_urlfragment": 3000
"refr_medium": 25
"refr_source": 50
"refr_term": 255
"mkt_clickid": 1000 # Different than the one in atomic schema (128)
"mkt_network": 64
"mkt_medium": 255
"mkt_source": 255
"mkt_term": 255
"mkt_content": 500
"mkt_campaign": 255
"se_category": 1000
"se_action": 1000
"se_label": 4096
"se_property": 1000
"tr_orderid": 255
"tr_affiliation": 255
"tr_city": 255
"tr_state": 255
"tr_country": 255
"ti_orderid": 255
"ti_sku": 255
"ti_name": 255
"ti_category": 255
"useragent": 1000
"br_name": 50
"br_family": 50
"br_version": 50
"br_type": 50
"br_renderengine": 50
"br_lang": 255
"br_colordepth": 12
"os_name": 50
"os_family": 50
"os_manufacturer": 50
"os_timezone": 255
"dvce_type": 50
"doc_charset": 128
"tr_currency": 3
"ti_currency": 3
"base_currency": 3
"geo_timezone": 64
"etl_tags": 500
"refr_domain_userid": 128
"domain_sessionid": 128
"event_vendor": 1000
"event_name": 1000
"event_format": 128
"event_version": 128
"event_fingerprint": 128
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ import com.snowplowanalytics.snowplow.enrich.common.EtlPipeline
import com.snowplowanalytics.snowplow.enrich.common.outputs.EnrichedEvent
import com.snowplowanalytics.snowplow.enrich.common.adapters.AdapterRegistry
import com.snowplowanalytics.snowplow.enrich.common.loaders.{CollectorPayload, ThriftLoader}
import com.snowplowanalytics.snowplow.enrich.common.enrichments.EnrichmentRegistry
import com.snowplowanalytics.snowplow.enrich.common.enrichments.{AtomicFields, EnrichmentRegistry}
import com.snowplowanalytics.snowplow.enrich.common.utils.ConversionUtils

import com.snowplowanalytics.snowplow.enrich.common.fs2.config.io.FeatureFlags
Expand All @@ -63,14 +63,16 @@ object Enrich {
def run[F[_]: Async, A](env: Environment[F, A]): Stream[F, Unit] = {
val enrichmentsRegistry: F[EnrichmentRegistry[F]] = env.enrichments.get.map(_.registry)
val enrich: Enrich[F] =
enrichWith[F](enrichmentsRegistry,
env.adapterRegistry,
env.igluClient,
env.sentry,
env.processor,
env.featureFlags,
env.metrics.invalidCount,
env.registryLookup
enrichWith[F](
enrichmentsRegistry,
env.adapterRegistry,
env.igluClient,
env.sentry,
env.processor,
env.featureFlags,
env.metrics.invalidCount,
env.registryLookup,
env.atomicFields
)

val enriched =
Expand Down Expand Up @@ -116,7 +118,8 @@ object Enrich {
processor: Processor,
featureFlags: FeatureFlags,
invalidCount: F[Unit],
registryLookup: RegistryLookup[F]
registryLookup: RegistryLookup[F],
atomicFields: AtomicFields
)(
row: Array[Byte]
): F[Result] = {
Expand All @@ -136,7 +139,8 @@ object Enrich {
payload,
FeatureFlags.toCommon(featureFlags),
invalidCount,
registryLookup
registryLookup,
atomicFields
)
} yield (enriched, collectorTstamp)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ import com.snowplowanalytics.snowplow.badrows.Processor

import com.snowplowanalytics.snowplow.enrich.common.adapters.AdapterRegistry
import com.snowplowanalytics.snowplow.enrich.common.adapters.registry.RemoteAdapter
import com.snowplowanalytics.snowplow.enrich.common.enrichments.EnrichmentRegistry
import com.snowplowanalytics.snowplow.enrich.common.enrichments.{AtomicFields, EnrichmentRegistry}
import com.snowplowanalytics.snowplow.enrich.common.enrichments.registry.EnrichmentConf
import com.snowplowanalytics.snowplow.enrich.common.enrichments.registry.EnrichmentConf.ApiRequestConf
import com.snowplowanalytics.snowplow.enrich.common.outputs.EnrichedEvent
Expand Down Expand Up @@ -126,7 +126,8 @@ final case class Environment[F[_], A](
streamsSettings: Environment.StreamsSettings,
region: Option[String],
cloud: Option[Cloud],
featureFlags: FeatureFlags
featureFlags: FeatureFlags,
atomicFields: AtomicFields
)

object Environment {
Expand Down Expand Up @@ -193,7 +194,8 @@ object Environment {
maxRecordSize: Int,
cloud: Option[Cloud],
getRegion: => Option[String],
featureFlags: FeatureFlags
featureFlags: FeatureFlags,
atomicFields: AtomicFields
): Resource[F, Environment[F, A]] = {
val file = parsedConfigs.configFile
for {
Expand Down Expand Up @@ -244,7 +246,8 @@ object Environment {
StreamsSettings(file.concurrency, maxRecordSize),
getRegionFromConfig(file).orElse(getRegion),
cloud,
featureFlags
featureFlags,
atomicFields
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,8 @@ object Run {
maxRecordSize,
cloud,
getRegion,
file.featureFlags
file.featureFlags,
file.validation.atomicFieldsLimits
)
runEnvironment[F, Array[Byte]](env)
case input =>
Expand Down Expand Up @@ -136,7 +137,8 @@ object Run {
maxRecordSize,
cloud,
getRegion,
file.featureFlags
file.featureFlags,
file.validation.atomicFieldsLimits
)
runEnvironment[F, A](env)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ object Telemetry {
"moduleVersion" -> teleCfg.moduleVersion.asJson,
"instanceId" -> teleCfg.instanceId.asJson,
"appGeneratedId" -> java.util.UUID.randomUUID.toString.asJson,
"cloud" -> cloud.asJson,
"cloud" -> cloud.toString.toUpperCase().asJson,
"region" -> region.asJson,
"applicationName" -> appName.asJson,
"applicationVersion" -> appVersion.asJson
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@ import cats.data.EitherT

import cats.effect.kernel.Sync

import _root_.io.circe.{Decoder, Encoder}
import _root_.io.circe.Decoder
import _root_.io.circe.config.syntax._
import _root_.io.circe.generic.extras.semiauto.{deriveConfiguredDecoder, deriveConfiguredEncoder}
import _root_.io.circe.generic.extras.semiauto.deriveConfiguredDecoder

import com.typesafe.config.{ConfigFactory, Config => TSConfig}

Expand Down Expand Up @@ -50,34 +50,30 @@ final case class ConfigFile(
experimental: Option[Experimental],
adaptersSchemas: AdaptersSchemas,
blobStorage: BlobStorageClients,
license: License
license: License,
validation: Validation
)

object ConfigFile {
import AdaptersSchemasEncoderDecoders._

// Missing in circe-config
implicit val finiteDurationEncoder: Encoder[FiniteDuration] =
implicitly[Encoder[String]].contramap(_.toString)
import AdaptersSchemasDecoders._

implicit val configFileDecoder: Decoder[ConfigFile] =
deriveConfiguredDecoder[ConfigFile].emap {
case ConfigFile(_, _, _, Some(aup), _, _, _, _, _, _, _, _) if aup._1 <= 0L =>
case ConfigFile(_, _, _, Some(aup), _, _, _, _, _, _, _, _, _) if aup._1 <= 0L =>
"assetsUpdatePeriod in config file cannot be less than 0".asLeft // TODO: use newtype
// Remove pii output if streamName and region empty
case c @ ConfigFile(_, Outputs(good, Some(output: Output.Kinesis), bad), _, _, _, _, _, _, _, _, _, _) if output.streamName.isEmpty =>
case c @ ConfigFile(_, Outputs(good, Some(output: Output.Kinesis), bad), _, _, _, _, _, _, _, _, _, _, _)
if output.streamName.isEmpty =>
c.copy(output = Outputs(good, None, bad)).asRight
// Remove pii output if topic empty
case c @ ConfigFile(_, Outputs(good, Some(Output.PubSub(t, _, _, _, _, _)), bad), _, _, _, _, _, _, _, _, _, _) if t.isEmpty =>
case c @ ConfigFile(_, Outputs(good, Some(Output.PubSub(t, _, _, _, _, _)), bad), _, _, _, _, _, _, _, _, _, _, _) if t.isEmpty =>
c.copy(output = Outputs(good, None, bad)).asRight
// Remove pii output if topic empty
case c @ ConfigFile(_, Outputs(good, Some(Output.Kafka(topicName, _, _, _, _)), bad), _, _, _, _, _, _, _, _, _, _)
case c @ ConfigFile(_, Outputs(good, Some(Output.Kafka(topicName, _, _, _, _)), bad), _, _, _, _, _, _, _, _, _, _, _)
if topicName.isEmpty =>
c.copy(output = Outputs(good, None, bad)).asRight
case other => other.asRight
}
implicit val configFileEncoder: Encoder[ConfigFile] =
deriveConfiguredEncoder[ConfigFile]

/* Defines where to look for default values if they are not in the provided file
*
Expand Down
Loading

0 comments on commit 304df4e

Please sign in to comment.