From e6df5270a71c0b06a70ac22bf7c16c9661009d2b Mon Sep 17 00:00:00 2001 From: Benjamin Benoist Date: Wed, 14 Feb 2024 11:08:08 +0000 Subject: [PATCH] Add incomplete events (close #) Before this change, any error in the enriching workflow would short circuit and a bad row would be emitted. After this change, if incomplete events are enabled, the enriching goes to the end with what is possible, accumulating errors as it goes. Errors get attached in derived_contexts. There are now 3 main steps : - Mapping and validating the input. This includes mapping fields of payload_data to the atomic event (e.g. tr_tt to tr_total while converting from string to number) and validating the contexts and unstruct event. Everything that goes wrong gets wrapped up in a SchemaViolations bad row. - Running the enrichments. Everything that goes wrong gets wrapped up in an EnrichmentFailures bad row. - Validating the output. This includes validating the enrichments contexts and the atomic fields lengths. Everything that goes wrong gets wrapped up in a SchemaViolations EnrichmentFailures bad row. --- config/config.file.extended.hocon | 8 + config/config.kafka.extended.hocon | 19 + config/config.kinesis.extended.hocon | 49 +- config/config.nsq.extended.hocon | 21 + config/config.pubsub.extended.hocon | 25 + .../snowplow/enrich/common/fs2/Enrich.scala | 45 +- .../enrich/common/fs2/Environment.scala | 5 + .../snowplow/enrich/common/fs2/Run.scala | 4 + .../enrich/common/fs2/config/ConfigFile.scala | 29 +- .../enrich/common/fs2/config/io.scala | 3 +- .../snowplow/enrich/common/fs2/package.scala | 6 +- .../enrich/common/fs2/EnrichSpec.scala | 35 +- .../common/fs2/EventGenEtlPipelineSpec.scala | 15 +- .../common/fs2/blackbox/BlackBoxTesting.scala | 7 +- .../blackbox/adapters/Tp2AdapterSpec.scala | 5 +- .../common/fs2/config/ParsedConfigsSpec.scala | 3 +- .../common/fs2/test/TestEnvironment.scala | 2 + .../common/EtlPipeline.scala | 24 +- .../AtomicFieldsLengthValidator.scala | 25 +- .../enrichments/EnrichmentManager.scala | 184 +++-- .../common/utils/IgluUtils.scala | 111 +-- .../common/utils/JsonUtils.scala | 19 +- .../EtlPipelineSpec.scala | 29 +- .../SpecHelpers.scala | 23 +- .../enrichments/EnrichmentManagerSpec.scala | 712 ++++++++++++++---- .../pii/PiiPseudonymizerEnrichmentSpec.scala | 24 +- .../utils/IgluUtilsSpec.scala | 404 ++++++---- .../utils/ValidateAndReformatJsonSpec.scala | 51 -- .../kafka/src/main/resources/application.conf | 15 + .../AzureAuthenticationCallbackHandler.scala | 2 + .../Main.scala | 1 + .../snowplow/enrich/kafka/ConfigSpec.scala | 17 +- .../resources/enrich/enrich-localstack.hocon | 7 + .../snowplow/enrich/kinesis/Containers.scala | 3 +- .../enrich/kinesis/EnrichKinesisSpec.scala | 8 +- .../enrich/kinesis/KinesisConfig.scala | 7 +- .../snowplow/enrich/kinesis/utils.scala | 23 +- .../src/main/resources/application.conf | 17 + .../snowplow/enrich/kinesis/Main.scala | 1 + .../snowplow/enrich/kinesis/ConfigSpec.scala | 15 +- .../snowplow/enrich/nsq/Main.scala | 1 + .../snowplow/enrich/nsq/ConfigSpec.scala | 15 +- .../src/main/resources/application.conf | 12 + .../snowplow/enrich/pubsub/Main.scala | 1 + .../snowplow/enrich/pubsub/ConfigSpec.scala | 13 +- 45 files changed, 1462 insertions(+), 583 deletions(-) delete mode 100644 modules/common/src/test/scala/com.snowplowanalytics.snowplow.enrich.common/utils/ValidateAndReformatJsonSpec.scala diff --git a/config/config.file.extended.hocon b/config/config.file.extended.hocon index b87a9ffd2..171a893fd 100644 --- a/config/config.file.extended.hocon +++ b/config/config.file.extended.hocon @@ -31,6 +31,14 @@ "file": "/var/bad" "maxBytes": 1000000 } + + # Incomplete events output + "incomplete": { + # Local FS supported for testing purposes + "type": "FileSystem" + "file": "/var/incomplete" + "maxBytes": 1000000 + } } # Optional. Concurrency of the app diff --git a/config/config.kafka.extended.hocon b/config/config.kafka.extended.hocon index 78965106f..8beafa1cc 100644 --- a/config/config.kafka.extended.hocon +++ b/config/config.kafka.extended.hocon @@ -89,6 +89,25 @@ "acks": "all" } } + + # Optional. Incomplete events output. + # If set, an incomplete enriched event holding the errors in derived_context will get emitted on top of a bad row + "incomplete": { + "type": "Kafka" + + # Name of the Kafka topic to write to + "topicName": "incomplete" + + # A list of host:port pairs to use for establishing the initial connection to the Kafka cluster + # This list should be in the form host1:port1,host2:port2,... + "bootstrapServers": "localhost:9092" + + # Optional, Kafka producer configuration + # See https://kafka.apache.org/documentation/#producerconfigs for all properties + "producerConf": { + "acks": "all" + } + } } # Optional. Concurrency of the app diff --git a/config/config.kinesis.extended.hocon b/config/config.kinesis.extended.hocon index 46aa49adc..6568096d4 100644 --- a/config/config.kinesis.extended.hocon +++ b/config/config.kinesis.extended.hocon @@ -94,7 +94,7 @@ # Otherwise, the partition key will be a random UUID. # "partitionKey": "user_id" - # Optional. Policy to retry if writing to kinesis fails with unexepected errors + # Optional. Policy to retry if writing to kinesis fails with unexpected errors "backoffPolicy": { "minBackoff": 100 milliseconds "maxBackoff": 10 seconds @@ -144,7 +144,7 @@ # Otherwise, the partition key will be a random UUID. # "partitionKey": "user_id" - # Optional. Policy to retry if writing to kinesis fails with unexepcted errors + # Optional. Policy to retry if writing to kinesis fails with unexpected errors "backoffPolicy": { "minBackoff": 100 milliseconds "maxBackoff": 10 seconds @@ -186,7 +186,50 @@ # https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/regions/providers/DefaultAwsRegionProviderChain.html "region": "eu-central-1" - # Optional. Policy to retry if writing to kinesis fails with unexepcted errors + # Optional. Policy to retry if writing to kinesis fails with unexpected errors + "backoffPolicy": { + "minBackoff": 100 milliseconds + "maxBackoff": 10 seconds + "maxRetries": 10 + } + + # Optional. Policy to retry if writing to kinesis exceeds the provisioned throughput. + "throttledBackoffPolicy": { + "minBackoff": 100 milliseconds + "maxBackoff": 1 second + } + + # Optional. Limits the number of events in a single PutRecords request. + # Several requests are made in parallel + # Maximum allowed: 500 + "recordLimit": 500 + + # Optional. Limits the number of bytes in a single PutRecords request, + # including records and partition keys. + # Several requests are made in parallel + # Maximum allowed: 5 MB + "byteLimit": 5242880 + + # Optional. Use a custom Kinesis endpoint. + # Can be used for instance to work locally with localstack + # "customEndpoint": "https://localhost:4566" + } + + # Optional. Incomplete events output. + # If set, an incomplete enriched event holding the errors in derived_context will get emitted on top of a bad row + "incomplete": { + "type": "Kinesis" + + # Name of the Kinesis stream to write to + "streamName": "incomplete" + + # Optional. Region where the Kinesis stream is located + # This field is optional if it can be resolved with AWS region provider chain. + # It checks places like env variables, system properties, AWS profile file. + # https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/regions/providers/DefaultAwsRegionProviderChain.html + "region": "eu-central-1" + + # Optional. Policy to retry if writing to kinesis fails with unexpected errors "backoffPolicy": { "minBackoff": 100 milliseconds "maxBackoff": 10 seconds diff --git a/config/config.nsq.extended.hocon b/config/config.nsq.extended.hocon index 07532f0f5..6e5aae5bd 100644 --- a/config/config.nsq.extended.hocon +++ b/config/config.nsq.extended.hocon @@ -95,6 +95,27 @@ "maxRetries": 10 } } + + # Incomplete events output + "incomplete": { + "type": "Nsq" + + # Name of the NSQ topic that will receive the incomplete events + "topic": "incomplete" + + # The host name of nsqd application + "nsqdHost": "127.0.0.1" + + # The port number of nsqd application + "nsqdPort": 4150 + + # Optional. Policy to retry if writing to NSQ fails + "backoffPolicy": { + "minBackoff": 100 milliseconds + "maxBackoff": 10 seconds + "maxRetries": 10 + } + } } # Optional. Concurrency of the app diff --git a/config/config.pubsub.extended.hocon b/config/config.pubsub.extended.hocon index 43388b12b..32425f39d 100644 --- a/config/config.pubsub.extended.hocon +++ b/config/config.pubsub.extended.hocon @@ -114,6 +114,31 @@ # Note the PubSub maximum is 10MB "maxBatchBytes": 8000000 } + + # Optional. Incomplete events output. + # If set, an incomplete enriched event holding the errors in derived_context will get emitted on top of a bad row + "incomplete": { + "type": "PubSub" + + # Name of the PubSub topic that will receive the incomplete events + "topic": "projects/test-project/topics/incomplete" + + # Optional. Delay threshold to use for batching. + # After this amount of time has elapsed, + # before maxBatchSize and maxBatchBytes have been reached, + # messages from the buffer will be sent. + "delayThreshold": 200 milliseconds + + # Optional. Maximum number of messages sent within a batch. + # When the buffer reaches this number of messages they are sent. + # PubSub maximum : 1000 + "maxBatchSize": 1000 + + # Optional. Maximum number of bytes sent within a batch. + # When the buffer reaches this size messages are sent. + # Note the PubSub maximum is 10MB + "maxBatchBytes": 8000000 + } } # Optional. Concurrency of the app diff --git a/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/Enrich.scala b/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/Enrich.scala index 89753726b..33ea660ad 100644 --- a/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/Enrich.scala +++ b/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/Enrich.scala @@ -16,7 +16,7 @@ import java.util.Base64 import org.joda.time.DateTime -import cats.data.{NonEmptyList, ValidatedNel} +import cats.data.{Ior, NonEmptyList, ValidatedNel} import cats.{Monad, Parallel} import cats.implicits._ @@ -72,7 +72,8 @@ object Enrich { env.featureFlags, env.metrics.invalidCount, env.registryLookup, - env.atomicFields + env.atomicFields, + env.sinkIncomplete.isDefined ) val enriched = @@ -119,7 +120,8 @@ object Enrich { featureFlags: FeatureFlags, invalidCount: F[Unit], registryLookup: RegistryLookup[F], - atomicFields: AtomicFields + atomicFields: AtomicFields, + emitIncomplete: Boolean )( row: Array[Byte] ): F[Result] = { @@ -140,7 +142,8 @@ object Enrich { FeatureFlags.toCommon(featureFlags), invalidCount, registryLookup, - atomicFields + atomicFields, + emitIncomplete ) } yield (enriched, collectorTstamp) @@ -170,7 +173,7 @@ object Enrich { case None => Sync[F].unit } - } yield (List(badRow.invalid), collectorTstamp) + } yield (List(Ior.left(badRow)), collectorTstamp) /** Build a `generic_error` bad row for unhandled runtime errors */ def genericBadRow( @@ -189,17 +192,29 @@ object Enrich { chunk: List[Result], env: Environment[F, A] ): F[Unit] = { - val (bad, enriched) = + val (bad, enriched, incomplete) = chunk .flatMap(_._1) - .map(_.toEither) - .separate + .foldLeft((List.empty[BadRow], List.empty[EnrichedEvent], List.empty[EnrichedEvent])) { + case (previous, item) => + val (bad, enriched, incomplete) = previous + item match { + case Ior.Right(e) => (bad, e :: enriched, incomplete) + case Ior.Left(br) => (br :: bad, enriched, incomplete) + case Ior.Both(br, i) => (br :: bad, enriched, i :: incomplete) + } + } val (moreBad, good) = enriched.map { e => serializeEnriched(e, env.processor, env.streamsSettings.maxRecordSize) .map(bytes => (e, AttributedData(bytes, env.goodPartitionKey(e), env.goodAttributes(e)))) }.separate + val (incompleteTooBig, incompleteBytes) = incomplete.map { e => + serializeEnriched(e, env.processor, env.streamsSettings.maxRecordSize) + .map(bytes => AttributedData(bytes, env.goodPartitionKey(e), env.goodAttributes(e))) + }.separate + val allBad = (bad ++ moreBad).map(badRowResize(env, _)) List( @@ -214,7 +229,10 @@ object Enrich { env.processor, env.streamsSettings.maxRecordSize ) *> env.metrics.enrichLatency(chunk.headOption.flatMap(_._2)), - sinkBad(allBad, env.sinkBad, env.metrics.badCount) + sinkBad(allBad, env.sinkBad, env.metrics.badCount), + if (incompleteTooBig.nonEmpty) Logger[F].warn(s"${incompleteTooBig.size} incomplete events discarded because they are too big") + else Sync[F].unit, + sinkIncomplete(incompleteBytes, env.sinkIncomplete) ).parSequence_ } @@ -272,6 +290,15 @@ object Enrich { Sync[F].unit } + def sinkIncomplete[F[_]: Sync]( + incomplete: List[AttributedData[Array[Byte]]], + maybeSink: Option[AttributedByteSink[F]] + ): F[Unit] = + maybeSink match { + case Some(sink) => sink(incomplete) + case None => Sync[F].unit + } + def serializeEnriched( enriched: EnrichedEvent, processor: Processor, diff --git a/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/Environment.scala b/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/Environment.scala index 648ee6c2f..6eaac5d6f 100644 --- a/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/Environment.scala +++ b/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/Environment.scala @@ -78,6 +78,7 @@ import com.snowplowanalytics.snowplow.enrich.common.fs2.io.experimental.Metadata * @param sinkGood function that sinks enriched event * @param sinkPii function that sinks pii event * @param sinkBad function that sinks an event that failed validation or enrichment + * @param sinkIncomplete function that sinks incomplete events * @param checkpoint function that checkpoints input stream records * @param getPayload function that extracts the collector payload bytes from a record * @param sentry optional sentry client @@ -111,6 +112,7 @@ final case class Environment[F[_], A]( sinkGood: AttributedByteSink[F], sinkPii: Option[AttributedByteSink[F]], sinkBad: ByteSink[F], + sinkIncomplete: Option[AttributedByteSink[F]], checkpoint: List[A] => F[Unit], getPayload: A => Array[Byte], sentry: Option[SentryClient], @@ -187,6 +189,7 @@ object Environment { sinkGood: Resource[F, AttributedByteSink[F]], sinkPii: Option[Resource[F, AttributedByteSink[F]]], sinkBad: Resource[F, ByteSink[F]], + sinkIncomplete: Option[Resource[F, AttributedByteSink[F]]], clients: Resource[F, List[Client[F]]], checkpoint: List[A] => F[Unit], getPayload: A => Array[Byte], @@ -204,6 +207,7 @@ object Environment { good <- sinkGood bad <- sinkBad pii <- sinkPii.sequence + incomplete <- sinkIncomplete.sequence http4s <- Clients.mkHttp() clts <- clients.map(Clients.init(http4s, _)) igluClient <- IgluCirceClient.parseDefault[F](parsedConfigs.igluJson).resource @@ -231,6 +235,7 @@ object Environment { good, pii, bad, + incomplete, checkpoint, getPayload, sentry, diff --git a/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/Run.scala b/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/Run.scala index 746c582af..af246637d 100644 --- a/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/Run.scala +++ b/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/Run.scala @@ -57,6 +57,7 @@ object Run { mkSinkGood: Output => Resource[F, AttributedByteSink[F]], mkSinkPii: Output => Resource[F, AttributedByteSink[F]], mkSinkBad: Output => Resource[F, ByteSink[F]], + mkSinkIncomplete: Output => Resource[F, AttributedByteSink[F]], checkpoint: List[A] => F[Unit], mkClients: BlobStorageClients => List[Resource[F, Client[F]]], getPayload: A => Array[Byte], @@ -89,6 +90,7 @@ object Run { case _ => mkSinkBad(file.output.bad) } + sinkIncomplete = file.output.incomplete.map(out => initAttributedSink(out, mkSinkIncomplete)) clients = mkClients(file.blobStorage).sequence exit <- file.input match { case p: Input.FileSystem => @@ -100,6 +102,7 @@ object Run { sinkGood, sinkPii, sinkBad, + sinkIncomplete, clients, _ => Sync[F].unit, identity, @@ -130,6 +133,7 @@ object Run { sinkGood, sinkPii, sinkBad, + sinkIncomplete, clients, checkpointing, getPayload, diff --git a/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/config/ConfigFile.scala b/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/config/ConfigFile.scala index c2cd740a9..6ffb12f61 100644 --- a/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/config/ConfigFile.scala +++ b/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/config/ConfigFile.scala @@ -59,20 +59,23 @@ object ConfigFile { implicit val configFileDecoder: Decoder[ConfigFile] = deriveConfiguredDecoder[ConfigFile].emap { - case ConfigFile(_, _, _, Some(aup), _, _, _, _, _, _, _, _, _) if aup._1 <= 0L => + case c: ConfigFile if c.assetsUpdatePeriod.exists(_.length <= 0L) => "assetsUpdatePeriod in config file cannot be less than 0".asLeft // TODO: use newtype - // Remove pii output if streamName and region empty - case c @ ConfigFile(_, Outputs(good, Some(output: Output.Kinesis), bad), _, _, _, _, _, _, _, _, _, _, _) - if output.streamName.isEmpty => - c.copy(output = Outputs(good, None, bad)).asRight - // Remove pii output if topic empty - case c @ ConfigFile(_, Outputs(good, Some(Output.PubSub(t, _, _, _, _, _)), bad), _, _, _, _, _, _, _, _, _, _, _) if t.isEmpty => - c.copy(output = Outputs(good, None, bad)).asRight - // Remove pii output if topic empty - case c @ ConfigFile(_, Outputs(good, Some(Output.Kafka(topicName, _, _, _, _)), bad), _, _, _, _, _, _, _, _, _, _, _) - if topicName.isEmpty => - c.copy(output = Outputs(good, None, bad)).asRight - case other => other.asRight + case c: ConfigFile => + val Outputs(good, pii, bad, incomplete) = c.output + val piiCleaned = pii match { + case Some(ki: Output.Kinesis) if ki.streamName.isEmpty => None + case Some(p: Output.PubSub) if p.topic.isEmpty => None + case Some(ka: Output.Kafka) if ka.topicName.isEmpty => None + case _ => pii + } + val incompleteCleaned = incomplete match { + case Some(ki: Output.Kinesis) if ki.streamName.isEmpty => None + case Some(p: Output.PubSub) if p.topic.isEmpty => None + case Some(ka: Output.Kafka) if ka.topicName.isEmpty => None + case _ => incomplete + } + c.copy(output = Outputs(good, piiCleaned, bad, incompleteCleaned)).asRight } /* Defines where to look for default values if they are not in the provided file diff --git a/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/config/io.scala b/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/config/io.scala index e43c6dd20..633781548 100644 --- a/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/config/io.scala +++ b/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/config/io.scala @@ -209,7 +209,8 @@ object io { case class Outputs( good: Output, pii: Option[Output], - bad: Output + bad: Output, + incomplete: Option[Output] ) object Outputs { implicit val outputsDecoder: Decoder[Outputs] = deriveConfiguredDecoder[Outputs] diff --git a/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/package.scala b/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/package.scala index a6a0a2337..40cd78efe 100644 --- a/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/package.scala +++ b/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/package.scala @@ -10,7 +10,7 @@ */ package com.snowplowanalytics.snowplow.enrich.common -import cats.data.{EitherT, Validated, ValidatedNel} +import cats.data.{EitherT, Ior, ValidatedNel} import com.snowplowanalytics.snowplow.badrows.BadRow @@ -25,8 +25,8 @@ package object fs2 { type ByteSink[F[_]] = List[Array[Byte]] => F[Unit] type AttributedByteSink[F[_]] = List[AttributedData[Array[Byte]]] => F[Unit] - /** Enrichment result, containing list of (valid and invalid) results as well as the collector timestamp */ - type Result = (List[Validated[BadRow, EnrichedEvent]], Option[Long]) + type Enriched = Ior[BadRow, EnrichedEvent] + type Result = (List[Enriched], Option[Long]) /** Function to transform an origin raw payload into good and/or bad rows */ type Enrich[F[_]] = Array[Byte] => F[Result] diff --git a/modules/common-fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/EnrichSpec.scala b/modules/common-fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/EnrichSpec.scala index 85c06a0e9..b285145a2 100644 --- a/modules/common-fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/EnrichSpec.scala +++ b/modules/common-fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/EnrichSpec.scala @@ -15,7 +15,7 @@ import java.util.{Base64, UUID} import scala.concurrent.duration._ -import cats.data.{NonEmptyList, Validated} +import cats.data.{Ior, NonEmptyList, Validated} import cats.implicits._ import cats.effect.IO @@ -77,13 +77,14 @@ class EnrichSpec extends Specification with CatsEffect with ScalaCheck { EnrichSpec.featureFlags, IO.unit, SpecHelpers.registryLookup, - AtomicFields.from(valueLimits = Map.empty) + AtomicFields.from(valueLimits = Map.empty), + SpecHelpers.emitIncomplete )( EnrichSpec.payload ) .map(normalizeResult) .map { - case List(Validated.Valid(event)) => event must beEqualTo(expected) + case List(Ior.Right(event)) => event must beEqualTo(expected) case other => ko(s"Expected one valid event, got $other") } } @@ -107,13 +108,14 @@ class EnrichSpec extends Specification with CatsEffect with ScalaCheck { EnrichSpec.featureFlags, IO.unit, SpecHelpers.registryLookup, - AtomicFields.from(valueLimits = Map.empty) + AtomicFields.from(valueLimits = Map.empty), + SpecHelpers.emitIncomplete )( payload ) .map(normalizeResult) .map { - case List(Validated.Valid(e)) => e.event must beSome("page_view") + case List(Ior.Right(e)) => e.event must beSome("page_view") case other => ko(s"Expected one valid event, got $other") } } @@ -145,13 +147,14 @@ class EnrichSpec extends Specification with CatsEffect with ScalaCheck { EnrichSpec.featureFlags.copy(tryBase64Decoding = true), IO.unit, SpecHelpers.registryLookup, - AtomicFields.from(valueLimits = Map.empty) + AtomicFields.from(valueLimits = Map.empty), + SpecHelpers.emitIncomplete )( Base64.getEncoder.encode(EnrichSpec.payload) ) .map(normalizeResult) .map { - case List(Validated.Valid(event)) => event must beEqualTo(expected) + case List(Ior.Right(event)) => event must beEqualTo(expected) case other => ko(s"Expected one valid event, got $other") } } @@ -169,13 +172,14 @@ class EnrichSpec extends Specification with CatsEffect with ScalaCheck { EnrichSpec.featureFlags, IO.unit, SpecHelpers.registryLookup, - AtomicFields.from(valueLimits = Map.empty) + AtomicFields.from(valueLimits = Map.empty), + SpecHelpers.emitIncomplete )( Base64.getEncoder.encode(EnrichSpec.payload) ) .map(normalizeResult) .map { - case List(Validated.Invalid(badRow)) => println(badRow); ok + case List(Ior.Left(_)) => ok case other => ko(s"Expected one bad row, got $other") } } @@ -449,16 +453,16 @@ class EnrichSpec extends Specification with CatsEffect with ScalaCheck { def sinkGood( environment: Environment[IO, Array[Byte]], enriched: EnrichedEvent - ): IO[Unit] = sinkOne(environment, Validated.Valid(enriched)) + ): IO[Unit] = sinkOne(environment, Ior.Right(enriched)) def sinkBad( environment: Environment[IO, Array[Byte]], badRow: BadRow - ): IO[Unit] = sinkOne(environment, Validated.Invalid(badRow)) + ): IO[Unit] = sinkOne(environment, Ior.Left(badRow)) def sinkOne( environment: Environment[IO, Array[Byte]], - event: Validated[BadRow, EnrichedEvent] + event: Ior[BadRow, EnrichedEvent] ): IO[Unit] = Enrich.sinkChunk(List((List(event), None)), environment) } @@ -491,10 +495,11 @@ object EnrichSpec { Validated.Invalid(badRow) } - def normalizeResult(payload: Result): List[Validated[BadRow, Event]] = + def normalizeResult(payload: Result): List[Ior[BadRow, Event]] = payload._1.map { - case Validated.Valid(a) => normalize(ConversionUtils.tabSeparatedEnrichedEvent(a)) - case Validated.Invalid(e) => e.invalid + case Ior.Right(enriched) => normalize(ConversionUtils.tabSeparatedEnrichedEvent(enriched)).toIor + case Ior.Left(err) => Ior.Left(err) + case Ior.Both(_, enriched) => normalize(ConversionUtils.tabSeparatedEnrichedEvent(enriched)).toIor } val minimalEvent = Event diff --git a/modules/common-fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/EventGenEtlPipelineSpec.scala b/modules/common-fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/EventGenEtlPipelineSpec.scala index f9632d9d5..55fee9718 100644 --- a/modules/common-fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/EventGenEtlPipelineSpec.scala +++ b/modules/common-fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/EventGenEtlPipelineSpec.scala @@ -10,7 +10,7 @@ */ package com.snowplowanalytics.snowplow.enrich.common.fs2 -import cats.data.{Validated, ValidatedNel} +import cats.data.{Ior, ValidatedNel} import cats.effect.testing.specs2.CatsEffect import cats.effect.IO import cats.effect.unsafe.implicits.global @@ -43,10 +43,12 @@ import org.specs2.specification.core.{Fragment, Fragments} import java.time.Instant import scala.util.{Random, Try} + import com.snowplowanalytics.snowplow.enrich.common.enrichments.AtomicFields -class EventGenEtlPipelineSpec extends Specification with CatsEffect { +import com.snowplowanalytics.snowplow.enrich.common.SpecHelpers +class EventGenEtlPipelineSpec extends Specification with CatsEffect { case class ContextMatcher(v: String) implicit val cmDecoder: Decoder[ContextMatcher] = Decoder.decodeString.emapTry { str => @@ -198,7 +200,7 @@ class EventGenEtlPipelineSpec extends Specification with CatsEffect { val dateTime = DateTime.now() val process = Processor("EventGenEtlPipelineSpec", "v1") - def processEvents(e: CollectorPayload): IO[List[Validated[BadRow, EnrichedEvent]]] = + def processEvents(e: CollectorPayload): IO[List[Ior[BadRow, EnrichedEvent]]] = EtlPipeline.processEvents[IO]( adapterRegistry, enrichmentReg, @@ -209,7 +211,8 @@ class EventGenEtlPipelineSpec extends Specification with CatsEffect { EtlPipeline.FeatureFlags(acceptInvalid = false, legacyEnrichmentOrder = false), IO.unit, SpecHelpers.registryLookup, - AtomicFields.from(Map.empty) + AtomicFields.from(Map.empty), + SpecHelpers.emitIncomplete ) def rethrowBadRows[A]: Pipe[IO, ValidatedNel[BadRow, A], A] = @@ -223,8 +226,8 @@ class EventGenEtlPipelineSpec extends Specification with CatsEffect { ).toEither ).rethrow[IO, A] - def rethrowBadRow[A]: Pipe[IO, Validated[BadRow, A], A] = - (in: Stream[IO, Validated[BadRow, A]]) => + def rethrowBadRow[A]: Pipe[IO, Ior[BadRow, A], A] = + (in: Stream[IO, Ior[BadRow, A]]) => in .map(_.leftMap(br => new Exception(br.compact)).toEither) .rethrow[IO, A] diff --git a/modules/common-fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/blackbox/BlackBoxTesting.scala b/modules/common-fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/blackbox/BlackBoxTesting.scala index b20a225ca..ee658f36d 100644 --- a/modules/common-fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/blackbox/BlackBoxTesting.scala +++ b/modules/common-fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/blackbox/BlackBoxTesting.scala @@ -19,7 +19,7 @@ import cats.effect.kernel.Resource import cats.effect.testing.specs2.CatsEffect -import cats.data.Validated +import cats.data.Ior import cats.data.Validated.{Invalid, Valid} import io.circe.Json @@ -104,12 +104,13 @@ object BlackBoxTesting extends Specification with CatsEffect { featureFlags, IO.unit, SpecHelpers.registryLookup, - AtomicFields.from(valueLimits = Map.empty) + AtomicFields.from(valueLimits = Map.empty), + SpecHelpers.emitIncomplete )( input ) .map { - case (List(Validated.Valid(enriched)), _) => checkEnriched(enriched, expected) + case (List(Ior.Right(enriched)), _) => checkEnriched(enriched, expected) case other => ko(s"there should be one enriched event but got $other") } } diff --git a/modules/common-fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/blackbox/adapters/Tp2AdapterSpec.scala b/modules/common-fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/blackbox/adapters/Tp2AdapterSpec.scala index 07698aba5..0aa68adab 100644 --- a/modules/common-fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/blackbox/adapters/Tp2AdapterSpec.scala +++ b/modules/common-fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/blackbox/adapters/Tp2AdapterSpec.scala @@ -43,12 +43,13 @@ class Tp2AdapterSpec extends Specification with CatsEffect { EnrichSpec.featureFlags, IO.unit, SpecHelpers.registryLookup, - AtomicFields.from(valueLimits = Map.empty) + AtomicFields.from(valueLimits = Map.empty), + SpecHelpers.emitIncomplete )( input ) .map { - case (l, _) if l.forall(_.isValid) => l must haveSize(10) + case (l, _) if l.forall(_.isRight) => l must haveSize(10) case other => ko(s"there should be 10 enriched events, got $other") } } diff --git a/modules/common-fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/config/ParsedConfigsSpec.scala b/modules/common-fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/config/ParsedConfigsSpec.scala index 7e5f770c6..effde6e90 100644 --- a/modules/common-fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/config/ParsedConfigsSpec.scala +++ b/modules/common-fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/config/ParsedConfigsSpec.scala @@ -71,7 +71,8 @@ class ParsedConfigsSpec extends Specification with CatsEffect { 1000, 10000000, io.GcpUserAgent("Snowplow OSS") - ) + ), + None ), io.Concurrency(10000, 64), Some(7.days), diff --git a/modules/common-fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/test/TestEnvironment.scala b/modules/common-fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/test/TestEnvironment.scala index 53cf12cf9..515aec603 100644 --- a/modules/common-fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/test/TestEnvironment.scala +++ b/modules/common-fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/test/TestEnvironment.scala @@ -126,6 +126,7 @@ object TestEnvironment extends CatsEffect { goodRef <- Resource.eval(Ref.of[IO, Vector[AttributedData[Array[Byte]]]](Vector.empty)) piiRef <- Resource.eval(Ref.of[IO, Vector[AttributedData[Array[Byte]]]](Vector.empty)) badRef <- Resource.eval(Ref.of[IO, Vector[Array[Byte]]](Vector.empty)) + incompleteRef <- Resource.eval(Ref.of[IO, Vector[AttributedData[Array[Byte]]]](Vector.empty)) igluClient <- Resource.eval(SpecHelpers.createIgluClient(List(embeddedRegistry))) environment = Environment[IO, Array[Byte]]( igluClient, @@ -141,6 +142,7 @@ object TestEnvironment extends CatsEffect { g => goodRef.update(_ ++ g), Some(p => piiRef.update(_ ++ p)), b => badRef.update(_ ++ b), + Some(i => incompleteRef.update(_ ++ i)), _ => IO.unit, identity, None, diff --git a/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/EtlPipeline.scala b/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/EtlPipeline.scala index 736cd661e..6ac39c633 100644 --- a/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/EtlPipeline.scala +++ b/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/EtlPipeline.scala @@ -10,7 +10,7 @@ */ package com.snowplowanalytics.snowplow.enrich.common -import cats.data.{Validated, ValidatedNel} +import cats.data.{Ior, Validated, ValidatedNel} import cats.effect.kernel.Sync import cats.implicits._ @@ -40,10 +40,6 @@ object EtlPipeline { case class FeatureFlags(acceptInvalid: Boolean, legacyEnrichmentOrder: Boolean) /** - * A helper method to take a ValidatedMaybeCanonicalInput and transform it into a List (possibly - * empty) of ValidatedCanonicalOutputs. - * We have to do some unboxing because enrichEvent expects a raw CanonicalInput as its argument, - * not a MaybeCanonicalInput. * @param adapterRegistry Contains all of the events adapters * @param enrichmentRegistry Contains configuration for all enrichments to apply * @param client Our Iglu client, for schema lookups and validation @@ -52,8 +48,6 @@ object EtlPipeline { * @param input The ValidatedMaybeCanonicalInput * @param featureFlags The feature flags available in the current version of Enrich * @param invalidCount Function to increment the count of invalid events - * @return the ValidatedMaybeCanonicalOutput. Thanks to flatMap, will include any validation - * errors contained within the ValidatedMaybeCanonicalInput */ def processEvents[F[_]: Sync]( adapterRegistry: AdapterRegistry[F], @@ -65,8 +59,9 @@ object EtlPipeline { featureFlags: FeatureFlags, invalidCount: F[Unit], registryLookup: RegistryLookup[F], - atomicFields: AtomicFields - ): F[List[Validated[BadRow, EnrichedEvent]]] = + atomicFields: AtomicFields, + emitIncomplete: Boolean + ): F[List[Ior[BadRow, EnrichedEvent]]] = input match { case Validated.Valid(Some(payload)) => adapterRegistry @@ -84,16 +79,17 @@ object EtlPipeline { featureFlags, invalidCount, registryLookup, - atomicFields + atomicFields, + emitIncomplete ) - .toValidated + .value } case Validated.Invalid(badRow) => - Sync[F].pure(List(badRow.invalid[EnrichedEvent])) + Sync[F].pure(List(Ior.left(badRow))) } case Validated.Invalid(badRows) => - Sync[F].pure(badRows.map(_.invalid[EnrichedEvent])).map(_.toList) + Sync[F].pure(badRows.toList.map(br => Ior.left(br))) case Validated.Valid(None) => - Sync[F].pure(List.empty[Validated[BadRow, EnrichedEvent]]) + Sync[F].pure(Nil) } } diff --git a/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/AtomicFieldsLengthValidator.scala b/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/AtomicFieldsLengthValidator.scala index 4eb514a49..c14a7a6d2 100644 --- a/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/AtomicFieldsLengthValidator.scala +++ b/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/AtomicFieldsLengthValidator.scala @@ -14,8 +14,7 @@ import org.slf4j.LoggerFactory import cats.Monad import cats.data.Validated.{Invalid, Valid} -import cats.data.NonEmptyList - +import cats.data.{Ior, IorT, NonEmptyList} import cats.implicits._ import com.snowplowanalytics.iglu.client.validator.ValidatorReport @@ -38,16 +37,18 @@ object AtomicFieldsLengthValidator { acceptInvalid: Boolean, invalidCount: F[Unit], atomicFields: AtomicFields - ): F[Either[FailureDetails.SchemaViolation, Unit]] = - atomicFields.value - .map(field => validateField(event, field).toValidatedNel) - .combineAll match { - case Invalid(errors) if acceptInvalid => - handleAcceptableErrors(invalidCount, event, errors) *> Monad[F].pure(Right(())) - case Invalid(errors) => - Monad[F].pure(AtomicFields.errorsToSchemaViolation(errors).asLeft) - case Valid(()) => - Monad[F].pure(Right(())) + ): IorT[F, FailureDetails.SchemaViolation, Unit] = + IorT { + atomicFields.value + .map(validateField(event, _).toValidatedNel) + .combineAll match { + case Invalid(errors) if acceptInvalid => + handleAcceptableErrors(invalidCount, event, errors) *> Monad[F].pure(Ior.Right(())) + case Invalid(errors) => + Monad[F].pure(Ior.Both(AtomicFields.errorsToSchemaViolation(errors), ())) + case Valid(()) => + Monad[F].pure(Ior.Right(())) + } } private def validateField( diff --git a/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/EnrichmentManager.scala b/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/EnrichmentManager.scala index d8b0bd4fb..54be85e71 100644 --- a/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/EnrichmentManager.scala +++ b/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/EnrichmentManager.scala @@ -15,10 +15,10 @@ import java.net.URI import java.time.Instant import org.joda.time.DateTime import io.circe.Json -import cats.{Applicative, Monad} -import cats.data.{EitherT, NonEmptyList, OptionT, StateT} -import cats.effect.kernel.{Clock, Sync} +import cats.{Applicative, Functor, Monad} +import cats.data.{EitherT, Ior, IorT, NonEmptyList, OptionT, StateT} import cats.implicits._ +import cats.effect.kernel.{Clock, Sync} import com.snowplowanalytics.refererparser._ @@ -26,7 +26,7 @@ import com.snowplowanalytics.iglu.client.IgluCirceClient import com.snowplowanalytics.iglu.client.resolver.registries.RegistryLookup import com.snowplowanalytics.iglu.client.validator.ValidatorReport -import com.snowplowanalytics.iglu.core.SelfDescribingData +import com.snowplowanalytics.iglu.core.{SchemaKey, SchemaVer, SelfDescribingData} import com.snowplowanalytics.iglu.core.circe.implicits._ import com.snowplowanalytics.snowplow.badrows._ @@ -43,8 +43,6 @@ import com.snowplowanalytics.snowplow.enrich.common.enrichments.registry.sqlquer import com.snowplowanalytics.snowplow.enrich.common.enrichments.web.{PageEnrichments => WPE} import com.snowplowanalytics.snowplow.enrich.common.outputs.EnrichedEvent import com.snowplowanalytics.snowplow.enrich.common.utils.{IgluUtils, ConversionUtils => CU} -import _root_.com.snowplowanalytics.iglu.core.SchemaKey -import com.snowplowanalytics.iglu.core.SchemaVer object EnrichmentManager { @@ -57,7 +55,9 @@ object EnrichmentManager { * @param raw Canonical input event to enrich * @param featureFlags The feature flags available in the current version of Enrich * @param invalidCount Function to increment the count of invalid events - * @return Enriched event or bad row if a problem occured + * @return Right(EnrichedEvent) if everything went well. + * Left(BadRow) if something went wrong and incomplete events are not enabled. + * Both(BadRow, EnrichedEvent) if something went wrong but incomplete events are enabled. */ def enrichEvent[F[_]: Sync]( registry: EnrichmentRegistry[F], @@ -68,10 +68,11 @@ object EnrichmentManager { featureFlags: EtlPipeline.FeatureFlags, invalidCount: F[Unit], registryLookup: RegistryLookup[F], - atomicFields: AtomicFields - ): EitherT[F, BadRow, EnrichedEvent] = - for { - enriched <- EitherT.rightT[F, BadRow](new EnrichedEvent) + atomicFields: AtomicFields, + emitIncomplete: Boolean + ): IorT[F, BadRow, EnrichedEvent] = { + val iorT: IorT[F, NonEmptyList[BadRow], EnrichedEvent] = for { + enriched <- IorT.pure[F, NonEmptyList[BadRow]](new EnrichedEvent) extractResult <- mapAndValidateInput( raw, enriched, @@ -80,9 +81,13 @@ object EnrichmentManager { client, registryLookup ) + .leftMap(NonEmptyList.one) + .possiblyExitingEarly(emitIncomplete) + // Next 2 lines remove the invalid contexts and the invalid unstructured event from the event. + // This should be done after the bad row was created and only if emitIncomplete is enabled. _ = { - ME.formatUnstructEvent(extractResult.unstructEvent).foreach(e => enriched.unstruct_event = e) - ME.formatContexts(extractResult.contexts).foreach(c => enriched.contexts = c) + enriched.contexts = ME.formatContexts(extractResult.contexts).orNull + enriched.unstruct_event = ME.formatUnstructEvent(extractResult.unstructEvent).orNull } enrichmentsContexts <- runEnrichments( registry, @@ -93,11 +98,13 @@ object EnrichmentManager { extractResult.unstructEvent, featureFlags.legacyEnrichmentOrder ) - _ = ME.formatContexts(enrichmentsContexts ::: extractResult.validationInfoContexts).foreach(c => enriched.derived_contexts = c) + .leftMap(NonEmptyList.one) + .possiblyExitingEarly(emitIncomplete) _ <- validateEnriched( enriched, raw, enrichmentsContexts, + extractResult.validationInfoContexts, client, processor, registryLookup, @@ -105,8 +112,13 @@ object EnrichmentManager { invalidCount, atomicFields ) + .leftMap(NonEmptyList.one) + .possiblyExitingEarly(emitIncomplete) } yield enriched + iorT.leftMap(_.head) + } + private def mapAndValidateInput[F[_]: Sync]( raw: RawEvent, enrichedEvent: EnrichedEvent, @@ -114,27 +126,28 @@ object EnrichmentManager { processor: Processor, client: IgluCirceClient[F], registryLookup: RegistryLookup[F] - ): EitherT[F, BadRow, IgluUtils.EventExtractResult] = - EitherT { - for { - setup <- setupEnrichedEvent[F](raw, enrichedEvent, etlTstamp, processor).map(_.toValidatedNel) - iglu <- IgluUtils.extractAndValidateInputJsons(enrichedEvent, client, registryLookup) - } yield (iglu <* setup).leftMap { violations => - buildSchemaViolationsBadRow( - violations, - EnrichedEvent.toPartiallyEnrichedEvent(enrichedEvent), - RawEvent.toRawEvent(raw), - processor - ) - }.toEither + ): IorT[F, BadRow, IgluUtils.EventExtractResult] = { + val iorT = for { + _ <- setupEnrichedEvent[F](raw, enrichedEvent, etlTstamp, processor) + .leftMap(NonEmptyList.one) + extract <- IgluUtils.extractAndValidateInputJsons(enrichedEvent, client, registryLookup) + } yield extract + + iorT.leftMap { violations => + buildSchemaViolationsBadRow( + violations, + EnrichedEvent.toPartiallyEnrichedEvent(enrichedEvent), + RawEvent.toRawEvent(raw), + processor + ) } + } /** - * Run all the enrichments and aggregate the errors if any + * Run all the enrichments * @param enriched /!\ MUTABLE enriched event, mutated IN-PLACE /!\ - * @return List of contexts to attach to the enriched event if all the enrichments went well - * or [[BadRow.EnrichmentFailures]] if something wrong happened - * with at least one enrichment + * @return All the contexts produced by the enrichments are in the Right. + * All the errors are aggregated in the bad row in the Left. */ private def runEnrichments[F[_]: Monad]( registry: EnrichmentRegistry[F], @@ -144,22 +157,25 @@ object EnrichmentManager { inputContexts: List[SelfDescribingData[Json]], unstructEvent: Option[SelfDescribingData[Json]], legacyOrder: Boolean - ): EitherT[F, BadRow, List[SelfDescribingData[Json]]] = - EitherT { + ): IorT[F, BadRow, List[SelfDescribingData[Json]]] = + IorT { accState(registry, raw, inputContexts, unstructEvent, legacyOrder) .runS(Accumulation(enriched, Nil, Nil)) .map { case Accumulation(_, failures, contexts) => failures.toNel match { case Some(nel) => - buildEnrichmentFailuresBadRow( - nel, - EnrichedEvent.toPartiallyEnrichedEvent(enriched), - RawEvent.toRawEvent(raw), - processor - ).asLeft + Ior.both( + buildEnrichmentFailuresBadRow( + nel, + EnrichedEvent.toPartiallyEnrichedEvent(enriched), + RawEvent.toRawEvent(raw), + processor + ), + contexts + ) case None => - contexts.asRight + Ior.right(contexts) } } } @@ -168,26 +184,31 @@ object EnrichmentManager { enriched: EnrichedEvent, raw: RawEvent, enrichmentsContexts: List[SelfDescribingData[Json]], + validationInfoContexts: List[SelfDescribingData[Json]], client: IgluCirceClient[F], processor: Processor, registryLookup: RegistryLookup[F], acceptInvalid: Boolean, invalidCount: F[Unit], atomicFields: AtomicFields - ): EitherT[F, BadRow, Unit] = - EitherT { - for { - atomic <- AtomicFieldsLengthValidator.validate[F](enriched, acceptInvalid, invalidCount, atomicFields).map(_.toValidatedNel) - contexts <- IgluUtils.validateEnrichmentsContexts[F](client, enrichmentsContexts, registryLookup) - } yield (atomic |+| contexts).void.leftMap { violations => - buildSchemaViolationsBadRow( - violations, - EnrichedEvent.toPartiallyEnrichedEvent(enriched), - RawEvent.toRawEvent(raw), - processor - ) - }.toEither + ): IorT[F, BadRow, Unit] = { + val iorT = for { + validContexts <- IgluUtils.validateEnrichmentsContexts[F](client, enrichmentsContexts, registryLookup) + _ = ME.formatContexts(validContexts ::: validationInfoContexts).foreach(enriched.derived_contexts = _) + _ <- AtomicFieldsLengthValidator + .validate[F](enriched, acceptInvalid, invalidCount, atomicFields) + .leftMap(NonEmptyList.one) + } yield () + + iorT.leftMap { violations => + buildSchemaViolationsBadRow( + violations, + EnrichedEvent.toPartiallyEnrichedEvent(enriched), + RawEvent.toRawEvent(raw), + processor + ) } + } private[enrichments] case class Accumulation( event: EnrichedEvent, @@ -315,27 +336,29 @@ object EnrichmentManager { e: EnrichedEvent, etlTstamp: DateTime, processor: Processor - ): F[Either[FailureDetails.SchemaViolation, Unit]] = - Sync[F].delay { - e.event_id = EE.generateEventId() // May be updated later if we have an `eid` parameter - e.v_collector = raw.source.name // May be updated later if we have a `cv` parameter - e.v_etl = ME.etlVersion(processor) - e.etl_tstamp = EE.toTimestamp(etlTstamp) - e.network_userid = raw.context.userId.map(_.toString).orNull // May be updated later by 'nuid' - e.user_ipaddress = ME - .extractIp("user_ipaddress", raw.context.ipAddress.orNull) - .toOption - .orNull // May be updated later by 'ip' - // May be updated later if we have a `ua` parameter - setUseragent(e, raw.context.useragent) - // Validate that the collectorTstamp exists and is Redshift-compatible - val collectorTstamp = setCollectorTstamp(e, raw.context.timestamp).toValidatedNel - // Map/validate/transform input fields to enriched event fields - val transformed = Transform.transform(raw, e) - - (collectorTstamp |+| transformed) - .leftMap(AtomicFields.errorsToSchemaViolation) - .toEither + ): IorT[F, FailureDetails.SchemaViolation, Unit] = + IorT { + Sync[F].delay { + e.event_id = EE.generateEventId() // May be updated later if we have an `eid` parameter + e.v_collector = raw.source.name // May be updated later if we have a `cv` parameter + e.v_etl = ME.etlVersion(processor) + e.etl_tstamp = EE.toTimestamp(etlTstamp) + e.network_userid = raw.context.userId.map(_.toString).orNull // May be updated later by 'nuid' + e.user_ipaddress = ME + .extractIp("user_ipaddress", raw.context.ipAddress.orNull) + .toOption + .orNull // May be updated later by 'ip' + // May be updated later if we have a `ua` parameter + setUseragent(e, raw.context.useragent) + // Validate that the collectorTstamp exists and is Redshift-compatible + val collectorTstamp = setCollectorTstamp(e, raw.context.timestamp).toValidatedNel + // Map/validate/transform input fields to enriched event fields + val transformed = Transform.transform(raw, e) + + (collectorTstamp |+| transformed).void.toIor + .leftMap(AtomicFields.errorsToSchemaViolation) + .putRight(()) + } } def setCollectorTstamp(event: EnrichedEvent, timestamp: Option[DateTime]): Either[ValidatorReport, Unit] = @@ -848,4 +871,17 @@ object EnrichmentManager { Payload.EnrichmentPayload(pee, re) ) + private implicit class IorTOps[F[_], A, B](val iorT: IorT[F, A, B]) extends AnyVal { + + /** If the incomplete events feature is disabled, then convert a Both to a Left, so we don't waste time with next steps */ + def possiblyExitingEarly(emitIncomplete: Boolean)(implicit F: Functor[F]): IorT[F, A, B] = + if (emitIncomplete) iorT + else + IorT { + iorT.value.map { + case Ior.Both(bad, _) => Ior.Left(bad) + case other => other + } + } + } } diff --git a/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/utils/IgluUtils.scala b/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/utils/IgluUtils.scala index 791174120..8483b2cca 100644 --- a/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/utils/IgluUtils.scala +++ b/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/utils/IgluUtils.scala @@ -11,7 +11,7 @@ package com.snowplowanalytics.snowplow.enrich.common.utils import cats.Monad -import cats.data.{EitherT, NonEmptyList, Validated, ValidatedNel} +import cats.data.{EitherT, Ior, IorT, NonEmptyList} import cats.effect.Clock import cats.implicits._ @@ -44,27 +44,25 @@ object IgluUtils { * @param client Iglu client used to validate the SDJs * @param raw Raw input event, used only to put in the bad row in case of problem * @param processor Meta data to put in the bad row - * @return Extracted unstructured event and input contexts if any and if everything valid, - * `BadRow.SchemaViolations` if something went wrong. For instance if the - * unstructured event is invalid and has a context that is invalid, - * the bad row will contain the 2 associated `FailureDetails.SchemaViolation`s + * @return Every SDJ that is invalid is in the Left part of the Ior + * while everything that is valid is in the Right part. */ def extractAndValidateInputJsons[F[_]: Monad: Clock]( enriched: EnrichedEvent, client: IgluCirceClient[F], registryLookup: RegistryLookup[F] - ): F[ValidatedNel[FailureDetails.SchemaViolation, EventExtractResult]] = + ): IorT[F, NonEmptyList[FailureDetails.SchemaViolation], EventExtractResult] = for { - contexts <- IgluUtils.extractAndValidateInputContexts(enriched, client, registryLookup) - unstruct <- IgluUtils - .extractAndValidateUnstructEvent(enriched, client, registryLookup) - .map(_.toValidatedNel) - } yield (contexts, unstruct) - .mapN { (c, ue) => - val validationInfoContexts = (c.flatMap(_.validationInfo) ::: ue.flatMap(_.validationInfo).toList).distinct - .map(_.toSdj) - EventExtractResult(contexts = c.map(_.sdj), unstructEvent = ue.map(_.sdj), validationInfoContexts = validationInfoContexts) - } + contexts <- extractAndValidateInputContexts(enriched, client, registryLookup) + unstruct <- extractAndValidateUnstructEvent(enriched, client, registryLookup) + } yield { + val validationInfoContexts = (contexts.flatMap(_.validationInfo) ::: unstruct.flatMap(_.validationInfo).toList).distinct + .map(_.toSdj) + EventExtractResult(contexts = contexts.map(_.sdj), + unstructEvent = unstruct.map(_.sdj), + validationInfoContexts = validationInfoContexts + ) + } /** * Extract unstructured event from event and validate against its schema @@ -81,18 +79,21 @@ object IgluUtils { registryLookup: RegistryLookup[F], field: String = "ue_properties", criterion: SchemaCriterion = SchemaCriterion("com.snowplowanalytics.snowplow", "unstruct_event", "jsonschema", 1, 0) - ): F[Validated[FailureDetails.SchemaViolation, Option[SdjExtractResult]]] = - (Option(enriched.unstruct_event) match { + ): IorT[F, NonEmptyList[FailureDetails.SchemaViolation], Option[SdjExtractResult]] = + Option(enriched.unstruct_event) match { case Some(rawUnstructEvent) => - for { + val iorT = for { // Validate input Json string and extract unstructured event unstruct <- extractInputData(rawUnstructEvent, field, criterion, client, registryLookup) + .leftMap(NonEmptyList.one) + .toIor // Parse Json unstructured event as SelfDescribingData[Json] unstructSDJ <- parseAndValidateSDJ(unstruct, client, registryLookup) } yield unstructSDJ.some + iorT.recoverWith { case errors => IorT.fromIor[F](Ior.Both(errors, None)) } case None => - EitherT.rightT[F, FailureDetails.SchemaViolation](none[SdjExtractResult]) - }).toValidated + IorT.rightT[F, NonEmptyList[FailureDetails.SchemaViolation]](none[SdjExtractResult]) + } /** * Extract list of custom contexts from event and validate each against its schema @@ -101,7 +102,7 @@ object IgluUtils { * @param field Name of the field containing the contexts, to put in the bad row * in case of failure * @param criterion Expected schema for the JSON containing the contexts - * @return List will all contexts provided that they are all valid + * @return All valid contexts are in the Right while all errors are in the Left */ private[common] def extractAndValidateInputContexts[F[_]: Monad: Clock]( enriched: EnrichedEvent, @@ -109,27 +110,28 @@ object IgluUtils { registryLookup: RegistryLookup[F], field: String = "contexts", criterion: SchemaCriterion = SchemaCriterion("com.snowplowanalytics.snowplow", "contexts", "jsonschema", 1, 0) - ): F[ValidatedNel[FailureDetails.SchemaViolation, List[SdjExtractResult]]] = - (Option(enriched.contexts) match { + ): IorT[F, NonEmptyList[FailureDetails.SchemaViolation], List[SdjExtractResult]] = + Option(enriched.contexts) match { case Some(rawContexts) => - for { + val iorT = for { // Validate input Json string and extract contexts contexts <- extractInputData(rawContexts, field, criterion, client, registryLookup) .map(_.asArray.get.toList) // .get OK because SDJ wrapping the contexts valid .leftMap(NonEmptyList.one) + .toIor // Parse and validate each SDJ and merge the errors - contextsSDJ <- EitherT( - contexts - .map(parseAndValidateSDJ(_, client, registryLookup).toValidatedNel) - .sequence - .map(_.sequence.toEither) - ) - } yield contextsSDJ + contextsSdj <- contexts + .traverse( + parseAndValidateSDJ(_, client, registryLookup) + .map(sdj => List(sdj)) + .recoverWith { case errors => IorT.fromIor[F](Ior.Both(errors, Nil)) } + ) + .map(_.flatten) + } yield contextsSdj + iorT.recoverWith { case errors => IorT.fromIor[F](Ior.Both(errors, Nil)) } case None => - EitherT.rightT[F, NonEmptyList[FailureDetails.SchemaViolation]]( - List.empty[SdjExtractResult] - ) - }).toValidated + IorT.rightT[F, NonEmptyList[FailureDetails.SchemaViolation]](Nil) + } /** * Validate each context added by the enrichments against its schema @@ -138,13 +140,13 @@ object IgluUtils { * @param raw Input event to put in the bad row if at least one context is invalid * @param processor Meta data for the bad row * @param enriched Partially enriched event to put in the bad row - * @return Unit if all the contexts are valid + * @return All valid contexts are in the Right while all errors are in the Left */ private[common] def validateEnrichmentsContexts[F[_]: Monad: Clock]( client: IgluCirceClient[F], sdjs: List[SelfDescribingData[Json]], registryLookup: RegistryLookup[F] - ): F[ValidatedNel[FailureDetails.SchemaViolation, Unit]] = + ): IorT[F, NonEmptyList[FailureDetails.SchemaViolation], List[SelfDescribingData[Json]]] = checkList(client, sdjs, registryLookup) .leftMap( _.map { @@ -153,7 +155,6 @@ object IgluUtils { f } ) - .toValidated /** Used to extract .data for input custom contexts and input unstructured event */ private def extractInputData[F[_]: Monad: Clock]( @@ -208,30 +209,34 @@ object IgluUtils { .leftMap((sdj.schema, _)) } - /** Check a list of SDJs and merge the Iglu errors */ + /** + * Check a list of SDJs. + * @return All valid SDJs are in the Right while all errors are in the Left + */ private def checkList[F[_]: Monad: Clock]( client: IgluCirceClient[F], sdjs: List[SelfDescribingData[Json]], registryLookup: RegistryLookup[F] - ): EitherT[F, NonEmptyList[(SchemaKey, ClientError)], Unit] = - EitherT { - sdjs - .map(check(client, _, registryLookup).toValidatedNel) - .sequence - .map(_.sequence_.toEither) - } + ): IorT[F, NonEmptyList[(SchemaKey, ClientError)], List[SelfDescribingData[Json]]] = + sdjs.map { sdj => + check(client, sdj, registryLookup) + .map(_ => List(sdj)) + .leftMap(NonEmptyList.one) + .toIor + .recoverWith { case errors => IorT.fromIor[F](Ior.Both(errors, Nil)) } + }.foldA /** Parse a Json as a SDJ and check that it's valid */ private def parseAndValidateSDJ[F[_]: Monad: Clock]( json: Json, client: IgluCirceClient[F], registryLookup: RegistryLookup[F] - ): EitherT[F, FailureDetails.SchemaViolation, SdjExtractResult] = + ): IorT[F, NonEmptyList[FailureDetails.SchemaViolation], SdjExtractResult] = for { - sdj <- SelfDescribingData - .parse(json) - .leftMap(FailureDetails.SchemaViolation.NotIglu(json, _)) - .toEitherT[F] + sdj <- IorT + .fromEither[F](SelfDescribingData.parse(json)) + .leftMap[FailureDetails.SchemaViolation](FailureDetails.SchemaViolation.NotIglu(json, _)) + .leftMap(NonEmptyList.one) supersedingSchema <- check(client, sdj, registryLookup) .leftMap { case (schemaKey, clientError) => @@ -239,6 +244,8 @@ object IgluUtils { .IgluError(schemaKey, clientError): FailureDetails.SchemaViolation } + .leftMap(NonEmptyList.one) + .toIor validationInfo = supersedingSchema.map(s => ValidationInfo(sdj.schema, s)) sdjUpdated = replaceSchemaVersion(sdj, validationInfo) } yield SdjExtractResult(sdjUpdated, validationInfo) diff --git a/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/utils/JsonUtils.scala b/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/utils/JsonUtils.scala index cf3851ffa..41540948c 100644 --- a/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/utils/JsonUtils.scala +++ b/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/utils/JsonUtils.scala @@ -31,20 +31,13 @@ object JsonUtils { private val JsonSchemaDateTimeFormat = DateTimeFormat.forPattern("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'").withZone(DateTimeZone.UTC) - /** Validates a String as correct JSON. */ val extractUnencJson: (String, String) => Either[ValidatorReport, String] = - (field, str) => - validateAndReformatJson(str) - .leftMap { e => - ValidatorReport(e, Some(field), Nil, Option(str)) - } + (_, str) => Right(str) - /** Decodes a Base64 (URL safe)-encoded String then validates it as correct JSON. */ val extractBase64EncJson: (String, String) => Either[ValidatorReport, String] = (field, str) => ConversionUtils .decodeBase64Url(str) - .flatMap(validateAndReformatJson) .leftMap { e => ValidatorReport(e, Some(field), Nil, Option(str)) } @@ -123,16 +116,6 @@ object JsonUtils { (key, v) } - /** - * Validates and reformats a JSON: - * 1. Checks the JSON is valid - * 2. Reformats, including removing unnecessary whitespace - * @param str the String hopefully containing JSON - * @return either an error String or the reformatted JSON String - */ - private[utils] def validateAndReformatJson(str: String): Either[String, String] = - extractJson(str).map(_.noSpaces) - /** * Converts a JSON string into an EIther[String, Json] * @param instance The JSON string to parse diff --git a/modules/common/src/test/scala/com.snowplowanalytics.snowplow.enrich.common/EtlPipelineSpec.scala b/modules/common/src/test/scala/com.snowplowanalytics.snowplow.enrich.common/EtlPipelineSpec.scala index 612236d3e..b4baad14f 100644 --- a/modules/common/src/test/scala/com.snowplowanalytics.snowplow.enrich.common/EtlPipelineSpec.scala +++ b/modules/common/src/test/scala/com.snowplowanalytics.snowplow.enrich.common/EtlPipelineSpec.scala @@ -10,8 +10,8 @@ */ package com.snowplowanalytics.snowplow.enrich.common -import cats.data.Validated import cats.syntax.validated._ +import cats.data.Ior import cats.effect.IO import cats.effect.testing.specs2.CatsEffect @@ -36,7 +36,6 @@ import com.snowplowanalytics.snowplow.enrich.common.adapters.AdapterRegistry import com.snowplowanalytics.snowplow.enrich.common.adapters.registry.RemoteAdapter import com.snowplowanalytics.snowplow.enrich.common.enrichments.{AtomicFields, EnrichmentRegistry} import com.snowplowanalytics.snowplow.enrich.common.loaders.{CollectorPayload, ThriftLoader} -import com.snowplowanalytics.snowplow.enrich.common.outputs.EnrichedEvent import com.snowplowanalytics.snowplow.enrich.common.SpecHelpers._ @@ -73,11 +72,12 @@ class EtlPipelineSpec extends Specification with ValidatedMatchers with CatsEffe AcceptInvalid.featureFlags, IO.unit, SpecHelpers.registryLookup, - AtomicFields.from(Map.empty) + AtomicFields.from(Map.empty), + emitIncomplete ) } yield output must be like { - case a :: b :: c :: d :: Nil => - (a must beValid).and(b must beInvalid).and(c must beInvalid).and(d must beInvalid) + case Ior.Right(_) :: Ior.Left(_) :: Ior.Left(_) :: Ior.Left(_) :: Nil => ok + case other => ko(s"[$other] is not a list with 1 enriched event and 3 bad rows") } def e2 = @@ -100,11 +100,12 @@ class EtlPipelineSpec extends Specification with ValidatedMatchers with CatsEffe AcceptInvalid.featureFlags, IO.unit, SpecHelpers.registryLookup, - AtomicFields.from(Map.empty) + AtomicFields.from(Map.empty), + emitIncomplete ) - } yield output must beLike { - case Validated.Valid(_: EnrichedEvent) :: Nil => ok - case res => ko(s"[$res] doesn't contain one enriched event") + } yield output must be like { + case Ior.Right(_) :: Nil => ok + case other => ko(s"[$other] is not a list with 1 enriched event") } def e3 = @@ -122,11 +123,12 @@ class EtlPipelineSpec extends Specification with ValidatedMatchers with CatsEffe AcceptInvalid.featureFlags, IO.unit, SpecHelpers.registryLookup, - AtomicFields.from(Map.empty) + AtomicFields.from(Map.empty), + emitIncomplete ) } yield output must be like { - case Validated.Invalid(_: BadRow.CPFormatViolation) :: Nil => ok - case other => ko(s"One invalid CPFormatViolation expected, got ${other}") + case Ior.Left(_: BadRow.CPFormatViolation) :: Nil => ok + case other => ko(s"[$other] is not a CPFormatViolation bad row") } def e4 = @@ -144,7 +146,8 @@ class EtlPipelineSpec extends Specification with ValidatedMatchers with CatsEffe AcceptInvalid.featureFlags, IO.unit, SpecHelpers.registryLookup, - AtomicFields.from(Map.empty) + AtomicFields.from(Map.empty), + emitIncomplete ) } yield output must beEqualTo(Nil) } diff --git a/modules/common/src/test/scala/com.snowplowanalytics.snowplow.enrich.common/SpecHelpers.scala b/modules/common/src/test/scala/com.snowplowanalytics.snowplow.enrich.common/SpecHelpers.scala index ae018a13c..6bf35e859 100644 --- a/modules/common/src/test/scala/com.snowplowanalytics.snowplow.enrich.common/SpecHelpers.scala +++ b/modules/common/src/test/scala/com.snowplowanalytics.snowplow.enrich.common/SpecHelpers.scala @@ -37,7 +37,7 @@ import com.snowplowanalytics.iglu.client.{IgluCirceClient, Resolver} import com.snowplowanalytics.iglu.client.resolver.registries.Registry import com.snowplowanalytics.iglu.client.resolver.registries.JavaNetRegistryLookup -import com.snowplowanalytics.iglu.core.SelfDescribingData +import com.snowplowanalytics.iglu.core.{SchemaKey, SelfDescribingData} import com.snowplowanalytics.iglu.core.circe.implicits._ import com.snowplowanalytics.lrumap.CreateLruMap._ @@ -129,6 +129,25 @@ object SpecHelpers extends CatsEffect { .flatMap(SelfDescribingData.parse[Json]) .leftMap(err => s"Can't parse Json [$rawJson] as as SelfDescribingData, error: [$err]") + def listContextsSchemas(rawContexts: String): List[SchemaKey] = + jsonStringToSDJ(rawContexts) + .map(_.data.asArray.get.toList) + .flatMap(contexts => contexts.traverse(c => SelfDescribingData.parse[Json](c).map(_.schema))) match { + case Left(err) => + throw new IllegalArgumentException(s"Couldn't list contexts schemas. Error: [$err]") + case Right(schemas) => schemas + } + + def getUnstructSchema(rawUnstruct: String): SchemaKey = + jsonStringToSDJ(rawUnstruct) + .map(_.data) + .flatMap(SelfDescribingData.parse[Json]) + .map(_.schema) match { + case Left(err) => + throw new IllegalArgumentException(s"Couldn't get unstruct event schema. Error: [$err]") + case Right(schema) => schema + } + implicit class MapOps[A, B](underlying: Map[A, B]) { def toOpt: Map[A, Option[B]] = underlying.map { case (a, b) => (a, Option(b)) } } @@ -148,6 +167,8 @@ object SpecHelpers extends CatsEffect { def createIgluClient(registries: List[Registry]): IO[IgluCirceClient[IO]] = IgluCirceClient.fromResolver[IO](Resolver(registries, None), cacheSize = 0) + val emitIncomplete = false + val callrailSchemas = CallrailSchemas( call_complete = "iglu:com.callrail/call_complete/jsonschema/1-0-2" ) diff --git a/modules/common/src/test/scala/com.snowplowanalytics.snowplow.enrich.common/enrichments/EnrichmentManagerSpec.scala b/modules/common/src/test/scala/com.snowplowanalytics.snowplow.enrich.common/enrichments/EnrichmentManagerSpec.scala index 7ea29cacb..fd1aff96c 100644 --- a/modules/common/src/test/scala/com.snowplowanalytics.snowplow.enrich.common/enrichments/EnrichmentManagerSpec.scala +++ b/modules/common/src/test/scala/com.snowplowanalytics.snowplow.enrich.common/enrichments/EnrichmentManagerSpec.scala @@ -19,7 +19,7 @@ import cats.effect.IO import cats.effect.testing.specs2.CatsEffect import cats.implicits._ -import cats.data.NonEmptyList +import cats.data.{Ior, NonEmptyList} import io.circe.Json import io.circe.literal._ @@ -89,13 +89,13 @@ class EnrichmentManagerSpec extends Specification with EitherMatchers with CatsE AcceptInvalid.featureFlags, IO.unit, SpecHelpers.registryLookup, - atomicFieldLimits + atomicFieldLimits, + emitIncomplete ) - enriched.value - .map(_ must beLeft.like { - case _: BadRow.SchemaViolations => ok - case br => ko(s"bad row [$br] is not SchemaViolations") - }) + enriched.value map { + case Ior.Left(_: BadRow.SchemaViolations) => ok + case other => ko(s"[$other] is not a SchemaViolations bad row") + } } "return a SchemaViolations bad row if the input unstructured event is invalid" >> { @@ -127,13 +127,13 @@ class EnrichmentManagerSpec extends Specification with EitherMatchers with CatsE AcceptInvalid.featureFlags, IO.unit, SpecHelpers.registryLookup, - atomicFieldLimits + atomicFieldLimits, + emitIncomplete ) - enriched.value - .map(_ must beLeft.like { - case _: BadRow.SchemaViolations => ok - case br => ko(s"bad row [$br] is not SchemaViolations") - }) + enriched.value map { + case Ior.Left(_: BadRow.SchemaViolations) => ok + case other => ko(s"[$other] is not a SchemaViolations bad row") + } } "return a SchemaViolations bad row that contains 1 ValidationError for the atomic field and 1 ValidationError for the unstruct event" >> { @@ -167,29 +167,32 @@ class EnrichmentManagerSpec extends Specification with EitherMatchers with CatsE AcceptInvalid.featureFlags, IO.unit, SpecHelpers.registryLookup, - atomicFieldLimits + atomicFieldLimits, + emitIncomplete ) .value - .map(_ must beLeft.like { - case BadRow.SchemaViolations( - _, - Failure.SchemaViolations(_, - NonEmptyList(FailureDetails.SchemaViolation.IgluError(schemaKey1, clientError1), - List(FailureDetails.SchemaViolation.IgluError(schemaKey2, clientError2)) - ) - ), - _ + .map { + case Ior.Left( + BadRow.SchemaViolations( + _, + Failure.SchemaViolations(_, + NonEmptyList(FailureDetails.SchemaViolation.IgluError(schemaKey1, clientError1), + List(FailureDetails.SchemaViolation.IgluError(schemaKey2, clientError2)) + ) + ), + _ + ) ) => - schemaKey1 must beEqualTo(emailSentSchema) - clientError1.toString must contain( + schemaKey1 must beEqualTo(AtomicFields.atomicSchema) + clientError1.toString must contain("tr_tt") + clientError1.toString must contain("Cannot be converted to java.math.BigDecimal") + schemaKey2 must beEqualTo(emailSentSchema) + clientError2.toString must contain( "unallowedAdditionalField: is not defined in the schema and the schema does not allow additional properties" ) - schemaKey2 must beEqualTo(AtomicFields.atomicSchema) - clientError2.toString must contain("tr_tt") - clientError2.toString must contain("Cannot be converted to java.math.BigDecimal") case other => - ko(s"[$other] is not a SchemaViolations bad row with 2 IgluError") - }) + ko(s"[$other] is not a SchemaViolations bad row with 2 expected IgluError") + } } "return an EnrichmentFailures bad row if one of the enrichment (JS enrichment here) fails" >> { @@ -232,11 +235,12 @@ class EnrichmentManagerSpec extends Specification with EitherMatchers with CatsE AcceptInvalid.featureFlags, IO.unit, SpecHelpers.registryLookup, - atomicFieldLimits + atomicFieldLimits, + emitIncomplete ) - enriched.value - .map(_ must beLeft.like { - case BadRow.EnrichmentFailures( + enriched.value map { + case Ior.Left( + BadRow.EnrichmentFailures( _, Failure.EnrichmentFailures( _, @@ -249,13 +253,11 @@ class EnrichmentManagerSpec extends Specification with EitherMatchers with CatsE ) ), _ - ) => - ok - case br => - ko( - s"bad row [$br] is not an EnrichmentFailures containing one EnrichmentFailureMessage.Simple" - ) - }) + ) + ) => + ok + case other => ko(s"[$other] is not an EnrichmentFailures bad row with one EnrichmentFailureMessage.Simple") + } } "return a SchemaViolations bad row containing one IgluError if one of the contexts added by the enrichments is invalid" >> { @@ -302,11 +304,12 @@ class EnrichmentManagerSpec extends Specification with EitherMatchers with CatsE AcceptInvalid.featureFlags, IO.unit, SpecHelpers.registryLookup, - atomicFieldLimits + atomicFieldLimits, + emitIncomplete ) - enriched.value - .map(_ must beLeft.like { - case BadRow.SchemaViolations( + enriched.value map { + case Ior.Left( + BadRow.SchemaViolations( _, Failure.SchemaViolations( _, @@ -315,11 +318,12 @@ class EnrichmentManagerSpec extends Specification with EitherMatchers with CatsE Nil ) ), - payload - ) if payload.enriched.derived_contexts.isDefined => - ok - case br => ko(s"[$br] is not a SchemaViolations bad row containing one IgluError and with derived_contexts defined") - }) + _ + ) + ) => + ok + case other => ko(s"[$other] is not a SchemaViolations bad row with one IgluError") + } } "emit an EnrichedEvent if everything goes well" >> { @@ -365,9 +369,13 @@ class EnrichmentManagerSpec extends Specification with EitherMatchers with CatsE AcceptInvalid.featureFlags, IO.unit, SpecHelpers.registryLookup, - atomicFieldLimits + atomicFieldLimits, + emitIncomplete ) - enriched.value.map(_ must beRight) + enriched.value.map { + case Ior.Right(_) => ok + case other => ko(s"[$other] is not an enriched event") + } } "emit an EnrichedEvent if a PII value that needs to be hashed is an empty string" >> { @@ -431,9 +439,13 @@ class EnrichmentManagerSpec extends Specification with EitherMatchers with CatsE AcceptInvalid.featureFlags, IO.unit, SpecHelpers.registryLookup, - atomicFieldLimits + atomicFieldLimits, + emitIncomplete ) - enriched.value.map(_ must beRight) + enriched.value.map { + case Ior.Right(_) => ok + case other => ko(s"[$other] is not an enriched event") + } } "emit an EnrichedEvent if a PII value that needs to be hashed is null" >> { @@ -497,9 +509,13 @@ class EnrichmentManagerSpec extends Specification with EitherMatchers with CatsE AcceptInvalid.featureFlags, IO.unit, SpecHelpers.registryLookup, - atomicFieldLimits + atomicFieldLimits, + emitIncomplete ) - enriched.value.map(_ must beRight) + enriched.value.map { + case Ior.Right(_) => ok + case other => ko(s"[$other] is not an enriched event") + } } "fail to emit an EnrichedEvent if a PII value that needs to be hashed is an empty object" >> { @@ -563,9 +579,13 @@ class EnrichmentManagerSpec extends Specification with EitherMatchers with CatsE AcceptInvalid.featureFlags, IO.unit, SpecHelpers.registryLookup, - atomicFieldLimits + atomicFieldLimits, + emitIncomplete ) - enriched.value.map(_ must beLeft) + enriched.value.map { + case Ior.Left(_) => ok + case other => ko(s"[$other] is not a bad row") + } } "fail to emit an EnrichedEvent if a context PII value that needs to be hashed is an empty object" >> { @@ -629,9 +649,13 @@ class EnrichmentManagerSpec extends Specification with EitherMatchers with CatsE AcceptInvalid.featureFlags, IO.unit, SpecHelpers.registryLookup, - atomicFieldLimits + atomicFieldLimits, + emitIncomplete ) - enriched.value.map(_ must beLeft) + enriched.value.map { + case Ior.Left(_) => ok + case other => ko(s"[$other] is not a bad row") + } } "fail to emit an EnrichedEvent if a PII value needs to be hashed in both co and ue and is invalid in one of them" >> { @@ -701,9 +725,13 @@ class EnrichmentManagerSpec extends Specification with EitherMatchers with CatsE AcceptInvalid.featureFlags, IO.unit, SpecHelpers.registryLookup, - atomicFieldLimits + atomicFieldLimits, + emitIncomplete ) - enriched.value.map(_ must beLeft) + enriched.value.map { + case Ior.Left(_) => ok + case other => ko(s"[$other] is not a bad row") + } } "emit an EnrichedEvent for valid integer fields" >> { @@ -729,9 +757,13 @@ class EnrichmentManagerSpec extends Specification with EitherMatchers with CatsE AcceptInvalid.featureFlags, IO.unit, SpecHelpers.registryLookup, - atomicFieldLimits + atomicFieldLimits, + emitIncomplete ) - enriched.value.map(_ must beRight) + enriched.value.map { + case Ior.Right(_) => ok + case other => ko(s"[$other] is not an enriched event") + } } } } @@ -759,9 +791,13 @@ class EnrichmentManagerSpec extends Specification with EitherMatchers with CatsE AcceptInvalid.featureFlags, IO.unit, SpecHelpers.registryLookup, - atomicFieldLimits + atomicFieldLimits, + emitIncomplete ) - enriched.value.map(_ must beRight) + enriched.value.map { + case Ior.Right(_) => ok + case other => ko(s"[$other] is not an enriched event") + } } } } @@ -798,11 +834,13 @@ class EnrichmentManagerSpec extends Specification with EitherMatchers with CatsE AcceptInvalid.featureFlags, IO.unit, SpecHelpers.registryLookup, - atomicFieldLimits + atomicFieldLimits, + emitIncomplete ) - enriched.value.map(_ must beRight { ee: EnrichedEvent => - ee.se_value.toString must_== expected - }) + enriched.value.map { + case Ior.Right(enriched) => enriched.se_value.toString must_== expected + case other => ko(s"[$other] is not an enriched event") + } } } @@ -825,12 +863,14 @@ class EnrichmentManagerSpec extends Specification with EitherMatchers with CatsE AcceptInvalid.featureFlags, IO.unit, SpecHelpers.registryLookup, - atomicFieldLimits + atomicFieldLimits, + emitIncomplete ) - enriched.value.map { e => - val res1 = e.map(_.useragent) must beRight(qs_ua) - val res2 = e.map(_.derived_contexts) must beRight((_: String).contains("\"agentName\":\"Firefox\"")) - res1 and res2 + enriched.value.map { + case Ior.Right(enriched) => + enriched.useragent must_== qs_ua + enriched.derived_contexts must contain("\"agentName\":\"Firefox\"") + case other => ko(s"[$other] is not an enriched event") } } @@ -851,10 +891,12 @@ class EnrichmentManagerSpec extends Specification with EitherMatchers with CatsE AcceptInvalid.featureFlags, IO.unit, SpecHelpers.registryLookup, - atomicFieldLimits + atomicFieldLimits, + emitIncomplete ) - enriched.value.map { e => - e.map(_.useragent) must beRight("header-useragent") + enriched.value.map { + case Ior.Right(enriched) => enriched.useragent must_== "header-useragent" + case other => ko(s"[$other] is not an enriched event") } } @@ -876,10 +918,12 @@ class EnrichmentManagerSpec extends Specification with EitherMatchers with CatsE AcceptInvalid.featureFlags, IO.unit, SpecHelpers.registryLookup, - atomicFieldLimits + atomicFieldLimits, + emitIncomplete ) - enriched.value.map { e => - e.map(_.useragent) must beRight(ua) + enriched.value.map { + case Ior.Right(enriched) => enriched.useragent must_== ua + case other => ko(s"[$other] is not an enriched event") } } @@ -902,12 +946,14 @@ class EnrichmentManagerSpec extends Specification with EitherMatchers with CatsE AcceptInvalid.featureFlags, IO.unit, SpecHelpers.registryLookup, - atomicFieldLimits + atomicFieldLimits, + emitIncomplete ) - enriched.value.map { e => - val res1 = e.map(_.useragent) must beRight(qs_ua) - val res2 = e.map(_.derived_contexts) must beRight((_: String).contains("\"agentName\":\"%1$S\"")) - res1 and res2 + enriched.value.map { + case Ior.Right(enriched) => + enriched.useragent must_== qs_ua + enriched.derived_contexts must contain("\"agentName\":\"%1$S\"") + case other => ko(s"[$other] is not an enriched event") } } @@ -948,10 +994,12 @@ class EnrichmentManagerSpec extends Specification with EitherMatchers with CatsE AcceptInvalid.featureFlags, IO.unit, SpecHelpers.registryLookup, - atomicFieldLimits + atomicFieldLimits, + emitIncomplete ) - enriched.value.map { e => - e.map(_.app_id) must beRight("moo") + enriched.value.map { + case Ior.Right(enriched) => enriched.app_id must_== "moo" + case other => ko(s"[$other] is not an enriched event") } } @@ -998,11 +1046,14 @@ class EnrichmentManagerSpec extends Specification with EitherMatchers with CatsE AcceptInvalid.featureFlags, IO.unit, SpecHelpers.registryLookup, - atomicFieldLimits + atomicFieldLimits, + emitIncomplete ) - enriched.value.map { e => - (e.map(_.app_id) must beRight("test_app_id")) and - (e.map(_.platform) must beRight("test_platform")) + enriched.value.map { + case Ior.Right(enriched) => + enriched.app_id must_== "test_app_id" + enriched.platform must_== "test_platform" + case other => ko(s"[$other] is not an enriched event") } } @@ -1152,20 +1203,387 @@ class EnrichmentManagerSpec extends Specification with EitherMatchers with CatsE AcceptInvalid.featureFlags, IO.unit, SpecHelpers.registryLookup, - atomicFieldLimits + atomicFieldLimits, + emitIncomplete ) - enriched.value.map(_ must beRight.like { - case e: EnrichedEvent => - val p = EnrichedEvent.toPartiallyEnrichedEvent(e) + enriched.value.map { + case Ior.Right(enriched) => + val p = EnrichedEvent.toPartiallyEnrichedEvent(enriched) val contextsJson = jparse(p.contexts.get).toOption.get val derivedContextsJson = jparse(p.derived_contexts.get).toOption.get val ueJson = jparse(p.unstruct_event.get).toOption.get (contextsJson must beEqualTo(expectedContexts)) and (derivedContextsJson must beEqualTo(expectedDerivedContexts)) and (ueJson must beEqualTo(expectedUnstructEvent)) - case _ => ko - }) + case other => ko(s"[$other] is not an enriched event") + } + } + + "remove the invalid unstructured event and enrich the event if emitIncomplete is set to true" >> { + val script = + s""" + function process(event) { + return [ ${emailSent} ]; + }""" + val schemaKey = SchemaKey( + "com.snowplowanalytics.snowplow", + "javascript_script_config", + "jsonschema", + SchemaVer.Full(1, 0, 0) + ) + val enrichmentReg = EnrichmentRegistry[IO]( + javascriptScript = List( + JavascriptScriptEnrichment(schemaKey, script) + ) + ) + + val parameters = Map( + "e" -> "pp", + "tv" -> "js-0.13.1", + "p" -> "web", + "co" -> + s""" + { + "schema": "iglu:com.snowplowanalytics.snowplow/contexts/jsonschema/1-0-0", + "data": [ + $clientSession + ] + } + """, + "ue_pr" -> + """ + { + "schema":"iglu:com.snowplowanalytics.snowplow/unstruct_event/jsonschema/1-0-0", + "data":{ + "schema":"iglu:com.acme/email_sent/jsonschema/1-0-0", + "data": { + "emailAddress": "hello@world.com", + "emailAddress2": "foo@bar.org", + "unallowedAdditionalField": "foo@bar.org" + } + } + }""" + ).toOpt + val rawEvent = RawEvent(api, parameters, None, source, context) + val enriched = EnrichmentManager.enrichEvent[IO]( + enrichmentReg, + client, + processor, + timestamp, + rawEvent, + AcceptInvalid.featureFlags, + IO.unit, + SpecHelpers.registryLookup, + atomicFieldLimits, + emitIncomplete = true + ) + enriched.value.map { + case Ior.Both(_: BadRow.SchemaViolations, enriched) + if Option(enriched.unstruct_event).isEmpty && + SpecHelpers.listContextsSchemas(enriched.contexts) == List(clientSessionSchema) && + SpecHelpers.listContextsSchemas(enriched.derived_contexts).contains(emailSentSchema) => + ok + case other => ko(s"[$other] is not a SchemaViolations bad row and an enriched event without the unstructured event") + } + } + + "remove the invalid context and enrich the event if emitIncomplete is set to true" >> { + val script = + s""" + function process(event) { + return [ ${emailSent} ]; + }""" + val schemaKey = SchemaKey( + "com.snowplowanalytics.snowplow", + "javascript_script_config", + "jsonschema", + SchemaVer.Full(1, 0, 0) + ) + val enrichmentReg = EnrichmentRegistry[IO]( + javascriptScript = List( + JavascriptScriptEnrichment(schemaKey, script) + ) + ) + + val parameters = Map( + "e" -> "pp", + "tv" -> "js-0.13.1", + "p" -> "web", + "co" -> + """ + { + "schema": "iglu:com.snowplowanalytics.snowplow/contexts/jsonschema/1-0-0", + "data": [ + { + "schema":"iglu:com.acme/email_sent/jsonschema/1-0-0", + "data": { + "foo": "hello@world.com", + "emailAddress2": "foo@bar.org" + } + } + ] + } + """, + "ue_pr" -> + s""" + { + "schema":"iglu:com.snowplowanalytics.snowplow/unstruct_event/jsonschema/1-0-0", + "data": $clientSession + }""" + ).toOpt + val rawEvent = RawEvent(api, parameters, None, source, context) + val enriched = EnrichmentManager.enrichEvent[IO]( + enrichmentReg, + client, + processor, + timestamp, + rawEvent, + AcceptInvalid.featureFlags, + IO.unit, + SpecHelpers.registryLookup, + atomicFieldLimits, + emitIncomplete = true + ) + enriched.value.map { + case Ior.Both(_: BadRow.SchemaViolations, enriched) + if Option(enriched.contexts).isEmpty && + SpecHelpers.getUnstructSchema(enriched.unstruct_event) == clientSessionSchema && + SpecHelpers.listContextsSchemas(enriched.derived_contexts).contains(emailSentSchema) => + ok + case other => ko(s"[$other] is not a SchemaViolations bad row and an enriched event with no input contexts") + } + } + + "remove one invalid context (out of 2) and enrich the event if emitIncomplete is set to true" >> { + val script = + s""" + function process(event) { + return [ ${emailSent} ]; + }""" + val schemaKey = SchemaKey( + "com.snowplowanalytics.snowplow", + "javascript_script_config", + "jsonschema", + SchemaVer.Full(1, 0, 0) + ) + val enrichmentReg = EnrichmentRegistry[IO]( + javascriptScript = List( + JavascriptScriptEnrichment(schemaKey, script) + ) + ) + + val parameters = Map( + "e" -> "pp", + "tv" -> "js-0.13.1", + "p" -> "web", + "co" -> + s""" + { + "schema": "iglu:com.snowplowanalytics.snowplow/contexts/jsonschema/1-0-0", + "data": [ + { + "schema":"iglu:com.acme/email_sent/jsonschema/1-0-0", + "data": { + "foo": "hello@world.com", + "emailAddress2": "foo@bar.org" + } + }, + $clientSession + ] + } + """, + "ue_pr" -> + s""" + { + "schema":"iglu:com.snowplowanalytics.snowplow/unstruct_event/jsonschema/1-0-0", + "data": $clientSession + }""" + ).toOpt + val rawEvent = RawEvent(api, parameters, None, source, context) + val enriched = EnrichmentManager.enrichEvent[IO]( + enrichmentReg, + client, + processor, + timestamp, + rawEvent, + AcceptInvalid.featureFlags, + IO.unit, + SpecHelpers.registryLookup, + atomicFieldLimits, + emitIncomplete = true + ) + enriched.value.map { + case Ior.Both(_: BadRow.SchemaViolations, enriched) + if SpecHelpers.getUnstructSchema(enriched.unstruct_event) == clientSessionSchema && + SpecHelpers.listContextsSchemas(enriched.contexts) == List(clientSessionSchema) && + SpecHelpers.listContextsSchemas(enriched.derived_contexts).contains(emailSentSchema) => + ok + case other => ko(s"[$other] is not a SchemaViolations bad row and an enriched event with 1 input context") + } + } + + "return the enriched event after an enrichment exception if emitIncomplete is set to true" >> { + val script = + s""" + function process(event) { + throw "Javascript exception"; + return [ $emailSent ]; + }""" + val schemaKey = SchemaKey( + "com.snowplowanalytics.snowplow", + "javascript_script_config", + "jsonschema", + SchemaVer.Full(1, 0, 0) + ) + val enrichmentReg = EnrichmentRegistry[IO]( + yauaa = Some(YauaaEnrichment(None)), + javascriptScript = List( + JavascriptScriptEnrichment(schemaKey, script) + ) + ) + + val parameters = Map( + "e" -> "pp", + "tv" -> "js-0.13.1", + "p" -> "web", + "ue_pr" -> + s""" + { + "schema":"iglu:com.snowplowanalytics.snowplow/unstruct_event/jsonschema/1-0-0", + "data": $clientSession + }""" + ).toOpt + val rawEvent = RawEvent(api, parameters, None, source, context) + val enriched = EnrichmentManager.enrichEvent[IO]( + enrichmentReg, + client, + processor, + timestamp, + rawEvent, + AcceptInvalid.featureFlags, + IO.unit, + SpecHelpers.registryLookup, + atomicFieldLimits, + emitIncomplete = true + ) + enriched.value.map { + case Ior.Both(_: BadRow.EnrichmentFailures, enriched) + if SpecHelpers.getUnstructSchema(enriched.unstruct_event) == clientSessionSchema && + !SpecHelpers.listContextsSchemas(enriched.derived_contexts).contains(emailSentSchema) => + ok + case other => ko(s"[$other] is not an EnrichmentFailures bad row and an enriched event") + } + } + + "return a SchemaViolations bad row in the Left in case of both a schema violation and an enrichment failure if emitIncomplete is set to true" >> { + val script = + s""" + function process(event) { + throw "Javascript exception"; + return [ $emailSent ]; + }""" + val schemaKey = SchemaKey( + "com.snowplowanalytics.snowplow", + "javascript_script_config", + "jsonschema", + SchemaVer.Full(1, 0, 0) + ) + val enrichmentReg = EnrichmentRegistry[IO]( + javascriptScript = List( + JavascriptScriptEnrichment(schemaKey, script) + ) + ) + + val parameters = Map( + "e" -> "pp", + "tv" -> "js-0.13.1", + "p" -> "web", + "tr_tt" -> "foo", + "ue_pr" -> + s""" + { + "schema":"iglu:com.snowplowanalytics.snowplow/unstruct_event/jsonschema/1-0-0", + "data": $clientSession + }""" + ).toOpt + val rawEvent = RawEvent(api, parameters, None, source, context) + val enriched = EnrichmentManager.enrichEvent[IO]( + enrichmentReg, + client, + processor, + timestamp, + rawEvent, + AcceptInvalid.featureFlags, + IO.unit, + SpecHelpers.registryLookup, + atomicFieldLimits, + emitIncomplete = true + ) + enriched.value.map { + case Ior.Both(_: BadRow.SchemaViolations, _) => ok + case other => ko(s"[$other] doesn't have a SchemaViolations bad row in the Left") + } + } + + "remove an invalid enrichment context and return the enriched event if emitIncomplete is set to true" >> { + val script = + s""" + function process(event) { + return [ + { + "schema":"iglu:com.acme/email_sent/jsonschema/1-0-0", + "data": { + "foo": "hello@world.com", + "emailAddress2": "foo@bar.org" + } + } + ]; + }""" + val schemaKey = SchemaKey( + "com.snowplowanalytics.snowplow", + "javascript_script_config", + "jsonschema", + SchemaVer.Full(1, 0, 0) + ) + val enrichmentReg = EnrichmentRegistry[IO]( + yauaa = Some(YauaaEnrichment(None)), + javascriptScript = List( + JavascriptScriptEnrichment(schemaKey, script) + ) + ) + + val parameters = Map( + "e" -> "pp", + "tv" -> "js-0.13.1", + "p" -> "web", + "ue_pr" -> + s""" + { + "schema":"iglu:com.snowplowanalytics.snowplow/unstruct_event/jsonschema/1-0-0", + "data": $clientSession + }""" + ).toOpt + val rawEvent = RawEvent(api, parameters, None, source, context) + val enriched = EnrichmentManager.enrichEvent[IO]( + enrichmentReg, + client, + processor, + timestamp, + rawEvent, + AcceptInvalid.featureFlags, + IO.unit, + SpecHelpers.registryLookup, + atomicFieldLimits, + emitIncomplete = true + ) + enriched.value.map { + case Ior.Both(_: BadRow.SchemaViolations, enriched) + if SpecHelpers.getUnstructSchema(enriched.unstruct_event) == clientSessionSchema && + !SpecHelpers.listContextsSchemas(enriched.derived_contexts).contains(emailSentSchema) => + ok + case other => ko(s"[$other] is not a SchemaViolations bad row and an enriched event without the faulty enrichment context") + } } } @@ -1630,7 +2048,7 @@ class EnrichmentManagerSpec extends Specification with EitherMatchers with CatsE "validateEnriched" should { "create a SchemaViolations bad row if an atomic field is oversized" >> { - val result = EnrichmentManager + EnrichmentManager .enrichEvent[IO]( enrichmentReg, client, @@ -1640,26 +2058,28 @@ class EnrichmentManagerSpec extends Specification with EitherMatchers with CatsE featureFlags = AcceptInvalid.featureFlags.copy(acceptInvalid = false), IO.unit, SpecHelpers.registryLookup, - atomicFieldLimits + atomicFieldLimits, + emitIncomplete ) - - result.value - .map(_ must beLeft.like { - case BadRow.SchemaViolations( - _, - Failure.SchemaViolations(_, NonEmptyList(FailureDetails.SchemaViolation.IgluError(schemaKey, clientError), Nil)), - _ + .value + .map { + case Ior.Left( + BadRow.SchemaViolations( + _, + Failure.SchemaViolations(_, NonEmptyList(FailureDetails.SchemaViolation.IgluError(schemaKey, clientError), Nil)), + _ + ) ) => schemaKey must beEqualTo(AtomicFields.atomicSchema) clientError.toString must contain("v_tracker") clientError.toString must contain("Field is longer than maximum allowed size") case other => ko(s"[$other] is not a SchemaViolations bad row with one IgluError") - }) + } } "not create a bad row if an atomic field is oversized and acceptInvalid is set to true" >> { - val result = EnrichmentManager + EnrichmentManager .enrichEvent[IO]( enrichmentReg, client, @@ -1669,11 +2089,14 @@ class EnrichmentManagerSpec extends Specification with EitherMatchers with CatsE featureFlags = AcceptInvalid.featureFlags.copy(acceptInvalid = true), IO.unit, SpecHelpers.registryLookup, - atomicFieldLimits + atomicFieldLimits, + emitIncomplete ) .value - - result.map(_ must beRight[EnrichedEvent]) + .map { + case Ior.Right(_) => ok + case other => ko(s"[$other] is not an enriched event") + } } "return a SchemaViolations bad row containing both the atomic field length error and the invalid enrichment context error" >> { @@ -1716,27 +2139,30 @@ class EnrichmentManagerSpec extends Specification with EitherMatchers with CatsE AcceptInvalid.featureFlags, IO.unit, SpecHelpers.registryLookup, - atomicFieldLimits + atomicFieldLimits, + emitIncomplete ) .value - .map(_ must beLeft.like { - case BadRow.SchemaViolations( - _, - Failure.SchemaViolations(_, - NonEmptyList(FailureDetails.SchemaViolation.IgluError(schemaKey1, clientError1), - List(FailureDetails.SchemaViolation.IgluError(schemaKey2, clientError2)) - ) - ), - _ + .map { + case Ior.Left( + BadRow.SchemaViolations( + _, + Failure.SchemaViolations(_, + NonEmptyList(FailureDetails.SchemaViolation.IgluError(schemaKey1, clientError1), + List(FailureDetails.SchemaViolation.IgluError(schemaKey2, clientError2)) + ) + ), + _ + ) ) => - schemaKey1 must beEqualTo(AtomicFields.atomicSchema) - clientError1.toString must contain("v_tracker") - clientError1.toString must contain("Field is longer than maximum allowed size") - schemaKey2 must beEqualTo(emailSentSchema) - clientError2.toString must contain("emailAddress2: is missing but it is required") + schemaKey1 must beEqualTo(emailSentSchema) + clientError1.toString must contain("emailAddress2: is missing but it is required") + schemaKey2 must beEqualTo(AtomicFields.atomicSchema) + clientError2.toString must contain("v_tracker") + clientError2.toString must contain("Field is longer than maximum allowed size") case other => ko(s"[$other] is not a SchemaViolations bad row with 2 IgluError") - }) + } } } } @@ -1813,4 +2239,32 @@ object EnrichmentManagerSpec { "jsonschema", SchemaVer.Full(1, 0, 0) ) + + val emailSent = s"""{ + "schema": "${emailSentSchema.toSchemaUri}", + "data": { + "emailAddress": "hello@world.com", + "emailAddress2": "foo@bar.org" + } + }""" + + val clientSessionSchema = + SchemaKey( + "com.snowplowanalytics.snowplow", + "client_session", + "jsonschema", + SchemaVer.Full(1, 0, 1) + ) + + val clientSession = s"""{ + "schema": "${clientSessionSchema.toSchemaUri}", + "data": { + "sessionIndex": 1, + "storageMechanism": "LOCAL_STORAGE", + "firstEventId": "5c33fccf-6be5-4ce6-afb1-e34026a3ca75", + "sessionId": "21c2a0dd-892d-42d1-b156-3a9d4e147eef", + "previousSessionId": null, + "userId": "20d631b8-7837-49df-a73e-6da73154e6fd" + } + }""" } diff --git a/modules/common/src/test/scala/com.snowplowanalytics.snowplow.enrich.common/enrichments/registry/pii/PiiPseudonymizerEnrichmentSpec.scala b/modules/common/src/test/scala/com.snowplowanalytics.snowplow.enrich.common/enrichments/registry/pii/PiiPseudonymizerEnrichmentSpec.scala index 53a0894be..25148a1cc 100644 --- a/modules/common/src/test/scala/com.snowplowanalytics.snowplow.enrich.common/enrichments/registry/pii/PiiPseudonymizerEnrichmentSpec.scala +++ b/modules/common/src/test/scala/com.snowplowanalytics.snowplow.enrich.common/enrichments/registry/pii/PiiPseudonymizerEnrichmentSpec.scala @@ -10,7 +10,6 @@ */ package com.snowplowanalytics.snowplow.enrich.common.enrichments.registry.pii -import cats.data.Validated import cats.syntax.option._ import cats.syntax.validated._ @@ -63,7 +62,7 @@ class PiiPseudonymizerEnrichmentSpec extends Specification with ValidatedMatcher removeAddedFields should remove fields added by PII enrichment $e9 """ - def commonSetup(enrichmentReg: EnrichmentRegistry[IO]): IO[List[Validated[BadRow, EnrichedEvent]]] = { + def commonSetup(enrichmentReg: EnrichmentRegistry[IO]): IO[List[Either[BadRow, EnrichedEvent]]] = { val context = CollectorPayload.Context( Some(DateTime.parse("2017-07-14T03:39:39.000+00:00")), @@ -182,9 +181,10 @@ class PiiPseudonymizerEnrichmentSpec extends Specification with ValidatedMatcher AcceptInvalid.featureFlags, IO.unit, SpecHelpers.registryLookup, - AtomicFields.from(Map.empty) + AtomicFields.from(Map.empty), + emitIncomplete ) - } yield result + } yield result.map(_.toEither) } private val ipEnrichment = { @@ -319,7 +319,7 @@ class PiiPseudonymizerEnrichmentSpec extends Specification with ValidatedMatcher actual.map { output => val size = output.size must_== 1 - val validOut = output.head must beValid.like { + val validOut = output.head must beRight.like { case enrichedEvent => (enrichedEvent.app_id must_== expected.app_id) and (enrichedEvent.geo_city must_== expected.geo_city) and @@ -414,7 +414,7 @@ class PiiPseudonymizerEnrichmentSpec extends Specification with ValidatedMatcher actual.map { output => val size = output.size must_== 1 - val validOut = output.head must beValid.like { + val validOut = output.head must beRight.like { case enrichedEvent => val contextJ = parse(enrichedEvent.contexts).toOption.get.hcursor val contextJFirstElement = contextJ.downField("data").downArray @@ -523,7 +523,7 @@ class PiiPseudonymizerEnrichmentSpec extends Specification with ValidatedMatcher actual.map { output => val size = output.size must_== 1 - val validOut = output.head must beValid.like { + val validOut = output.head must beRight.like { case enrichedEvent => val contextJ = parse(enrichedEvent.contexts).toOption.get.hcursor.downField("data") val firstElem = contextJ.downArray.downField("data") @@ -574,7 +574,7 @@ class PiiPseudonymizerEnrichmentSpec extends Specification with ValidatedMatcher actual.map { output => val size = output.size must_== 1 - val validOut = output.head must beValid.like { + val validOut = output.head must beRight.like { case enrichedEvent => val contextJ = parse(enrichedEvent.contexts).toOption.get.hcursor.downField("data") val firstElem = contextJ.downArray.downField("data") @@ -628,7 +628,7 @@ class PiiPseudonymizerEnrichmentSpec extends Specification with ValidatedMatcher actual.map { output => val size = output.size must_== 1 - val validOut = output.head must beValid.like { + val validOut = output.head must beRight.like { case enrichedEvent => val contextJ = parse(enrichedEvent.contexts).toOption.get.hcursor.downField("data") val firstElem = contextJ.downArray.downField("data") @@ -682,7 +682,7 @@ class PiiPseudonymizerEnrichmentSpec extends Specification with ValidatedMatcher actual.map { output => val size = output.size must_== 1 - val validOut = output.head must beValid.like { + val validOut = output.head must beRight.like { case enrichedEvent => val contextJ = parse(enrichedEvent.contexts).toOption.get.hcursor.downField("data") val firstElem = contextJ.downArray.downField("data") @@ -746,7 +746,7 @@ class PiiPseudonymizerEnrichmentSpec extends Specification with ValidatedMatcher actual.map { output => val size = output.size must_== 1 - val validOut = output.head must beValid.like { + val validOut = output.head must beRight.like { case enrichedEvent => val contextJ = parse(enrichedEvent.contexts).toOption.get.hcursor.downField("data") val firstElem = contextJ.downArray.downField("data") @@ -801,7 +801,7 @@ class PiiPseudonymizerEnrichmentSpec extends Specification with ValidatedMatcher actual.map { output => val size = output.size must_== 1 - val validOut = output.head must beValid.like { + val validOut = output.head must beRight.like { case enrichedEvent => val context = parse(enrichedEvent.contexts).toOption.get.hcursor.downField("data").downArray val data = context.downField("data") diff --git a/modules/common/src/test/scala/com.snowplowanalytics.snowplow.enrich.common/utils/IgluUtilsSpec.scala b/modules/common/src/test/scala/com.snowplowanalytics.snowplow.enrich.common/utils/IgluUtilsSpec.scala index 8f0596eba..607b3a5c3 100644 --- a/modules/common/src/test/scala/com.snowplowanalytics.snowplow.enrich.common/utils/IgluUtilsSpec.scala +++ b/modules/common/src/test/scala/com.snowplowanalytics.snowplow.enrich.common/utils/IgluUtilsSpec.scala @@ -17,7 +17,7 @@ import cats.effect.testing.specs2.CatsEffect import io.circe.parser.parse -import cats.data.NonEmptyList +import cats.data.{Ior, NonEmptyList} import com.snowplowanalytics.iglu.core.{SchemaKey, SchemaVer} @@ -135,7 +135,11 @@ class IgluUtilsSpec extends Specification with ValidatedMatchers with CatsEffect "return None if unstruct_event field is empty" >> { IgluUtils .extractAndValidateUnstructEvent(new EnrichedEvent, SpecHelpers.client, SpecHelpers.registryLookup) - .map(_ must beValid(None)) + .value + .map { + case Ior.Right(None) => ok + case other => ko(s"[$other] is not a success with None") + } } "return a SchemaViolation.NotJson if unstruct_event does not contain a properly formatted JSON string" >> { @@ -144,10 +148,11 @@ class IgluUtilsSpec extends Specification with ValidatedMatchers with CatsEffect IgluUtils .extractAndValidateUnstructEvent(input, SpecHelpers.client, SpecHelpers.registryLookup) - .map(_ must beInvalid.like { - case _: FailureDetails.SchemaViolation.NotJson => ok - case err => ko(s"[$err] is not NotJson") - }) + .value + .map { + case Ior.Both(NonEmptyList(_: FailureDetails.SchemaViolation.NotJson, _), None) => ok + case other => ko(s"[$other] is not an error with NotJson") + } } "return a SchemaViolation.NotIglu if unstruct_event contains a properly formatted JSON string that is not self-describing" >> { @@ -156,10 +161,11 @@ class IgluUtilsSpec extends Specification with ValidatedMatchers with CatsEffect IgluUtils .extractAndValidateUnstructEvent(input, SpecHelpers.client, SpecHelpers.registryLookup) - .map(_ must beInvalid.like { - case _: FailureDetails.SchemaViolation.NotIglu => ok - case err => ko(s"[$err] is not NotIglu") - }) + .value + .map { + case Ior.Both(NonEmptyList(_: FailureDetails.SchemaViolation.NotIglu, _), None) => ok + case other => ko(s"[$other] is not an error with NotIglu") + } } "return a SchemaViolation.CriterionMismatch if unstruct_event contains a self-describing JSON but not with the expected schema for unstructured events" >> { @@ -168,10 +174,11 @@ class IgluUtilsSpec extends Specification with ValidatedMatchers with CatsEffect IgluUtils .extractAndValidateUnstructEvent(input, SpecHelpers.client, SpecHelpers.registryLookup) - .map(_ must beInvalid.like { - case _: FailureDetails.SchemaViolation.CriterionMismatch => ok - case err => ko(s"[$err] is not CriterionMismatch") - }) + .value + .map { + case Ior.Both(NonEmptyList(_: FailureDetails.SchemaViolation.CriterionMismatch, _), None) => ok + case other => ko(s"[$other] is not an error with CriterionMismatch") + } } "return a SchemaViolation.NotJson if the JSON in .data is not a JSON" >> { @@ -180,10 +187,11 @@ class IgluUtilsSpec extends Specification with ValidatedMatchers with CatsEffect IgluUtils .extractAndValidateUnstructEvent(input, SpecHelpers.client, SpecHelpers.registryLookup) - .map(_ must beInvalid.like { - case _: FailureDetails.SchemaViolation.NotJson => ok - case err => ko(s"[$err] is not NotJson") - }) + .value + .map { + case Ior.Both(NonEmptyList(_: FailureDetails.SchemaViolation.NotJson, _), None) => ok + case other => ko(s"[$other] is not an error with NotJson") + } } "return a SchemaViolation.IgluError containing a ValidationError if the JSON in .data is not self-describing" >> { @@ -192,12 +200,13 @@ class IgluUtilsSpec extends Specification with ValidatedMatchers with CatsEffect IgluUtils .extractAndValidateUnstructEvent(input, SpecHelpers.client, SpecHelpers.registryLookup) - .map(_ must beInvalid.like { - case FailureDetails.SchemaViolation.IgluError(_, ValidationError(_, _)) => ok - case ie: FailureDetails.SchemaViolation.IgluError => + .value + .map { + case Ior.Both(NonEmptyList(FailureDetails.SchemaViolation.IgluError(_, ValidationError(_, _)), _), None) => ok + case Ior.Both(NonEmptyList(ie: FailureDetails.SchemaViolation.IgluError, _), None) => ko(s"IgluError [$ie] is not ValidationError") - case err => ko(s"[$err] is not IgluError") - }) + case other => ko(s"[$other] is not an error with IgluError") + } } "return a SchemaViolation.IgluError containing a ValidationError if the JSON in .data is not a valid SDJ" >> { @@ -206,12 +215,13 @@ class IgluUtilsSpec extends Specification with ValidatedMatchers with CatsEffect IgluUtils .extractAndValidateUnstructEvent(input, SpecHelpers.client, SpecHelpers.registryLookup) - .map(_ must beInvalid.like { - case FailureDetails.SchemaViolation.IgluError(_, ValidationError(_, _)) => ok - case ie: FailureDetails.SchemaViolation.IgluError => + .value + .map { + case Ior.Both(NonEmptyList(FailureDetails.SchemaViolation.IgluError(_, ValidationError(_, _)), _), None) => ok + case Ior.Both(NonEmptyList(ie: FailureDetails.SchemaViolation.IgluError, _), None) => ko(s"IgluError [$ie] is not ValidationError") - case err => ko(s"[$err] is not IgluError") - }) + case other => ko(s"[$other] is not an error with IgluError") + } } "return a SchemaViolation.IgluError containing a ResolutionError if the schema of the SDJ in .data can't be resolved" >> { @@ -220,12 +230,13 @@ class IgluUtilsSpec extends Specification with ValidatedMatchers with CatsEffect IgluUtils .extractAndValidateUnstructEvent(input, SpecHelpers.client, SpecHelpers.registryLookup) - .map(_ must beInvalid.like { - case FailureDetails.SchemaViolation.IgluError(_, ResolutionError(_)) => ok - case ie: FailureDetails.SchemaViolation.IgluError => - ko(s"IgluError [$ie] is not ResolutionError") - case err => ko(s"[$err] is not IgluError") - }) + .value + .map { + case Ior.Both(NonEmptyList(FailureDetails.SchemaViolation.IgluError(_, ResolutionError(_)), _), None) => ok + case Ior.Both(NonEmptyList(ie: FailureDetails.SchemaViolation.IgluError, _), None) => + ko(s"IgluError [$ie] is not a ResolutionError") + case other => ko(s"[$other] is not an error with IgluError") + } } "return the extracted unstructured event if .data is a valid SDJ" >> { @@ -234,14 +245,15 @@ class IgluUtilsSpec extends Specification with ValidatedMatchers with CatsEffect IgluUtils .extractAndValidateUnstructEvent(input, SpecHelpers.client, SpecHelpers.registryLookup) - .map(_ must beValid.like { - case Some(IgluUtils.SdjExtractResult(sdj, None)) if sdj.schema == emailSentSchema => ok - case Some(s) => + .value + .map { + case Ior.Right(Some(IgluUtils.SdjExtractResult(sdj, None))) if sdj.schema == emailSentSchema => ok + case Ior.Right(Some(s)) => ko( s"unstructured event's schema [${s.sdj.schema}] does not match expected schema [${emailSentSchema}]" ) - case None => ko("no unstructured event was extracted") - }) + case other => ko(s"no unstructured event was extracted [$other]") + } } "return the extracted unstructured event when schema is superseded by another schema" >> { @@ -255,26 +267,32 @@ class IgluUtilsSpec extends Specification with ValidatedMatchers with CatsEffect IgluUtils .extractAndValidateUnstructEvent(input1, SpecHelpers.client, SpecHelpers.registryLookup) - .map(_ must beValid.like { - case Some(IgluUtils.SdjExtractResult(sdj, Some(`expectedValidationInfo`))) if sdj.schema == supersedingExampleSchema101 => ok - case Some(s) => + .value + .map { + case Ior.Right(Some(IgluUtils.SdjExtractResult(sdj, Some(`expectedValidationInfo`)))) + if sdj.schema == supersedingExampleSchema101 => + ok + case Ior.Right(Some(s)) => ko( s"unstructured event's schema [${s.sdj.schema}] does not match expected schema [${supersedingExampleSchema101}]" ) - case None => ko("no unstructured event was extracted") - }) + case other => ko(s"no unstructured event was extracted [$other]") + } // input2 wouldn't be validated with 1-0-0. It would be validated with 1-0-1 only. IgluUtils .extractAndValidateUnstructEvent(input2, SpecHelpers.client, SpecHelpers.registryLookup) - .map(_ must beValid.like { - case Some(IgluUtils.SdjExtractResult(sdj, Some(`expectedValidationInfo`))) if sdj.schema == supersedingExampleSchema101 => ok - case Some(s) => + .value + .map { + case Ior.Right(Some(IgluUtils.SdjExtractResult(sdj, Some(`expectedValidationInfo`)))) + if sdj.schema == supersedingExampleSchema101 => + ok + case Ior.Right(Some(s)) => ko( s"unstructured event's schema [${s.sdj.schema}] does not match expected schema [${supersedingExampleSchema101}]" ) - case None => ko("no unstructured event was extracted") - }) + case other => ko(s"no unstructured event was extracted [$other]") + } } } @@ -282,7 +300,11 @@ class IgluUtilsSpec extends Specification with ValidatedMatchers with CatsEffect "return Nil if contexts field is empty" >> { IgluUtils .extractAndValidateInputContexts(new EnrichedEvent, SpecHelpers.client, SpecHelpers.registryLookup) - .map(_ must beValid(Nil)) + .value + .map { + case Ior.Right(Nil) => ok + case other => ko(s"[$other] is not a success with an empty list") + } } "return a SchemaViolation.NotJson if .contexts does not contain a properly formatted JSON string" >> { @@ -291,10 +313,11 @@ class IgluUtilsSpec extends Specification with ValidatedMatchers with CatsEffect IgluUtils .extractAndValidateInputContexts(input, SpecHelpers.client, SpecHelpers.registryLookup) - .map(_ must beInvalid.like { - case NonEmptyList(_: FailureDetails.SchemaViolation.NotJson, Nil) => ok - case err => ko(s"[$err] is not one NotJson") - }) + .value + .map { + case Ior.Both(NonEmptyList(_: FailureDetails.SchemaViolation.NotJson, Nil), Nil) => ok + case other => ko(s"[$other] is not an error with NotJson") + } } "return a SchemaViolation.NotIglu if .contexts contains a properly formatted JSON string that is not self-describing" >> { @@ -303,10 +326,11 @@ class IgluUtilsSpec extends Specification with ValidatedMatchers with CatsEffect IgluUtils .extractAndValidateInputContexts(input, SpecHelpers.client, SpecHelpers.registryLookup) - .map(_ must beInvalid.like { - case NonEmptyList(_: FailureDetails.SchemaViolation.NotIglu, Nil) => ok - case err => ko(s"[$err] is not one NotIglu") - }) + .value + .map { + case Ior.Both(NonEmptyList(_: FailureDetails.SchemaViolation.NotIglu, Nil), Nil) => ok + case other => ko(s"[$other] is not an error with NotIglu") + } } "return a SchemaViolation.CriterionMismatch if .contexts contains a self-describing JSON but not with the right schema" >> { @@ -315,10 +339,11 @@ class IgluUtilsSpec extends Specification with ValidatedMatchers with CatsEffect IgluUtils .extractAndValidateInputContexts(input, SpecHelpers.client, SpecHelpers.registryLookup) - .map(_ must beInvalid.like { - case NonEmptyList(_: FailureDetails.SchemaViolation.CriterionMismatch, Nil) => ok - case err => ko(s"[$err] is not one CriterionMismatch") - }) + .value + .map { + case Ior.Both(NonEmptyList(_: FailureDetails.SchemaViolation.CriterionMismatch, Nil), Nil) => ok + case other => ko(s"[$other] is not an error with CriterionMismatch") + } } "return a SchemaViolation.IgluError containing a ValidationError if .data does not contain an array of JSON objects" >> { @@ -329,13 +354,14 @@ class IgluUtilsSpec extends Specification with ValidatedMatchers with CatsEffect IgluUtils .extractAndValidateInputContexts(input, SpecHelpers.client, SpecHelpers.registryLookup) - .map(_ must beInvalid.like { - case NonEmptyList(FailureDetails.SchemaViolation.IgluError(_, ValidationError(_, _)), Nil) => + .value + .map { + case Ior.Both(NonEmptyList(FailureDetails.SchemaViolation.IgluError(_, ValidationError(_, _)), Nil), Nil) => ok - case NonEmptyList(ie: FailureDetails.SchemaViolation.IgluError, Nil) => + case Ior.Both(NonEmptyList(ie: FailureDetails.SchemaViolation.IgluError, Nil), Nil) => ko(s"IgluError [$ie] is not ValidationError") - case err => ko(s"[$err] is not one IgluError") - }) + case other => ko(s"[$other] is not an error with IgluError") + } } "return a SchemaViolation.IgluError containing a ValidationError if .data contains one invalid context" >> { @@ -344,13 +370,14 @@ class IgluUtilsSpec extends Specification with ValidatedMatchers with CatsEffect IgluUtils .extractAndValidateInputContexts(input, SpecHelpers.client, SpecHelpers.registryLookup) - .map(_ must beInvalid.like { - case NonEmptyList(FailureDetails.SchemaViolation.IgluError(_, ValidationError(_, _)), Nil) => + .value + .map { + case Ior.Both(NonEmptyList(FailureDetails.SchemaViolation.IgluError(_, ValidationError(_, _)), Nil), Nil) => ok - case NonEmptyList(ie: FailureDetails.SchemaViolation.IgluError, Nil) => + case Ior.Both(NonEmptyList(ie: FailureDetails.SchemaViolation.IgluError, Nil), Nil) => ko(s"IgluError [$ie] is not ValidationError") - case err => ko(s"[$err] is not one IgluError") - }) + case other => ko(s"[$other] is not an error with IgluError") + } } "return a SchemaViolation.IgluError containing a ResolutionError if .data contains one context whose schema can't be resolved" >> { @@ -359,13 +386,14 @@ class IgluUtilsSpec extends Specification with ValidatedMatchers with CatsEffect IgluUtils .extractAndValidateInputContexts(input, SpecHelpers.client, SpecHelpers.registryLookup) - .map(_ must beInvalid.like { - case NonEmptyList(FailureDetails.SchemaViolation.IgluError(_, ResolutionError(_)), Nil) => + .value + .map { + case Ior.Both(NonEmptyList(FailureDetails.SchemaViolation.IgluError(_, ResolutionError(_)), Nil), Nil) => ok - case NonEmptyList(ie: FailureDetails.SchemaViolation.IgluError, Nil) => + case Ior.Both(NonEmptyList(ie: FailureDetails.SchemaViolation.IgluError, Nil), Nil) => ko(s"IgluError [$ie] is not ResolutionError") - case err => ko(s"[$err] is not one IgluError") - }) + case other => ko(s"[$other] is not an error with IgluError") + } } "return 2 expected failures for 2 invalid contexts" >> { @@ -374,26 +402,32 @@ class IgluUtilsSpec extends Specification with ValidatedMatchers with CatsEffect IgluUtils .extractAndValidateInputContexts(input, SpecHelpers.client, SpecHelpers.registryLookup) - .map(_ must beInvalid.like { - case NonEmptyList( - FailureDetails.SchemaViolation.IgluError(_, ValidationError(_, _)), - List(FailureDetails.SchemaViolation.IgluError(_, ResolutionError(_))) + .value + .map { + case Ior.Both(NonEmptyList( + FailureDetails.SchemaViolation.IgluError(_, ValidationError(_, _)), + List(FailureDetails.SchemaViolation.IgluError(_, ResolutionError(_))) + ), + Nil ) => ok - case errs => ko(s"[$errs] is not one ValidationError and one ResolutionError") - }) + case other => ko(s"[$other] is not one ValidationError and one ResolutionError") + } } - "return an expected failure if one context is valid and the other invalid" >> { + "return an expected failure and an expected SDJ if one context is invalid and one is valid" >> { val input = new EnrichedEvent input.setContexts(buildInputContexts(List(emailSent1, noSchema))) IgluUtils .extractAndValidateInputContexts(input, SpecHelpers.client, SpecHelpers.registryLookup) - .map(_ must beInvalid.like { - case NonEmptyList(_: FailureDetails.SchemaViolation.IgluError, Nil) => ok - case err => ko(s"[$err] is not one IgluError") - }) + .value + .map { + case Ior.Both(NonEmptyList(_: FailureDetails.SchemaViolation.IgluError, Nil), List(extract)) + if extract.sdj.schema == emailSentSchema => + ok + case other => ko(s"[$other] is not one IgluError and one SDJ with schema $emailSentSchema") + } } "return the extracted SDJs for 2 valid input contexts" >> { @@ -402,12 +436,13 @@ class IgluUtilsSpec extends Specification with ValidatedMatchers with CatsEffect IgluUtils .extractAndValidateInputContexts(input, SpecHelpers.client, SpecHelpers.registryLookup) - .map(_ must beValid.like { - case sdjs if sdjs.size == 2 && sdjs.forall(i => i.sdj.schema == emailSentSchema && i.validationInfo.isEmpty) => + .value + .map { + case Ior.Right(sdjs) if sdjs.size == 2 && sdjs.forall(i => i.sdj.schema == emailSentSchema && i.validationInfo.isEmpty) => ok - case res => - ko(s"[$res] are not 2 SDJs with expected schema [${emailSentSchema.toSchemaUri}]") - }) + case other => + ko(s"[$other] is not 2 SDJs with expected schema [${emailSentSchema.toSchemaUri}]") + } } "return the extracted SDJ for an input that has a required property set to null if the schema explicitly allows it" >> { @@ -416,12 +451,13 @@ class IgluUtilsSpec extends Specification with ValidatedMatchers with CatsEffect IgluUtils .extractAndValidateInputContexts(input, SpecHelpers.client, SpecHelpers.registryLookup) - .map(_ must beValid.like { - case sdj if sdj.size == 1 && sdj.forall(_.sdj.schema == clientSessionSchema) => + .value + .map { + case Ior.Right(sdjs) if sdjs.size == 1 && sdjs.forall(_.sdj.schema == clientSessionSchema) => ok - case _ => - ko("$.previousSessionId: is missing but it is required") - }) + case other => + ko(s"[$other] is not 1 SDJ with expected schema [${clientSessionSchema.toSchemaUri}]") + } } "return the extracted context when schema is superseded by another schema" >> { @@ -430,12 +466,13 @@ class IgluUtilsSpec extends Specification with ValidatedMatchers with CatsEffect IgluUtils .extractAndValidateInputContexts(input, SpecHelpers.client, SpecHelpers.registryLookup) - .map(_ must beValid.like { - case sdj if sdj.size == 2 && sdj.forall(_.sdj.schema == supersedingExampleSchema101) => + .value + .map { + case Ior.Right(sdjs) if sdjs.size == 2 && sdjs.forall(_.sdj.schema == supersedingExampleSchema101) => ok - case _ => - ko("Failed to extract context when schema is superseded by another schema") - }) + case other => + ko(s"[$other] is not 2 SDJs with expected schema [${supersedingExampleSchema101.toSchemaUri}]") + } } } @@ -447,10 +484,11 @@ class IgluUtilsSpec extends Specification with ValidatedMatchers with CatsEffect IgluUtils .validateEnrichmentsContexts(SpecHelpers.client, contexts, SpecHelpers.registryLookup) - .map(_.toEither must beLeft.like { - case NonEmptyList(FailureDetails.SchemaViolation.IgluError(_, ValidationError(_, _)), Nil) => ok + .value + .map { + case Ior.Both(NonEmptyList(FailureDetails.SchemaViolation.IgluError(_, ValidationError(_, _)), Nil), Nil) => ok case other => ko(s"[$other] is not one ValidationError") - }) + } } "return two expected SchemaViolation for two invalid contexts" >> { @@ -461,13 +499,16 @@ class IgluUtilsSpec extends Specification with ValidatedMatchers with CatsEffect IgluUtils .validateEnrichmentsContexts(SpecHelpers.client, contexts, SpecHelpers.registryLookup) - .map(_.toEither must beLeft.like { - case NonEmptyList(FailureDetails.SchemaViolation.IgluError(_, ValidationError(_, _)), - List(FailureDetails.SchemaViolation.IgluError(_, ResolutionError(_))) + .value + .map { + case Ior.Both(NonEmptyList(FailureDetails.SchemaViolation.IgluError(_, ValidationError(_, _)), + List(FailureDetails.SchemaViolation.IgluError(_, ResolutionError(_))) + ), + Nil ) => ok case other => ko(s"[$other] is not one ValidationError and one ResolutionError") - }) + } } "return one expected SchemaViolation for one invalid context and one valid" >> { @@ -478,13 +519,20 @@ class IgluUtilsSpec extends Specification with ValidatedMatchers with CatsEffect IgluUtils .validateEnrichmentsContexts(SpecHelpers.client, contexts, SpecHelpers.registryLookup) - .map(_.toEither must beLeft.like { - case NonEmptyList(FailureDetails.SchemaViolation.IgluError(_, ValidationError(_, _)), Nil) => ok - case other => ko(s"[$other] is not one ValidationError") - }) + .value + .map { + case Ior.Both(NonEmptyList( + FailureDetails.SchemaViolation.IgluError(_, ValidationError(_, _)), + Nil + ), + List(sdj) + ) if sdj.schema == emailSentSchema => + ok + case other => ko(s"[$other] is not one ValidationError and one SDJ with schema $emailSentSchema") + } } - "not return any error for two valid contexts" >> { + "return 2 valid contexts" >> { val contexts = List( SpecHelpers.jsonStringToSDJ(emailSent1).right.get, SpecHelpers.jsonStringToSDJ(emailSent2).right.get @@ -492,7 +540,11 @@ class IgluUtilsSpec extends Specification with ValidatedMatchers with CatsEffect IgluUtils .validateEnrichmentsContexts(SpecHelpers.client, contexts, SpecHelpers.registryLookup) - .map(_.toEither must beRight) + .value + .map { + case Ior.Right(List(sdj1, sdj2)) if sdj1.schema == emailSentSchema && sdj2.schema == emailSentSchema => ok + case other => ko(s"[$other] doesn't contain 2 valid contexts with schema $emailSentSchema") + } } } @@ -507,10 +559,18 @@ class IgluUtilsSpec extends Specification with ValidatedMatchers with CatsEffect SpecHelpers.client, SpecHelpers.registryLookup ) - .map(_.toEither must beLeft.like { - case errors if errors.size == 1 => ok - case other => ko(s"[$other] is not one SchemaViolation") - }) + .value + .map { + case Ior.Both( + NonEmptyList( + _: FailureDetails.SchemaViolation, + Nil + ), + IgluUtils.EventExtractResult(Nil, None, Nil) + ) => + ok + case other => ko(s"[$other] isn't an error with SchemaViolation") + } } "return one SchemaViolation if the input event contains one invalid context" >> { @@ -523,10 +583,18 @@ class IgluUtilsSpec extends Specification with ValidatedMatchers with CatsEffect SpecHelpers.client, SpecHelpers.registryLookup ) - .map(_.toEither must beLeft.like { - case errors if errors.size == 1 => ok - case other => ko(s"[$other] is not one SchemaViolation") - }) + .value + .map { + case Ior.Both( + NonEmptyList( + _: FailureDetails.SchemaViolation, + Nil + ), + IgluUtils.EventExtractResult(Nil, None, Nil) + ) => + ok + case other => ko(s"[$other] isn't an error with SchemaViolation") + } } "return two SchemaViolation if the input event contains an invalid unstructured event and one invalid context" >> { @@ -540,10 +608,18 @@ class IgluUtilsSpec extends Specification with ValidatedMatchers with CatsEffect SpecHelpers.client, SpecHelpers.registryLookup ) - .map(_.toEither must beLeft.like { - case errors if errors.size == 2 => ok - case other => ko(s"[$other] is not two SchemaViolation") - }) + .value + .map { + case Ior.Both( + NonEmptyList( + _: FailureDetails.SchemaViolation, + List(_: FailureDetails.SchemaViolation) + ), + IgluUtils.EventExtractResult(Nil, None, Nil) + ) => + ok + case other => ko(s"[$other] isn't 2 errors with SchemaViolation") + } } "return the extracted unstructured event and the extracted input contexts if they are all valid" >> { @@ -557,17 +633,68 @@ class IgluUtilsSpec extends Specification with ValidatedMatchers with CatsEffect SpecHelpers.client, SpecHelpers.registryLookup ) - .map(_.toEither must beRight.like { - case IgluUtils.EventExtractResult(contexts, Some(unstructEvent), validationInfos) + .value + .map { + case Ior.Right(IgluUtils.EventExtractResult(contexts, Some(unstructEvent), validationInfos)) if contexts.size == 2 && validationInfos.isEmpty && (unstructEvent :: contexts).forall(_.schema == emailSentSchema) => ok - case res => + case other => + ko( + s"[$other] doesn't contain the two contexts and the unstructured event" + ) + } + } + + "return the SchemaViolation of the invalid context in the Left and the extracted unstructured event in the Right" >> { + val input = new EnrichedEvent + input.setUnstruct_event(buildUnstruct(emailSent1)) + input.setContexts(buildInputContexts(List(invalidEmailSent))) + + IgluUtils + .extractAndValidateInputJsons( + input, + SpecHelpers.client, + SpecHelpers.registryLookup + ) + .value + .map { + case Ior.Both( + NonEmptyList(FailureDetails.SchemaViolation.IgluError(_, ValidationError(_, _)), _), + extract + ) if extract.contexts.isEmpty && extract.unstructEvent.isDefined && extract.unstructEvent.get.schema == emailSentSchema => + ok + case other => + ko( + s"[$other] isn't one ValidationError and an unstructured event with schema $emailSentSchema" + ) + } + } + + "return the SchemaViolation of the invalid unstructured event in the Left and the valid context in the Right" >> { + val input = new EnrichedEvent + input.setUnstruct_event(buildUnstruct(invalidEmailSent)) + input.setContexts(buildInputContexts(List(emailSent1))) + + IgluUtils + .extractAndValidateInputJsons( + input, + SpecHelpers.client, + SpecHelpers.registryLookup + ) + .value + .map { + case Ior.Both( + NonEmptyList(FailureDetails.SchemaViolation.IgluError(_, ValidationError(_, _)), _), + extract + ) if extract.contexts.size == 1 && extract.contexts.head.schema == emailSentSchema && extract.unstructEvent.isEmpty => + ok + case other => ko( - s"[$res] is not a list with two extracted contexts and an option with the extracted unstructured event" + s"[$other] isn't one ValidationError and one context with schema $emailSentSchema" ) - }) + } } "return the extracted unstructured event and the extracted input contexts when schema is superseded by another schema" >> { @@ -588,19 +715,20 @@ class IgluUtilsSpec extends Specification with ValidatedMatchers with CatsEffect SpecHelpers.client, SpecHelpers.registryLookup ) - .map(_.toEither must beRight.like { - case IgluUtils.EventExtractResult(contexts, Some(unstructEvent), List(validationInfo)) + .value + .map { + case Ior.Right(IgluUtils.EventExtractResult(contexts, Some(unstructEvent), List(validationInfo))) if contexts.size == 2 && unstructEvent.schema == supersedingExampleSchema101 && contexts.count(_.schema == supersedingExampleSchema101) == 2 && validationInfo.schema == IgluUtils.ValidationInfo.schemaKey && validationInfo.data == expectedValidationInfoContext => ok - case res => + case other => ko( - s"[$res] is not a list with two extracted contexts and an option with the extracted unstructured event" + s"[$other] doesn't contain the two contexts and the unstructured event with the superseded schema" ) - }) + } } } diff --git a/modules/common/src/test/scala/com.snowplowanalytics.snowplow.enrich.common/utils/ValidateAndReformatJsonSpec.scala b/modules/common/src/test/scala/com.snowplowanalytics.snowplow.enrich.common/utils/ValidateAndReformatJsonSpec.scala deleted file mode 100644 index f5cd921ec..000000000 --- a/modules/common/src/test/scala/com.snowplowanalytics.snowplow.enrich.common/utils/ValidateAndReformatJsonSpec.scala +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Copyright (c) 2012-present Snowplow Analytics Ltd. - * All rights reserved. - * - * This software is made available by Snowplow Analytics, Ltd., - * under the terms of the Snowplow Limited Use License Agreement, Version 1.0 - * located at https://docs.snowplow.io/limited-use-license-1.0 - * BY INSTALLING, DOWNLOADING, ACCESSING, USING OR DISTRIBUTING ANY PORTION - * OF THE SOFTWARE, YOU AGREE TO THE TERMS OF SUCH LICENSE AGREEMENT. - */ -package com.snowplowanalytics.snowplow.enrich.common.utils - -import org.specs2.Specification -import org.specs2.matcher.DataTables - -class ValidateAndReformatJsonSpec extends Specification with DataTables { - def is = s2""" - extracting and reformatting (where necessary) valid JSONs with work $e1 - extracting invalid JSONs should fail $e2 - """ - - def e1 = - "SPEC NAME" || "INPUT STR" | "EXPECTED" | - "Empty JSON" !! "{}" ! "{}" | - "Simple JSON #1" !! """{"key":"value"}""" ! """{"key":"value"}""" | - "Simple JSON #2" !! """[1,2,3]""" ! """[1,2,3]""" | - "Reformatted JSON #1" !! """{ "key" : 23 }""" ! """{"key":23}""" | - "Reformatted JSON #2" !! """[1.00, 2.00, 3.00, 4.00]""" ! """[1.00,2.00,3.00,4.00]""" | - "Reformatted JSON #3" !! """ - { - "a": 23 - }""" ! """{"a":23}""" |> { (_, str, expected) => - JsonUtils.validateAndReformatJson(str) must beRight(expected) - } - - def err1 = s"invalid json: exhausted input" - def err2: (String, Int, Int) => String = - (got, line, col) => s"invalid json: expected json value got '$got' (line $line, column $col)" - def err3: (String, Int, Int) => String = - (got, line, col) => s"""invalid json: expected " got '$got' (line $line, column $col)""" - - def e2 = - "SPEC NAME" || "INPUT STR" | "EXPECTED" | - "Empty string" !! "" ! err1 | - "Double colons" !! """{"a"::2}""" ! err2(":2}", 1, 6) | - "Random noise" !! "^45fj_" ! err2("^45fj_", 1, 1) | - "Bad key" !! """{9:"a"}""" ! err3("""9:"a"}""", 1, 2) |> { (_, str, expected) => - JsonUtils.validateAndReformatJson(str) must beLeft(expected) - } - -} diff --git a/modules/kafka/src/main/resources/application.conf b/modules/kafka/src/main/resources/application.conf index 5c5897eda..639e1b15f 100644 --- a/modules/kafka/src/main/resources/application.conf +++ b/modules/kafka/src/main/resources/application.conf @@ -49,6 +49,21 @@ "partitionKey": "" "headers": [] } + + "incomplete": { + "type": "Kafka" + # we need all the fields to exist to have defaults + "topicName": "" + "bootstrapServers": "" + "producerConf": { + "acks": "all" + "security.protocol": "SASL_SSL" + "sasl.mechanism": "OAUTHBEARER" + "sasl.jaas.config": "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;" + } + "partitionKey": "" + "headers": [] + } } "concurrency" : { diff --git a/modules/kafka/src/main/scala/com.snowplowanalytics.snowplow.enrich.kafka/AzureAuthenticationCallbackHandler.scala b/modules/kafka/src/main/scala/com.snowplowanalytics.snowplow.enrich.kafka/AzureAuthenticationCallbackHandler.scala index 34bc3eb8c..c9783f6b7 100644 --- a/modules/kafka/src/main/scala/com.snowplowanalytics.snowplow.enrich.kafka/AzureAuthenticationCallbackHandler.scala +++ b/modules/kafka/src/main/scala/com.snowplowanalytics.snowplow.enrich.kafka/AzureAuthenticationCallbackHandler.scala @@ -44,6 +44,8 @@ class BadSinkAuthHandler extends AzureAuthenticationCallbackHandler class PiiSinkAuthHandler extends AzureAuthenticationCallbackHandler +class IncompleteSinkAuthHandler extends AzureAuthenticationCallbackHandler + class AzureAuthenticationCallbackHandler extends AuthenticateCallbackHandler { val credentials = new DefaultAzureCredentialBuilder().build() diff --git a/modules/kafka/src/main/scala/com.snowplowanalytics.snowplow.enrich.kafka/Main.scala b/modules/kafka/src/main/scala/com.snowplowanalytics.snowplow.enrich.kafka/Main.scala index 0a9ef88e0..9ae70af42 100644 --- a/modules/kafka/src/main/scala/com.snowplowanalytics.snowplow.enrich.kafka/Main.scala +++ b/modules/kafka/src/main/scala/com.snowplowanalytics.snowplow.enrich.kafka/Main.scala @@ -61,6 +61,7 @@ object Main extends IOApp { out => Sink.initAttributed(out, classOf[GoodSinkAuthHandler].getName), out => Sink.initAttributed(out, classOf[PiiSinkAuthHandler].getName), out => Sink.init(out, classOf[BadSinkAuthHandler].getName), + out => Sink.initAttributed(out, classOf[IncompleteSinkAuthHandler].getName), checkpoint, createBlobStorageClient, _.record.value, diff --git a/modules/kafka/src/test/scala/com/snowplowanalytics/snowplow/enrich/kafka/ConfigSpec.scala b/modules/kafka/src/test/scala/com/snowplowanalytics/snowplow/enrich/kafka/ConfigSpec.scala index 054c5916c..85b5e2cf7 100644 --- a/modules/kafka/src/test/scala/com/snowplowanalytics/snowplow/enrich/kafka/ConfigSpec.scala +++ b/modules/kafka/src/test/scala/com/snowplowanalytics/snowplow/enrich/kafka/ConfigSpec.scala @@ -90,6 +90,20 @@ class ConfigSpec extends Specification with CatsEffect { "sasl.mechanism" -> "OAUTHBEARER", "sasl.jaas.config" -> "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;" ) + ), + Some( + io.Output.Kafka( + "incomplete", + "localhost:9092", + "", + Set(), + Map( + "acks" -> "all", + "security.protocol" -> "SASL_SSL", + "sasl.mechanism" -> "OAUTHBEARER", + "sasl.jaas.config" -> "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;" + ) + ) ) ), io.Concurrency(256, 1), @@ -200,7 +214,8 @@ class ConfigSpec extends Specification with CatsEffect { "sasl.mechanism" -> "OAUTHBEARER", "sasl.jaas.config" -> "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;" ) - ) + ), + None ), io.Concurrency(256, 1), None, diff --git a/modules/kinesis/src/it/resources/enrich/enrich-localstack.hocon b/modules/kinesis/src/it/resources/enrich/enrich-localstack.hocon index 569eca940..bf441a8f0 100644 --- a/modules/kinesis/src/it/resources/enrich/enrich-localstack.hocon +++ b/modules/kinesis/src/it/resources/enrich/enrich-localstack.hocon @@ -27,6 +27,13 @@ "region": ${REGION} "customEndpoint": ${LOCALSTACK_ENDPOINT} } + + "incomplete": { + "type": "Kinesis" + "streamName": ${STREAM_INCOMPLETE} + "region": ${REGION} + "customEndpoint": ${LOCALSTACK_ENDPOINT} + } } "monitoring": { diff --git a/modules/kinesis/src/it/scala/com/snowplowanalytics/snowplow/enrich/kinesis/Containers.scala b/modules/kinesis/src/it/scala/com/snowplowanalytics/snowplow/enrich/kinesis/Containers.scala index 55b9ec6d7..b4cc0ef82 100644 --- a/modules/kinesis/src/it/scala/com/snowplowanalytics/snowplow/enrich/kinesis/Containers.scala +++ b/modules/kinesis/src/it/scala/com/snowplowanalytics/snowplow/enrich/kinesis/Containers.scala @@ -86,6 +86,7 @@ object Containers extends CatsEffect { "STREAM_RAW" -> streams.raw, "STREAM_ENRICHED" -> streams.enriched, "STREAM_BAD" -> streams.bad, + "STREAM_INCOMPLETE" -> streams.incomplete, "LOCALSTACK_ENDPOINT" -> s"http://$localstackAlias:$localstackPort" ), fileSystemBind = Seq( @@ -225,7 +226,7 @@ object Containers extends CatsEffect { region: String, streams: KinesisConfig.Streams ): Unit = - List(streams.raw, streams.enriched, streams.bad).foreach { stream => + List(streams.raw, streams.enriched, streams.bad, streams.incomplete).foreach { stream => localstack.execInContainer( "aws", s"--endpoint-url=http://127.0.0.1:$port", diff --git a/modules/kinesis/src/it/scala/com/snowplowanalytics/snowplow/enrich/kinesis/EnrichKinesisSpec.scala b/modules/kinesis/src/it/scala/com/snowplowanalytics/snowplow/enrich/kinesis/EnrichKinesisSpec.scala index 67fca4aca..46306f4d2 100644 --- a/modules/kinesis/src/it/scala/com/snowplowanalytics/snowplow/enrich/kinesis/EnrichKinesisSpec.scala +++ b/modules/kinesis/src/it/scala/com/snowplowanalytics/snowplow/enrich/kinesis/EnrichKinesisSpec.scala @@ -42,7 +42,7 @@ class EnrichKinesisSpec extends Specification with AfterAll with CatsEffect { } } - "emit the correct number of enriched events and bad rows" in { + "emit the correct number of enriched events, bad rows and incomplete events" in { import utils._ val testName = "count" @@ -66,10 +66,11 @@ class EnrichKinesisSpec extends Specification with AfterAll with CatsEffect { resources.use { enrich => for { output <- enrich(input).compile.toList - (good, bad) = parseOutput(output, testName) + (good, bad, incomplete) = parseOutput(output, testName) } yield { good.size.toLong must beEqualTo(nbGood) bad.size.toLong must beEqualTo(nbBad) + incomplete.size.toLong must beEqualTo(nbBad) } } } @@ -108,13 +109,14 @@ class EnrichKinesisSpec extends Specification with AfterAll with CatsEffect { resources.use { enrich => for { output <- enrich(input).compile.toList - (good, bad) = parseOutput(output, testName) + (good, bad, incomplete) = parseOutput(output, testName) } yield { good.size.toLong must beEqualTo(nbGood) good.map { enriched => enriched.derived_contexts.data.map(_.schema) must containTheSameElementsAs(enrichmentsContexts) } bad.size.toLong must beEqualTo(0l) + incomplete.size.toLong must beEqualTo(0l) } } } diff --git a/modules/kinesis/src/it/scala/com/snowplowanalytics/snowplow/enrich/kinesis/KinesisConfig.scala b/modules/kinesis/src/it/scala/com/snowplowanalytics/snowplow/enrich/kinesis/KinesisConfig.scala index cf65222a7..91d279af9 100644 --- a/modules/kinesis/src/it/scala/com/snowplowanalytics/snowplow/enrich/kinesis/KinesisConfig.scala +++ b/modules/kinesis/src/it/scala/com/snowplowanalytics/snowplow/enrich/kinesis/KinesisConfig.scala @@ -59,6 +59,9 @@ object KinesisConfig { Some(URI.create(getEndpoint(localstackPort))) ) + def incompleteStreamConfig(localstackPort: Int, streamName: String) = + enrichedStreamConfig(localstackPort, streamName) + val monitoring = Monitoring( None, MetricsReporters(None, None, false) @@ -67,8 +70,8 @@ object KinesisConfig { private def getEndpoint(localstackPort: Int): String = s"http://$endpoint:$localstackPort" - case class Streams(raw: String, enriched: String, bad: String) + case class Streams(raw: String, enriched: String, bad: String, incomplete: String) def getStreams(uuid: String): Streams = - Streams(s"raw-$uuid", s"enriched-$uuid", s"bad-1-$uuid") + Streams(s"raw-$uuid", s"enriched-$uuid", s"bad-1-$uuid", s"incomplete-$uuid") } diff --git a/modules/kinesis/src/it/scala/com/snowplowanalytics/snowplow/enrich/kinesis/utils.scala b/modules/kinesis/src/it/scala/com/snowplowanalytics/snowplow/enrich/kinesis/utils.scala index ad3b63508..c6c169d67 100644 --- a/modules/kinesis/src/it/scala/com/snowplowanalytics/snowplow/enrich/kinesis/utils.scala +++ b/modules/kinesis/src/it/scala/com/snowplowanalytics/snowplow/enrich/kinesis/utils.scala @@ -34,6 +34,7 @@ object utils extends CatsEffect { object OutputRow { final case class Good(event: Event) extends OutputRow final case class Bad(badRow: BadRow) extends OutputRow + final case class Incomplete(incomplete: Event) extends OutputRow } def mkEnrichPipe( @@ -46,9 +47,12 @@ object utils extends CatsEffect { } yield { val enriched = asGood(outputStream(KinesisConfig.enrichedStreamConfig(localstackPort, streams.enriched))) val bad = asBad(outputStream(KinesisConfig.badStreamConfig(localstackPort, streams.bad))) + val incomplete = asIncomplete(outputStream(KinesisConfig.incompleteStreamConfig(localstackPort, streams.incomplete))) collectorPayloads => - enriched.merge(bad) + enriched + .merge(bad) + .merge(incomplete) .interruptAfter(3.minutes) .concurrently(collectorPayloads.evalMap(bytes => rawSink(List(bytes)))) } @@ -81,13 +85,26 @@ object utils extends CatsEffect { } } - def parseOutput(output: List[OutputRow], testName: String): (List[Event], List[BadRow]) = { + private def asIncomplete(source: Stream[IO, Array[Byte]]): Stream[IO, OutputRow.Incomplete] = + source.map { bytes => + OutputRow.Incomplete { + val s = new String(bytes) + Event.parse(s) match { + case Validated.Valid(e) => e + case Validated.Invalid(e) => + throw new RuntimeException(s"Can't parse incomplete event [$s]. Error: $e") + } + } + } + + def parseOutput(output: List[OutputRow], testName: String): (List[Event], List[BadRow], List[Event]) = { val good = output.collect { case OutputRow.Good(e) => e} println(s"[$testName] Bad rows:") val bad = output.collect { case OutputRow.Bad(b) => println(s"[$testName] ${b.compact}") b } - (good, bad) + val incomplete = output.collect { case OutputRow.Incomplete(i) => i} + (good, bad, incomplete) } } diff --git a/modules/kinesis/src/main/resources/application.conf b/modules/kinesis/src/main/resources/application.conf index a75006bde..ed99b2dd0 100644 --- a/modules/kinesis/src/main/resources/application.conf +++ b/modules/kinesis/src/main/resources/application.conf @@ -64,6 +64,23 @@ "recordLimit": 500 "byteLimit": 5242880 } + + "incomplete": { + "type": "Kinesis" + # we need all the fields to exist to have defaults + "streamName": "" + "backoffPolicy": { + "minBackoff": 100 milliseconds + "maxBackoff": 10 seconds + "maxRetries": 10 + } + "throttledBackoffPolicy": { + "minBackoff": 100 milliseconds + "maxBackoff": 1 second + } + "recordLimit": 500 + "byteLimit": 5242880 + } } "concurrency" : { diff --git a/modules/kinesis/src/main/scala/com/snowplowanalytics/snowplow/enrich/kinesis/Main.scala b/modules/kinesis/src/main/scala/com/snowplowanalytics/snowplow/enrich/kinesis/Main.scala index 16a16ec19..159dbb9ca 100644 --- a/modules/kinesis/src/main/scala/com/snowplowanalytics/snowplow/enrich/kinesis/Main.scala +++ b/modules/kinesis/src/main/scala/com/snowplowanalytics/snowplow/enrich/kinesis/Main.scala @@ -59,6 +59,7 @@ object Main extends IOApp { out => Sink.initAttributed(out), out => Sink.initAttributed(out), out => Sink.init(out), + out => Sink.initAttributed(out), checkpoint[IO], _ => List(S3Client.mk[IO]), getPayload, diff --git a/modules/kinesis/src/test/scala/com/snowplowanalytics/snowplow/enrich/kinesis/ConfigSpec.scala b/modules/kinesis/src/test/scala/com/snowplowanalytics/snowplow/enrich/kinesis/ConfigSpec.scala index c7ad3136b..77af46d98 100644 --- a/modules/kinesis/src/test/scala/com/snowplowanalytics/snowplow/enrich/kinesis/ConfigSpec.scala +++ b/modules/kinesis/src/test/scala/com/snowplowanalytics/snowplow/enrich/kinesis/ConfigSpec.scala @@ -79,6 +79,18 @@ class ConfigSpec extends Specification with CatsEffect { 500, 5242880, None + ), + Some( + io.Output.Kinesis( + "incomplete", + Some("eu-central-1"), + None, + io.BackoffPolicy(100.millis, 10.seconds, Some(10)), + io.BackoffPolicy(100.millis, 1.second, None), + 500, + 5242880, + None + ) ) ), io.Concurrency(256, 1), @@ -173,7 +185,8 @@ class ConfigSpec extends Specification with CatsEffect { 500, 5242880, None - ) + ), + None ), io.Concurrency(256, 1), None, diff --git a/modules/nsq/src/main/scala/com/snowplowanalytics/snowplow/enrich/nsq/Main.scala b/modules/nsq/src/main/scala/com/snowplowanalytics/snowplow/enrich/nsq/Main.scala index 93fd912f9..a479c458e 100644 --- a/modules/nsq/src/main/scala/com/snowplowanalytics/snowplow/enrich/nsq/Main.scala +++ b/modules/nsq/src/main/scala/com/snowplowanalytics/snowplow/enrich/nsq/Main.scala @@ -58,6 +58,7 @@ object Main extends IOApp { out => Sink.initAttributed(out), out => Sink.initAttributed(out), out => Sink.init(out), + out => Sink.initAttributed(out), checkpoint, createBlobStorageClient, _.data, diff --git a/modules/nsq/src/test/scala/com/snowplowanalytics/snowplow/enrich/nsq/ConfigSpec.scala b/modules/nsq/src/test/scala/com/snowplowanalytics/snowplow/enrich/nsq/ConfigSpec.scala index f0ef5d1ba..442e15205 100644 --- a/modules/nsq/src/test/scala/com/snowplowanalytics/snowplow/enrich/nsq/ConfigSpec.scala +++ b/modules/nsq/src/test/scala/com/snowplowanalytics/snowplow/enrich/nsq/ConfigSpec.scala @@ -81,6 +81,18 @@ class ConfigSpec extends Specification with CatsEffect { maxBackoff = 10.seconds, maxRetries = Some(10) ) + ), + Some( + io.Output.Nsq( + "incomplete", + "127.0.0.1", + 4150, + BackoffPolicy( + minBackoff = 100.milliseconds, + maxBackoff = 10.seconds, + maxRetries = Some(10) + ) + ) ) ), io.Concurrency(256, 3), @@ -187,7 +199,8 @@ class ConfigSpec extends Specification with CatsEffect { maxBackoff = 10.seconds, maxRetries = Some(10) ) - ) + ), + None ), io.Concurrency(256, 3), None, diff --git a/modules/pubsub/src/main/resources/application.conf b/modules/pubsub/src/main/resources/application.conf index bdb750caf..45dd83119 100644 --- a/modules/pubsub/src/main/resources/application.conf +++ b/modules/pubsub/src/main/resources/application.conf @@ -42,6 +42,18 @@ "productName": "Snowplow OSS" } } + + "incomplete": { + "type": "PubSub" + # we need all the fields to exist to have defaults + "topic": "" + "delayThreshold": 200 milliseconds + "maxBatchSize": 1000 + "maxBatchBytes": 8000000, + "gcpUserAgent": { + "productName": "Snowplow OSS" + } + } } "concurrency" : { diff --git a/modules/pubsub/src/main/scala/com/snowplowanalytics/snowplow/enrich/pubsub/Main.scala b/modules/pubsub/src/main/scala/com/snowplowanalytics/snowplow/enrich/pubsub/Main.scala index daee27114..cb0bf67fb 100644 --- a/modules/pubsub/src/main/scala/com/snowplowanalytics/snowplow/enrich/pubsub/Main.scala +++ b/modules/pubsub/src/main/scala/com/snowplowanalytics/snowplow/enrich/pubsub/Main.scala @@ -60,6 +60,7 @@ object Main extends IOApp { out => Sink.initAttributed(out), out => Sink.initAttributed(out), out => Sink.init(out), + out => Sink.initAttributed(out), checkpoint, _ => List(Resource.eval(GcsClient.mk[IO])), _.value, diff --git a/modules/pubsub/src/test/scala/com/snowplowanalytics/snowplow/enrich/pubsub/ConfigSpec.scala b/modules/pubsub/src/test/scala/com/snowplowanalytics/snowplow/enrich/pubsub/ConfigSpec.scala index 804348310..8765d91bc 100644 --- a/modules/pubsub/src/test/scala/com/snowplowanalytics/snowplow/enrich/pubsub/ConfigSpec.scala +++ b/modules/pubsub/src/test/scala/com/snowplowanalytics/snowplow/enrich/pubsub/ConfigSpec.scala @@ -69,6 +69,16 @@ class ConfigSpec extends Specification with CatsEffect { 1000, 8000000, io.GcpUserAgent("Snowplow OSS") + ), + Some( + io.Output.PubSub( + "projects/test-project/topics/incomplete", + None, + 200.milliseconds, + 1000, + 8000000, + io.GcpUserAgent("Snowplow OSS") + ) ) ), io.Concurrency(256, 3), @@ -155,7 +165,8 @@ class ConfigSpec extends Specification with CatsEffect { 1000, 8000000, io.GcpUserAgent("Snowplow OSS") - ) + ), + None ), io.Concurrency(256, 3), None,