Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BatchingExecutor BlockableBatch.run: "requirement failed" under concurrent task invocations #1708

Open
jxtps opened this issue Jan 13, 2025 · 3 comments

Comments

@jxtps
Copy link

jxtps commented Jan 13, 2025

I'm seeing the following occasional stack trace in production:

java.lang.IllegalArgumentException: requirement failed
	at scala.Predef$.require(Predef.scala:324)
	at org.apache.pekko.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:105)
	at org.apache.pekko.dispatch.TaskInvocation.run(AbstractDispatcher.scala:59)
	at org.apache.pekko.dispatch.ForkJoinExecutorConfigurator$PekkoForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:57)
	at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:387)
	at java.base/java.util.concurrent.ForkJoinPool.helpJoin(ForkJoinPool.java:2076)
	at java.base/java.util.concurrent.ForkJoinTask.awaitDone(ForkJoinTask.java:423)
	at java.base/java.util.concurrent.ForkJoinTask.invokeAll(ForkJoinTask.java:750)
        ...

The ... is my client code, and it does ForkJoinTask.invokeAll(tasks) with an array of RecursiveActions.

This happens during heavily parallelized calls to my client code. Think a bunch of concurrent web requests all needing to issue parallel sub-tasks.

As this is happening based on live traffic I don't have a neatly packaged up reproducible test case immediately available.

Does pekko have multi-threaded stress tests with tasks being invoked in parallel?

This is in pekko-actor 1.0.3 for Scala 2.13, pekko-actor_2.13-1.0.3.jar

@pjfanning
Copy link
Contributor

pjfanning commented Jan 14, 2025

Other contributors may have a different viewpoint on this but I would not really think of Pekko dispatchers as being for general use - that they are more designed for Pekko specific internal usage.

Are your tasks doing Pekko based work?

Were tasks lost when the requirement exception happened or were they retried?

Have you considered using a Scala ExecutionContext or a Java ForkJoinPool directly? Or a framework like ZIO, cats-effect or another general purpose async job framework?

@jxtps
Copy link
Author

jxtps commented Jan 14, 2025

Yes, I have had to switch to using a custom ForkJoinPool.

But regardless, if you look at the code where this happens ( https://github.com/apache/pekko/blob/main/actor/src/main/scala/org/apache/pekko/dispatch/BatchingExecutor.scala#L104 ):

    override final def run(): Unit = {
      require(_tasksLocal.get eq null)
      _tasksLocal.set(this) // Install ourselves as the current batch
      val firstInvocation = _blockContext.get eq null
      if (firstInvocation) _blockContext.set(BlockContext.current)
      BlockContext.withBlockContext(this) {
        try processBatch(this)
        catch {
          case t: Throwable =>
            resubmitUnbatched()
            throw t
        } finally {
          _tasksLocal.remove()
          if (firstInvocation) _blockContext.remove()
        }
      }
    }

What happens is that require(_tasksLocal.get eq null) fails, which makes it look like there's a race condition happening? And regardless of use, that should be made to be not possible, right?

Or maybe I'm misunderstanding how this is supposed to work, but my code "just" does ForkJoinPool.invokeAll, which should always be legal, no?

@He-Pin
Copy link
Member

He-Pin commented Jan 14, 2025

Do you have a reproducer for this? But the current implementation seems its only valid for internal usage.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants