Skip to content
This repository has been archived by the owner on Jan 13, 2025. It is now read-only.

SocketChannel is never released #205

Open
ubourdon opened this issue May 26, 2020 · 0 comments
Open

SocketChannel is never released #205

ubourdon opened this issue May 26, 2020 · 0 comments

Comments

@ubourdon
Copy link

From the example given in https://zio.github.io/zio-nio/docs/essentials/essentials_sockets#creating-sockets

val server = AsynchronousServerSocketChannel()
  .mapM { socket =>
    for {
      _ <- SocketAddress.inetSocketAddress("127.0.0.1", 1337) >>= socket.bind
      _ <- socket.accept.preallocate.flatMap(_..ensuring(putStrLn("Connection closed")).use(channel => doWork(channel).catchAll(ex => putStrLn(ex.getMessage))).fork).forever.fork
    } yield ()
  }.useForever

def doWork(channel: AsynchronousSocketChannel): ZIO[Console with Clock, Throwable, Unit] = {
  val process =
    for {
      chunk <- channel.read(3)
      str = chunk.toArray.map(_.toChar).mkString
      _ <- putStrLn(s"received: [$str] [${chunk.length}]")
    } yield ()

  process.whenM(channel.isOpen).forever
}

If you run this code and make a curl on it, the socket is never closed by the server.

If i add .ensuring(putStrLn("Connection closed")) before use, i see the log happened in console, but the socket remains not closed, the curl request don't end, & a lsof cmd on curl process & server process, show that the socket stay opened.

The real code i use for my test is :

import zio._
import zio.clock.Clock
import zio.duration._
import zio.nio.core.SocketAddress
import zio.nio.channels._
import zio.stream._
import zio.console._

object Main extends App {
	override def run(args: List[String]): ZIO[zio.ZEnv, Nothing, Int] = {
		server.orDie.as(0)
	}

	val server = AsynchronousServerSocketChannel()
		.mapM { socket =>
			for {
				_ <- SocketAddress.inetSocketAddress("127.0.0.1", 8080) >>= socket.bind
				_ <- socket.accept.preallocate
                                      .flatMap {
                                          _.ensuring(putStrLn("Connection closed"))
                                            .use { channel =>
                                                doWork(channel).catchAll(ex => putStrLn(ex.getMessage))
                                             }
                                             .forkDaemon
                                     }
                                    .forever
			} yield ()
		}
    	.use(_ => putStrLn("coucou"))

	def doWork(channel: AsynchronousSocketChannel): ZIO[Console with Clock, Throwable, Unit] = {
		val readRequest: ZIO[Any, Nothing, String] = ZStream
			.fromEffectOption {                          
				channel.read(1024).orElse(ZIO.fail(None))
			}
			.flattenChunks
    		.transduce(ZTransducer.utf8Decode)
			.run(Sink.foldLeft("")(_ + (_: String)))

		val res = List(
			"HTTP/1.1 200 OK",
			"Location: http://localhost:8080",
			"Content-Type: text/html; charset=UTF-8",
			"Date: Tue, 26 May 2020 17:02:42 GMT",
			""
		).mkString("\n")

		for {
			request <- readRequest
			_ <- putStrLn(request)
			response = s"$res"
			_ <- channel.write(Chunk.fromArray(response.toCharArray.map { _.toByte }))
		} yield ()
	}
}
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant