diff --git a/modules/loaders-common/src/main/scala/com/snowplowanalytics/snowplow/loaders/transform/SchemaProvider.scala b/modules/loaders-common/src/main/scala/com/snowplowanalytics/snowplow/loaders/transform/SchemaProvider.scala index e6e2e21..fae5ed0 100644 --- a/modules/loaders-common/src/main/scala/com/snowplowanalytics/snowplow/loaders/transform/SchemaProvider.scala +++ b/modules/loaders-common/src/main/scala/com/snowplowanalytics/snowplow/loaders/transform/SchemaProvider.scala @@ -7,9 +7,8 @@ */ package com.snowplowanalytics.snowplow.loaders.transform -import cats.data.{EitherT, NonEmptyVector} +import cats.data.{EitherT, NonEmptyList} import cats.effect.Sync -import cats.implicits._ import com.snowplowanalytics.iglu.client.{ClientError, Resolver} import com.snowplowanalytics.iglu.client.resolver.registries.RegistryLookup import com.snowplowanalytics.iglu.client.resolver.Resolver.SchemaResolutionError @@ -20,11 +19,11 @@ import com.snowplowanalytics.snowplow.badrows.FailureDetails private[transform] object SchemaProvider { - // Note schema order of the returned list is not guaranteed + // Note schema order of the returned list is guaranteed def fetchSchemasWithSameModel[F[_]: Sync: RegistryLookup]( resolver: Resolver[F], schemaKey: SchemaKey - ): EitherT[F, FailureDetails.LoaderIgluError, NonEmptyVector[SelfDescribingSchema[Schema]]] = + ): EitherT[F, FailureDetails.LoaderIgluError, NonEmptyList[SelfDescribingSchema[Schema]]] = for { jsons <- EitherT(resolver.lookupSchemasUntil(schemaKey)) .leftMap { case SchemaResolutionError(schemaKey, error) => resolverBadRow(schemaKey)(error) } @@ -33,7 +32,7 @@ private[transform] object SchemaProvider { .fromOption[F](Schema.parse(json.schema), parseSchemaBadRow(json.self.schemaKey)) .map(schema => SelfDescribingSchema(SchemaMap(json.self.schemaKey), schema)) } - } yield NonEmptyVector(schemas.head, schemas.tail.toVector) + } yield schemas private def resolverBadRow(schemaKey: SchemaKey)(e: ClientError.ResolutionError): FailureDetails.LoaderIgluError = FailureDetails.LoaderIgluError.IgluError(schemaKey, e) diff --git a/modules/loaders-common/src/main/scala/com/snowplowanalytics/snowplow/loaders/transform/TypedTabledEntity.scala b/modules/loaders-common/src/main/scala/com/snowplowanalytics/snowplow/loaders/transform/TypedTabledEntity.scala index a2cd9bc..4f2c043 100644 --- a/modules/loaders-common/src/main/scala/com/snowplowanalytics/snowplow/loaders/transform/TypedTabledEntity.scala +++ b/modules/loaders-common/src/main/scala/com/snowplowanalytics/snowplow/loaders/transform/TypedTabledEntity.scala @@ -7,7 +7,7 @@ */ package com.snowplowanalytics.snowplow.loaders.transform -import cats.data.NonEmptyVector +import cats.data.{NonEmptyList, NonEmptyVector} import cats.implicits._ import io.circe.syntax._ import com.snowplowanalytics.iglu.core.{SchemaKey, SelfDescribingSchema} @@ -51,23 +51,19 @@ object TypedTabledEntity { * @param subVersions * Sub-versions (e.g. '*-0-0') that were present in the batch of events. * @param schemas - * Iglu schemas pre-fetched from Iglu Server + * Iglu schemas pre-fetched from Iglu Server ordered by key */ private[transform] def build( tabledEntity: TabledEntity, subVersions: Set[SchemaSubVersion], - schemas: NonEmptyVector[SelfDescribingSchema[Schema]] + schemas: NonEmptyList[SelfDescribingSchema[Schema]] ): Option[TypedTabledEntity] = - // Schemas need to be ordered by key to merge in correct order. - schemas.sorted.toVector - .flatMap { case sds => - fieldFromSchema(tabledEntity, sds.schema).map((_, sds)) - } - .toNev - .map { nev => - val (rootField, rootSchema) = nev.head + schemas + .traverse(sds => fieldFromSchema(tabledEntity, sds.schema).map((_, sds))) + .map { nel => + val (rootField, rootSchema) = nel.head val tte = TypedTabledEntity(tabledEntity, rootField, Set(keyToSubVersion(rootSchema.self.schemaKey)), Nil) - nev.tail + nel.tail .foldLeft(tte) { case (columnGroup, (field, selfDescribingSchema)) => val schemaKey = selfDescribingSchema.self.schemaKey val subversion = keyToSubVersion(schemaKey) diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 7123a6f..784ec27 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -40,7 +40,7 @@ object Dependencies { // Snowplow val schemaDdl = "0.23.0" val badrows = "2.3.0" - val igluClient = "3.2.0-M4" + val igluClient = "3.2.0-M6" val tracker = "2.0.0" val analyticsSdk = "3.2.1"