Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix query batch size detection failure #505

Open
wants to merge 3 commits into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 55 additions & 0 deletions src/Akka.Persistence.Sql.Tests/ColorFruitTagger.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
// -----------------------------------------------------------------------
// <copyright file="ColorFruitTagger.cs" company="Akka.NET Project">
// Copyright (C) 2013-2023 .NET Foundation <https://github.com/akkadotnet/akka.net>
// </copyright>
// -----------------------------------------------------------------------

using System.Collections.Immutable;
using System.Linq;
using Akka.Persistence.Journal;

namespace Akka.Persistence.Sql.Tests
{
public class ColorFruitTagger : IEventAdapter
{
public static IImmutableSet<string> Colors { get; } = ImmutableHashSet.Create("green", "black", "blue");
public static IImmutableSet<string> Fruits { get; } = ImmutableHashSet.Create("apple", "banana");

public string Manifest(object evt) => string.Empty;

public object ToJournal(object evt)
{
if (evt is not string s)
return evt;

var colorTags = Colors.Aggregate(
ImmutableHashSet<string>.Empty,
(acc, color) => s.Contains(color)
? acc.Add(color)
: acc);
var fruitTags = Fruits.Aggregate(
ImmutableHashSet<string>.Empty,
(acc, color) => s.Contains(color)
? acc.Add(color)
: acc);
var tags = colorTags.Union(fruitTags);
return tags.IsEmpty
? evt
: new Tagged(evt, tags);
}

public IEventSequence FromJournal(object evt, string manifest)
{
if (evt is not string s)
return EventSequence.Single(evt);

if (s.Contains("invalid"))
return EventSequence.Empty;

if (s.Contains("duplicated"))
return EventSequence.Create(evt + "-1", evt + "-2");

return EventSequence.Single(evt);
}
}
}
187 changes: 187 additions & 0 deletions src/Akka.Persistence.Sql.Tests/Query/Issue502SpecsBase.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
// -----------------------------------------------------------------------
// <copyright file="Issue502SpecsBase.cs" company="Akka.NET Project">
// Copyright (C) 2013-2023 .NET Foundation <https://github.com/akkadotnet/akka.net>
// </copyright>
// -----------------------------------------------------------------------

using System;
using System.Collections.Immutable;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Akka.Actor;
using Akka.Configuration;
using Akka.Persistence.Journal;
using Akka.Persistence.Query;
using Akka.Persistence.Sql.Config;
using Akka.Persistence.Sql.Db;
using Akka.Persistence.Sql.Journal.Dao;
using Akka.Persistence.Sql.Query;
using Akka.Persistence.Sql.Query.Dao;
using Akka.Persistence.Sql.Tests.Common.Containers;
using Akka.Persistence.TCK;
using Akka.Streams;
using Akka.Streams.TestKit;
using Akka.TestKit;
using Akka.TestKit.Extensions;
using Akka.Util;
using FluentAssertions;
using FluentAssertions.Extensions;
using Xunit;
using Xunit.Abstractions;

namespace Akka.Persistence.Sql.Tests.Query
{
public class Issue502SpecsBase<T> : PluginSpec where T : ITestContainer
{
private readonly TestProbe _senderProbe;
private readonly ActorMaterializer _materializer;

protected Issue502SpecsBase(TagMode tagMode, ITestOutputHelper output, string name, T fixture)
: base(FromConfig(Config(tagMode, fixture)), name, output)
{
// Force start read journal
_ = Journal;

_senderProbe = CreateTestProbe();
_materializer = Sys.Materializer();
}

protected IActorRef Journal => Extension.JournalFor(null);

protected SqlReadJournal ReadJournal => PersistenceQuery.Get(Sys).ReadJournalFor<SqlReadJournal>("akka.persistence.query.journal.sql");

protected override bool SupportsSerialization => true;

private static Configuration.Config Config(TagMode tagMode, T fixture)
{
if (!fixture.InitializeDbAsync().Wait(10.Seconds()))
throw new Exception("Failed to clean up database in 10 seconds");

return ConfigurationFactory.ParseString(
$$"""
akka {
loglevel = INFO
persistence {
journal {
plugin = "akka.persistence.journal.sql"
auto-start-journals = [ "akka.persistence.journal.sql" ]
sql {
event-adapters {
color-tagger = "Akka.Persistence.Sql.Tests.ColorFruitTagger, Akka.Persistence.Sql.Tests"
}
event-adapter-bindings = {
"System.String" = color-tagger
}
provider-name = "{{fixture.ProviderName}}"
tag-write-mode = "{{tagMode}}"
connection-string = "{{fixture.ConnectionString}}"
}
}
query.journal.sql {
provider-name = "{{fixture.ProviderName}}"
connection-string = "{{fixture.ConnectionString}}"
tag-read-mode = "{{tagMode}}"
refresh-interval = 1s

# what is referred as "batchSize" in code
max-buffer-size = 3
}
}
}
akka.test.single-expect-default = 10s
""")
.WithFallback(SqlPersistence.DefaultConfiguration);
}

[Fact (DisplayName = "A full query batch with one element adapted to EventSequence.Empty should still run to the end")]
public async Task MissingSequenceTest()
{
var a = Sys.ActorOf(Query.TestActor.Props("a"));
var b = Sys.ActorOf(Query.TestActor.Props("b"));

a.Tell("hello");
await ExpectMsgAsync("hello-done");
b.Tell("a black car");
await ExpectMsgAsync("a black car-done");
a.Tell("something else");
await ExpectMsgAsync("something else-done");
a.Tell("a green banana");
await ExpectMsgAsync("a green banana-done");
a.Tell("an invalid apple"); // will be missing on query
await ExpectMsgAsync("an invalid apple-done");
b.Tell("a green leaf");
await ExpectMsgAsync("a green leaf-done");
b.Tell("a repeated green leaf");
await ExpectMsgAsync("a repeated green leaf-done");
b.Tell("a repeated green leaf");
await ExpectMsgAsync("a repeated green leaf-done");

var reader = ReadJournal;
var probe = reader.CurrentAllEvents(Offset.NoOffset())
.RunWith(this.SinkProbe<EventEnvelope>(), _materializer);

await probe.ExpectSubscriptionAsync().ShouldCompleteWithin(1.Seconds());
await probe.RequestAsync(10);

await ValidateRepresentation(probe, "a", 1L, "hello");
await ValidateRepresentation(probe, "b", 1L, "a black car");
await ValidateRepresentation(probe, "a", 2L, "something else");
await ValidateRepresentation(probe, "a", 3L, "a green banana");
await ValidateRepresentation(probe, "b", 2L, "a green leaf");
await ValidateRepresentation(probe, "b", 3L, "a repeated green leaf");
await ValidateRepresentation(probe, "b", 4L, "a repeated green leaf");
await probe.ExpectCompleteAsync();
}

[Fact (DisplayName = "A full query batch with one element adapted to EventSequence with multiple entries should still run to the end")]
public async Task DuplicatedSequenceTest()
{
var a = Sys.ActorOf(Query.TestActor.Props("a"));
var b = Sys.ActorOf(Query.TestActor.Props("b"));

a.Tell("hello");
await ExpectMsgAsync("hello-done");
b.Tell("a black car");
await ExpectMsgAsync("a black car-done");
a.Tell("something else");
await ExpectMsgAsync("something else-done");
a.Tell("a green banana");
await ExpectMsgAsync("a green banana-done");
a.Tell("a duplicated apple"); // will emit 2 events
await ExpectMsgAsync("a duplicated apple-done");
b.Tell("a green leaf");
await ExpectMsgAsync("a green leaf-done");
b.Tell("a repeated green leaf");
await ExpectMsgAsync("a repeated green leaf-done");
b.Tell("a repeated green leaf");
await ExpectMsgAsync("a repeated green leaf-done");

var reader = ReadJournal;
var probe = reader.CurrentAllEvents(Offset.NoOffset())
.RunWith(this.SinkProbe<EventEnvelope>(), _materializer);

await probe.ExpectSubscriptionAsync().ShouldCompleteWithin(1.Seconds());
await probe.RequestAsync(10);

await ValidateRepresentation(probe, "a", 1L, "hello");
await ValidateRepresentation(probe, "b", 1L, "a black car");
await ValidateRepresentation(probe, "a", 2L, "something else");
await ValidateRepresentation(probe, "a", 3L, "a green banana");
await ValidateRepresentation(probe, "a", 4L, "a duplicated apple-1");
await ValidateRepresentation(probe, "a", 4L, "a duplicated apple-2");
await ValidateRepresentation(probe, "b", 2L, "a green leaf");
await ValidateRepresentation(probe, "b", 3L, "a repeated green leaf");
await ValidateRepresentation(probe, "b", 4L, "a repeated green leaf");
await probe.ExpectCompleteAsync();
}

private static async Task ValidateRepresentation(TestSubscriber.Probe<EventEnvelope> p, string persistenceId, long sequenceNr, object payload)
{
var next = await p.ExpectNextAsync(3.Seconds());
next.PersistenceId.Should().Be(persistenceId);
next.SequenceNr.Should().Be(sequenceNr);
next.Event.Should().Be(payload);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
// -----------------------------------------------------------------------
// <copyright file="SqlServerIssue502Specs.cs" company="Akka.NET Project">
// Copyright (C) 2013-2023 .NET Foundation <https://github.com/akkadotnet/akka.net>
// </copyright>
// -----------------------------------------------------------------------

using Akka.Persistence.Sql.Config;
using Akka.Persistence.Sql.Tests.Common.Containers;
using Akka.Persistence.Sql.Tests.SqlServer;
using Xunit;
using Xunit.Abstractions;

namespace Akka.Persistence.Sql.Tests.Query.SqlServer.TagTable
{
[Collection(nameof(SqlServerPersistenceSpec))]
public class SqlServerIssue502Specs: Issue502SpecsBase<SqlServerContainer>
{
public SqlServerIssue502Specs(ITestOutputHelper output, SqlServerContainer fixture)
: base(TagMode.TagTable, output, nameof(SqlServerQueryThrottleSpecs), fixture)
{
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ namespace Akka.Persistence.Sql.Tests.Query.SqlServer.TagTable;
public class SqlServerQueryThrottleSpecs: QueryThrottleSpecsBase<SqlServerContainer>
{
public SqlServerQueryThrottleSpecs(ITestOutputHelper output, SqlServerContainer fixture)
: base(TagMode.TagTable, output, nameof(Csv.SqlServerAllEventsSpec), fixture)
: base(TagMode.TagTable, output, nameof(SqlServerQueryThrottleSpecs), fixture)
{
}
}
20 changes: 11 additions & 9 deletions src/Akka.Persistence.Sql/Query/SqlReadJournal.cs
Original file line number Diff line number Diff line change
Expand Up @@ -219,15 +219,15 @@ private Source<EventEnvelope, NotUsed> EventsByPersistenceIdSource(
timestamp: r.representation.Timestamp,
tags: r.tags));

private Source<EventEnvelope, NotUsed> CurrentJournalEvents(long offset, long max, MaxOrderingId latestOrdering)
private Source<EventEnvelope[], NotUsed> CurrentJournalEvents(long offset, long max, MaxOrderingId latestOrdering)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of using SelectMany and return a list of EventEnvelopes, we use Select and return a list of EventEnvelope[] arrays instead. This list count will be consistently equals to the number of rows we fetched from the database.

{
if (latestOrdering.Max < offset)
return Source.Empty<EventEnvelope>();
return Source.Empty<EventEnvelope[]>();

return _readJournalDao
.Events(offset, latestOrdering.Max, max)
.SelectAsync(1, r => Task.FromResult(r.Get()))
.SelectMany(
.Select(
a =>
{
var (representation, tags, ordering) = a;
Expand All @@ -240,7 +240,8 @@ private Source<EventEnvelope, NotUsed> CurrentJournalEvents(long offset, long ma
sequenceNr: r.SequenceNr,
@event: r.Payload,
timestamp: r.Timestamp,
tags: tags));
tags: tags))
.ToArray();
});
}

Expand Down Expand Up @@ -367,17 +368,18 @@ private Source<EventEnvelope, NotUsed> Events(long offset, long? terminateAfterO
askTimeout);

var xs = await CurrentJournalEvents(uf.offset, batchSize, queryUntil)
.RunWith(Sink.Seq<EventEnvelope>(), _mat);
.RunWith(Sink.Seq<EventEnvelope[]>(), _mat);

var hasMoreEvents = xs.Count == batchSize;
var envelopes = xs.SelectMany(x => x).ToImmutableList();

var nextControl = FlowControlEnum.Unknown;
if (terminateAfterOffset.HasValue)
{
if (!hasMoreEvents && terminateAfterOffset.Value <= queryUntil.Max)
nextControl = FlowControlEnum.Stop;

if (xs.Exists(r => r.Offset is Sequence s && s.Value >= terminateAfterOffset.Value))
if (envelopes.Exists(r => r.Offset is Sequence s && s.Value >= terminateAfterOffset.Value))
nextControl = FlowControlEnum.Stop;
}

Expand All @@ -388,13 +390,13 @@ private Source<EventEnvelope, NotUsed> Events(long offset, long? terminateAfterO
: FlowControlEnum.ContinueDelayed;
}

var nextStartingOffset = xs.Count == 0
var nextStartingOffset = envelopes.Count == 0
? Math.Max(uf.offset, queryUntil.Max)
: xs.Select(r => r.Offset as Sequence)
: envelopes.Select(r => r.Offset as Sequence)
.Max(t => t?.Value ?? long.MinValue);

return Option<((long nextStartingOffset, FlowControlEnum nextControl), IImmutableList<EventEnvelope>xs)>.Create(
((nextStartingOffset, nextControl), xs));
((nextStartingOffset, nextControl), envelopes));
}

return uf.flowControl switch
Expand Down