-
Notifications
You must be signed in to change notification settings - Fork 55
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implement streaming with state changes #270
base: master
Are you sure you want to change the base?
Conversation
9e45776
to
e430361
Compare
I'm also just writing a |
@softinio @mtsokol @mijicd I could need some help 😅 |
@nightscape Hi! Thanks for reminder! |
Hi @mtsokol, thanks for the offer! |
Hi @mtsokol, did you already find some time to have a look at this? |
@nightscape Sorry for the delay (started new semester this week). Here are snippets reproducing it:
After changing it to I only focused on this issue so I'll deliver rest of the review. [EDIT] |
Great find! That actually looks like a bug in takeWhile. |
@nightscape I got this test passing but there's still one issue (with deserialization I guess?). The whole stream is producing Strings. If I augment producer site with: sealed trait StreamProtocol
case class StreamMsg(obj: Any) extends StreamProtocol
case object StreamEnd extends StreamProtocol And |
85e752f
to
97ed0eb
Compare
And I guess this should fix the |
From my side everything except supervision of streams is looking good. |
override def materialize[AA >: ZStream[R, E, I]]( | ||
state: Ref[S], | ||
promise: Promise[Throwable, AA], | ||
supervisor: Supervisor[R] | ||
): URIO[R with Clock, Unit] = { | ||
// TODO: Supervision | ||
val outputStream = response.mapM { case (s, a) => state.set(s) *> UIO(a) } | ||
promise.succeed(outputStream).ignore | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
} | ||
implicit def stream[R, S, E, I](response: ZStream[R, E, (S, I)]): ActorResponse[R, S, ZStream[R, E, I]] = | ||
new ActorResponse[R, S, ZStream[R, E, I]] { | ||
override type ResponseType = ZStream[R, E, (S, I)] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that ResponseType
here should be ZStream[R, Throwable, (S, I)]
.
As actors are location transparent and interaction can involve remote communication our interaction patterns !
and ?
result in Task[...]
(so a ZIO that can fail with Throwable
as it might involve write/readFromWire
) although in actor's behavior there could be no failures that can occur.
Notice that here https://github.com/zio/zio-actors/pull/270/files#diff-c1a93d1001aa018540bc59b83dd7a09dR134 stream elements are created from Task[Any]
so resulting stream that is returned will be Stream[Any, Throwable, Any]
.
That is - a stream returned in our RemoteSpec
will perform readFromWire
to get elements (until receives StreamEnd
) so every time it might fail with communication error but here it test the signature is ZStream[Any, Nothing, String]
which seems that pulling elements out of stream can't fail.
I propose to remove E
from implicit def stream[R, S, E, I]
and fix returned ZStream
signature to ZStream[R, Throwable, I]
.
This might seem unclear but if we notice where communication occurs it should be clear - when we do ?
and the response it oneTime
the call is synchronous and we wait for the response.
When the response is a stream on the caller site we only create a recipe with ?
-> the communication occurs when we do .runCollect
and this is a counterpart of getting response out of ?
with oneTime
.
Please correct me if I got this wrong.
[EDIT]
Problem
I think that support for streaming responses will require overhaul for remoting (due to the fact that we decide when we pull responses out of stream, so perform remote communication) as it breaks current design.
I introduced simple change to our test case:
_ <- actorRef ? Str("ZIO-Actor response... ")
resultStream <- actorRef ? Strs(messages)
result <- actorRef ? Str("ZIO-Actor response... ")
resultStream2 <- actorRef ? Strs(messages2)
And it fails - first we do the first ask -> remote actor system writes to the wire - we read it.
Then we ask for a stream response and get the stream - the remote actor system writes all responses to the wire but we do not read them (!) but only define recipe for reading them. Then we ask for another oneTime result
but we read a bit of those stream responses instead and it fails. This should definitely be covered in a test case like this (intertwine oneTime and stream interactions)
Solution?
Off the top of my head - we can perform runCollect
in sendEnvelope
but it misses the point of #268 what you described as your goal.
You could just make it fully akka-like and pass ActorRef
in a message you send as a part of your protocol (so you know where send messages back) - and perform all communication with !
(tell) and drop stream responses.
Otherwise it would require rewriting internals (which are really basic now) to support it that way.
WDYT?
Sorry for writing about this issue now - only now I had some time to think about it deeper.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Regarding ZStream
being a pull-based stream, but the responses being pushed over the wire:
Would it help to introduce a Queue
at the receiver side instead of doing a runCollect
?
That way the remote actor can push messages at any time, but from the client side we pull from the Queue
.
I had also thought about the Akka way of passing an ActorRef
and sending responses via !
, but the resulting API would be quite a bit less elegant...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right now I think the issue on response producer site is:
(stream.map(StreamMsg) ++ ZStream(StreamEnd)).foreach(e =>
writeToWire(worker, e)
)
If we immediately write stream elements to receiver like this then they need to be read right away (e.g. with runCollect), otherwise remoting can be easily broken (but it makes no sense as we are blocked until receive the whole stream).
I don't know a solution for this right now (maybe another abstraction layer for remote communication for atomic and stream responses where stream response elements will be stored on response producer site and only send over the wire when requested (but it would introduce other issues I think), or (also with that additional abstraction layer) response producer will send it like this but on the other side we need to manage reading ?
responses and those out-of-order stream responses, or maybe something different)
Yeah, including sender ActorRef in protocol would make it less elegant but making stream responses that comply with current remoting feels to require major design change.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would it be possible to have a daemon on the receiver side that permanently tries to readFromWire
and pushes into a Queue
from which both ZStream
s and answers from ?
can be read?
StreamMsg
would then need some kind of "conversation identifier" so that when multiple streams are running at the same time their messages can be distinguished.
If we can come up with a working solution, would you see the overall goal as worthwhile or do you want to have a simple, no-bells-and-whistles library?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a draft of a "correct" streaming API where the state changes with the emitted stream elements.
@softinio @mtsokol @mijicd could you have a look if this looks plausible?
I'm not sure what would be the correct behaviour of any element in the stream failing. Should the supervisor be called in that case or would we just continue streaming?
In order to maintain the current
Stateful
API, one could introduce an additional methodreceiveS
(or any name you prefer) and do something like the following:That way users could implement/override the new
receiveS
if they need the additional functionality orreceive
if they don't.The downside of this is that IDEs will not know that one of the two methods has to be overriden because both contain an implementation.
I'd be happy for any suggestions 😃