Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Operator: Concat #85

Open
carmeleve opened this issue May 19, 2021 · 1 comment
Open

Operator: Concat #85

carmeleve opened this issue May 19, 2021 · 1 comment
Labels
needs design Needs some design considerations

Comments

@carmeleve
Copy link

Link to Rx.NET implementation

@bartdesmet bartdesmet added the needs design Needs some design considerations label May 19, 2021
@NickDarvey
Copy link

I needed this operator so I had a go at implementing it, specifically the overload ISubscribable<ISubscribable<'a>> -> ISubscribable<'a> (which isn't present in AsyncRx.NET).

I would have thought this operator existed in Reactor's version of Rx originally, so if there's a better implementation lying around @bartdesmet, I'll happily take that.

Otherwise, here's my attempt. Excuse the fsharp, but if this operator is wanted, I'd like to reimplement in csharp and contribute.

Implementation and tests

You can run this with dotnet fsi this.fsx.

#r "nuget: Reaqtive.Linq, 1.0.0-beta.24"
#r "nuget: Reaqtive.TestingFramework, 1.0.0-beta.24"
#r "nuget: Expecto"

open System
open System.Threading

open Reaqtive

module Subscribable =
  let concat (subscribable : ISubscribable<ISubscribable<'a>>) : ISubscribable<'a> = {
    new SubscribableBase<'a> () with
      override parent.SubscribeCore observer =
        let mutable context = Unchecked.defaultof<IOperatorContext>
        let mutable active = false
        let queue = System.Collections.Concurrent.ConcurrentQueue ()
        let subscriptions = new SerialSubscription ()
        let observe (thisDisposable : IDisposable) subscribe = {
          new IObserver<'a> with
            member i.OnNext value =
              observer.OnNext value
            member i.OnError error =
              observer.OnError error
              thisDisposable.Dispose ()
            member i.OnCompleted () =
              match queue.TryDequeue () with
              | false, _ ->
                // TOASK: lock around these ops?
                active <- false
                //// TOASK: Are we supposed to dispose after completions?
                thisDisposable.Dispose ()
              | true, next ->
                subscribe next
        }
        let rec subscribe (s : ISubscribable<_>) =
          // TOASK: lock around these ops?
          active <- true
          // TODO: Extract this as a type to work around this error and be able to use SubscribeInner
          // > Error FS0491 The member or object constructor 'SubscribeInner' is not accessible. Private members may only be accessed from within the declaring type. Protected members may only be accessed from an extending type and cannot be accessed from inner lambda expressions.	LocalIndex	C:\Users\nickd\source\repos\local-index-prototype\LocalIndex\ext\Reaqtive.Linq.fs	55	Active
          // (It can't be done inside an object expression.)
          // let subscriptions = this.SubscribeInner (s, { new IObserver<_> with ... })
          let subscription = new SingleAssignmentSubscription ()
          subscription.Subscription <- s.Subscribe (observe subscription subscribe)
          SubscriptionInitializeVisitor.Subscribe subscription
          SubscriptionInitializeVisitor.SetContext (subscription, context)
          subscriptions.Subscription <- subscription
          SubscriptionInitializeVisitor.Start subscription
        {
          new HigherOrderInputStatefulOperator<_, 'a>(parent, observer) with
            override _.Name = "rcx:Concat"
            override _.Version = Version (1,0,0,0)

            override this.SetContext ctx =
              base.SetContext ctx
              context <- ctx

            override this.SaveStateCore writer =
              base.SaveStateCore writer
              writer.Write active
              if active then base.SaveInner (subscriptions.Subscription, writer)

            override this.LoadStateCore reader =
              base.LoadStateCore reader
              active <- reader.Read<bool> ()
              if active then 
                let subscription = new SingleAssignmentSubscription ()
                subscription.Subscription <- base.LoadInner (reader, observe subscription subscribe)
                subscriptions.Subscription <- subscription

            override this.OnSubscribe () =
              let subscription = subscribable.Subscribe ({
                new IObserver<ISubscribable<'a>> with
                  member child.OnNext value =
                    if Volatile.Read &active
                    then queue.Enqueue value
                    else subscribe value
                  member child.OnError error =
                    observer.OnError error
                    this.Dispose ()
                  member child.OnCompleted () =
                    observer.OnCompleted ()
              })

              [ subscription; subscriptions ]
        }
    }


module Tests =
  open Expecto
  open type Reaqtive.Testing.ReactiveTest
  open Reaqtive.TestingFramework

  type Env () as this =
    inherit TestBase ()
    do this.TestInitialize ()
    member this.Scheduler = base.Scheduler
    interface IDisposable with
      member this.Dispose () =
          this.TestCleanup ()

  module TestableSubscribable =
    let inline asSubscribable (observable : Mocks.ITestableSubscribable<_>) =
      observable :> ISubscribable<_>


  let tests = testList "Subscribable.concat" [
    test "overlapped" {
      use env = new Env ()

      let xs = env.Scheduler.CreateHotObservable [|
        OnNext (100, 1)
        OnNext (120, 2)
        OnNext (160, 3)
        OnCompleted 220
      |]

      let ys = env.Scheduler.CreateHotObservable [|
        OnNext (140, 4)
        OnNext (240, 5)
        OnCompleted 260
      |]

      let work = env.Scheduler.CreateHotObservable [|
        OnNext(090, TestableSubscribable.asSubscribable xs)
        OnNext(120, TestableSubscribable.asSubscribable ys)
        OnCompleted 400
      |]

      let results = env.Scheduler.Start(
        create = (fun () -> Subscribable.concat work),
        created = 0, subscribed = 10, disposed = 500)

      Expect.sequenceEqual results.Messages [
        OnNext (100, 1)
        OnNext (120, 2)
        OnNext (160, 3)
        OnNext (240, 5)
        OnCompleted 400
      ] "results includes notifications after completion of prior subscribable"

      Expect.sequenceEqual xs.Subscriptions [
        Subscribe (090, 220)
      ] "`xs` should be subscribed when its emitted by outer till its completion"

      Expect.sequenceEqual ys.Subscriptions [
        Subscribe (220, 260)
      ] "`ys` should be subscribed on completion of `xs` till its completion"
    }

    test "non-overlapped" {
      use env = new Env ()

      let work = env.Scheduler.CreateHotObservable [|
        OnNext(090, TestableSubscribable.asSubscribable <| env.Scheduler.CreateHotObservable [|
          OnNext (100, 1)
          OnNext (120, 2)
          OnNext (160, 3)
          OnCompleted 180
        |])
        OnNext(190, TestableSubscribable.asSubscribable <| env.Scheduler.CreateHotObservable [|
          OnNext (200, 4)
          OnNext (220, 5)
          OnCompleted 280
        |])
        OnCompleted 400
      |]

      let results = env.Scheduler.Start(
        create = (fun () -> Subscribable.concat work),
        created = 0, subscribed = 10, disposed = 500)

      Expect.sequenceEqual results.Messages [
        OnNext (100, 1)
        OnNext (120, 2)
        OnNext (160, 3)
        OnNext (200, 4)
        OnNext (220, 5)
        OnCompleted 400
      ] "results includes all notifications"
    }

    test "inner error" {
      use env = new Env ()
      let err = ArgumentException ()
      let work = env.Scheduler.CreateHotObservable [|
        OnNext(100, TestableSubscribable.asSubscribable <| env.Scheduler.CreateHotObservable [|
          OnNext (120, 1)
          OnNext (140, 2)
          OnCompleted 190
        |])
        OnNext(200, TestableSubscribable.asSubscribable <| env.Scheduler.CreateHotObservable [|
          OnNext (220, 3)
          OnError (240, err)
          OnCompleted 290
        |])
        OnNext(300, TestableSubscribable.asSubscribable <| env.Scheduler.CreateHotObservable [|
          OnNext (320, 4)
          OnCompleted 390
        |])
      |]

      let results = env.Scheduler.Start(
        create = (fun () -> Subscribable.concat work),
        created = 0, subscribed = 10, disposed = 500)

      Expect.sequenceEqual results.Messages [
        OnNext (120, 1)
        OnNext (140, 2)
        OnNext (220, 3)
        OnError (240, err)
      ] "includes terminal error"
    }

    test "outer error" {
      use env = new Env ()
      let err = ArgumentException ()
      let work = env.Scheduler.CreateHotObservable [|
        OnNext(100, TestableSubscribable.asSubscribable <| env.Scheduler.CreateHotObservable [|
          OnNext (120, 1)
          OnNext (140, 2)
          OnCompleted 190
        |])
        OnError (200, err)
        OnNext(300, TestableSubscribable.asSubscribable <| env.Scheduler.CreateHotObservable [|
          OnNext (320, 4)
          OnCompleted 390
        |])
      |]

      let results = env.Scheduler.Start(
        create = (fun () -> Subscribable.concat work),
        created = 0, subscribed = 10, disposed = 500)

      Expect.sequenceEqual results.Messages [
        OnNext (120, 1)
        OnNext (140, 2)
        OnError (200, err)
      ] "includes terminal error"
    }

    test "early disposal" {
      use env = new Env ()

      let xs = env.Scheduler.CreateHotObservable [|
        OnNext (120, 1)
        OnNext (140, 2)
        OnCompleted 190
      |]

      let ys = env.Scheduler.CreateHotObservable [|
        OnNext (220, 3)
        OnNext (240, 4)
        OnCompleted 290
      |]

      let work = env.Scheduler.CreateHotObservable [|
        OnNext(100, TestableSubscribable.asSubscribable xs)
        OnNext(200, TestableSubscribable.asSubscribable ys)
        OnCompleted 400
      |]

      let results = env.Scheduler.Start(
        create = (fun () -> Subscribable.concat work),
        created = 0, subscribed = 10, disposed = 230)

      Expect.sequenceEqual results.Messages [
        OnNext (120, 1)
        OnNext (140, 2)
        OnNext (220, 3)
      ] "results includes notifications until disposal"

      Expect.sequenceEqual xs.Subscriptions [
        Subscribe (100, 190)
      ] "`xs` should be subscribed when its emitted by outer till its completion"

      Expect.sequenceEqual ys.Subscriptions [
        Subscribe (200, 230)
      ] "`ys` should be subscribed when its emitted by outer till outer disposal"

      Expect.sequenceEqual work.Subscriptions [
        Subscribe (10, 230)
      ] "`work` should be disposed when the scheduler is disposed"
    }

    test "late subscription" {
      use env = new Env ()

      let xs = env.Scheduler.CreateHotObservable [|
        OnNext (120, 1)
        OnNext (140, 2)
        OnCompleted 190
      |]

      let ys = env.Scheduler.CreateHotObservable [|
        OnNext (220, 3)
        OnNext (240, 4)
        OnCompleted 290
      |]

      let work = env.Scheduler.CreateHotObservable [|
        OnNext (100, TestableSubscribable.asSubscribable xs)
        OnNext (200, TestableSubscribable.asSubscribable ys)
        OnCompleted 400
      |]
    
      let results = env.Scheduler.Start(
        create = (fun () -> Subscribable.concat work),
        created = 0, subscribed = 160, disposed = 500)

      Expect.sequenceEqual results.Messages [
        OnNext (220, 3)
        OnNext (240, 4)
        OnCompleted 400
      ] "results includes notifications after subscription"
    }

    test "concurrent inners" {
      use env = new Env ()

      let xs = env.Scheduler.CreateHotObservable [|
        OnNext (120, 1)
        OnNext (140, 2)
        OnCompleted 190
      |]

      let ys = env.Scheduler.CreateHotObservable [|
        OnNext (120, 3)
        OnNext (140, 4)
        OnCompleted 190
      |]

      let zs = env.Scheduler.CreateHotObservable [|
        OnNext (120, 5)
        OnNext (140, 6)
        OnCompleted 190
      |]

      let work = env.Scheduler.CreateHotObservable [|
        OnNext (100, TestableSubscribable.asSubscribable xs)
        OnNext (100, TestableSubscribable.asSubscribable ys)
        OnNext (100, TestableSubscribable.asSubscribable zs)
        OnCompleted 400
      |]
    
      let results = env.Scheduler.Start(
        create = (fun () -> Subscribable.concat work),
        created = 0, subscribed = 10, disposed = 500)

      Expect.sequenceEqual results.Messages [
        OnNext (120, 1)
        OnNext (140, 2)
        OnCompleted 400
      ] "results includes notifications from first subscribable"
    }

    test "idk tail recursive" {
      use env = new Env ()

      let size = 20_000

      let xs x = env.Scheduler.CreateHotObservable [|
        OnCompleted (x + 1L)
      |]

      let work = env.Scheduler.CreateHotObservable [|
        for i in 1..size do
          OnNext (i, TestableSubscribable.asSubscribable (xs i))
        OnCompleted (int64 <| size + 1)
      |]
    
      let results = env.Scheduler.Start(
        create = (fun () -> Subscribable.concat work),
        created = 0, subscribed = 0, disposed = (int64 <| size + 2))

      Expect.sequenceEqual results.Messages [
        OnCompleted (int64 <| size + 1)
      ] "does not stack overflow"
    }
  ]

Expecto.Tests.runTests Expecto.Tests.defaultConfig Tests.tests

I have two questions for the maintainers before I could contribute this operator:

  1. Are there any more tests I need to validate my operator maintains Reaqtive's invariants?
  2. I'm guessing this, like the Switch operator, will need some locks around some of the operations in callbacks. Are there any tests I can to help guide me here?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
needs design Needs some design considerations
Projects
None yet
Development

No branches or pull requests

3 participants