Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Kinesis Sink #12

Merged
merged 2 commits into from
Nov 14, 2023
Merged

Add Kinesis Sink #12

merged 2 commits into from
Nov 14, 2023

Conversation

colmsnowplow
Copy link
Contributor

This PR adds kinesis sink. Integration tests are not included here - I'll work on those in a separate branch off this one.

@istreeter assigned you as you have a dependency on this, but I think review can come from anyone so will ask the team.

@@ -113,7 +114,7 @@ object KinesisSource {
private def kinesisStream[F[_]: Async](config: KinesisSourceConfig): Stream[F, LowLevelEvents[Map[String, KinesisMetadata[F]]]] = {
val resources =
for {
region <- Resource.eval(KinesisSourceConfig.getRuntimeRegion)
region <- Resource.eval(com.snowplowanalytics.snowplow.kinesis.Util.getRuntimeRegion)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can replace this with (new DefaultAwsRegionProviderChain).getRegion, and rip out the getRuntimeRegion function entirely - I don't really understand why it was in a function in the first place.

Are the two functionally equivalent, or is there some meaningful difference I'm missing?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree it does not need to be its own function. I like your suggestion to replace it with new DefaultAwsRegionProviderChain.getRegion (I think you don't even need those parantheses).

Copy link
Contributor Author

@colmsnowplow colmsnowplow Oct 26, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For some reason my editor required the brackets - not sure if they're required or it's just my editor, but I assume it's a style choice regardless so doesn't feel worth spending time on it.

Note that something does change in a potentially meaningful way, in that rather than being part of the effects-y execution flow, these are now just evaluated as values outside of that scope (within the scope of the same function though). Integration tests still pass and I don't think there's any real difference there just not 100% sure of my understanding.

@@ -20,6 +20,7 @@ import software.amazon.awssdk.services.kinesis.KinesisAsyncClient

import com.snowplowanalytics.snowplow.sources.EventProcessingConfig
import com.snowplowanalytics.snowplow.sources.EventProcessingConfig.NoWindowing
import com.snowplowanalytics.snowplow.kinesis._
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried to import com.snowplowanalytics.snowplow.kinesis.Util and call Util.getRuntimeRegion, but I got an error:

object Util is not a member of package com.snowplowanalytics.snowplow.kinesis

I don't understand why - to me it seems like it should be the same as importing any other object. Would be interested to hear why if someone can explain.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's because your Util.scala file contains two separate lines both declaring:

package com.snowplowanalytics.snowplow.kinesis

See line 1 and line 10.

It means the fully qualified class name of your Util object is actually com.snowplowanalytics.snowplow.kinesis.com.snowplowanalytics.snowplow.kinesis.Util.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤦 It's cos I didn't scroll my editor up far enough... Thanks!

Copy link
Contributor

@istreeter istreeter left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks good @colmsnowplow.

What are your thoughts on adding an integration test? No worries if you think that's too much for the current PR. You already know I am keen to get this merged without too much extra delay!

Comment on lines 51 to 55
val withRegion = KinesisClient.builder().region(region)
customEndpoint match {
case Some(endpoint) => withRegion.endpointOverride(endpoint).build()
case None => withRegion.build()
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So an alternative way of writing this is....

val builder = KinesisClient.builder().region(region)
customEndpoint.foreach(e => builder.endpointOverride(e))
builder.build()

Some people in the team write it the way you did it... I think because in functional programming we pretend that everything is immutable.

But IMO, if we accept that builder is mutable whether we like it or not... then my alternative suggestion looks simpler and is exactly how the builder was intended to be used when someone designed it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I like that better too.

Comment on lines +64 to +65
Resource.make(make) { producer =>
Sync[F].blocking {
producer.close()
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nothing wrong with this, but I think you can also do....

Resource.fromAutoCloseable(make)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I copied the pattern from pubsub target. Would lean on not changing it as it feels like both should be the same, but if there's some reason to use esource.fromAutoCloseable(make) for both happy to make that change too.


final case class RequestLimits(recordLimit: Int, bytesLimit: Int)

object Retries {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this object be private?

Anything which is not private ends up being part of the public API of this library. And if we are conscientious library maintainers then the API should be stable between versions, i.e. it restricts us from ever removing this Retries object in future when we want to do a code refactor.

}

private def getRecordSize(record: PutRecordsRequestEntry) =
record.data.asByteArray().length + record.partitionKey().getBytes(UTF_8).length
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was curious so I looked up sdkbytes in the aws java core repo.

If you call .asByteArray().length then it copes the data into a new array. This is relatively expensive -- we should try to avoid ever copying lots of bytes like this.

If you call .asByteBuffer().limit then it does not copy the bytes. Similarly .asByteArrayUnsafe().length is cheap.


private def toKinesisRecords(records: List[Sinkable]): List[PutRecordsRequestEntry] =
records.map { r =>
val data = SdkBytes.fromByteArray(r.bytes)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's very tempting to use SdkBytes.fromByteArrayUnsafe(r.bytes) here, because it saves doing an extra copy of the data in the byte array. See source code here.

It's called "unsafe" because byte arrays are mutable, so the aws sdk needs to trust us that we're not going to modify the shared byte array. But if we promise not to modify it, then the unsafe method will be faster.


case class KinesisSinkConfig(
streamName: String,
partitionKey: Option[String],
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this partitionKey ever used?

Copy link
Contributor Author

@colmsnowplow colmsnowplow Oct 26, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here I think, no? https://github.com/snowplow-incubator/common-streams/pull/12/files#r1373364675

(Couldn't link to the line of code so commented)

} yield ()
}

final case class RequestLimits(recordLimit: Int, bytesLimit: Int)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This class could be private because it's never used outside of our private methods.

@colmsnowplow
Copy link
Contributor Author

Thanks @istreeter I'll take a look at the specific comments in a bit, just to address the top line question first:

What are your thoughts on adding an integration test? No worries if you think that's too much for the current PR. You already know I am keen to get this merged without too much extra delay!

Yes keen to implement it but doing so in a separate branch as I'm aware of blockers. I'll prioritise addressing feedback first, and from there it's up to you really - happy to get this merged first or wait for the integration tests.

val data = SdkBytes.fromByteArray(r.bytes)
val prre = PutRecordsRequestEntry
.builder()
.partitionKey(r.partitionKey.getOrElse(UUID.randomUUID.toString()))
Copy link
Contributor Author

@colmsnowplow colmsnowplow Oct 26, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Partition key is used here I think?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is the partition key from the Sinkable. It is not the partition key from the KinesisSinkConfig.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah! OK I follow now. I guess we don't need to configure it this way then, removing.

@@ -33,12 +34,13 @@ class KinesisSourceSpec
override val Timeout: FiniteDuration = 3.minutes

/** Resources which are shared across tests */
override val resource: Resource[IO, (LocalStackContainer, KinesisAsyncClient, String => KinesisSourceConfig)] =
override val resource: Resource[IO, (LocalStackContainer, KinesisAsyncClient, String => KinesisSourceConfig)] = {
val region = (new DefaultAwsRegionProviderChain).getRegion
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should keep it effects-y

for {
  region <- Resource.eval(IO.blocking((new DefaultAwsRegionProviderChain).getRegion))
  localstack <- Localstack.resource(region, KINESIS_INITIALIZE_STREAMS)

And now I've got to explain why :)

Reason 1: thread pools

If this app is running on a EC2 instance (or EKS or ECS) then the AWS SDK discovers the region by making a HTTP request to a AWS metadata server. This blocks the current thread until we get the response from the server.

This bit of code we're looking at runs on the cats-effect thread pool, which is a relatively small fixed size pool. So while we're waiting for the response it blocks other cats-effect things to run concurrently.

If this bit of code was in a data-processing part of the the app, then it would be a big problem because it could hurt throughput. Luckily this code only runs once at startup, and the lookup is fairly quick, so it would not hurt us too much.

So it is a weak reason. But still it is best practice to never block the cats-effect thread pool.

Reason 2: Exceptions

If the AWS sdk cannot discover the region, then this line is going to raise an exception. With cats-effect stuff, the exception can happen either....

  1. When you create the resource
  2. When you use the resource

Scala developers using your library will expect case 2 to happen. They would be surprised if it's case 1. They might write their code to do some special error-handling around use'ing the resource, but then their error handling does not work properly because the exception happens in the wrong place.

My Reason 2 is stronger than my Reason 1. Maybe neither reason is very strong. But I do think we should write the library in way that does not cause surprises for scala devs who are used to certain conventions on how stuff works.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just realised I left this comment on the integration test, but the comment was meant more for the production code.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This makes sense! Thanks so much for the explanation that helps a lot. :)

I had an idea of something like point 1 but indeed I figured it wasn't much of a big deal since this gets executed once on startup. Point 2 I didn't consider at all. But still even on just point 1 I see the argument that it should be the other way.

I've made the changes to source and integration tests. The one in sink, I believe was already the way we want it because it just saves the effects-y bit in the make value, then after calls that as a Resource. But I might be wrong about that.

@colmsnowplow colmsnowplow force-pushed the kinesis-sink branch 4 times, most recently from 0fbd58e to d2b5092 Compare November 14, 2023 10:38
@colmsnowplow colmsnowplow merged commit 11153c4 into develop Nov 14, 2023
1 check passed
@colmsnowplow colmsnowplow deleted the kinesis-sink branch November 14, 2023 12:48
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants