diff --git a/modules/common-transformer-stream/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/stream/common/parquet/Codecs.scala b/modules/common-transformer-stream/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/stream/common/parquet/Codecs.scala index 294818b42..0cc594fc8 100644 --- a/modules/common-transformer-stream/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/stream/common/parquet/Codecs.scala +++ b/modules/common-transformer-stream/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/stream/common/parquet/Codecs.scala @@ -50,7 +50,7 @@ private[parquet] object Codecs { case FieldValue.DateValue(value) => as[Date](value) case FieldValue.ArrayValue(values) => - as[List[FieldValue]](values) + as[Vector[FieldValue]](values) case FieldValue.StructValue(values) => values .foldLeft[RowParquetRecord](RowParquetRecord()) { case (acc, NamedValue(name, value)) => diff --git a/modules/common-transformer-stream/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/stream/common/parquet/ParquetSchema.scala b/modules/common-transformer-stream/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/stream/common/parquet/ParquetSchema.scala index 61cfd630c..6042f8308 100644 --- a/modules/common-transformer-stream/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/stream/common/parquet/ParquetSchema.scala +++ b/modules/common-transformer-stream/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/stream/common/parquet/ParquetSchema.scala @@ -70,6 +70,6 @@ private[parquet] object ParquetSchema { val listElement = asParquetType(element).withRequired(elementNullability.required) SchemaDef.list(listElement) case Type.Struct(subFields) => - SchemaDef.group(subFields.map(asParquetField): _*) + SchemaDef.group(subFields.map(asParquetField).toVector: _*) } } diff --git a/modules/common/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/common/transformation/parquet/NonAtomicFieldsProvider.scala b/modules/common/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/common/transformation/parquet/NonAtomicFieldsProvider.scala index 7ea22fb6e..a80b09fdd 100644 --- a/modules/common/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/common/transformation/parquet/NonAtomicFieldsProvider.scala +++ b/modules/common/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/common/transformation/parquet/NonAtomicFieldsProvider.scala @@ -79,9 +79,11 @@ object NonAtomicFieldsProvider { val schemasSorted = schemas.sorted // Schemas need to be ordered by key to merge in correct order. - schemasSorted + schemasSorted.toList // `types` are all of the same family, so it does not matter which element is passed to fieldFromSchema - .map(schemaWithKey => TypedField(fieldFromSchema(types.head)(schemaWithKey.schema), types.head, Set(schemaWithKey.schemaKey))) + .flatMap { schemaWithKey => + fieldFromSchema(types.head)(schemaWithKey.schema).map(TypedField(_, types.head, Set(schemaWithKey.schemaKey))) + } // Accumulating vector would contain base column as first element and broken migrations in others .foldLeft(Vector.empty[TypedField])((endFields, typedField) => endFields.headOption match { @@ -137,14 +139,14 @@ object NonAtomicFieldsProvider { ) ) - private def fieldFromSchema(`type`: WideRow.Type)(schema: Schema): Field = { + private def fieldFromSchema(`type`: WideRow.Type)(schema: Schema): Option[Field] = { val fieldName = SnowplowEvent.transformSchema(`type`.snowplowEntity.toSdkProperty, `type`.schemaKey) - Field.normalize(`type`.snowplowEntity match { + (`type`.snowplowEntity match { case LoaderMessage.SnowplowEntity.SelfDescribingEvent => Field.build(fieldName, schema, enforceValuePresence = false) case LoaderMessage.SnowplowEntity.Context => Field.buildRepeated(fieldName, schema, enforceItemPresence = true, Type.Nullability.Nullable) - }) + }).map(Field.normalize) } } diff --git a/modules/common/src/test/resources/schemas/com.snowplowanalytics.snowplow/digit_schema/jsonschema/1-0-0 b/modules/common/src/test/resources/schemas/com.snowplowanalytics.snowplow/digit_schema/jsonschema/1-0-0 new file mode 100644 index 000000000..24d69ef52 --- /dev/null +++ b/modules/common/src/test/resources/schemas/com.snowplowanalytics.snowplow/digit_schema/jsonschema/1-0-0 @@ -0,0 +1,14 @@ +{ + "$schema": "http://iglucentral.com/schemas/com.snowplowanalytics.self-desc/schema/jsonschema/1-0-0#", + "self": { + "vendor": "com.snowplowanalytics.snowplow", + "name": "digit_schema", + "format": "jsonschema", + "version": "1-0-0" + }, + "type": "object", + "properties": { + "1field": {"type": "string"} + }, + "required": ["1field"] +} diff --git a/modules/common/src/test/scala/com.snowplowanalytics.snowplow.rdbloader.common/ParquetFieldsProviderSpec.scala b/modules/common/src/test/scala/com.snowplowanalytics.snowplow.rdbloader.common/ParquetFieldsProviderSpec.scala index 807590b13..73b7cbb20 100644 --- a/modules/common/src/test/scala/com.snowplowanalytics.snowplow.rdbloader.common/ParquetFieldsProviderSpec.scala +++ b/modules/common/src/test/scala/com.snowplowanalytics.snowplow.rdbloader.common/ParquetFieldsProviderSpec.scala @@ -11,6 +11,7 @@ package com.snowplowanalytics.snowplow.rdbloader.common import cats.Id +import cats.data.NonEmptyVector import com.snowplowanalytics.iglu.client.Client import com.snowplowanalytics.iglu.client.resolver.registries.JavaNetRegistryLookup._ import com.snowplowanalytics.iglu.core.{SchemaKey, SchemaVer} @@ -31,6 +32,35 @@ class ParquetFieldsProviderSpec extends Specification with Tables { private val resolver = embeddedIgluClient.resolver "Parquet non-atomic fields provider" should { + "prefix field with underscore if it starts with a number" >> { + "for contexts" in { + val ctx = WideRow.Type( + SchemaKey(vendor = "com.snowplowanalytics.snowplow", name = "digit_schema", format = "jsonschema", SchemaVer.Full(1, 0, 0)), + Context + ) + val result = NonAtomicFieldsProvider.build(resolver, List(ctx)).value.right.get + + result.value.size mustEqual 1 + result.value.head.field mustEqual nullableArrayWithRequiredElement( + name = "contexts_com_snowplowanalytics_snowplow_digit_schema_1", + elementType = DdlTypes.digitSchema100 + ) + } + "for unstruct" in { + val ctx = WideRow.Type( + SchemaKey(vendor = "com.snowplowanalytics.snowplow", name = "digit_schema", format = "jsonschema", SchemaVer.Full(1, 0, 0)), + SelfDescribingEvent + ) + val result = NonAtomicFieldsProvider.build(resolver, List(ctx)).value.right.get + + result.value.size mustEqual 1 + result.value.head.field mustEqual Field( + name = "unstruct_event_com_snowplowanalytics_snowplow_digit_schema_1", + fieldType = DdlTypes.digitSchema100, + nullability = Nullable + ) + } + } "produce only one field from latest type when versions are compatible" >> { "for contexts" in { @@ -218,17 +248,23 @@ object ParquetFieldsProviderSpec { object DdlTypes { + val digitSchema100 = Type.Struct( + fields = NonEmptyVector.of( + Field("_1field", Type.String, Required).copy(accessors = Set("1field")) + ) + ) + val schema100 = Type.Struct( - fields = List( + fields = NonEmptyVector.of( Field( "a_field", Type.Struct( - List( + NonEmptyVector.of( Field("b_field", Type.String, Nullable), Field( "c_field", Type.Struct( - List( + NonEmptyVector.one( Field("d_field", Type.String, Nullable) ) ), @@ -247,7 +283,7 @@ object ParquetFieldsProviderSpec { "i_field", Type.Array( Type.Struct( - List( + NonEmptyVector.of( Field("c_field", Type.Long, Nullable), Field("d_field", Type.String, Nullable) ) @@ -259,16 +295,16 @@ object ParquetFieldsProviderSpec { ) ) val schema101 = Type.Struct( - fields = List( + fields = NonEmptyVector.of( Field( "a_field", Type.Struct( - List( + NonEmptyVector.of( Field("b_field", Type.String, Nullable), Field( "c_field", Type.Struct( - List( + NonEmptyVector.of( Field("d_field", Type.String, Nullable), Field("e_field", Type.String, Nullable) ) @@ -289,7 +325,7 @@ object ParquetFieldsProviderSpec { "i_field", Type.Array( Type.Struct( - List( + NonEmptyVector.of( Field("c_field", Type.Long, Nullable), Field("d_field", Type.String, Nullable) ) @@ -301,16 +337,16 @@ object ParquetFieldsProviderSpec { ) ) val schema110 = Type.Struct( - fields = List( + fields = NonEmptyVector.of( Field( "a_field", Type.Struct( - List( + NonEmptyVector.of( Field("b_field", Type.String, Nullable), Field( "c_field", Type.Struct( - List( + NonEmptyVector.of( Field("d_field", Type.String, Nullable), Field("e_field", Type.String, Nullable) ) @@ -333,7 +369,7 @@ object ParquetFieldsProviderSpec { "i_field", Type.Array( Type.Struct( - List( + NonEmptyVector.of( Field("c_field", Type.Long, Nullable), Field("d_field", Type.String, Nullable) ) @@ -346,22 +382,22 @@ object ParquetFieldsProviderSpec { ) val schema200 = Type.Struct( - fields = List( + fields = NonEmptyVector.of( Field("a_field", Type.String, Required), Field("e_field", Type.String, Required), Field("f_field", Type.Long, Required) ) ) - val brokenSchema100 = Type.Struct(fields = List(Field("b_field", Type.Long, Nullable))) - val brokenSchema101 = Type.Struct(fields = List(Field("b_field", Type.String, Nullable))) + val brokenSchema100 = Type.Struct(fields = NonEmptyVector.of(Field("b_field", Type.Long, Nullable))) + val brokenSchema101 = Type.Struct(fields = NonEmptyVector.of(Field("b_field", Type.String, Nullable))) val brokenSchema110 = Type.Struct(fields = - List( + NonEmptyVector.of( Field("a_field", Type.Long, Nullable), Field("b_field", Type.Long, Nullable) ) ) - val brokenSchema111 = Type.Struct(fields = List(Field("a_field", Type.String, Nullable))) + val brokenSchema111 = Type.Struct(fields = NonEmptyVector.of(Field("a_field", Type.String, Nullable))) val context100 = getBrokenType(SchemaVer.Full(1, 0, 0), Context) val context101 = getBrokenType(SchemaVer.Full(1, 0, 1), Context) // breaking diff --git a/modules/transformer-batch/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/batch/SparkSchema.scala b/modules/transformer-batch/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/batch/SparkSchema.scala index 497406463..bdd68a456 100644 --- a/modules/transformer-batch/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/batch/SparkSchema.scala +++ b/modules/transformer-batch/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/batch/SparkSchema.scala @@ -14,6 +14,8 @@ import com.snowplowanalytics.iglu.schemaddl.parquet.{Field, Type} import com.snowplowanalytics.snowplow.rdbloader.common.transformation.parquet.fields.AllFields import org.apache.spark.sql.types._ +import scala.collection.JavaConverters._ + object SparkSchema { def build(allFields: AllFields): StructType = @@ -37,7 +39,7 @@ object SparkSchema { case Type.Decimal(precision, scale) => DecimalType(Type.DecimalPrecision.toInt(precision), scale) case Type.Date => DateType case Type.Timestamp => TimestampType - case Type.Struct(fields) => StructType(fields.map(asSparkField)) + case Type.Struct(fields) => StructType(fields.map(asSparkField).toVector.asJava) case Type.Array(element, elNullability) => ArrayType(fieldType(element), elNullability.nullable) case Type.Json => StringType // Spark does not support the `Json` parquet logical type. } diff --git a/project/Dependencies.scala b/project/Dependencies.scala index d6fe2f248..4fa2e8385 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -39,13 +39,13 @@ object Dependencies { val monocle = "2.0.3" val catsRetry = "3.1.0" val log4cats = "2.5.0" - val http4s = "0.23.17" - val http4sBlaze = "0.23.14" // this dep fell out of sync with http4s-core versioning - 0.23.14 is the last 0.X release. + val http4s = "0.23.18" + val http4sBlaze = "0.23.16" // this dep fell out of sync with http4s-core versioning - 0.23.16 is the last 0.X release. val scalaTracker = "2.0.0" val spark = "3.3.1" val eventsManifest = "0.4.0" - val schemaDdl = "0.22.1" + val schemaDdl = "0.25.0-M1" val jacksonModule = "2.17.2" // Override incompatible version in spark runtime val jacksonDatabind = "2.17.2" val jacksonMapper = "1.9.14-atlassian-6" // Fix CVE