Skip to content

Commit

Permalink
Transformer Stream: don't use hard-coded window timestamps in the tests
Browse files Browse the repository at this point in the history
Some of the tests are failing since windows don't have expected timestamps.
In order to solve this problem, this commit reads window timestamps from
shredded message instead of using hard-coded values.
  • Loading branch information
spenes committed Sep 27, 2024
1 parent 74acfcf commit 31cab3e
Show file tree
Hide file tree
Showing 16 changed files with 128 additions and 111 deletions.
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"schema": "iglu:com.snowplowanalytics.snowplow.storage.rdbloader/shredding_complete/jsonschema/2-0-1",
"data": {
"base": "output_path_placeholder/run=1970-01-01-10-30-00/",
"base": "output_path_placeholder",
"typesInfo": {
"transformation": "SHREDDED",
"types": [
Expand Down Expand Up @@ -113,8 +113,8 @@
]
},
"timestamps": {
"jobStarted": "1970-01-01T10:30:00Z",
"jobCompleted": "1970-01-01T10:30:00Z",
"jobStarted": "job_started_placeholder",
"jobCompleted": "job_completed_placeholder",
"min": "2021-10-13T20:21:47.595072674Z",
"max": "2021-10-15T00:51:57.521746512Z"
},
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"schema": "iglu:com.snowplowanalytics.snowplow.storage.rdbloader/shredding_complete/jsonschema/2-0-1",
"data": {
"base": "output_path_placeholder/run=1970-01-01-10-30-00/",
"base": "output_path_placeholder",
"typesInfo": {
"transformation": "WIDEROW",
"fileFormat": "JSON",
Expand Down Expand Up @@ -93,8 +93,8 @@
]
},
"timestamps": {
"jobStarted": "1970-01-01T10:30:00Z",
"jobCompleted": "1970-01-01T10:30:00Z",
"jobStarted": "job_started_placeholder",
"jobCompleted": "job_completed_placeholder",
"min": "2021-10-13T20:21:47.595072674Z",
"max": "2021-10-15T00:51:57.521746512Z"
},
Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1 @@
{"schema":"iglu:com.snowplowanalytics.snowplow.badrows/loader_iglu_error/jsonschema/2-0-0","data":{"processor":{"artifact":"snowplow-transformer-kinesis","version":"version_placeholder"},"failure":[{"schemaCriterion":"iglu:org.schema/some_unknown_name/jsonschema/1-*-*","error":{"error":"ResolutionError","lookupHistory":[{"repository":"Iglu Client Embedded","errors":[{"error":"NotFound"}],"attempts":1,"lastAttempt":"1970-01-01T10:30:00Z"}]}}],"payload":{"app_id":"snowplowweb","platform":"web","etl_tstamp":"2014-06-01T14:04:11.639Z","collector_tstamp":"2014-05-29T18:16:35Z","dvce_created_tstamp":"2014-05-29T18:04:11.639Z","event":"page_view","event_id":"2b1b25a4-c0df-4859-8201-cf21492ad61b","txn_id":836413,"name_tracker":"clojure","v_tracker":"js-2.0.0-M2","v_collector":"clj-0.6.0-tom-0.0.4","v_etl":"hadoop-0.5.0-common-0.4.0","user_id":null,"user_ipaddress":"216.207.42.134","user_fingerprint":"3499345421","domain_userid":"3b1d1a375044eede","domain_sessionidx":3,"network_userid":"2bad2a4e-aae4-4bea-8acd-399e7fe0366a","geo_country":"US","geo_region":"CA","geo_city":"South San Francisco","geo_zipcode":null,"geo_latitude":37.654694,"geo_longitude":-122.4077,"geo_region_name":null,"ip_isp":null,"ip_organization":null,"ip_domain":null,"ip_netspeed":null,"page_url":"http://snowplowanalytics.com/blog/2013/02/08/writing-hive-udfs-and-serdes/","page_title":"Writing Hive UDFs - a tutorial","page_referrer":null,"page_urlscheme":"http","page_urlhost":"snowplowanalytics.com","page_urlport":80,"page_urlpath":"/blog/2013/02/08/writing-hive-udfs-and-serdes/","page_urlquery":null,"page_urlfragment":null,"refr_urlscheme":null,"refr_urlhost":null,"refr_urlport":null,"refr_urlpath":null,"refr_urlquery":null,"refr_urlfragment":null,"refr_medium":null,"refr_source":null,"refr_term":null,"mkt_medium":null,"mkt_source":null,"mkt_term":null,"mkt_content":null,"mkt_campaign":null,"contexts":{},"se_category":null,"se_action":null,"se_label":null,"se_property":null,"se_value":null,"unstruct_event":null,"tr_orderid":null,"tr_affiliation":null,"tr_total":null,"tr_tax":null,"tr_shipping":null,"tr_city":null,"tr_state":null,"tr_country":null,"ti_orderid":null,"ti_sku":null,"ti_name":null,"ti_category":null,"ti_price":null,"ti_quantity":null,"pp_xoffset_min":null,"pp_xoffset_max":null,"pp_yoffset_min":null,"pp_yoffset_max":null,"useragent":"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_3) AppleWebKit/537.75.14 (KHTML, like Gecko) Version/7.0.3 Safari/537.75.14","br_name":"Safari","br_family":"Safari","br_version":null,"br_type":"Browser","br_renderengine":"WEBKIT","br_lang":"en-us","br_features_pdf":false,"br_features_flash":false,"br_features_java":false,"br_features_director":false,"br_features_quicktime":false,"br_features_realplayer":false,"br_features_windowsmedia":false,"br_features_gears":false,"br_features_silverlight":false,"br_cookies":true,"br_colordepth":"24","br_viewwidth":1440,"br_viewheight":1845,"os_name":"Mac OS","os_family":"Mac OS","os_manufacturer":"Apple Inc.","os_timezone":"America/Los_Angeles","dvce_type":"Computer","dvce_ismobile":false,"dvce_screenwidth":1440,"dvce_screenheight":900,"doc_charset":"UTF-8","doc_width":1440,"doc_height":6015,"tr_currency":null,"tr_total_base":null,"tr_tax_base":null,"tr_shipping_base":null,"ti_currency":null,"ti_price_base":null,"base_currency":null,"geo_timezone":null,"mkt_clickid":null,"mkt_network":null,"etl_tags":null,"dvce_sent_tstamp":null,"refr_domain_userid":null,"refr_dvce_tstamp":null,"derived_contexts":{"schema":"iglu:com.snowplowanalytics.snowplow/contexts/jsonschema/1-0-0","data":[{"schema":"iglu:org.schema/some_unknown_name/jsonschema/1-0-0","data":{"datePublished":"2014-07-23T00:00:00Z","author":"Jonathan Almeida","inLanguage":"en-US","genre":"blog","breadcrumb":["blog","releases"],"keywords":["snowplow","analytics","java","jvm","tracker"]}}]},"domain_sessionid":null,"derived_tstamp":null,"event_vendor":null,"event_name":null,"event_format":null,"event_version":null,"event_fingerprint":null,"true_tstamp":null}}}
{"schema":"iglu:com.snowplowanalytics.snowplow.badrows/loader_iglu_error/jsonschema/2-0-0","data":{"processor":{"artifact":"snowplow-transformer-kinesis","version":"version_placeholder"},"failure":[{"schemaCriterion":"iglu:org.schema/some_unknown_name/jsonschema/1-*-*","error":{"error":"ResolutionError","lookupHistory":[{"repository":"Iglu Client Embedded","errors":[{"error":"NotFound"}],"attempts":1,"lastAttempt":""}]}}],"payload":{"app_id":"snowplowweb","platform":"web","etl_tstamp":"2014-06-01T14:04:11.639Z","collector_tstamp":"2014-05-29T18:16:35Z","dvce_created_tstamp":"2014-05-29T18:04:11.639Z","event":"page_view","event_id":"2b1b25a4-c0df-4859-8201-cf21492ad61b","txn_id":836413,"name_tracker":"clojure","v_tracker":"js-2.0.0-M2","v_collector":"clj-0.6.0-tom-0.0.4","v_etl":"hadoop-0.5.0-common-0.4.0","user_id":null,"user_ipaddress":"216.207.42.134","user_fingerprint":"3499345421","domain_userid":"3b1d1a375044eede","domain_sessionidx":3,"network_userid":"2bad2a4e-aae4-4bea-8acd-399e7fe0366a","geo_country":"US","geo_region":"CA","geo_city":"South San Francisco","geo_zipcode":null,"geo_latitude":37.654694,"geo_longitude":-122.4077,"geo_region_name":null,"ip_isp":null,"ip_organization":null,"ip_domain":null,"ip_netspeed":null,"page_url":"http://snowplowanalytics.com/blog/2013/02/08/writing-hive-udfs-and-serdes/","page_title":"Writing Hive UDFs - a tutorial","page_referrer":null,"page_urlscheme":"http","page_urlhost":"snowplowanalytics.com","page_urlport":80,"page_urlpath":"/blog/2013/02/08/writing-hive-udfs-and-serdes/","page_urlquery":null,"page_urlfragment":null,"refr_urlscheme":null,"refr_urlhost":null,"refr_urlport":null,"refr_urlpath":null,"refr_urlquery":null,"refr_urlfragment":null,"refr_medium":null,"refr_source":null,"refr_term":null,"mkt_medium":null,"mkt_source":null,"mkt_term":null,"mkt_content":null,"mkt_campaign":null,"contexts":{},"se_category":null,"se_action":null,"se_label":null,"se_property":null,"se_value":null,"unstruct_event":null,"tr_orderid":null,"tr_affiliation":null,"tr_total":null,"tr_tax":null,"tr_shipping":null,"tr_city":null,"tr_state":null,"tr_country":null,"ti_orderid":null,"ti_sku":null,"ti_name":null,"ti_category":null,"ti_price":null,"ti_quantity":null,"pp_xoffset_min":null,"pp_xoffset_max":null,"pp_yoffset_min":null,"pp_yoffset_max":null,"useragent":"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_3) AppleWebKit/537.75.14 (KHTML, like Gecko) Version/7.0.3 Safari/537.75.14","br_name":"Safari","br_family":"Safari","br_version":null,"br_type":"Browser","br_renderengine":"WEBKIT","br_lang":"en-us","br_features_pdf":false,"br_features_flash":false,"br_features_java":false,"br_features_director":false,"br_features_quicktime":false,"br_features_realplayer":false,"br_features_windowsmedia":false,"br_features_gears":false,"br_features_silverlight":false,"br_cookies":true,"br_colordepth":"24","br_viewwidth":1440,"br_viewheight":1845,"os_name":"Mac OS","os_family":"Mac OS","os_manufacturer":"Apple Inc.","os_timezone":"America/Los_Angeles","dvce_type":"Computer","dvce_ismobile":false,"dvce_screenwidth":1440,"dvce_screenheight":900,"doc_charset":"UTF-8","doc_width":1440,"doc_height":6015,"tr_currency":null,"tr_total_base":null,"tr_tax_base":null,"tr_shipping_base":null,"ti_currency":null,"ti_price_base":null,"base_currency":null,"geo_timezone":null,"mkt_clickid":null,"mkt_network":null,"etl_tags":null,"dvce_sent_tstamp":null,"refr_domain_userid":null,"refr_dvce_tstamp":null,"derived_contexts":{"schema":"iglu:com.snowplowanalytics.snowplow/contexts/jsonschema/1-0-0","data":[{"schema":"iglu:org.schema/some_unknown_name/jsonschema/1-0-0","data":{"datePublished":"2014-07-23T00:00:00Z","author":"Jonathan Almeida","inLanguage":"en-US","genre":"blog","breadcrumb":["blog","releases"],"keywords":["snowplow","analytics","java","jvm","tracker"]}}]},"domain_sessionid":null,"derived_tstamp":null,"event_vendor":null,"event_name":null,"event_format":null,"event_version":null,"event_fingerprint":null,"true_tstamp":null}}}
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
{
"schema": "iglu:com.snowplowanalytics.snowplow.storage.rdbloader/shredding_complete/jsonschema/2-0-1",
"data": {
"base": "output_path_placeholder/run=1970-01-01-10-30-00/",
"base": "output_path_placeholder",
"typesInfo": {
"transformation": "SHREDDED",
"types": []
},
"timestamps": {
"jobStarted": "1970-01-01T10:30:00Z",
"jobCompleted": "1970-01-01T10:30:00Z",
"jobStarted": "job_started_placeholder",
"jobCompleted": "job_completed_placeholder",
"min": "2014-05-29T18:16:35Z",
"max": "2014-05-29T18:16:35Z"
},
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"schema": "iglu:com.snowplowanalytics.snowplow.storage.rdbloader/shredding_complete/jsonschema/2-0-1",
"data": {
"base": "output_path_placeholder/run=1970-01-01-10-30-00/",
"base": "output_path_placeholder",
"typesInfo": {
"transformation": "WIDEROW",
"fileFormat": "PARQUET",
Expand Down Expand Up @@ -93,8 +93,8 @@
]
},
"timestamps": {
"jobStarted": "1970-01-01T10:30:00Z",
"jobCompleted": "1970-01-01T10:30:00Z",
"jobStarted": "job_started_placeholder",
"jobCompleted": "job_completed_placeholder",
"min": "2021-09-17T09:05:28.590000001Z",
"max": "2021-10-15T09:06:27.101185600Z"
},
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"schema": "iglu:com.snowplowanalytics.snowplow.storage.rdbloader/shredding_complete/jsonschema/2-0-1",
"data": {
"base": "output_path_placeholder/run=1970-01-01-10-30-00/",
"base": "output_path_placeholder",
"typesInfo": {
"transformation": "WIDEROW",
"fileFormat": "PARQUET",
Expand Down Expand Up @@ -105,8 +105,8 @@
]
},
"timestamps": {
"jobStarted": "1970-01-01T10:30:00Z",
"jobCompleted": "1970-01-01T10:30:00Z",
"jobStarted": "job_started_placeholder",
"jobCompleted": "job_completed_placeholder",
"min": "2022-02-01T22:14:21.648Z",
"max": "2022-02-02T01:01:01.648Z"
},
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"schema": "iglu:com.snowplowanalytics.snowplow.storage.rdbloader/shredding_complete/jsonschema/2-0-1",
"data": {
"base": "output_path_placeholder/run=1970-01-01-10-30-00/",
"base": "output_path_placeholder",
"typesInfo": {
"transformation": "WIDEROW",
"fileFormat": "PARQUET",
Expand Down Expand Up @@ -101,8 +101,8 @@
]
},
"timestamps": {
"jobStarted": "1970-01-01T10:30:00Z",
"jobCompleted": "1970-01-01T10:30:00Z",
"jobStarted": "job_started_placeholder",
"jobCompleted": "job_completed_placeholder",
"min": "2022-02-01T22:14:21.648Z",
"max": "2022-02-02T01:01:01.648Z"
},
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"schema": "iglu:com.snowplowanalytics.snowplow.storage.rdbloader/shredding_complete/jsonschema/2-0-1",
"data": {
"base": "output_path_placeholder/run=1970-01-01-10-30-00/",
"base": "output_path_placeholder",
"typesInfo": {
"transformation": "WIDEROW",
"fileFormat": "PARQUET",
Expand All @@ -21,8 +21,8 @@
]
},
"timestamps": {
"jobStarted": "1970-01-01T10:30:00Z",
"jobCompleted": "1970-01-01T10:30:00Z",
"jobStarted": "job_started_placeholder",
"jobCompleted": "job_completed_placeholder",
"min": "2022-02-01T22:32:41.069Z",
"max": "2022-02-02T01:01:01.648Z"
},
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"schema": "iglu:com.snowplowanalytics.snowplow.storage.rdbloader/shredding_complete/jsonschema/2-0-1",
"data": {
"base": "output_path_placeholder/run=1970-01-01-10-30-00/",
"base": "output_path_placeholder",
"typesInfo": {
"transformation": "WIDEROW",
"fileFormat": "JSON",
Expand Down Expand Up @@ -93,8 +93,8 @@
]
},
"timestamps": {
"jobStarted": "1970-01-01T10:30:00Z",
"jobCompleted": "1970-01-01T10:31:00Z",
"jobStarted": "job_started_placeholder",
"jobCompleted": "job_completed_placeholder",
"min": "2021-10-13T20:21:47.595072674Z",
"max": "2021-10-15T00:51:57.521746512Z"
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ package com.snowplowanalytics.snowplow.rdbloader.transformer.stream.common.proce
import cats.effect.{IO, Resource}
import cats.effect.kernel.Ref

import io.circe.optics.JsonPath._
import io.circe.parser.{parse => parseCirce}

import com.snowplowanalytics.snowplow.rdbloader.generated.BuildInfo
import com.snowplowanalytics.snowplow.rdbloader.transformer.stream.common.AppId
import com.snowplowanalytics.snowplow.rdbloader.transformer.stream.common.FileUtils
Expand Down Expand Up @@ -58,11 +61,15 @@ trait BaseProcessingSpec extends Specification {
}
.reduce(_ and _)

protected def readMessageFromResource(resource: String, outputRootDirectory: Path) =
protected def readMessageFromResource(resource: String, outputRootDirectory: Path): IO[String] = ???

protected def readMessageFromResource(resource: String, completionMessageVars: BaseProcessingSpec.CompletionMessageVars) =
readLinesFromResource(resource)
.map(_.mkString)
.map(
_.replace("output_path_placeholder", outputRootDirectory.toNioPath.toUri.toString.replaceAll("/+$", ""))
_.replace("output_path_placeholder", completionMessageVars.base.toNioPath.toUri.toString)
.replace("job_started_placeholder", completionMessageVars.jobStarted)
.replace("job_completed_placeholder", completionMessageVars.jobCompleted)
.replace("version_placeholder", BuildInfo.version)
.replace(" ", "")
)
Expand All @@ -86,6 +93,15 @@ trait BaseProcessingSpec extends Specification {
new String(encoder.encode(config.app.replace("file:/", "s3:/").getBytes))
)
}

def extractCompletionMessageVars(processingOutput: BaseProcessingSpec.ProcessingOutput): BaseProcessingSpec.CompletionMessageVars = {
val message = processingOutput.completionMessages.head
val json = parseCirce(message).toOption.get
val base = root.data.base.string.getOption(json).get.stripPrefix("file://")
val jobStarted = root.data.timestamps.jobStarted.string.getOption(json).get
val jobCompleted = root.data.timestamps.jobCompleted.string.getOption(json).get
BaseProcessingSpec.CompletionMessageVars(Path(base), jobStarted, jobCompleted)
}
}

object BaseProcessingSpec {
Expand All @@ -96,5 +112,13 @@ object BaseProcessingSpec {
badrowsFromQueue: Vector[String],
checkpointed: Int
)
final case class CompletionMessageVars(
base: Path,
jobStarted: String,
jobCompleted: String
) {
def goodPath: Path = Path(s"$base/output=good")
def badPath: Path = Path(s"$base/output=bad")
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
package com.snowplowanalytics.snowplow.rdbloader.transformer.stream.common.processing

import cats.effect.unsafe.implicits.global
import com.snowplowanalytics.snowplow.rdbloader.transformer.stream.common.AppId
import com.snowplowanalytics.snowplow.rdbloader.transformer.stream.common.processing.QueueBadSinkSpec._
import com.snowplowanalytics.snowplow.rdbloader.transformer.stream.common.processing.BaseProcessingSpec.TransformerConfig
import fs2.io.file.Path
Expand Down Expand Up @@ -39,12 +38,12 @@ class QueueBadSinkSpec extends BaseProcessingSpec {
inputEventsPath = "/processing-spec/1/input/events"
)

val config = TransformerConfig(configFromPath(outputDirectory), igluConfig)
val badDirectory = outputDirectory.resolve(s"run=1970-01-01-10-30-00-${AppId.appId}/output=bad")
val config = TransformerConfig(configFromPath(outputDirectory), igluConfig)

for {
output <- process(inputStream, config)
badDirectoryExists <- pathExists(badDirectory)
compVars = extractCompletionMessageVars(output)
badDirectoryExists <- pathExists(compVars.badPath)
expectedBadRows <- readLinesFromResource("/processing-spec/1/output/bad")
} yield {
val actualBadRows = output.badrowsFromQueue.toList
Expand Down Expand Up @@ -98,7 +97,18 @@ object QueueBadSinkSpec {
| "schema": "iglu:com.snowplowanalytics.iglu/resolver-config/jsonschema/1-0-0",
| "data": {
| "cacheSize": 500,
| "repositories": []
| "repositories": [
| {
| "name": "Iglu Central",
| "priority": 1,
| "vendorPrefixes": [],
| "connection": {
| "http": {
| "uri": "http://iglucentral.com"
| }
| }
| }
| ]
| }
|}""".stripMargin
}
Loading

0 comments on commit 31cab3e

Please sign in to comment.