Skip to content

Commit

Permalink
fixup! Implement streaming with state changes
Browse files Browse the repository at this point in the history
  • Loading branch information
nightscape committed Oct 6, 2020
1 parent 7c73a46 commit 97ed0eb
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 4 deletions.
3 changes: 2 additions & 1 deletion actors/src/main/scala/zio/actors/ActorRef.scala
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,8 @@ private[actors] final class ActorRefRemote[-F[+_]](
ZStream
.fromEffect(readFromWire(client))
.repeat(Schedule.forever)
.takeWhile(e => e != StreamEnd)
.takeUntil(_ == StreamEnd)
.collect { case StreamMsg(m) => m }
).either
else
readFromWire(client)
Expand Down
6 changes: 4 additions & 2 deletions actors/src/main/scala/zio/actors/ActorSystem.scala
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ final class ActorSystem private[actors] (
Any @unchecked
]
) =>
(stream ++ ZStream(StreamEnd)).foreach(e => writeToWire(worker, e))
(stream.map(StreamMsg) ++ ZStream(StreamEnd)).foreach(e => writeToWire(worker, e))
case _ => writeToWire(worker, response)
}
} yield ()
Expand Down Expand Up @@ -332,4 +332,6 @@ private[actors] object ActorSystemUtils {
} yield ()
}

object StreamEnd
sealed trait StreamProtocol
case class StreamMsg(obj: Any) extends StreamProtocol
case object StreamEnd extends StreamProtocol
7 changes: 6 additions & 1 deletion actors/src/test/scala/zio/actors/RemoteSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ object RemoteSpec extends DefaultRunnableSpec {
suite("Remote communication suite")(
testM("Remote test send message") {
val messages = (1 until 10).map(i => s"ZIO-Actor response... $i").toList
val messages2 = (11 until 20).map(i => s"ZIO-Actor response... $i").toList
for {
actorSystemOne <- ActorSystem("testSystem11", configFile)
_ <- actorSystemOne.make("actorOne", Supervisor.none, 0, handlerMessageTrait)
Expand All @@ -102,9 +103,13 @@ object RemoteSpec extends DefaultRunnableSpec {
)
result <- actorRef ? Str("ZIO-Actor response... ")
resultStream <- actorRef ? Strs(messages)
resultStream2 <- actorRef ? Strs(messages2)
streamVals2 <- resultStream2.runCollect
streamVals <- resultStream.runCollect
} yield assert(result)(equalTo("ZIO-Actor response... received plus 01")) &&
assert(streamVals)(equalTo(Chunk(messages: _*)))
assert(streamVals)(equalTo(Chunk(messages: _*))) &&
assert(streamVals2)(equalTo(Chunk(messages2: _*)))

},
testM("ActorRef serialization case") {
for {
Expand Down

0 comments on commit 97ed0eb

Please sign in to comment.