Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix missing implicit on port decoder #19

Merged
merged 5 commits into from
Nov 11, 2023
Merged

Conversation

istreeter
Copy link
Contributor

@istreeter istreeter commented Nov 8, 2023

A miscellany of fixes and enhancements

Fix missing implicit on port decoder

This should have been part of #17 but I missed it. It doesn't make a huge difference, but it's slightly neater when importing the decoder to use in applications.

Pubsub source should nack, not ack, pending messages during shutdown

This was a quite a bad bug if it had gone unnoticed! During clean shutdown of the pubsub source, I was ack'ing input events that had not yet been published. Unforgiveable.

Change Source and Sink traits to use Chunk instead of List

I am trying to cut down the number of times we copy large data structures. Before this change, we were converting a Chunk to a List before giving it to the application for processing. That involved a copy.

Once the application receives the events it can just as easily call .traverse on a Chunk instead of a List.

This might be controversial because it is changing the API interface of this library. And especially as I haven't measured the performance impact on our apps. I have integrated this change into the lake loader and snowflake loader and I am happy with how the code looks in the applications.

Add syntax for traversing lists

This is also about cutting the number of times we copy large data structures. Very frequently in snowplow apps we need to traverse a collection of events, perform some operation, and separate successes from failures. We have a lot of lines like this:

events.traverse { event =>
  transformEvent(event)
}.map { results =>
  results.separate
}

If you dig into what's happening underneath, the .traverse starts with a copy (see here -- it was a surprise to me) and the results.separate also involves a copy.

This commit adds some alternative syntax so we can run these common snowplow operations without copying so much.

Fixes to the pubsub and kafka sinks

The pubsub sink had a parTraverse, but it didn't need to be parallel because each item returned a future which already runs in parallel.

The kafka sink had a traverse where it should have been parTraverse so we checkpoint each shard in parallel.

@istreeter istreeter force-pushed the fix/implicit-port-decoder branch from 7851c78 to 56e30c1 Compare November 10, 2023 12:43
Copy link
Contributor

@benjben benjben left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great stuff !!

s"$str-transformed"
}
}
.unsafeRunSync()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can use this to remove the .unsafeRunSync()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just tried this.... in my opinion after re-writing it to use the CatsEffect mixin, this spec lost some clarity. I like the mixin, and I use it several places already in these common libraries. But I don't feel we need to use it everywhere.

package com.snowplowanalytics.snowplow.runtime

package object syntax {
object foldable extends FoldableExtensionSyntax
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then if we just import com.snowplowanalytics.snowplow.runtime.syntax.foldable we can use the additional functions right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

import com.snowplowanalytics.snowplow.runtime.syntax.foldable._

with underscore at the end. I copied this syntax object by doing what cats does.

* }
* }}}
*/
def traverseUnordered[F[_]: Monad, A, B](items: M[A])(f: A => F[B]): F[List[B]] =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We're missing M.empty to be able to return F[M[B]] ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not completely sure what you're asking -- is it about why I return F[List[B]] instead of F[M[B]]? I did it this way because constructing a List like this is very fast. And because most of the time in our applications it is convenient to receive a List as the return type.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it about why I return F[List[B]] instead of F[M[B]]?

Yes it was. Thanks for the answer.

Comment on lines +22 to +24
* This is similar to a cats Traverse. But it is more efficient than cats Traverse because it does
* not attempt to keep the order of the original list in the final result.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't get why this would be quicker without preserving the order, we just need to iterate over all the elements once and doing it in the same order sounds like the fastest way. And are we not still preserving the order with this function?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you have a List and you do list.traverse { i => ???} then the first thing cats does underneath is to copy the original List into a buffer. Whereas this new method does a single traversal.

And are we not still preserving the order with this function?

See the unit tests! It's because we iterate in a forwards direction, and then do _ :: acc which prepends (not appends) to the accumulated list.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh yeah I get it now ! That's a great idea !

* }
* }}}
*/
def traverseSeparateUnordered[F[_]: Monad, A, B, C](items: M[A])(f: A => F[Either[B, C]]): F[(List[B], List[C])] =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very useful!

@istreeter istreeter force-pushed the fix/implicit-port-decoder branch from 56e30c1 to 913a387 Compare November 11, 2023 18:10
@istreeter istreeter merged commit 913a387 into develop Nov 11, 2023
1 check passed
@istreeter istreeter deleted the fix/implicit-port-decoder branch November 11, 2023 18:21
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants