diff --git a/config/transformer/aws/transformer.batch.config.reference.hocon b/config/transformer/aws/transformer.batch.config.reference.hocon index f15a31f96..b866a6a36 100644 --- a/config/transformer/aws/transformer.batch.config.reference.hocon +++ b/config/transformer/aws/transformer.batch.config.reference.hocon @@ -166,6 +166,10 @@ # When enabled, event's atomic fields are truncated (based on the length limits from the atomic JSON schema) before transformation. # Optional, default "false". "truncateAtomicFields": false + + # Use old directory structure, i.e. vendor/name/format/model , for transformed events or not + # This should be enabled during upgrade from older versions of the loader + "legacyPartitioning": false } # Observability and reporting options diff --git a/config/transformer/aws/transformer.kinesis.config.reference.hocon b/config/transformer/aws/transformer.kinesis.config.reference.hocon index ba1043a91..09a36a62c 100644 --- a/config/transformer/aws/transformer.kinesis.config.reference.hocon +++ b/config/transformer/aws/transformer.kinesis.config.reference.hocon @@ -234,6 +234,10 @@ # When enabled, event's atomic fields are truncated (based on the length limits from the atomic JSON schema) before transformation. # Optional, default "false". - "truncateAtomicFields": false + "truncateAtomicFields": false + + # Use old directory structure, i.e. vendor/name/format/model , for transformed events or not + # This should be enabled during upgrade from older versions of the loader + "legacyPartitioning": false } } diff --git a/config/transformer/azure/transformer.kafka.config.reference.hocon b/config/transformer/azure/transformer.kafka.config.reference.hocon index 96a92f4e6..9fcb16786 100644 --- a/config/transformer/azure/transformer.kafka.config.reference.hocon +++ b/config/transformer/azure/transformer.kafka.config.reference.hocon @@ -150,5 +150,9 @@ # When enabled, event's atomic fields are truncated (based on the length limits from the atomic JSON schema) before transformation. # Optional, default "false". "truncateAtomicFields": false + + # Use old directory structure, i.e. vendor/name/format/model , for transformed events or not + # This should be enabled during upgrade from older versions of the loader + "legacyPartitioning": false } } diff --git a/config/transformer/gcp/transformer.pubsub.config.reference.hocon b/config/transformer/gcp/transformer.pubsub.config.reference.hocon index 3819e532f..106493f71 100644 --- a/config/transformer/gcp/transformer.pubsub.config.reference.hocon +++ b/config/transformer/gcp/transformer.pubsub.config.reference.hocon @@ -143,5 +143,9 @@ # When enabled, event's atomic fields are truncated (based on the length limits from the atomic JSON schema) before transformation. # Optional, default "false". "truncateAtomicFields": false + + # Use old directory structure, i.e. vendor/name/format/model , for transformed events or not + # This should be enabled during upgrade from older versions of the loader + "legacyPartitioning": false } } diff --git a/modules/common-transformer-stream/src/main/resources/application.conf b/modules/common-transformer-stream/src/main/resources/application.conf index 15a70dae7..c8a474e20 100644 --- a/modules/common-transformer-stream/src/main/resources/application.conf +++ b/modules/common-transformer-stream/src/main/resources/application.conf @@ -24,7 +24,8 @@ "featureFlags": { "legacyMessageFormat": false, "enableMaxRecordsPerFile": true, - "truncateAtomicFields": false + "truncateAtomicFields": false, + "legacyPartitioning": false } "monitoring": { diff --git a/modules/common-transformer-stream/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/stream/common/Processing.scala b/modules/common-transformer-stream/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/stream/common/Processing.scala index fb705a3a9..79f2f6f9f 100644 --- a/modules/common-transformer-stream/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/stream/common/Processing.scala +++ b/modules/common-transformer-stream/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/stream/common/Processing.scala @@ -100,7 +100,7 @@ object Processing { source .through(transform(transformer, config.validations, processor)) .through(incrementMetrics(resources.metrics)) - .through(handleTransformResult(transformer)) + .through(handleTransformResult(transformer, config.featureFlags.legacyPartitioning)) .through(windowing) val sink: Pipe[F, Record[Window, List[(SinkPath, Transformed.Data)], State[C]], Unit] = @@ -226,14 +226,15 @@ object Processing { * to where it should sink. Processes in batches for efficiency. */ def handleTransformResult[F[_], C: Checkpointer[F, *]]( - transformer: Transformer[F] + transformer: Transformer[F], + legacyPartitioning: Boolean ): Pipe[F, TransformationResults[C], SerializationResults[C]] = _.map { case (items, checkpointer) => val state = State.fromEvents(items).withCheckpointer(checkpointer) val mapped = items.flatMap( _.fold( - bad => transformer.badTransform(bad).split :: Nil, - success => success.output.map(_.split) + bad => transformer.badTransform(bad).split(legacyPartitioning) :: Nil, + success => success.output.map(_.split(legacyPartitioning)) ) ) (mapped, state) @@ -255,11 +256,15 @@ object Processing { } implicit class TransformedOps(t: Transformed) { - def getPath: SinkPath = t match { + def getPath(legacyPartitioning: Boolean): SinkPath = t match { case p: Transformed.Shredded => - val suffix = Some( - s"vendor=${p.vendor}/name=${p.name}/format=${p.format.path.toLowerCase}/model=${p.model}/revision=${p.revision}/addition=${p.addition}" - ) + val suffix = + if (legacyPartitioning) + Some(s"vendor=${p.vendor}/name=${p.name}/format=${p.format.path.toLowerCase}/model=${p.model}/") + else + Some( + s"vendor=${p.vendor}/name=${p.name}/format=${p.format.path.toLowerCase}/model=${p.model}/revision=${p.revision}/addition=${p.addition}" + ) val pathType = if (p.isGood) SinkPath.PathType.Good else SinkPath.PathType.Bad SinkPath(suffix, pathType) case p: Transformed.WideRow => @@ -269,6 +274,6 @@ object Processing { case _: Transformed.Parquet => SinkPath(None, SinkPath.PathType.Good) } - def split: (SinkPath, Transformed.Data) = (getPath, t.data) + def split(legacyPartitioning: Boolean): (SinkPath, Transformed.Data) = (getPath(legacyPartitioning), t.data) } } diff --git a/modules/common-transformer-stream/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/stream/common/processing/ShredTsvProcessingSpec.scala b/modules/common-transformer-stream/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/stream/common/processing/ShredTsvProcessingSpec.scala index 066fcd358..4dea51c22 100644 --- a/modules/common-transformer-stream/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/stream/common/processing/ShredTsvProcessingSpec.scala +++ b/modules/common-transformer-stream/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/stream/common/processing/ShredTsvProcessingSpec.scala @@ -24,7 +24,7 @@ class ShredTsvProcessingSpec extends BaseProcessingSpec { inputEventsPath = "/processing-spec/1/input/events" ) - val config = TransformerConfig(appConfig(outputDirectory), igluConfig) + val config = TransformerConfig(appConfig(outputDirectory, false), igluConfig) for { output <- process(inputStream, config) @@ -84,7 +84,7 @@ class ShredTsvProcessingSpec extends BaseProcessingSpec { inputEventsPath = "/processing-spec/3/input/events" ) - val config = TransformerConfig(appConfig(outputDirectory), igluConfig) + val config = TransformerConfig(appConfig(outputDirectory, false), igluConfig) for { output <- process(inputStream, config) @@ -114,11 +114,71 @@ class ShredTsvProcessingSpec extends BaseProcessingSpec { } .unsafeRunSync() } + + "respect legacyPartitioning flag" in { + temporaryDirectory + .use { outputDirectory => + val inputStream = InputEventsProvider.eventStream( + inputEventsPath = "/processing-spec/1/input/events" + ) + + val config = TransformerConfig(appConfig(outputDirectory, true), igluConfig) + + for { + output <- process(inputStream, config) + actualAtomicRows <- + readStringRowsFrom( + Path( + outputDirectory.toString + + s"/run=1970-01-01-10-30-00-${AppId.appId}/output=good/vendor=com.snowplowanalytics.snowplow/name=atomic/format=tsv/model=1" + ) + ) + actualOptimizelyRows <- + readStringRowsFrom( + Path( + outputDirectory.toString + + s"/run=1970-01-01-10-30-00-${AppId.appId}/output=good/vendor=com.optimizely/name=state/format=tsv/model=1" + ) + ) + actualConsentRows <- + readStringRowsFrom( + Path( + outputDirectory.toString + + s"/run=1970-01-01-10-30-00-${AppId.appId}/output=good/vendor=com.snowplowanalytics.snowplow/name=consent_document/format=tsv/model=1" + ) + ) + actualBadRows <- + readStringRowsFrom( + Path( + outputDirectory.toString + + s"/run=1970-01-01-10-30-00-${AppId.appId}/output=bad/vendor=com.snowplowanalytics.snowplow.badrows/name=loader_parsing_error/format=json/model=2" + ) + ) + + expectedCompletionMessage <- readMessageFromResource("/processing-spec/1/output/good/tsv/completion.json", outputDirectory) + expectedAtomicRows <- readLinesFromResource("/processing-spec/1/output/good/tsv/com.snowplowanalytics.snowplow-atomic") + expectedOptimizelyRows <- readLinesFromResource("/processing-spec/1/output/good/tsv/com.optimizely-state") + expectedConsentRows <- + readLinesFromResource("/processing-spec/1/output/good/tsv/com.snowplowanalytics.snowplow-consent_document") + expectedBadRows <- readLinesFromResource("/processing-spec/1/output/bad") + } yield { + removeAppId(output.completionMessages.toList) must beEqualTo(Vector(expectedCompletionMessage)) + output.checkpointed must beEqualTo(1) + + assertStringRows(removeAppId(actualAtomicRows), expectedAtomicRows) + assertStringRows(removeAppId(actualOptimizelyRows), expectedOptimizelyRows) + assertStringRows(removeAppId(actualConsentRows), expectedConsentRows) + + assertStringRows(removeAppId(actualBadRows), expectedBadRows) + } + } + .unsafeRunSync() + } } } object ShredTsvProcessingSpec { - private val appConfig = (outputPath: Path) => s"""|{ + private val appConfig = (outputPath: Path, legacyPartitioning: Boolean) => s"""|{ | "input": { | "type": "pubsub" | "subscription": "projects/project-id/subscriptions/subscription-id" @@ -137,6 +197,9 @@ object ShredTsvProcessingSpec { | "region": "eu-central-1" | } | "windowing": "1 minute" + | "featureFlags": { + | "legacyPartitioning": $legacyPartitioning + | } | "formats": { | "transformationType": "shred" | "default": "TSV" diff --git a/modules/common-transformer-stream/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/stream/common/sinks/TransformingSpec.scala b/modules/common-transformer-stream/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/stream/common/sinks/TransformingSpec.scala index 8f3e158ea..0d1fb1ee6 100644 --- a/modules/common-transformer-stream/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/stream/common/sinks/TransformingSpec.scala +++ b/modules/common-transformer-stream/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/stream/common/sinks/TransformingSpec.scala @@ -42,11 +42,11 @@ class TransformingSpec extends Specification { val testFileNameMap = List( Transformed.Shredded .Tabular("com.snowplowanalytics.snowplow", "atomic", 1, 0, 0, dummyTransformedData) - .getPath -> "com.snowplowanalytics.snowplow-atomic", + .getPath(false) -> "com.snowplowanalytics.snowplow-atomic", Transformed.Shredded .Tabular("com.snowplowanalytics.snowplow", "consent_document", 1, 0, 0, dummyTransformedData) - .getPath -> "com.snowplowanalytics.snowplow-consent_document", - Transformed.Shredded.Tabular("com.optimizely", "state", 1, 0, 0, dummyTransformedData).getPath -> "com.optimizely-state" + .getPath(false) -> "com.snowplowanalytics.snowplow-consent_document", + Transformed.Shredded.Tabular("com.optimizely", "state", 1, 0, 0, dummyTransformedData).getPath(false) -> "com.optimizely-state" ).toMap val expectedTransformedMap = @@ -149,7 +149,7 @@ object TransformingSpec { val eventStream = parsedEventStream(resourcePath) .through(Processing.transform(transformer, validations, TestProcessor)) - .through(Processing.handleTransformResult(transformer)) + .through(Processing.handleTransformResult(transformer, legacyPartitioning = false)) val transformed = eventStream.compile.toList.unsafeRunSync().flatMap(_._1) (transformed.flatMap(_.getGood), transformed.flatMap(_.getBad)) diff --git a/modules/common/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/common/config/TransformerConfig.scala b/modules/common/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/common/config/TransformerConfig.scala index 1ea246bcb..a1488e5c3 100644 --- a/modules/common/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/common/config/TransformerConfig.scala +++ b/modules/common/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/common/config/TransformerConfig.scala @@ -144,7 +144,8 @@ object TransformerConfig { legacyMessageFormat: Boolean, sparkCacheEnabled: Option[Boolean], enableMaxRecordsPerFile: Boolean, - truncateAtomicFields: Boolean + truncateAtomicFields: Boolean, + legacyPartitioning: Boolean ) object FeatureFlags {