Skip to content

Commit

Permalink
Address review feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
benjben committed Aug 20, 2024
1 parent 9a09d56 commit 651fa94
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,8 @@
*/
package com.snowplowanalytics.snowplow.loaders.transform

import cats.data.{EitherT, NonEmptyVector}
import cats.data.{EitherT, NonEmptyList}
import cats.effect.Sync
import cats.implicits._
import com.snowplowanalytics.iglu.client.{ClientError, Resolver}
import com.snowplowanalytics.iglu.client.resolver.registries.RegistryLookup
import com.snowplowanalytics.iglu.client.resolver.Resolver.SchemaResolutionError
Expand All @@ -20,11 +19,11 @@ import com.snowplowanalytics.snowplow.badrows.FailureDetails

private[transform] object SchemaProvider {

// Note schema order of the returned list is not guaranteed
// Note schema order of the returned list is guaranteed
def fetchSchemasWithSameModel[F[_]: Sync: RegistryLookup](
resolver: Resolver[F],
schemaKey: SchemaKey
): EitherT[F, FailureDetails.LoaderIgluError, NonEmptyVector[SelfDescribingSchema[Schema]]] =
): EitherT[F, FailureDetails.LoaderIgluError, NonEmptyList[SelfDescribingSchema[Schema]]] =
for {
jsons <- EitherT(resolver.lookupSchemasUntil(schemaKey))
.leftMap { case SchemaResolutionError(schemaKey, error) => resolverBadRow(schemaKey)(error) }
Expand All @@ -33,7 +32,7 @@ private[transform] object SchemaProvider {
.fromOption[F](Schema.parse(json.schema), parseSchemaBadRow(json.self.schemaKey))
.map(schema => SelfDescribingSchema(SchemaMap(json.self.schemaKey), schema))
}
} yield NonEmptyVector(schemas.head, schemas.tail.toVector)
} yield schemas

private def resolverBadRow(schemaKey: SchemaKey)(e: ClientError.ResolutionError): FailureDetails.LoaderIgluError =
FailureDetails.LoaderIgluError.IgluError(schemaKey, e)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
*/
package com.snowplowanalytics.snowplow.loaders.transform

import cats.data.NonEmptyVector
import cats.data.{NonEmptyList, NonEmptyVector}
import cats.implicits._
import io.circe.syntax._
import com.snowplowanalytics.iglu.core.{SchemaKey, SelfDescribingSchema}
Expand Down Expand Up @@ -51,23 +51,19 @@ object TypedTabledEntity {
* @param subVersions
* Sub-versions (e.g. '*-0-0') that were present in the batch of events.
* @param schemas
* Iglu schemas pre-fetched from Iglu Server
* Iglu schemas pre-fetched from Iglu Server ordered by key
*/
private[transform] def build(
tabledEntity: TabledEntity,
subVersions: Set[SchemaSubVersion],
schemas: NonEmptyVector[SelfDescribingSchema[Schema]]
schemas: NonEmptyList[SelfDescribingSchema[Schema]]
): Option[TypedTabledEntity] =
// Schemas need to be ordered by key to merge in correct order.
schemas.sorted.toVector
.flatMap { case sds =>
fieldFromSchema(tabledEntity, sds.schema).map((_, sds))
}
.toNev
.map { nev =>
val (rootField, rootSchema) = nev.head
schemas
.traverse(sds => fieldFromSchema(tabledEntity, sds.schema).map((_, sds)))
.map { nel =>
val (rootField, rootSchema) = nel.head
val tte = TypedTabledEntity(tabledEntity, rootField, Set(keyToSubVersion(rootSchema.self.schemaKey)), Nil)
nev.tail
nel.tail
.foldLeft(tte) { case (columnGroup, (field, selfDescribingSchema)) =>
val schemaKey = selfDescribingSchema.self.schemaKey
val subversion = keyToSubVersion(schemaKey)
Expand Down
2 changes: 1 addition & 1 deletion project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ object Dependencies {
// Snowplow
val schemaDdl = "0.23.0"
val badrows = "2.3.0"
val igluClient = "3.2.0-M4"
val igluClient = "3.2.0-M6"
val tracker = "2.0.0"
val analyticsSdk = "3.2.1"

Expand Down

0 comments on commit 651fa94

Please sign in to comment.