From 5e8a0ca4d3f1d835358009ac3ae084affd4d19e7 Mon Sep 17 00:00:00 2001 From: Gregorius Soedharmo Date: Thu, 9 Jan 2025 04:38:19 +0700 Subject: [PATCH 1/3] Fix query batch size detection failure --- .../ColorFruitTagger.cs | 55 ++++++ .../Query/Issue502SpecsBase.cs | 187 ++++++++++++++++++ .../TagTable/SqlServerIssue502Specs.cs | 23 +++ .../TagTable/SqlServerQueryThrottleSpecs.cs | 2 +- .../Query/SqlReadJournal.cs | 20 +- 5 files changed, 277 insertions(+), 10 deletions(-) create mode 100644 src/Akka.Persistence.Sql.Tests/ColorFruitTagger.cs create mode 100644 src/Akka.Persistence.Sql.Tests/Query/Issue502SpecsBase.cs create mode 100644 src/Akka.Persistence.Sql.Tests/Query/SqlServer/TagTable/SqlServerIssue502Specs.cs diff --git a/src/Akka.Persistence.Sql.Tests/ColorFruitTagger.cs b/src/Akka.Persistence.Sql.Tests/ColorFruitTagger.cs new file mode 100644 index 00000000..757039b2 --- /dev/null +++ b/src/Akka.Persistence.Sql.Tests/ColorFruitTagger.cs @@ -0,0 +1,55 @@ +// ----------------------------------------------------------------------- +// +// Copyright (C) 2013-2023 .NET Foundation +// +// ----------------------------------------------------------------------- + +using System.Collections.Immutable; +using System.Linq; +using Akka.Persistence.Journal; + +namespace Akka.Persistence.Sql.Tests +{ + public class ColorFruitTagger : IEventAdapter + { + public static IImmutableSet Colors { get; } = ImmutableHashSet.Create("green", "black", "blue"); + public static IImmutableSet Fruits { get; } = ImmutableHashSet.Create("apple", "banana"); + + public string Manifest(object evt) => string.Empty; + + public object ToJournal(object evt) + { + if (evt is not string s) + return evt; + + var colorTags = Colors.Aggregate( + ImmutableHashSet.Empty, + (acc, color) => s.Contains(color) + ? acc.Add(color) + : acc); + var fruitTags = Fruits.Aggregate( + ImmutableHashSet.Empty, + (acc, color) => s.Contains(color) + ? acc.Add(color) + : acc); + var tags = colorTags.Union(fruitTags); + return tags.IsEmpty + ? evt + : new Tagged(evt, tags); + } + + public IEventSequence FromJournal(object evt, string manifest) + { + if (evt is not string s) + return EventSequence.Single(evt); + + if (s.Contains("invalid")) + return EventSequence.Empty; + + if (s.Contains("duplicated")) + return EventSequence.Create(evt + "-1", evt + "-2"); + + return EventSequence.Single(evt); + } + } +} diff --git a/src/Akka.Persistence.Sql.Tests/Query/Issue502SpecsBase.cs b/src/Akka.Persistence.Sql.Tests/Query/Issue502SpecsBase.cs new file mode 100644 index 00000000..1f0d4960 --- /dev/null +++ b/src/Akka.Persistence.Sql.Tests/Query/Issue502SpecsBase.cs @@ -0,0 +1,187 @@ +// ----------------------------------------------------------------------- +// +// Copyright (C) 2013-2023 .NET Foundation +// +// ----------------------------------------------------------------------- + +using System; +using System.Collections.Immutable; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using Akka.Actor; +using Akka.Configuration; +using Akka.Persistence.Journal; +using Akka.Persistence.Query; +using Akka.Persistence.Sql.Config; +using Akka.Persistence.Sql.Db; +using Akka.Persistence.Sql.Journal.Dao; +using Akka.Persistence.Sql.Query; +using Akka.Persistence.Sql.Query.Dao; +using Akka.Persistence.Sql.Tests.Common.Containers; +using Akka.Persistence.TCK; +using Akka.Streams; +using Akka.Streams.TestKit; +using Akka.TestKit; +using Akka.TestKit.Extensions; +using Akka.Util; +using FluentAssertions; +using FluentAssertions.Extensions; +using Xunit; +using Xunit.Abstractions; + +namespace Akka.Persistence.Sql.Tests.Query +{ + public class Issue502SpecsBase : PluginSpec where T : ITestContainer + { + private readonly TestProbe _senderProbe; + private readonly ActorMaterializer _materializer; + + protected Issue502SpecsBase(TagMode tagMode, ITestOutputHelper output, string name, T fixture) + : base(FromConfig(Config(tagMode, fixture)), name, output) + { + // Force start read journal + _ = Journal; + + _senderProbe = CreateTestProbe(); + _materializer = Sys.Materializer(); + } + + protected IActorRef Journal => Extension.JournalFor(null); + + protected SqlReadJournal ReadJournal => PersistenceQuery.Get(Sys).ReadJournalFor("akka.persistence.query.journal.sql"); + + protected override bool SupportsSerialization => true; + + private static Configuration.Config Config(TagMode tagMode, T fixture) + { + if (!fixture.InitializeDbAsync().Wait(10.Seconds())) + throw new Exception("Failed to clean up database in 10 seconds"); + + return ConfigurationFactory.ParseString( + $$""" + akka { + loglevel = INFO + persistence { + journal { + plugin = "akka.persistence.journal.sql" + auto-start-journals = [ "akka.persistence.journal.sql" ] + sql { + event-adapters { + color-tagger = "Akka.Persistence.Sql.Tests.ColorFruitTagger, Akka.Persistence.Sql.Tests" + } + event-adapter-bindings = { + "System.String" = color-tagger + } + provider-name = "{{fixture.ProviderName}}" + tag-write-mode = "{{tagMode}}" + connection-string = "{{fixture.ConnectionString}}" + } + } + query.journal.sql { + provider-name = "{{fixture.ProviderName}}" + connection-string = "{{fixture.ConnectionString}}" + tag-read-mode = "{{tagMode}}" + refresh-interval = 1s + + # what is referred as "batchSize" in code + max-buffer-size = 3 + } + } + } + akka.test.single-expect-default = 10s + """) + .WithFallback(SqlPersistence.DefaultConfiguration); + } + + [Fact (DisplayName = "A full query batch with one element adapted to EventSequence.Empty should still run to the end")] + public async Task MissingSequenceTest() + { + var a = Sys.ActorOf(Query.TestActor.Props("a")); + var b = Sys.ActorOf(Query.TestActor.Props("b")); + + a.Tell("hello"); + await ExpectMsgAsync("hello-done"); + b.Tell("a black car"); + await ExpectMsgAsync("a black car-done"); + a.Tell("something else"); + await ExpectMsgAsync("something else-done"); + a.Tell("a green banana"); + await ExpectMsgAsync("a green banana-done"); + a.Tell("an invalid apple"); // will be missing on query + await ExpectMsgAsync("an invalid apple-done"); + b.Tell("a green leaf"); + await ExpectMsgAsync("a green leaf-done"); + b.Tell("a repeated green leaf"); + await ExpectMsgAsync("a repeated green leaf-done"); + b.Tell("a repeated green leaf"); + await ExpectMsgAsync("a repeated green leaf-done"); + + var reader = ReadJournal; + var probe = reader.CurrentAllEvents(Offset.NoOffset()) + .RunWith(this.SinkProbe(), _materializer); + + await probe.ExpectSubscriptionAsync().ShouldCompleteWithin(1.Seconds()); + await probe.RequestAsync(10); + + await ValidateRepresentation(probe, "a", 1L, "hello"); + await ValidateRepresentation(probe, "b", 1L, "a black car"); + await ValidateRepresentation(probe, "a", 2L, "something else"); + await ValidateRepresentation(probe, "a", 3L, "a green banana"); + await ValidateRepresentation(probe, "b", 2L, "a green leaf"); + await ValidateRepresentation(probe, "b", 3L, "a repeated green leaf"); + await ValidateRepresentation(probe, "b", 4L, "a repeated green leaf"); + await probe.ExpectCompleteAsync(); + } + + [Fact (DisplayName = "A full query batch with one element adapted to EventSequence with multiple entries should still run to the end")] + public async Task DuplicatedSequenceTest() + { + var a = Sys.ActorOf(Query.TestActor.Props("a")); + var b = Sys.ActorOf(Query.TestActor.Props("b")); + + a.Tell("hello"); + await ExpectMsgAsync("hello-done"); + b.Tell("a black car"); + await ExpectMsgAsync("a black car-done"); + a.Tell("something else"); + await ExpectMsgAsync("something else-done"); + a.Tell("a green banana"); + await ExpectMsgAsync("a green banana-done"); + a.Tell("a duplicated apple"); // will emit 2 events + await ExpectMsgAsync("a duplicated apple-done"); + b.Tell("a green leaf"); + await ExpectMsgAsync("a green leaf-done"); + b.Tell("a repeated green leaf"); + await ExpectMsgAsync("a repeated green leaf-done"); + b.Tell("a repeated green leaf"); + await ExpectMsgAsync("a repeated green leaf-done"); + + var reader = ReadJournal; + var probe = reader.CurrentAllEvents(Offset.NoOffset()) + .RunWith(this.SinkProbe(), _materializer); + + await probe.ExpectSubscriptionAsync().ShouldCompleteWithin(1.Seconds()); + await probe.RequestAsync(10); + + await ValidateRepresentation(probe, "a", 1L, "hello"); + await ValidateRepresentation(probe, "b", 1L, "a black car"); + await ValidateRepresentation(probe, "a", 2L, "something else"); + await ValidateRepresentation(probe, "a", 3L, "a green banana"); + await ValidateRepresentation(probe, "a", 4L, "a duplicated apple-1"); + await ValidateRepresentation(probe, "a", 4L, "a duplicated apple-2"); + await ValidateRepresentation(probe, "b", 2L, "a green leaf"); + await ValidateRepresentation(probe, "b", 3L, "a repeated green leaf"); + await ValidateRepresentation(probe, "b", 4L, "a repeated green leaf"); + await probe.ExpectCompleteAsync(); + } + + private static async Task ValidateRepresentation(TestSubscriber.Probe p, string persistenceId, long sequenceNr, object payload) + { + var next = await p.ExpectNextAsync(3.Seconds()); + next.PersistenceId.Should().Be(persistenceId); + next.SequenceNr.Should().Be(sequenceNr); + next.Event.Should().Be(payload); + } + } +} diff --git a/src/Akka.Persistence.Sql.Tests/Query/SqlServer/TagTable/SqlServerIssue502Specs.cs b/src/Akka.Persistence.Sql.Tests/Query/SqlServer/TagTable/SqlServerIssue502Specs.cs new file mode 100644 index 00000000..046f7779 --- /dev/null +++ b/src/Akka.Persistence.Sql.Tests/Query/SqlServer/TagTable/SqlServerIssue502Specs.cs @@ -0,0 +1,23 @@ +// ----------------------------------------------------------------------- +// +// Copyright (C) 2013-2023 .NET Foundation +// +// ----------------------------------------------------------------------- + +using Akka.Persistence.Sql.Config; +using Akka.Persistence.Sql.Tests.Common.Containers; +using Akka.Persistence.Sql.Tests.SqlServer; +using Xunit; +using Xunit.Abstractions; + +namespace Akka.Persistence.Sql.Tests.Query.SqlServer.TagTable +{ + [Collection(nameof(SqlServerPersistenceSpec))] + public class SqlServerIssue502Specs: Issue502SpecsBase + { + public SqlServerIssue502Specs(ITestOutputHelper output, SqlServerContainer fixture) + : base(TagMode.TagTable, output, nameof(SqlServerQueryThrottleSpecs), fixture) + { + } + } +} diff --git a/src/Akka.Persistence.Sql.Tests/Query/SqlServer/TagTable/SqlServerQueryThrottleSpecs.cs b/src/Akka.Persistence.Sql.Tests/Query/SqlServer/TagTable/SqlServerQueryThrottleSpecs.cs index ca52f50b..6542040e 100644 --- a/src/Akka.Persistence.Sql.Tests/Query/SqlServer/TagTable/SqlServerQueryThrottleSpecs.cs +++ b/src/Akka.Persistence.Sql.Tests/Query/SqlServer/TagTable/SqlServerQueryThrottleSpecs.cs @@ -16,7 +16,7 @@ namespace Akka.Persistence.Sql.Tests.Query.SqlServer.TagTable; public class SqlServerQueryThrottleSpecs: QueryThrottleSpecsBase { public SqlServerQueryThrottleSpecs(ITestOutputHelper output, SqlServerContainer fixture) - : base(TagMode.TagTable, output, nameof(Csv.SqlServerAllEventsSpec), fixture) + : base(TagMode.TagTable, output, nameof(SqlServerQueryThrottleSpecs), fixture) { } } diff --git a/src/Akka.Persistence.Sql/Query/SqlReadJournal.cs b/src/Akka.Persistence.Sql/Query/SqlReadJournal.cs index 6387e20f..6f353791 100644 --- a/src/Akka.Persistence.Sql/Query/SqlReadJournal.cs +++ b/src/Akka.Persistence.Sql/Query/SqlReadJournal.cs @@ -219,15 +219,15 @@ private Source EventsByPersistenceIdSource( timestamp: r.representation.Timestamp, tags: r.tags)); - private Source CurrentJournalEvents(long offset, long max, MaxOrderingId latestOrdering) + private Source CurrentJournalEvents(long offset, long max, MaxOrderingId latestOrdering) { if (latestOrdering.Max < offset) - return Source.Empty(); + return Source.Empty(); return _readJournalDao .Events(offset, latestOrdering.Max, max) .SelectAsync(1, r => Task.FromResult(r.Get())) - .SelectMany( + .Select( a => { var (representation, tags, ordering) = a; @@ -240,7 +240,8 @@ private Source CurrentJournalEvents(long offset, long ma sequenceNr: r.SequenceNr, @event: r.Payload, timestamp: r.Timestamp, - tags: tags)); + tags: tags)) + .ToArray(); }); } @@ -367,9 +368,10 @@ private Source Events(long offset, long? terminateAfterO askTimeout); var xs = await CurrentJournalEvents(uf.offset, batchSize, queryUntil) - .RunWith(Sink.Seq(), _mat); + .RunWith(Sink.Seq(), _mat); var hasMoreEvents = xs.Count == batchSize; + var envelopes = xs.SelectMany(x => x).ToImmutableList(); var nextControl = FlowControlEnum.Unknown; if (terminateAfterOffset.HasValue) @@ -377,7 +379,7 @@ private Source Events(long offset, long? terminateAfterO if (!hasMoreEvents && terminateAfterOffset.Value <= queryUntil.Max) nextControl = FlowControlEnum.Stop; - if (xs.Exists(r => r.Offset is Sequence s && s.Value >= terminateAfterOffset.Value)) + if (envelopes.Exists(r => r.Offset is Sequence s && s.Value >= terminateAfterOffset.Value)) nextControl = FlowControlEnum.Stop; } @@ -388,13 +390,13 @@ private Source Events(long offset, long? terminateAfterO : FlowControlEnum.ContinueDelayed; } - var nextStartingOffset = xs.Count == 0 + var nextStartingOffset = envelopes.Count == 0 ? Math.Max(uf.offset, queryUntil.Max) - : xs.Select(r => r.Offset as Sequence) + : envelopes.Select(r => r.Offset as Sequence) .Max(t => t?.Value ?? long.MinValue); return Option<((long nextStartingOffset, FlowControlEnum nextControl), IImmutableListxs)>.Create( - ((nextStartingOffset, nextControl), xs)); + ((nextStartingOffset, nextControl), envelopes)); } return uf.flowControl switch From 27b1a0621c8d8d28237c2a1787ba63a6914ac2a3 Mon Sep 17 00:00:00 2001 From: Gregorius Soedharmo Date: Thu, 9 Jan 2025 04:47:08 +0700 Subject: [PATCH 2/3] Revert bad code --- src/Akka.Persistence.Sql/Query/SqlReadJournal.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Akka.Persistence.Sql/Query/SqlReadJournal.cs b/src/Akka.Persistence.Sql/Query/SqlReadJournal.cs index 6f353791..7fc97e18 100644 --- a/src/Akka.Persistence.Sql/Query/SqlReadJournal.cs +++ b/src/Akka.Persistence.Sql/Query/SqlReadJournal.cs @@ -390,7 +390,7 @@ private Source Events(long offset, long? terminateAfterO : FlowControlEnum.ContinueDelayed; } - var nextStartingOffset = envelopes.Count == 0 + var nextStartingOffset = xs.Count == 0 ? Math.Max(uf.offset, queryUntil.Max) : envelopes.Select(r => r.Offset as Sequence) .Max(t => t?.Value ?? long.MinValue); From 4ece6d0e9ef4060d5b96e0f62e6a96c72ca2cbc4 Mon Sep 17 00:00:00 2001 From: Gregorius Soedharmo Date: Wed, 15 Jan 2025 22:01:30 +0700 Subject: [PATCH 3/3] Fix missing abstract keyword --- src/Akka.Persistence.Sql.Tests/Query/Issue502SpecsBase.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Akka.Persistence.Sql.Tests/Query/Issue502SpecsBase.cs b/src/Akka.Persistence.Sql.Tests/Query/Issue502SpecsBase.cs index 1f0d4960..7256d473 100644 --- a/src/Akka.Persistence.Sql.Tests/Query/Issue502SpecsBase.cs +++ b/src/Akka.Persistence.Sql.Tests/Query/Issue502SpecsBase.cs @@ -32,7 +32,7 @@ namespace Akka.Persistence.Sql.Tests.Query { - public class Issue502SpecsBase : PluginSpec where T : ITestContainer + public abstract class Issue502SpecsBase : PluginSpec where T : ITestContainer { private readonly TestProbe _senderProbe; private readonly ActorMaterializer _materializer;