Skip to content

Commit

Permalink
Streaming Transformer: Support old output file hierarchy (close #1314)
Browse files Browse the repository at this point in the history
  • Loading branch information
oguzhanunlu committed Oct 31, 2023
1 parent 79868f8 commit 5a44c52
Show file tree
Hide file tree
Showing 9 changed files with 105 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,10 @@
# When enabled, event's atomic fields are truncated (based on the length limits from the atomic JSON schema) before transformation.
# Optional, default "false".
"truncateAtomicFields": false

# Use old directory structure, i.e. vendor/name/format/model , for transformed events or not
# This should be enabled during upgrade from older versions of the loader
"legacyPartitioning": false
}

# Observability and reporting options
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,10 @@

# When enabled, event's atomic fields are truncated (based on the length limits from the atomic JSON schema) before transformation.
# Optional, default "false".
"truncateAtomicFields": false
"truncateAtomicFields": false

# Use old directory structure, i.e. vendor/name/format/model , for transformed events or not
# This should be enabled during upgrade from older versions of the loader
"legacyPartitioning": false
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -150,5 +150,9 @@
# When enabled, event's atomic fields are truncated (based on the length limits from the atomic JSON schema) before transformation.
# Optional, default "false".
"truncateAtomicFields": false

# Use old directory structure, i.e. vendor/name/format/model , for transformed events or not
# This should be enabled during upgrade from older versions of the loader
"legacyPartitioning": false
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -143,5 +143,9 @@
# When enabled, event's atomic fields are truncated (based on the length limits from the atomic JSON schema) before transformation.
# Optional, default "false".
"truncateAtomicFields": false

# Use old directory structure, i.e. vendor/name/format/model , for transformed events or not
# This should be enabled during upgrade from older versions of the loader
"legacyPartitioning": false
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@
"featureFlags": {
"legacyMessageFormat": false,
"enableMaxRecordsPerFile": true,
"truncateAtomicFields": false
"truncateAtomicFields": false,
"legacyPartitioning": false
}

"monitoring": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ object Processing {
source
.through(transform(transformer, config.validations, processor))
.through(incrementMetrics(resources.metrics))
.through(handleTransformResult(transformer))
.through(handleTransformResult(transformer, config.featureFlags.legacyPartitioning))
.through(windowing)

val sink: Pipe[F, Record[Window, List[(SinkPath, Transformed.Data)], State[C]], Unit] =
Expand Down Expand Up @@ -226,14 +226,15 @@ object Processing {
* to where it should sink. Processes in batches for efficiency.
*/
def handleTransformResult[F[_], C: Checkpointer[F, *]](
transformer: Transformer[F]
transformer: Transformer[F],
legacyPartitioning: Boolean
): Pipe[F, TransformationResults[C], SerializationResults[C]] =
_.map { case (items, checkpointer) =>
val state = State.fromEvents(items).withCheckpointer(checkpointer)
val mapped = items.flatMap(
_.fold(
bad => transformer.badTransform(bad).split :: Nil,
success => success.output.map(_.split)
bad => transformer.badTransform(bad).split(legacyPartitioning) :: Nil,
success => success.output.map(_.split(legacyPartitioning))
)
)
(mapped, state)
Expand All @@ -255,11 +256,15 @@ object Processing {
}

implicit class TransformedOps(t: Transformed) {
def getPath: SinkPath = t match {
def getPath(legacyPartitioning: Boolean): SinkPath = t match {
case p: Transformed.Shredded =>
val suffix = Some(
s"vendor=${p.vendor}/name=${p.name}/format=${p.format.path.toLowerCase}/model=${p.model}/revision=${p.revision}/addition=${p.addition}"
)
val suffix =
if (legacyPartitioning)
Some(s"vendor=${p.vendor}/name=${p.name}/format=${p.format.path.toLowerCase}/model=${p.model}/")
else
Some(
s"vendor=${p.vendor}/name=${p.name}/format=${p.format.path.toLowerCase}/model=${p.model}/revision=${p.revision}/addition=${p.addition}"
)
val pathType = if (p.isGood) SinkPath.PathType.Good else SinkPath.PathType.Bad
SinkPath(suffix, pathType)
case p: Transformed.WideRow =>
Expand All @@ -269,6 +274,6 @@ object Processing {
case _: Transformed.Parquet =>
SinkPath(None, SinkPath.PathType.Good)
}
def split: (SinkPath, Transformed.Data) = (getPath, t.data)
def split(legacyPartitioning: Boolean): (SinkPath, Transformed.Data) = (getPath(legacyPartitioning), t.data)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ class ShredTsvProcessingSpec extends BaseProcessingSpec {
inputEventsPath = "/processing-spec/1/input/events"
)

val config = TransformerConfig(appConfig(outputDirectory), igluConfig)
val config = TransformerConfig(appConfig(outputDirectory, false), igluConfig)

for {
output <- process(inputStream, config)
Expand Down Expand Up @@ -84,7 +84,7 @@ class ShredTsvProcessingSpec extends BaseProcessingSpec {
inputEventsPath = "/processing-spec/3/input/events"
)

val config = TransformerConfig(appConfig(outputDirectory), igluConfig)
val config = TransformerConfig(appConfig(outputDirectory, false), igluConfig)

for {
output <- process(inputStream, config)
Expand Down Expand Up @@ -114,11 +114,71 @@ class ShredTsvProcessingSpec extends BaseProcessingSpec {
}
.unsafeRunSync()
}

"respect legacyPartitioning flag" in {
temporaryDirectory
.use { outputDirectory =>
val inputStream = InputEventsProvider.eventStream(
inputEventsPath = "/processing-spec/1/input/events"
)

val config = TransformerConfig(appConfig(outputDirectory, true), igluConfig)

for {
output <- process(inputStream, config)
actualAtomicRows <-
readStringRowsFrom(
Path(
outputDirectory.toString +
s"/run=1970-01-01-10-30-00-${AppId.appId}/output=good/vendor=com.snowplowanalytics.snowplow/name=atomic/format=tsv/model=1"
)
)
actualOptimizelyRows <-
readStringRowsFrom(
Path(
outputDirectory.toString +
s"/run=1970-01-01-10-30-00-${AppId.appId}/output=good/vendor=com.optimizely/name=state/format=tsv/model=1"
)
)
actualConsentRows <-
readStringRowsFrom(
Path(
outputDirectory.toString +
s"/run=1970-01-01-10-30-00-${AppId.appId}/output=good/vendor=com.snowplowanalytics.snowplow/name=consent_document/format=tsv/model=1"
)
)
actualBadRows <-
readStringRowsFrom(
Path(
outputDirectory.toString +
s"/run=1970-01-01-10-30-00-${AppId.appId}/output=bad/vendor=com.snowplowanalytics.snowplow.badrows/name=loader_parsing_error/format=json/model=2"
)
)

expectedCompletionMessage <- readMessageFromResource("/processing-spec/1/output/good/tsv/completion.json", outputDirectory)
expectedAtomicRows <- readLinesFromResource("/processing-spec/1/output/good/tsv/com.snowplowanalytics.snowplow-atomic")
expectedOptimizelyRows <- readLinesFromResource("/processing-spec/1/output/good/tsv/com.optimizely-state")
expectedConsentRows <-
readLinesFromResource("/processing-spec/1/output/good/tsv/com.snowplowanalytics.snowplow-consent_document")
expectedBadRows <- readLinesFromResource("/processing-spec/1/output/bad")
} yield {
removeAppId(output.completionMessages.toList) must beEqualTo(Vector(expectedCompletionMessage))
output.checkpointed must beEqualTo(1)

assertStringRows(removeAppId(actualAtomicRows), expectedAtomicRows)
assertStringRows(removeAppId(actualOptimizelyRows), expectedOptimizelyRows)
assertStringRows(removeAppId(actualConsentRows), expectedConsentRows)

assertStringRows(removeAppId(actualBadRows), expectedBadRows)
}
}
.unsafeRunSync()
}
}
}

object ShredTsvProcessingSpec {
private val appConfig = (outputPath: Path) => s"""|{
private val appConfig = (outputPath: Path, legacyPartitioning: Boolean) => s"""|{
| "input": {
| "type": "pubsub"
| "subscription": "projects/project-id/subscriptions/subscription-id"
Expand All @@ -137,6 +197,9 @@ object ShredTsvProcessingSpec {
| "region": "eu-central-1"
| }
| "windowing": "1 minute"
| "featureFlags": {
| "legacyPartitioning": $legacyPartitioning
| }
| "formats": {
| "transformationType": "shred"
| "default": "TSV"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,11 @@ class TransformingSpec extends Specification {
val testFileNameMap = List(
Transformed.Shredded
.Tabular("com.snowplowanalytics.snowplow", "atomic", 1, 0, 0, dummyTransformedData)
.getPath -> "com.snowplowanalytics.snowplow-atomic",
.getPath(false) -> "com.snowplowanalytics.snowplow-atomic",
Transformed.Shredded
.Tabular("com.snowplowanalytics.snowplow", "consent_document", 1, 0, 0, dummyTransformedData)
.getPath -> "com.snowplowanalytics.snowplow-consent_document",
Transformed.Shredded.Tabular("com.optimizely", "state", 1, 0, 0, dummyTransformedData).getPath -> "com.optimizely-state"
.getPath(false) -> "com.snowplowanalytics.snowplow-consent_document",
Transformed.Shredded.Tabular("com.optimizely", "state", 1, 0, 0, dummyTransformedData).getPath(false) -> "com.optimizely-state"
).toMap

val expectedTransformedMap =
Expand Down Expand Up @@ -149,7 +149,7 @@ object TransformingSpec {

val eventStream = parsedEventStream(resourcePath)
.through(Processing.transform(transformer, validations, TestProcessor))
.through(Processing.handleTransformResult(transformer))
.through(Processing.handleTransformResult(transformer, legacyPartitioning = false))

val transformed = eventStream.compile.toList.unsafeRunSync().flatMap(_._1)
(transformed.flatMap(_.getGood), transformed.flatMap(_.getBad))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,8 @@ object TransformerConfig {
legacyMessageFormat: Boolean,
sparkCacheEnabled: Option[Boolean],
enableMaxRecordsPerFile: Boolean,
truncateAtomicFields: Boolean
truncateAtomicFields: Boolean,
legacyPartitioning: Boolean
)

object FeatureFlags {
Expand Down

0 comments on commit 5a44c52

Please sign in to comment.