diff --git a/actors/src/main/scala/zio/actors/Actor.scala b/actors/src/main/scala/zio/actors/Actor.scala index e2a5d70f..4a9d1fd9 100644 --- a/actors/src/main/scala/zio/actors/Actor.scala +++ b/actors/src/main/scala/zio/actors/Actor.scala @@ -2,11 +2,59 @@ package zio.actors import zio.actors.Actor.PendingMessage import zio.clock.Clock +import zio.stream.ZStream import zio.{ Supervisor => _, _ } +import scala.language.implicitConversions + object Actor { private[actors] type PendingMessage[F[_], A] = (F[A], Promise[Throwable, A]) + trait ActorResponse[R, S, +A] { + type ResponseType + + def materialize[AA >: A]( + state: Ref[S], + promise: Promise[Throwable, AA], + supervisor: Supervisor[R] + ): URIO[R with Clock, Unit] + } + + object ActorResponse { + implicit def oneTime[R, S, A](response: RIO[R, (S, A)]): ActorResponse[R, S, A] = + new ActorResponse[R, S, A] { + override type ResponseType = RIO[R, (S, A)] + + override def materialize[AA >: A]( + state: Ref[S], + promise: Promise[Throwable, AA], + supervisor: Supervisor[R] + ): URIO[R with Clock, Unit] = { + val completer = ((s: S, a: A) => (state.set(s) *> promise.succeed(a)).ignore).tupled + response.foldM( + e => + supervisor + .supervise[R, (S, A)](response, e) + .foldM(_ => promise.fail(e).ignore, completer), + completer + ) + } + } + implicit def stream[R, S, E, I](response: ZStream[R, E, (S, I)]): ActorResponse[R, S, ZStream[R, E, I]] = + new ActorResponse[R, S, ZStream[R, E, I]] { + override type ResponseType = ZStream[R, E, (S, I)] + + override def materialize[AA >: ZStream[R, E, I]]( + state: Ref[S], + promise: Promise[Throwable, AA], + supervisor: Supervisor[R] + ): URIO[R with Clock, Unit] = { + // TODO: Supervision + val outputStream = response.mapM { case (s, a) => state.set(s) *> UIO(a) } + promise.succeed(outputStream).ignore + } + } + } /** * Description of actor behavior (can act as FSM) @@ -26,7 +74,9 @@ object Actor { * @tparam A - domain of return entities * @return effectful result */ - def receive[A](state: S, msg: F[A], context: Context): RIO[R, (S, A)] + def receive[A](state: S, msg: F[A], context: Context): RIO[R, (S, A)] = ??? + def receiveS[A](state: S, msg: F[A], context: Context): ActorResponse[R, S, A] = + ActorResponse.oneTime(receive(state, msg, context)) /* INTERNAL API */ @@ -41,15 +91,8 @@ object Actor { for { s <- state.get (fa, promise) = msg - receiver = receive(s, fa, context) - completer = ((s: S, a: A) => state.set(s) *> promise.succeed(a)).tupled - _ <- receiver.foldM( - e => - supervisor - .supervise(receiver, e) - .foldM(_ => promise.fail(e), completer), - completer - ) + receiver = receiveS(s, fa, context) + _ <- receiver.materialize(state, promise, supervisor) } yield () for { diff --git a/actors/src/main/scala/zio/actors/ActorRef.scala b/actors/src/main/scala/zio/actors/ActorRef.scala index f5288642..694d6a4f 100644 --- a/actors/src/main/scala/zio/actors/ActorRef.scala +++ b/actors/src/main/scala/zio/actors/ActorRef.scala @@ -4,7 +4,10 @@ import java.io.{ IOException, ObjectInputStream, ObjectOutputStream, ObjectStrea import zio.nio.core.{ InetAddress, InetSocketAddress, SocketAddress } import zio.nio.core.channels.AsynchronousSocketChannel -import zio.{ IO, Runtime, Task, UIO } +import zio.stream.ZStream +import zio.{ IO, Runtime, Schedule, Task, UIO } + +import scala.reflect.ClassTag /** * Reference to actor that might reside on local JVM instance or be available via remote communication @@ -21,7 +24,7 @@ sealed trait ActorRef[-F[+_]] extends Serializable { * @tparam A return type * @return effectful response */ - def ?[A](fa: F[A]): Task[A] + def ?[A: ClassTag](fa: F[A]): Task[A] /** * Send message to an actor as `fire-and-forget` - @@ -86,7 +89,7 @@ private[actors] final class ActorRefLocal[-F[+_]]( private val actorName: String, actor: Actor[F] ) extends ActorRefSerial[F](actorName) { - override def ?[A](fa: F[A]): Task[A] = actor ? fa + override def ?[A: ClassTag](fa: F[A]): Task[A] = actor ? fa override def !(fa: F[_]): Task[Unit] = actor ! fa @@ -111,23 +114,34 @@ private[actors] final class ActorRefRemote[-F[+_]]( ) extends ActorRefSerial[F](actorName) { import ActorSystemUtils._ - override def ?[A](fa: F[A]): Task[A] = sendEnvelope(Command.Ask(fa)) + override def ?[A: ClassTag](fa: F[A]): Task[A] = sendEnvelope(Command.Ask(fa)) override def !(fa: F[_]): Task[Unit] = sendEnvelope[Unit](Command.Tell(fa)) override val stop: Task[List[_]] = sendEnvelope(Command.Stop) - private def sendEnvelope[A](command: Command): Task[A] = + private def sendEnvelope[A](command: Command)(implicit classTag: ClassTag[A]): Task[A] = { + val isZStream = classTag.runtimeClass.isAssignableFrom(classOf[ZStream[_, _, _]]) for { client <- AsynchronousSocketChannel() response <- for { _ <- client.connect(address) actorPath <- path _ <- writeToWire(client, new Envelope(command, actorPath)) - response <- readFromWire(client) + response <- if (isZStream) + Task( + ZStream + .fromEffect(readFromWire(client)) + .repeat(Schedule.forever) + .takeUntil(_ == StreamEnd) + .collect { case StreamMsg(m) => m } + ).either + else + readFromWire(client) } yield response.asInstanceOf[Either[Throwable, A]] result <- IO.fromEither(response) } yield result + } @throws[IOException] private def writeObject(out: ObjectOutputStream): Unit = diff --git a/actors/src/main/scala/zio/actors/ActorSystem.scala b/actors/src/main/scala/zio/actors/ActorSystem.scala index ab218bb0..b1625067 100644 --- a/actors/src/main/scala/zio/actors/ActorSystem.scala +++ b/actors/src/main/scala/zio/actors/ActorSystem.scala @@ -10,6 +10,7 @@ import zio.actors.ActorsConfig._ import zio.clock.Clock import zio.nio.core.{ Buffer, InetAddress, SocketAddress } import zio.nio.core.channels.{ AsynchronousServerSocketChannel, AsynchronousSocketChannel } +import zio.stream.ZStream import scala.io.Source @@ -237,7 +238,9 @@ final class ActorSystem private[actors] ( Any @unchecked ] ) => - stream.foreach(writeToWire(worker, _)) + (stream.map(StreamMsg) ++ ZStream(StreamEnd)).foreach(e => + writeToWire(worker, e) + ) case _ => writeToWire(worker, response) } } yield () @@ -330,3 +333,7 @@ private[actors] object ActorSystemUtils { _ <- socket.write(Chunk.fromArray(bytes)) } yield () } + +sealed trait StreamProtocol +case class StreamMsg(obj: Any) extends StreamProtocol +case object StreamEnd extends StreamProtocol diff --git a/actors/src/main/scala/zio/actors/Supervisor.scala b/actors/src/main/scala/zio/actors/Supervisor.scala index cd94ddef..8e22cea1 100644 --- a/actors/src/main/scala/zio/actors/Supervisor.scala +++ b/actors/src/main/scala/zio/actors/Supervisor.scala @@ -3,7 +3,7 @@ package zio.actors import zio.clock.Clock import zio.{ IO, RIO, Schedule, URIO, ZIO } -private[actors] trait Supervisor[-R] { +trait Supervisor[-R] { def supervise[R0 <: R, A](zio: RIO[R0, A], error: Throwable): ZIO[R0 with Clock, Unit, A] } diff --git a/actors/src/test/scala/zio/actors/ActorsSpec.scala b/actors/src/test/scala/zio/actors/ActorsSpec.scala index 72f46ce8..7ae04874 100644 --- a/actors/src/test/scala/zio/actors/ActorsSpec.scala +++ b/actors/src/test/scala/zio/actors/ActorsSpec.scala @@ -2,8 +2,8 @@ package zio.actors import java.util.concurrent.atomic.AtomicBoolean -import zio.actors.Actor.Stateful -import zio.stream.Stream +import zio.actors.Actor.{ ActorResponse, Stateful } +import zio.stream.{ Stream, ZStream } import zio.{ Chunk, IO, Ref, Schedule, Task, UIO } import zio.test._ import zio.test.Assertion._ @@ -13,7 +13,7 @@ object CounterUtils { case object Reset extends Message[Unit] case object Increase extends Message[Unit] case object Get extends Message[Int] - case class IncreaseUpTo(upper: Int) extends Message[Stream[Nothing, Int]] + case class IncreaseUpTo(upper: Int) extends Message[ZStream[Any, Nothing, Int]] } object TickUtils { @@ -33,16 +33,17 @@ object ActorsSpec extends DefaultRunnableSpec { import CounterUtils._ val handler: Stateful[Any, Int, Message] = new Stateful[Any, Int, Message] { - override def receive[A]( + override def receiveS[A]( state: Int, msg: Message[A], context: Context - ): UIO[(Int, A)] = + ): ActorResponse[Any, Int, A] = msg match { case Reset => UIO((0, ())) case Increase => UIO((state + 1, ())) case Get => UIO((state, state)) - case IncreaseUpTo(upper) => UIO((upper, Stream.fromIterable(state until upper))) + case IncreaseUpTo(upper) => + Stream.fromIterable((state to upper).map(i => (i, i))) } } @@ -55,11 +56,11 @@ object ActorsSpec extends DefaultRunnableSpec { _ <- actor ! Reset c2 <- actor ? Get c3 <- actor ? IncreaseUpTo(20) - vals <- c3.runCollect + vals <- c3.mapM(i => (actor ? Get).map(i2 => (i, i2))).runCollect c4 <- actor ? Get } yield assert(c1)(equalTo(2)) && assert(c2)(equalTo(0)) && - assert(vals)(equalTo(Chunk.apply(0 until 20: _*))) && + assert(vals)(equalTo(Chunk.apply((0 to 20).map(i => (i, i)): _*))) && assert(c4)(equalTo(20)) }, testM("Error recovery by retrying") { diff --git a/actors/src/test/scala/zio/actors/RemoteSpec.scala b/actors/src/test/scala/zio/actors/RemoteSpec.scala index c21026fe..6e3c6c85 100644 --- a/actors/src/test/scala/zio/actors/RemoteSpec.scala +++ b/actors/src/test/scala/zio/actors/RemoteSpec.scala @@ -3,31 +3,34 @@ package zio.actors import java.io.File import java.net.ConnectException -import zio.actors.Actor.Stateful -import zio.{ clock, console, IO } -import zio.test.DefaultRunnableSpec -import zio.test._ -import zio.test.Assertion._ +import zio.actors.Actor.{ ActorResponse, Stateful } +import zio.actors.SpecUtils._ import zio.duration._ +import zio.stream.ZStream +import zio.test.Assertion._ import zio.test.environment.TestConsole -import SpecUtils._ +import zio.test.{ DefaultRunnableSpec, _ } +import zio.{ clock, console, Chunk, IO } object SpecUtils { sealed trait Message[+A] - case class Str(value: String) extends Message[String] + case class Str(value: String) extends Message[String] + case class Strs(values: Seq[String]) extends Message[ZStream[Any, Nothing, String]] sealed trait MyErrorDomain extends Throwable case object DomainError extends MyErrorDomain val handlerMessageTrait = new Stateful[Any, Int, Message] { - override def receive[A]( + override def receiveS[A]( state: Int, msg: Message[A], context: Context - ): IO[MyErrorDomain, (Int, A)] = + ): ActorResponse[Any, Int, A] = msg match { - case Str(value) => + case Str(value) => IO.effectTotal((state + 1, value + "received plus " + state + 1)) + case Strs(values) => + ZStream(values: _*).zipWithIndex.map { case (v, i) => (state + i.toInt, v) } } } @@ -87,6 +90,8 @@ object RemoteSpec extends DefaultRunnableSpec { suite("RemoteSpec")( suite("Remote communication suite")( testM("Remote test send message") { + val messages = (1 until 10).map(i => s"ZIO-Actor response... $i").toList + val messages2 = (11 until 20).map(i => s"ZIO-Actor response... $i").toList for { actorSystemOne <- ActorSystem("testSystem11", configFile) _ <- actorSystemOne.make("actorOne", Supervisor.none, 0, handlerMessageTrait) @@ -95,7 +100,14 @@ object RemoteSpec extends DefaultRunnableSpec { "zio://testSystem11@127.0.0.1:9665/actorOne" ) result <- actorRef ? Str("ZIO-Actor response... ") - } yield assert(result)(equalTo("ZIO-Actor response... received plus 01")) + resultStream <- actorRef ? Strs(messages) + resultStream2 <- actorRef ? Strs(messages2) + streamVals2 <- resultStream2.runCollect + streamVals <- resultStream.runCollect + } yield assert(result)(equalTo("ZIO-Actor response... received plus 01")) && + assert(streamVals)(equalTo(Chunk(messages: _*))) && + assert(streamVals2)(equalTo(Chunk(messages2: _*))) + }, testM("ActorRef serialization case") { for { @@ -111,7 +123,7 @@ object RemoteSpec extends DefaultRunnableSpec { _ <- one ! GameInit(remoteActor) - _ <- clock.sleep(2.seconds) + _ <- clock.sleep(3.seconds) outputVector <- TestConsole.output } yield assert(outputVector.size)(equalTo(3)) &&