From 12e084468afb237a9bb36edf7e6c586987f3e1e9 Mon Sep 17 00:00:00 2001 From: Oguzhan Unlu Date: Tue, 31 Oct 2023 16:49:32 +0300 Subject: [PATCH] Transformer: Support old output file hierarchy (close #1314) --- .../transformer.batch.config.reference.hocon | 4 + ...transformer.kinesis.config.reference.hocon | 6 +- .../transformer.kafka.config.reference.hocon | 4 + .../transformer.pubsub.config.reference.hocon | 4 + .../src/main/resources/application.conf | 3 +- .../stream/common/Processing.scala | 23 +++-- .../processing/ShredTsvProcessingSpec.scala | 69 ++++++++++++++- .../common/sinks/TransformingSpec.scala | 8 +- .../common/config/TransformerConfig.scala | 3 +- .../src/main/resources/application.conf | 3 +- .../transformer/batch/ShredJob.scala | 3 +- .../transformer/batch/Transformer.scala | 16 +++- .../transformer/batch/spark/Sink.scala | 19 +++- .../transformer/batch/ConfigSpec.scala | 2 +- .../transformer/batch/ShredJobSpec.scala | 16 ++-- .../good/tabular/LegacyPartitioningSpec.scala | 87 +++++++++++++++++++ 16 files changed, 240 insertions(+), 30 deletions(-) create mode 100644 modules/transformer-batch/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/batch/good/tabular/LegacyPartitioningSpec.scala diff --git a/config/transformer/aws/transformer.batch.config.reference.hocon b/config/transformer/aws/transformer.batch.config.reference.hocon index f15a31f96..b866a6a36 100644 --- a/config/transformer/aws/transformer.batch.config.reference.hocon +++ b/config/transformer/aws/transformer.batch.config.reference.hocon @@ -166,6 +166,10 @@ # When enabled, event's atomic fields are truncated (based on the length limits from the atomic JSON schema) before transformation. # Optional, default "false". "truncateAtomicFields": false + + # Use old directory structure, i.e. vendor/name/format/model , for transformed events or not + # This should be enabled during upgrade from older versions of the loader + "legacyPartitioning": false } # Observability and reporting options diff --git a/config/transformer/aws/transformer.kinesis.config.reference.hocon b/config/transformer/aws/transformer.kinesis.config.reference.hocon index ba1043a91..09a36a62c 100644 --- a/config/transformer/aws/transformer.kinesis.config.reference.hocon +++ b/config/transformer/aws/transformer.kinesis.config.reference.hocon @@ -234,6 +234,10 @@ # When enabled, event's atomic fields are truncated (based on the length limits from the atomic JSON schema) before transformation. # Optional, default "false". - "truncateAtomicFields": false + "truncateAtomicFields": false + + # Use old directory structure, i.e. vendor/name/format/model , for transformed events or not + # This should be enabled during upgrade from older versions of the loader + "legacyPartitioning": false } } diff --git a/config/transformer/azure/transformer.kafka.config.reference.hocon b/config/transformer/azure/transformer.kafka.config.reference.hocon index 96a92f4e6..9fcb16786 100644 --- a/config/transformer/azure/transformer.kafka.config.reference.hocon +++ b/config/transformer/azure/transformer.kafka.config.reference.hocon @@ -150,5 +150,9 @@ # When enabled, event's atomic fields are truncated (based on the length limits from the atomic JSON schema) before transformation. # Optional, default "false". "truncateAtomicFields": false + + # Use old directory structure, i.e. vendor/name/format/model , for transformed events or not + # This should be enabled during upgrade from older versions of the loader + "legacyPartitioning": false } } diff --git a/config/transformer/gcp/transformer.pubsub.config.reference.hocon b/config/transformer/gcp/transformer.pubsub.config.reference.hocon index 3819e532f..106493f71 100644 --- a/config/transformer/gcp/transformer.pubsub.config.reference.hocon +++ b/config/transformer/gcp/transformer.pubsub.config.reference.hocon @@ -143,5 +143,9 @@ # When enabled, event's atomic fields are truncated (based on the length limits from the atomic JSON schema) before transformation. # Optional, default "false". "truncateAtomicFields": false + + # Use old directory structure, i.e. vendor/name/format/model , for transformed events or not + # This should be enabled during upgrade from older versions of the loader + "legacyPartitioning": false } } diff --git a/modules/common-transformer-stream/src/main/resources/application.conf b/modules/common-transformer-stream/src/main/resources/application.conf index 15a70dae7..c8a474e20 100644 --- a/modules/common-transformer-stream/src/main/resources/application.conf +++ b/modules/common-transformer-stream/src/main/resources/application.conf @@ -24,7 +24,8 @@ "featureFlags": { "legacyMessageFormat": false, "enableMaxRecordsPerFile": true, - "truncateAtomicFields": false + "truncateAtomicFields": false, + "legacyPartitioning": false } "monitoring": { diff --git a/modules/common-transformer-stream/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/stream/common/Processing.scala b/modules/common-transformer-stream/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/stream/common/Processing.scala index fb705a3a9..79f2f6f9f 100644 --- a/modules/common-transformer-stream/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/stream/common/Processing.scala +++ b/modules/common-transformer-stream/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/stream/common/Processing.scala @@ -100,7 +100,7 @@ object Processing { source .through(transform(transformer, config.validations, processor)) .through(incrementMetrics(resources.metrics)) - .through(handleTransformResult(transformer)) + .through(handleTransformResult(transformer, config.featureFlags.legacyPartitioning)) .through(windowing) val sink: Pipe[F, Record[Window, List[(SinkPath, Transformed.Data)], State[C]], Unit] = @@ -226,14 +226,15 @@ object Processing { * to where it should sink. Processes in batches for efficiency. */ def handleTransformResult[F[_], C: Checkpointer[F, *]]( - transformer: Transformer[F] + transformer: Transformer[F], + legacyPartitioning: Boolean ): Pipe[F, TransformationResults[C], SerializationResults[C]] = _.map { case (items, checkpointer) => val state = State.fromEvents(items).withCheckpointer(checkpointer) val mapped = items.flatMap( _.fold( - bad => transformer.badTransform(bad).split :: Nil, - success => success.output.map(_.split) + bad => transformer.badTransform(bad).split(legacyPartitioning) :: Nil, + success => success.output.map(_.split(legacyPartitioning)) ) ) (mapped, state) @@ -255,11 +256,15 @@ object Processing { } implicit class TransformedOps(t: Transformed) { - def getPath: SinkPath = t match { + def getPath(legacyPartitioning: Boolean): SinkPath = t match { case p: Transformed.Shredded => - val suffix = Some( - s"vendor=${p.vendor}/name=${p.name}/format=${p.format.path.toLowerCase}/model=${p.model}/revision=${p.revision}/addition=${p.addition}" - ) + val suffix = + if (legacyPartitioning) + Some(s"vendor=${p.vendor}/name=${p.name}/format=${p.format.path.toLowerCase}/model=${p.model}/") + else + Some( + s"vendor=${p.vendor}/name=${p.name}/format=${p.format.path.toLowerCase}/model=${p.model}/revision=${p.revision}/addition=${p.addition}" + ) val pathType = if (p.isGood) SinkPath.PathType.Good else SinkPath.PathType.Bad SinkPath(suffix, pathType) case p: Transformed.WideRow => @@ -269,6 +274,6 @@ object Processing { case _: Transformed.Parquet => SinkPath(None, SinkPath.PathType.Good) } - def split: (SinkPath, Transformed.Data) = (getPath, t.data) + def split(legacyPartitioning: Boolean): (SinkPath, Transformed.Data) = (getPath(legacyPartitioning), t.data) } } diff --git a/modules/common-transformer-stream/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/stream/common/processing/ShredTsvProcessingSpec.scala b/modules/common-transformer-stream/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/stream/common/processing/ShredTsvProcessingSpec.scala index 066fcd358..4dea51c22 100644 --- a/modules/common-transformer-stream/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/stream/common/processing/ShredTsvProcessingSpec.scala +++ b/modules/common-transformer-stream/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/stream/common/processing/ShredTsvProcessingSpec.scala @@ -24,7 +24,7 @@ class ShredTsvProcessingSpec extends BaseProcessingSpec { inputEventsPath = "/processing-spec/1/input/events" ) - val config = TransformerConfig(appConfig(outputDirectory), igluConfig) + val config = TransformerConfig(appConfig(outputDirectory, false), igluConfig) for { output <- process(inputStream, config) @@ -84,7 +84,7 @@ class ShredTsvProcessingSpec extends BaseProcessingSpec { inputEventsPath = "/processing-spec/3/input/events" ) - val config = TransformerConfig(appConfig(outputDirectory), igluConfig) + val config = TransformerConfig(appConfig(outputDirectory, false), igluConfig) for { output <- process(inputStream, config) @@ -114,11 +114,71 @@ class ShredTsvProcessingSpec extends BaseProcessingSpec { } .unsafeRunSync() } + + "respect legacyPartitioning flag" in { + temporaryDirectory + .use { outputDirectory => + val inputStream = InputEventsProvider.eventStream( + inputEventsPath = "/processing-spec/1/input/events" + ) + + val config = TransformerConfig(appConfig(outputDirectory, true), igluConfig) + + for { + output <- process(inputStream, config) + actualAtomicRows <- + readStringRowsFrom( + Path( + outputDirectory.toString + + s"/run=1970-01-01-10-30-00-${AppId.appId}/output=good/vendor=com.snowplowanalytics.snowplow/name=atomic/format=tsv/model=1" + ) + ) + actualOptimizelyRows <- + readStringRowsFrom( + Path( + outputDirectory.toString + + s"/run=1970-01-01-10-30-00-${AppId.appId}/output=good/vendor=com.optimizely/name=state/format=tsv/model=1" + ) + ) + actualConsentRows <- + readStringRowsFrom( + Path( + outputDirectory.toString + + s"/run=1970-01-01-10-30-00-${AppId.appId}/output=good/vendor=com.snowplowanalytics.snowplow/name=consent_document/format=tsv/model=1" + ) + ) + actualBadRows <- + readStringRowsFrom( + Path( + outputDirectory.toString + + s"/run=1970-01-01-10-30-00-${AppId.appId}/output=bad/vendor=com.snowplowanalytics.snowplow.badrows/name=loader_parsing_error/format=json/model=2" + ) + ) + + expectedCompletionMessage <- readMessageFromResource("/processing-spec/1/output/good/tsv/completion.json", outputDirectory) + expectedAtomicRows <- readLinesFromResource("/processing-spec/1/output/good/tsv/com.snowplowanalytics.snowplow-atomic") + expectedOptimizelyRows <- readLinesFromResource("/processing-spec/1/output/good/tsv/com.optimizely-state") + expectedConsentRows <- + readLinesFromResource("/processing-spec/1/output/good/tsv/com.snowplowanalytics.snowplow-consent_document") + expectedBadRows <- readLinesFromResource("/processing-spec/1/output/bad") + } yield { + removeAppId(output.completionMessages.toList) must beEqualTo(Vector(expectedCompletionMessage)) + output.checkpointed must beEqualTo(1) + + assertStringRows(removeAppId(actualAtomicRows), expectedAtomicRows) + assertStringRows(removeAppId(actualOptimizelyRows), expectedOptimizelyRows) + assertStringRows(removeAppId(actualConsentRows), expectedConsentRows) + + assertStringRows(removeAppId(actualBadRows), expectedBadRows) + } + } + .unsafeRunSync() + } } } object ShredTsvProcessingSpec { - private val appConfig = (outputPath: Path) => s"""|{ + private val appConfig = (outputPath: Path, legacyPartitioning: Boolean) => s"""|{ | "input": { | "type": "pubsub" | "subscription": "projects/project-id/subscriptions/subscription-id" @@ -137,6 +197,9 @@ object ShredTsvProcessingSpec { | "region": "eu-central-1" | } | "windowing": "1 minute" + | "featureFlags": { + | "legacyPartitioning": $legacyPartitioning + | } | "formats": { | "transformationType": "shred" | "default": "TSV" diff --git a/modules/common-transformer-stream/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/stream/common/sinks/TransformingSpec.scala b/modules/common-transformer-stream/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/stream/common/sinks/TransformingSpec.scala index 8f3e158ea..0d1fb1ee6 100644 --- a/modules/common-transformer-stream/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/stream/common/sinks/TransformingSpec.scala +++ b/modules/common-transformer-stream/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/stream/common/sinks/TransformingSpec.scala @@ -42,11 +42,11 @@ class TransformingSpec extends Specification { val testFileNameMap = List( Transformed.Shredded .Tabular("com.snowplowanalytics.snowplow", "atomic", 1, 0, 0, dummyTransformedData) - .getPath -> "com.snowplowanalytics.snowplow-atomic", + .getPath(false) -> "com.snowplowanalytics.snowplow-atomic", Transformed.Shredded .Tabular("com.snowplowanalytics.snowplow", "consent_document", 1, 0, 0, dummyTransformedData) - .getPath -> "com.snowplowanalytics.snowplow-consent_document", - Transformed.Shredded.Tabular("com.optimizely", "state", 1, 0, 0, dummyTransformedData).getPath -> "com.optimizely-state" + .getPath(false) -> "com.snowplowanalytics.snowplow-consent_document", + Transformed.Shredded.Tabular("com.optimizely", "state", 1, 0, 0, dummyTransformedData).getPath(false) -> "com.optimizely-state" ).toMap val expectedTransformedMap = @@ -149,7 +149,7 @@ object TransformingSpec { val eventStream = parsedEventStream(resourcePath) .through(Processing.transform(transformer, validations, TestProcessor)) - .through(Processing.handleTransformResult(transformer)) + .through(Processing.handleTransformResult(transformer, legacyPartitioning = false)) val transformed = eventStream.compile.toList.unsafeRunSync().flatMap(_._1) (transformed.flatMap(_.getGood), transformed.flatMap(_.getBad)) diff --git a/modules/common/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/common/config/TransformerConfig.scala b/modules/common/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/common/config/TransformerConfig.scala index 1ea246bcb..a1488e5c3 100644 --- a/modules/common/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/common/config/TransformerConfig.scala +++ b/modules/common/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/common/config/TransformerConfig.scala @@ -144,7 +144,8 @@ object TransformerConfig { legacyMessageFormat: Boolean, sparkCacheEnabled: Option[Boolean], enableMaxRecordsPerFile: Boolean, - truncateAtomicFields: Boolean + truncateAtomicFields: Boolean, + legacyPartitioning: Boolean ) object FeatureFlags { diff --git a/modules/transformer-batch/src/main/resources/application.conf b/modules/transformer-batch/src/main/resources/application.conf index 6017ce5ef..8d515bd85 100644 --- a/modules/transformer-batch/src/main/resources/application.conf +++ b/modules/transformer-batch/src/main/resources/application.conf @@ -41,6 +41,7 @@ "featureFlags": { "legacyMessageFormat": false, "enableMaxRecordsPerFile": false, - "truncateAtomicFields": false + "truncateAtomicFields": false, + "legacyPartitioning": false } } diff --git a/modules/transformer-batch/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/batch/ShredJob.scala b/modules/transformer-batch/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/batch/ShredJob.scala index 192a76bb3..5c82e5dfa 100644 --- a/modules/transformer-batch/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/batch/ShredJob.scala +++ b/modules/transformer-batch/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/batch/ShredJob.scala @@ -294,7 +294,8 @@ object ShredJob { unshredded.foreach { folder => System.out.println(s"Batch Transformer: processing $folder") val transformer = config.formats match { - case f: TransformerConfig.Formats.Shred => Transformer.ShredTransformer(resolverConfig, f, maxRecordsPerFile = 0) + case f: TransformerConfig.Formats.Shred => + Transformer.ShredTransformer(resolverConfig, f, maxRecordsPerFile = 0, config.featureFlags.legacyPartitioning) case TransformerConfig.Formats.WideRow.JSON => Transformer.WideRowJsonTransformer() case TransformerConfig.Formats.WideRow.PARQUET => val resolver = IgluSingleton.get(resolverConfig) diff --git a/modules/transformer-batch/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/batch/Transformer.scala b/modules/transformer-batch/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/batch/Transformer.scala index 150d15443..bf9689a18 100644 --- a/modules/transformer-batch/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/batch/Transformer.scala +++ b/modules/transformer-batch/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/batch/Transformer.scala @@ -65,7 +65,8 @@ object Transformer { case class ShredTransformer( resolverConfig: ResolverConfig, formats: Formats.Shred, - maxRecordsPerFile: Long + maxRecordsPerFile: Long, + legacyPartitioning: Boolean ) extends Transformer[TypesInfo.Shredded.Type] { val typesAccumulator = new TypesAccumulator[TypesInfo.Shredded.Type] val timestampsAccumulator: TimestampsAccumulator = new TimestampsAccumulator @@ -119,7 +120,10 @@ object Transformer { outFolder: Folder, maxRecordsPerFile: Long ): Unit = - Sink.writeShredded(spark, compression, transformed.flatMap(_.shredded), outFolder) + if (legacyPartitioning) + Sink.legacyWriteShredded(spark, compression, transformed.flatMap(_.legacyShredded), outFolder) + else + Sink.writeShredded(spark, compression, transformed.flatMap(_.shredded), outFolder) def register(sc: SparkContext): Unit = { sc.register(typesAccumulator) @@ -215,6 +219,7 @@ object Transformer { type WideRowTuple = (String, String) type ShreddedTuple = (String, String, String, String, Int, Int, Int, String) + type LegacyShreddedTuple = (String, String, String, String, Int, String) private implicit class TransformedOps(t: Transformed) { def wideRow: Option[WideRowTuple] = t match { @@ -231,6 +236,13 @@ object Transformer { case _ => None } + def legacyShredded: Option[LegacyShreddedTuple] = t match { + case p: Transformed.Shredded => + val outputType = if (p.isGood) "good" else "bad" + (outputType, p.vendor, p.name, p.format.path, p.model, p.data.value).some + case _ => None + } + def parquet: Option[List[Any]] = t match { case p: Transformed.Parquet => p.data.value.map(_.value).map(extractFieldValue).some case _ => None diff --git a/modules/transformer-batch/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/batch/spark/Sink.scala b/modules/transformer-batch/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/batch/spark/Sink.scala index fbb16da29..10cdc87b6 100644 --- a/modules/transformer-batch/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/batch/spark/Sink.scala +++ b/modules/transformer-batch/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/batch/spark/Sink.scala @@ -11,13 +11,30 @@ import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrameWriter, Row, SaveMode, SparkSession} import org.apache.spark.sql.types.StructType import com.snowplowanalytics.snowplow.rdbloader.common.config.TransformerConfig.Compression +import com.snowplowanalytics.snowplow.rdbloader.transformer.batch.Transformer.{LegacyShreddedTuple, ShreddedTuple} object Sink { + def legacyWriteShredded( + spark: SparkSession, + compression: Compression, + data: RDD[LegacyShreddedTuple], + outFolder: String + ): Unit = { + import spark.implicits._ + data + .toDF("output", "vendor", "name", "format", "model", "data") + .write + .withCompression(compression) + .partitionBy("output", "vendor", "name", "format", "model") + .mode(SaveMode.Append) + .text(outFolder) + } + def writeShredded( spark: SparkSession, compression: Compression, - data: RDD[(String, String, String, String, Int, Int, Int, String)], + data: RDD[ShreddedTuple], outFolder: String ): Unit = { import spark.implicits._ diff --git a/modules/transformer-batch/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/batch/ConfigSpec.scala b/modules/transformer-batch/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/batch/ConfigSpec.scala index f0530f6e7..6380cae56 100644 --- a/modules/transformer-batch/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/batch/ConfigSpec.scala +++ b/modules/transformer-batch/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/batch/ConfigSpec.scala @@ -155,7 +155,7 @@ object TransformerConfigSpec { Some(Duration.create("14 days").asInstanceOf[FiniteDuration]), Some(Config.RunInterval.IntervalInstant(Instant.parse("2021-12-10T18:34:52.00Z"))) ) - val exampleDefaultFeatureFlags = TransformerConfig.FeatureFlags(false, None, false, false) + val exampleDefaultFeatureFlags = TransformerConfig.FeatureFlags(false, None, false, false, false) val exampleValidations = Validations(Some(Instant.parse("2021-11-18T11:00:00.00Z"))) val emptyValidations = Validations(None) val exampleSkipSchemas = List( diff --git a/modules/transformer-batch/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/batch/ShredJobSpec.scala b/modules/transformer-batch/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/batch/ShredJobSpec.scala index b5005eee1..5dc1ee97f 100644 --- a/modules/transformer-batch/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/batch/ShredJobSpec.scala +++ b/modules/transformer-batch/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/batch/ShredJobSpec.scala @@ -62,6 +62,7 @@ object ShredJobSpec { val Version = BuildInfo.version val AtomicFolder = "vendor=com.snowplowanalytics.snowplow/name=atomic/format=tsv/model=1/revision=0/addition=0" + val LegacyAtomicFolder = "vendor=com.snowplowanalytics.snowplow/name=atomic/format=tsv/model=1" sealed trait Events @@ -256,7 +257,8 @@ object ShredJobSpec { shredder: Config, tsv: Boolean, jsonSchemas: List[SchemaCriterion], - wideRow: Option[WideRow] + wideRow: Option[WideRow], + legacyPartitioning: Boolean ) = { val encoder = Base64.getUrlEncoder val format = if (tsv) "TSV" else "JSON" @@ -314,6 +316,9 @@ object ShredJobSpec { |"validations": { | "minimumTimestamp": "0000-01-02T00:00:00.00Z" |} + |"featureFlags": { + | "legacyPartitioning": $legacyPartitioning + |} |"skipSchemas": [$skipSchemas] |"monitoring": {"snowplow": null, "sentry": null} |}""".stripMargin @@ -448,7 +453,7 @@ object ShredJobSpec { Config.Monitoring(None, Config.Monitoring.Metrics(None)), deduplication, Config.RunInterval(None, None, None), - TransformerConfig.FeatureFlags(false, None, false, false), + TransformerConfig.FeatureFlags(false, None, false, false, false), skipSchemas, TransformerConfig.Validations(None) ) @@ -476,14 +481,15 @@ trait ShredJobSpec extends SparkSpec { wideRow: Option[WideRow] = None, outputDirs: Option[OutputDirs] = None, deduplication: Config.Deduplication = Config.Deduplication(Config.Deduplication.Synthetic.Broadcast(1), true), - skipSchemas: List[SchemaCriterion] = Nil + skipSchemas: List[SchemaCriterion] = Nil, + legacyPartitioning: Boolean = false ): LoaderMessage.ShreddingComplete = { val shredder = getShredder(events, outputDirs.getOrElse(dirs), deduplication, skipSchemas) val config = Array( "--iglu-config", igluConfigWithLocal, "--config", - storageConfig(shredder, tsv, jsonSchemas, wideRow) + storageConfig(shredder, tsv, jsonSchemas, wideRow, legacyPartitioning) ) val dedupeConfigCli = if (crossBatchDedupe) { @@ -509,7 +515,7 @@ trait ShredJobSpec extends SparkSpec { None } val transformer = cli.config.formats match { - case f: TransformerConfig.Formats.Shred => Transformer.ShredTransformer(resolverConfig, f, 0) + case f: TransformerConfig.Formats.Shred => Transformer.ShredTransformer(resolverConfig, f, 0, legacyPartitioning) case TransformerConfig.Formats.WideRow.JSON => Transformer.WideRowJsonTransformer() case TransformerConfig.Formats.WideRow.PARQUET => val resolver = IgluSingleton.get(resolverConfig) diff --git a/modules/transformer-batch/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/batch/good/tabular/LegacyPartitioningSpec.scala b/modules/transformer-batch/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/batch/good/tabular/LegacyPartitioningSpec.scala new file mode 100644 index 000000000..f7d9086dc --- /dev/null +++ b/modules/transformer-batch/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/batch/good/tabular/LegacyPartitioningSpec.scala @@ -0,0 +1,87 @@ +/* + * Copyright (c) 2012-present Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Snowplow Community License Version 1.0, + * and you may not use this file except in compliance with the Snowplow Community License Version 1.0. + * You may obtain a copy of the Snowplow Community License Version 1.0 at https://docs.snowplow.io/community-license-1.0 + */ + +package com.snowplowanalytics.snowplow.rdbloader.transformer.batch.good.tabular + +import io.circe.literal._ + +import org.specs2.Specification +import com.snowplowanalytics.snowplow.rdbloader.transformer.batch.ShredJobSpec._ +import com.snowplowanalytics.snowplow.rdbloader.transformer.batch.ShredJobSpec + +class LegacyPartitioningSpec extends Specification with ShredJobSpec { + override def appName = "legacy-partition" + + sequential + "A job which is provided with an enabled legacy partition flag" should { + runShredJob(LegacyPartitioningSpec.lines, tsv = true, legacyPartitioning = true) + val expectedFiles = scala.collection.mutable.ArrayBuffer.empty[String] + + "transform the enriched event and store it in legacy atomic events folder" in { + val Some((lines, f)) = readPartFile(dirs.goodRows, LegacyAtomicFolder) + expectedFiles += f + lines mustEqual Seq(LegacyPartitioningSpec.expected.event) + } + "shred the page_context TSV into legacy path" in { + val Some((lines, f)) = readPartFile(dirs.goodRows, LegacyPartitioningSpec.expected.contextPath) + expectedFiles += f + lines mustEqual Seq(LegacyPartitioningSpec.expected.contextContents) + } + "shred the application_error TSV into its appropriate path" in { + val Some((lines, f)) = readPartFile(dirs.goodRows, LegacyPartitioningSpec.expected.eventPath) + expectedFiles += f + lines mustEqual Seq(TabularOutputSpec.expected.eventContents) + } + "not shred any unexpected data" in { + listFilesWithExclusions(dirs.goodRows, expectedFiles.toList) must beEmpty + } + "not write any bad rows" in { + dirs.badRows must beEmptyDir + } + } +} + +object LegacyPartitioningSpec { + val event = + json"""{ + "schema": "iglu:com.snowplowanalytics.snowplow/unstruct_event/jsonschema/1-0-0", + "data": { + "schema": "iglu:com.snowplowanalytics.snowplow/application_error/jsonschema/1-0-2", + "data": { + "programmingLanguage": "JAVASCRIPT", + "message": "undefined is not a function", + "threadName": null, + "threadId": 14, + "stackTrace": null, + "isFatal": true, + "className": "AbstractSingletonFactoryBean", + "causeStackTrace": "trace" + } + } + }""".noSpaces + + val lines = Lines( + s"""snowplowweb web 2014-06-01 14:04:11.639 2014-05-29 18:16:35.000 2014-05-29 18:04:11.639 unstruct 2b1b25a4-c0df-4859-8201-cf21492ad61b 836413 clojure js-2.0.0-M2 clj-0.6.0-tom-0.0.4 hadoop-0.5.0-common-0.4.0 216.207.42.134 3499345421 3b1d1a375044eede 3 2bad2a4e-aae4-4bea-8acd-399e7fe0366a US CA South San Francisco 37.654694 -122.4077 http://snowplowanalytics.com/blog/2013/02/08/writing-hive-udfs-and-serdes/ Writing Hive UDFs - a tutorial http snowplowanalytics.com 80 /blog/2013/02/08/writing-hive-udfs-and-serdes/ $event Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_3) AppleWebKit/537.75.14 (KHTML, like Gecko) Version/7.0.3 Safari/537.75.14 Safari Safari Browser WEBKIT en-us 0 0 0 0 0 0 0 0 0 1 24 1440 1845 Mac OS Mac OS Apple Inc. America/Los_Angeles Computer 0 1440 900 UTF-8 1440 6015 {"schema":"iglu:com.snowplowanalytics.snowplow/contexts/jsonschema/1-0-1","data":[{"schema":"iglu:org.schema/WebPage/jsonschema/1-0-0","data":{"datePublished":"2014-07-23T00:00:00Z","author":"Jonathan Almeida","inLanguage":"en-US","genre":"blog","breadcrumb":["blog","releases"],"keywords":["snowplow","analytics","java","jvm","tracker"]}}]} """ + ) + + object expected { + val contextPath = s"vendor=org.schema/name=WebPage/format=tsv/model=1" + + val contextContents = + "org.schema\tWebPage\tjsonschema\t1-0-0\t2b1b25a4-c0df-4859-8201-cf21492ad61b\t2014-05-29 18:16:35.000\tevents\t[\"events\",\"WebPage\"]\tevents\tJonathan Almeida\t[\"blog\",\"releases\"]\t\\N\t\\N\t2014-07-23T00:00:00Z\tblog\ten-US\t[\"snowplow\",\"analytics\",\"java\",\"jvm\",\"tracker\"]" + + val eventPath = s"vendor=com.snowplowanalytics.snowplow/name=application_error/format=tsv/model=1" + + val eventContents = + "com.snowplowanalytics.snowplow\tapplication_error\tjsonschema\t1-0-2\t2b1b25a4-c0df-4859-8201-cf21492ad61b\t2014-05-29 18:16:35.000\tevents\t[\"events\",\"application_error\"]\tevents\tundefined is not a function\tJAVASCRIPT\ttrace\tAbstractSingletonFactoryBean\t\\N\t\\N\t1\t\\N\t\\N\t\\N\t14\t\\N" + + // Removed three JSON columns and added 7 columns at the end + val event = + """snowplowweb web 2014-06-01 14:04:11.639 2014-05-29 18:16:35.000 2014-05-29 18:04:11.639 unstruct 2b1b25a4-c0df-4859-8201-cf21492ad61b 836413 clojure js-2.0.0-M2 clj-0.6.0-tom-0.0.4 hadoop-0.5.0-common-0.4.0 216.207.42.134 3499345421 3b1d1a375044eede 3 2bad2a4e-aae4-4bea-8acd-399e7fe0366a US CA South San Francisco 37.654694 -122.4077 http://snowplowanalytics.com/blog/2013/02/08/writing-hive-udfs-and-serdes/ Writing Hive UDFs - a tutorial http snowplowanalytics.com 80 /blog/2013/02/08/writing-hive-udfs-and-serdes/ Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_3) AppleWebKit/537.75.14 (KHTML, like Gecko) Version/7.0.3 Safari/537.75.14 Safari Safari Browser WEBKIT en-us 0 0 0 0 0 0 0 0 0 1 24 1440 1845 Mac OS Mac OS Apple Inc. America/Los_Angeles Computer 0 1440 900 UTF-8 1440 6015 """ + } +}