Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Upgrade schema-ddl to 0.25.0-M1 #1361

Merged
merged 1 commit into from
Sep 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ private[parquet] object Codecs {
case FieldValue.DateValue(value) =>
as[Date](value)
case FieldValue.ArrayValue(values) =>
as[List[FieldValue]](values)
as[Vector[FieldValue]](values)
case FieldValue.StructValue(values) =>
values
.foldLeft[RowParquetRecord](RowParquetRecord()) { case (acc, NamedValue(name, value)) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,6 @@ private[parquet] object ParquetSchema {
val listElement = asParquetType(element).withRequired(elementNullability.required)
SchemaDef.list(listElement)
case Type.Struct(subFields) =>
SchemaDef.group(subFields.map(asParquetField): _*)
SchemaDef.group(subFields.map(asParquetField).toVector: _*)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -79,9 +79,11 @@ object NonAtomicFieldsProvider {
val schemasSorted = schemas.sorted

// Schemas need to be ordered by key to merge in correct order.
schemasSorted
schemasSorted.toList
// `types` are all of the same family, so it does not matter which element is passed to fieldFromSchema
.map(schemaWithKey => TypedField(fieldFromSchema(types.head)(schemaWithKey.schema), types.head, Set(schemaWithKey.schemaKey)))
.flatMap { schemaWithKey =>
fieldFromSchema(types.head)(schemaWithKey.schema).map(TypedField(_, types.head, Set(schemaWithKey.schemaKey)))
}
// Accumulating vector would contain base column as first element and broken migrations in others
.foldLeft(Vector.empty[TypedField])((endFields, typedField) =>
endFields.headOption match {
Expand Down Expand Up @@ -137,14 +139,14 @@ object NonAtomicFieldsProvider {
)
)

private def fieldFromSchema(`type`: WideRow.Type)(schema: Schema): Field = {
private def fieldFromSchema(`type`: WideRow.Type)(schema: Schema): Option[Field] = {
val fieldName = SnowplowEvent.transformSchema(`type`.snowplowEntity.toSdkProperty, `type`.schemaKey)

Field.normalize(`type`.snowplowEntity match {
(`type`.snowplowEntity match {
case LoaderMessage.SnowplowEntity.SelfDescribingEvent =>
Field.build(fieldName, schema, enforceValuePresence = false)
case LoaderMessage.SnowplowEntity.Context =>
Field.buildRepeated(fieldName, schema, enforceItemPresence = true, Type.Nullability.Nullable)
})
}).map(Field.normalize)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
{
"$schema": "http://iglucentral.com/schemas/com.snowplowanalytics.self-desc/schema/jsonschema/1-0-0#",
"self": {
"vendor": "com.snowplowanalytics.snowplow",
"name": "digit_schema",
"format": "jsonschema",
"version": "1-0-0"
},
"type": "object",
"properties": {
"1field": {"type": "string"}
},
"required": ["1field"]
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
package com.snowplowanalytics.snowplow.rdbloader.common

import cats.Id
import cats.data.NonEmptyVector
import com.snowplowanalytics.iglu.client.Client
import com.snowplowanalytics.iglu.client.resolver.registries.JavaNetRegistryLookup._
import com.snowplowanalytics.iglu.core.{SchemaKey, SchemaVer}
Expand All @@ -31,6 +32,35 @@ class ParquetFieldsProviderSpec extends Specification with Tables {
private val resolver = embeddedIgluClient.resolver

"Parquet non-atomic fields provider" should {
"prefix field with underscore if it starts with a number" >> {
"for contexts" in {
val ctx = WideRow.Type(
SchemaKey(vendor = "com.snowplowanalytics.snowplow", name = "digit_schema", format = "jsonschema", SchemaVer.Full(1, 0, 0)),
Context
)
val result = NonAtomicFieldsProvider.build(resolver, List(ctx)).value.right.get

result.value.size mustEqual 1
result.value.head.field mustEqual nullableArrayWithRequiredElement(
name = "contexts_com_snowplowanalytics_snowplow_digit_schema_1",
elementType = DdlTypes.digitSchema100
)
}
"for unstruct" in {
val ctx = WideRow.Type(
SchemaKey(vendor = "com.snowplowanalytics.snowplow", name = "digit_schema", format = "jsonschema", SchemaVer.Full(1, 0, 0)),
SelfDescribingEvent
)
val result = NonAtomicFieldsProvider.build(resolver, List(ctx)).value.right.get

result.value.size mustEqual 1
result.value.head.field mustEqual Field(
name = "unstruct_event_com_snowplowanalytics_snowplow_digit_schema_1",
fieldType = DdlTypes.digitSchema100,
nullability = Nullable
)
}
}
"produce only one field from latest type when versions are compatible" >> {
"for contexts" in {

Expand Down Expand Up @@ -218,17 +248,23 @@ object ParquetFieldsProviderSpec {

object DdlTypes {

val digitSchema100 = Type.Struct(
fields = NonEmptyVector.of(
Field("_1field", Type.String, Required).copy(accessors = Set("1field"))
)
)

val schema100 = Type.Struct(
fields = List(
fields = NonEmptyVector.of(
Field(
"a_field",
Type.Struct(
List(
NonEmptyVector.of(
Field("b_field", Type.String, Nullable),
Field(
"c_field",
Type.Struct(
List(
NonEmptyVector.one(
Field("d_field", Type.String, Nullable)
)
),
Expand All @@ -247,7 +283,7 @@ object ParquetFieldsProviderSpec {
"i_field",
Type.Array(
Type.Struct(
List(
NonEmptyVector.of(
Field("c_field", Type.Long, Nullable),
Field("d_field", Type.String, Nullable)
)
Expand All @@ -259,16 +295,16 @@ object ParquetFieldsProviderSpec {
)
)
val schema101 = Type.Struct(
fields = List(
fields = NonEmptyVector.of(
Field(
"a_field",
Type.Struct(
List(
NonEmptyVector.of(
Field("b_field", Type.String, Nullable),
Field(
"c_field",
Type.Struct(
List(
NonEmptyVector.of(
Field("d_field", Type.String, Nullable),
Field("e_field", Type.String, Nullable)
)
Expand All @@ -289,7 +325,7 @@ object ParquetFieldsProviderSpec {
"i_field",
Type.Array(
Type.Struct(
List(
NonEmptyVector.of(
Field("c_field", Type.Long, Nullable),
Field("d_field", Type.String, Nullable)
)
Expand All @@ -301,16 +337,16 @@ object ParquetFieldsProviderSpec {
)
)
val schema110 = Type.Struct(
fields = List(
fields = NonEmptyVector.of(
Field(
"a_field",
Type.Struct(
List(
NonEmptyVector.of(
Field("b_field", Type.String, Nullable),
Field(
"c_field",
Type.Struct(
List(
NonEmptyVector.of(
Field("d_field", Type.String, Nullable),
Field("e_field", Type.String, Nullable)
)
Expand All @@ -333,7 +369,7 @@ object ParquetFieldsProviderSpec {
"i_field",
Type.Array(
Type.Struct(
List(
NonEmptyVector.of(
Field("c_field", Type.Long, Nullable),
Field("d_field", Type.String, Nullable)
)
Expand All @@ -346,22 +382,22 @@ object ParquetFieldsProviderSpec {
)

val schema200 = Type.Struct(
fields = List(
fields = NonEmptyVector.of(
Field("a_field", Type.String, Required),
Field("e_field", Type.String, Required),
Field("f_field", Type.Long, Required)
)
)

val brokenSchema100 = Type.Struct(fields = List(Field("b_field", Type.Long, Nullable)))
val brokenSchema101 = Type.Struct(fields = List(Field("b_field", Type.String, Nullable)))
val brokenSchema100 = Type.Struct(fields = NonEmptyVector.of(Field("b_field", Type.Long, Nullable)))
val brokenSchema101 = Type.Struct(fields = NonEmptyVector.of(Field("b_field", Type.String, Nullable)))
val brokenSchema110 = Type.Struct(fields =
List(
NonEmptyVector.of(
Field("a_field", Type.Long, Nullable),
Field("b_field", Type.Long, Nullable)
)
)
val brokenSchema111 = Type.Struct(fields = List(Field("a_field", Type.String, Nullable)))
val brokenSchema111 = Type.Struct(fields = NonEmptyVector.of(Field("a_field", Type.String, Nullable)))

val context100 = getBrokenType(SchemaVer.Full(1, 0, 0), Context)
val context101 = getBrokenType(SchemaVer.Full(1, 0, 1), Context) // breaking
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ import com.snowplowanalytics.iglu.schemaddl.parquet.{Field, Type}
import com.snowplowanalytics.snowplow.rdbloader.common.transformation.parquet.fields.AllFields
import org.apache.spark.sql.types._

import scala.collection.JavaConverters._

object SparkSchema {

def build(allFields: AllFields): StructType =
Expand All @@ -37,7 +39,7 @@ object SparkSchema {
case Type.Decimal(precision, scale) => DecimalType(Type.DecimalPrecision.toInt(precision), scale)
case Type.Date => DateType
case Type.Timestamp => TimestampType
case Type.Struct(fields) => StructType(fields.map(asSparkField))
case Type.Struct(fields) => StructType(fields.map(asSparkField).toVector.asJava)
case Type.Array(element, elNullability) => ArrayType(fieldType(element), elNullability.nullable)
case Type.Json => StringType // Spark does not support the `Json` parquet logical type.
}
Expand Down
6 changes: 3 additions & 3 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,13 @@ object Dependencies {
val monocle = "2.0.3"
val catsRetry = "3.1.0"
val log4cats = "2.5.0"
val http4s = "0.23.17"
val http4sBlaze = "0.23.14" // this dep fell out of sync with http4s-core versioning - 0.23.14 is the last 0.X release.
val http4s = "0.23.18"
val http4sBlaze = "0.23.16" // this dep fell out of sync with http4s-core versioning - 0.23.16 is the last 0.X release.
val scalaTracker = "2.0.0"

val spark = "3.3.1"
val eventsManifest = "0.4.0"
val schemaDdl = "0.22.1"
val schemaDdl = "0.25.0-M1"
val jacksonModule = "2.17.2" // Override incompatible version in spark runtime
val jacksonDatabind = "2.17.2"
val jacksonMapper = "1.9.14-atlassian-6" // Fix CVE
Expand Down
Loading