Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement streaming with state changes #270

Draft
wants to merge 12 commits into
base: master
Choose a base branch
from
69 changes: 56 additions & 13 deletions actors/src/main/scala/zio/actors/Actor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,59 @@ package zio.actors

import zio.actors.Actor.PendingMessage
import zio.clock.Clock
import zio.stream.ZStream
import zio.{ Supervisor => _, _ }

import scala.language.implicitConversions

object Actor {

private[actors] type PendingMessage[F[_], A] = (F[A], Promise[Throwable, A])
trait ActorResponse[R, S, +A] {
type ResponseType

def materialize[AA >: A](
state: Ref[S],
promise: Promise[Throwable, AA],
supervisor: Supervisor[R]
): URIO[R with Clock, Unit]
}

object ActorResponse {
implicit def oneTime[R, S, A](response: RIO[R, (S, A)]): ActorResponse[R, S, A] =
new ActorResponse[R, S, A] {
override type ResponseType = RIO[R, (S, A)]

override def materialize[AA >: A](
state: Ref[S],
promise: Promise[Throwable, AA],
supervisor: Supervisor[R]
): URIO[R with Clock, Unit] = {
val completer = ((s: S, a: A) => (state.set(s) *> promise.succeed(a)).ignore).tupled
response.foldM(
e =>
supervisor
.supervise[R, (S, A)](response, e)
.foldM(_ => promise.fail(e).ignore, completer),
completer
)
}
}
implicit def stream[R, S, E, I](response: ZStream[R, E, (S, I)]): ActorResponse[R, S, ZStream[R, E, I]] =
new ActorResponse[R, S, ZStream[R, E, I]] {
override type ResponseType = ZStream[R, E, (S, I)]
Copy link
Contributor

@mtsokol mtsokol Oct 9, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that ResponseType here should be ZStream[R, Throwable, (S, I)].
As actors are location transparent and interaction can involve remote communication our interaction patterns ! and ? result in Task[...] (so a ZIO that can fail with Throwable as it might involve write/readFromWire) although in actor's behavior there could be no failures that can occur.

Notice that here https://github.com/zio/zio-actors/pull/270/files#diff-c1a93d1001aa018540bc59b83dd7a09dR134 stream elements are created from Task[Any] so resulting stream that is returned will be Stream[Any, Throwable, Any].

That is - a stream returned in our RemoteSpec will perform readFromWire to get elements (until receives StreamEnd) so every time it might fail with communication error but here it test the signature is ZStream[Any, Nothing, String] which seems that pulling elements out of stream can't fail.

I propose to remove E from implicit def stream[R, S, E, I] and fix returned ZStream signature to ZStream[R, Throwable, I].

This might seem unclear but if we notice where communication occurs it should be clear - when we do ? and the response it oneTime the call is synchronous and we wait for the response.
When the response is a stream on the caller site we only create a recipe with ? -> the communication occurs when we do .runCollect and this is a counterpart of getting response out of ? with oneTime.

Please correct me if I got this wrong.


[EDIT]

Problem

I think that support for streaming responses will require overhaul for remoting (due to the fact that we decide when we pull responses out of stream, so perform remote communication) as it breaks current design.

I introduced simple change to our test case:

_              <- actorRef ? Str("ZIO-Actor response... ")
resultStream   <- actorRef ? Strs(messages)
result         <- actorRef ? Str("ZIO-Actor response... ")
resultStream2  <- actorRef ? Strs(messages2)

And it fails - first we do the first ask -> remote actor system writes to the wire - we read it.
Then we ask for a stream response and get the stream - the remote actor system writes all responses to the wire but we do not read them (!) but only define recipe for reading them. Then we ask for another oneTime result but we read a bit of those stream responses instead and it fails. This should definitely be covered in a test case like this (intertwine oneTime and stream interactions)

Solution?

Off the top of my head - we can perform runCollect in sendEnvelope but it misses the point of #268 what you described as your goal.

You could just make it fully akka-like and pass ActorRef in a message you send as a part of your protocol (so you know where send messages back) - and perform all communication with ! (tell) and drop stream responses.

Otherwise it would require rewriting internals (which are really basic now) to support it that way.

WDYT?

Sorry for writing about this issue now - only now I had some time to think about it deeper.

Copy link
Contributor Author

@nightscape nightscape Oct 9, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Regarding ZStream being a pull-based stream, but the responses being pushed over the wire:
Would it help to introduce a Queue at the receiver side instead of doing a runCollect?
That way the remote actor can push messages at any time, but from the client side we pull from the Queue.

I had also thought about the Akka way of passing an ActorRef and sending responses via !, but the resulting API would be quite a bit less elegant...

Copy link
Contributor

@mtsokol mtsokol Oct 9, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right now I think the issue on response producer site is:

(stream.map(StreamMsg) ++ ZStream(StreamEnd)).foreach(e =>
  writeToWire(worker, e)
)

If we immediately write stream elements to receiver like this then they need to be read right away (e.g. with runCollect), otherwise remoting can be easily broken (but it makes no sense as we are blocked until receive the whole stream).

I don't know a solution for this right now (maybe another abstraction layer for remote communication for atomic and stream responses where stream response elements will be stored on response producer site and only send over the wire when requested (but it would introduce other issues I think), or (also with that additional abstraction layer) response producer will send it like this but on the other side we need to manage reading ? responses and those out-of-order stream responses, or maybe something different)

Yeah, including sender ActorRef in protocol would make it less elegant but making stream responses that comply with current remoting feels to require major design change.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be possible to have a daemon on the receiver side that permanently tries to readFromWire and pushes into a Queue from which both ZStreams and answers from ? can be read?
StreamMsg would then need some kind of "conversation identifier" so that when multiple streams are running at the same time their messages can be distinguished.

If we can come up with a working solution, would you see the overall goal as worthwhile or do you want to have a simple, no-bells-and-whistles library?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, sounds great but I lack in knowledge and experience to elaborate on such solution (what are the pros and cons). Do you know a project where such solution is used?
As this is a design discussion also asking @mijicd @softinio. What do you think about it?


override def materialize[AA >: ZStream[R, E, I]](
state: Ref[S],
promise: Promise[Throwable, AA],
supervisor: Supervisor[R]
): URIO[R with Clock, Unit] = {
// TODO: Supervision
nightscape marked this conversation as resolved.
Show resolved Hide resolved
val outputStream = response.mapM { case (s, a) => state.set(s) *> UIO(a) }
promise.succeed(outputStream).ignore
}
Comment on lines +47 to +55
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mtsokol @softinio @mijicd any ideas how Supervision could/should work in the streaming case?

}
}

/**
* Description of actor behavior (can act as FSM)
Expand All @@ -26,7 +74,9 @@ object Actor {
* @tparam A - domain of return entities
* @return effectful result
*/
def receive[A](state: S, msg: F[A], context: Context): RIO[R, (S, A)]
def receive[A](state: S, msg: F[A], context: Context): RIO[R, (S, A)] = ???
def receiveS[A](state: S, msg: F[A], context: Context): ActorResponse[R, S, A] =
ActorResponse.oneTime(receive(state, msg, context))

/* INTERNAL API */

Expand All @@ -41,24 +91,17 @@ object Actor {
for {
s <- state.get
(fa, promise) = msg
receiver = receive(s, fa, context)
completer = ((s: S, a: A) => state.set(s) *> promise.succeed(a)).tupled
_ <- receiver.foldM(
e =>
supervisor
.supervise(receiver, e)
.foldM(_ => promise.fail(e), completer),
completer
)
receiver = receiveS(s, fa, context)
_ <- receiver.materialize(state, promise, supervisor)
} yield ()

for {
state <- Ref.make(initial)
queue <- Queue.bounded[PendingMessage[F, _]](mailboxSize)
_ <- (for {
t <- queue.take
_ <- process(t, state)
} yield ()).forever.fork
t <- queue.take
_ <- process(t, state)
} yield ()).forever.fork
} yield new Actor[F](queue)(optOutActorSystem)
}
}
Expand Down
27 changes: 21 additions & 6 deletions actors/src/main/scala/zio/actors/ActorRef.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@ import java.io.{ IOException, ObjectInputStream, ObjectOutputStream, ObjectStrea

import zio.nio.core.{ InetAddress, InetSocketAddress, SocketAddress }
import zio.nio.core.channels.AsynchronousSocketChannel
import zio.{ IO, Runtime, Task, UIO }
import zio.stream.ZStream
import zio.{ IO, Runtime, Schedule, Task, UIO }

import scala.reflect.runtime.universe._

/**
* Reference to actor that might reside on local JVM instance or be available via remote communication
Expand All @@ -21,7 +24,7 @@ sealed trait ActorRef[-F[+_]] extends Serializable {
* @tparam A return type
* @return effectful response
*/
def ?[A](fa: F[A]): Task[A]
def ?[A: TypeTag](fa: F[A]): Task[A]

/**
* Send message to an actor as `fire-and-forget` -
Expand Down Expand Up @@ -86,7 +89,7 @@ private[actors] final class ActorRefLocal[-F[+_]](
private val actorName: String,
actor: Actor[F]
) extends ActorRefSerial[F](actorName) {
override def ?[A](fa: F[A]): Task[A] = actor ? fa
override def ?[A: TypeTag](fa: F[A]): Task[A] = actor ? fa

override def !(fa: F[_]): Task[Unit] = actor ! fa

Expand All @@ -111,23 +114,35 @@ private[actors] final class ActorRefRemote[-F[+_]](
) extends ActorRefSerial[F](actorName) {
import ActorSystemUtils._

override def ?[A](fa: F[A]): Task[A] = sendEnvelope(Command.Ask(fa))
override def ?[A: TypeTag](fa: F[A]): Task[A] = sendEnvelope(Command.Ask(fa))

override def !(fa: F[_]): Task[Unit] = sendEnvelope[Unit](Command.Tell(fa))

override val stop: Task[List[_]] = sendEnvelope(Command.Stop)

private def sendEnvelope[A](command: Command): Task[A] =
private def sendEnvelope[A](command: Command)(implicit typeTag: TypeTag[A]): Task[A] = {
val baseClassNames = typeTag.tpe.baseClasses.map(_.name.decodedName.toString)
val isZStream = baseClassNames.contains("ZStream")
nightscape marked this conversation as resolved.
Show resolved Hide resolved
for {
client <- AsynchronousSocketChannel()
response <- for {
_ <- client.connect(address)
actorPath <- path
_ <- writeToWire(client, new Envelope(command, actorPath))
response <- readFromWire(client)
response <- if (isZStream)
Task(
ZStream
.fromEffect(readFromWire(client))
.repeat(Schedule.forever)
.takeUntil(_ == StreamEnd)
.collect { case StreamMsg(m) => m }
).either
else
readFromWire(client)
} yield response.asInstanceOf[Either[Throwable, A]]
result <- IO.fromEither(response)
} yield result
}

@throws[IOException]
private def writeObject(out: ObjectOutputStream): Unit =
Expand Down
15 changes: 11 additions & 4 deletions actors/src/main/scala/zio/actors/ActorSystem.scala
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import zio.actors.ActorsConfig._
import zio.clock.Clock
import zio.nio.core.{ Buffer, InetAddress, SocketAddress }
import zio.nio.core.channels.{ AsynchronousServerSocketChannel, AsynchronousSocketChannel }
import zio.stream.ZStream

import scala.io.Source

Expand Down Expand Up @@ -173,9 +174,9 @@ final class ActorSystem private[actors] (
} yield actorRef
else
for {
address <- InetAddress
.byName(addr.value)
.flatMap(iAddr => SocketAddress.inetSocketAddress(iAddr, port.value))
address <- InetAddress
.byName(addr.value)
.flatMap(iAddr => SocketAddress.inetSocketAddress(iAddr, port.value))
} yield new ActorRefRemote[F](path, address)
} yield actorRef

Expand Down Expand Up @@ -237,7 +238,9 @@ final class ActorSystem private[actors] (
Any @unchecked
]
) =>
stream.foreach(writeToWire(worker, _))
(stream.map(StreamMsg) ++ ZStream(StreamEnd)).foreach(e =>
writeToWire(worker, e)
)
case _ => writeToWire(worker, response)
}
} yield ()
Expand Down Expand Up @@ -330,3 +333,7 @@ private[actors] object ActorSystemUtils {
_ <- socket.write(Chunk.fromArray(bytes))
} yield ()
}

sealed trait StreamProtocol
case class StreamMsg(obj: Any) extends StreamProtocol
case object StreamEnd extends StreamProtocol
2 changes: 1 addition & 1 deletion actors/src/main/scala/zio/actors/Supervisor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package zio.actors
import zio.clock.Clock
import zio.{ IO, RIO, Schedule, URIO, ZIO }

private[actors] trait Supervisor[-R] {
trait Supervisor[-R] {
def supervise[R0 <: R, A](zio: RIO[R0, A], error: Throwable): ZIO[R0 with Clock, Unit, A]
}

Expand Down
37 changes: 21 additions & 16 deletions actors/src/test/scala/zio/actors/ActorsSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@ package zio.actors

import java.util.concurrent.atomic.AtomicBoolean

import zio.actors.Actor.Stateful
import zio.stream.Stream
import zio.actors.Supervisor
import zio.actors.Actor.ActorResponse._
import zio.actors.Actor.{ ActorResponse, Stateful }
import zio.stream.{ Stream, ZStream }
import zio.{ Chunk, IO, Ref, Schedule, Task, UIO }
import zio.test._
import zio.test.Assertion._
Expand All @@ -13,7 +15,7 @@ object CounterUtils {
case object Reset extends Message[Unit]
case object Increase extends Message[Unit]
case object Get extends Message[Int]
case class IncreaseUpTo(upper: Int) extends Message[Stream[Nothing, Int]]
case class IncreaseUpTo(upper: Int) extends Message[ZStream[Any, Nothing, Int]]
}

object TickUtils {
Expand All @@ -33,16 +35,19 @@ object ActorsSpec extends DefaultRunnableSpec {
import CounterUtils._

val handler: Stateful[Any, Int, Message] = new Stateful[Any, Int, Message] {
override def receive[A](
override def receiveS[A](
state: Int,
msg: Message[A],
context: Context
): UIO[(Int, A)] =
): ActorResponse[Any, Int, A] =
msg match {
case Reset => UIO((0, ()))
case Increase => UIO((state + 1, ()))
case Get => UIO((state, state))
case IncreaseUpTo(upper) => UIO((upper, Stream.fromIterable(state until upper)))
case Reset => oneTime(UIO((0, ())))
case Increase => oneTime(UIO((state + 1, ())))
case Get => oneTime(UIO((state, state)))
case IncreaseUpTo(upper) =>
stream[Any, Int, Nothing, Int](
Stream.fromIterable((state to upper).map(i => (i, i))).asInstanceOf[Stream[Nothing, (Int, Int)]]
) //
}
}

Expand All @@ -55,11 +60,11 @@ object ActorsSpec extends DefaultRunnableSpec {
_ <- actor ! Reset
c2 <- actor ? Get
c3 <- actor ? IncreaseUpTo(20)
vals <- c3.runCollect
vals <- c3.mapM(i => (actor ? Get).map(i2 => (i, i2))).runCollect
c4 <- actor ? Get
} yield assert(c1)(equalTo(2)) &&
assert(c2)(equalTo(0)) &&
assert(vals)(equalTo(Chunk.apply(0 until 20: _*))) &&
assert(vals)(equalTo(Chunk.apply((0 to 20).map(i => (i, i)): _*))) &&
assert(c4)(equalTo(20))
},
testM("Error recovery by retrying") {
Expand Down Expand Up @@ -141,10 +146,10 @@ object ActorsSpec extends DefaultRunnableSpec {
}
for {
system <- ActorSystem("test1")
actor <- system.make("actor1", Supervisor.none, (), handler)
_ <- actor ! Letter
_ <- actor ? Letter
dump <- actor.stop
actor <- system.make("actor1", Supervisor.none, (), handler)
_ <- actor ! Letter
_ <- actor ? Letter
dump <- actor.stop
} yield assert(dump)(
isSubtype[List[_]](anything) &&
hasField[List[_], Int]("size", _.size, equalTo(0))
Expand All @@ -164,7 +169,7 @@ object ActorsSpec extends DefaultRunnableSpec {
}
}
for {
system <- ActorSystem("test5")
system <- ActorSystem("test5")
_ <- system.make("actor1-1", Supervisor.none, (), handler)
actor <- system.select[Message]("zio://[email protected]:0000/actor1-1")
_ <- actor ! Tick
Expand Down
Loading