Skip to content

Commit

Permalink
Add shouldCompress predicate
Browse files Browse the repository at this point in the history
  • Loading branch information
bartelink committed Jan 2, 2025
1 parent 8c3c211 commit 8875e8c
Show file tree
Hide file tree
Showing 5 changed files with 75 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ open System.Text.Json

/// Represents the body of an Event (or its Metadata), holding the encoded form of the buffer together with an enum value identifying the encoding scheme.
/// Enables the decoding side to transparently inflate the data on loading without burdening the application layer with tracking the encoding scheme used.
type EncodedBodyT = (struct(int * JsonElement))
type EncodedBody = (struct(int * JsonElement))

module private Impl =

Expand All @@ -35,7 +35,7 @@ module private Impl =
let s = new System.IO.MemoryStream(data, writable = false)
use decompressor = new System.IO.Compression.BrotliStream(s, System.IO.Compression.CompressionMode.Decompress)
decompressor.CopyTo output
let expand post alg compressedBytes =
let private unpack post alg compressedBytes =
use output = new System.IO.MemoryStream()
compressedBytes |> alg output
output.ToArray() |> post
Expand All @@ -44,34 +44,34 @@ module private Impl =
| Encoding.Deflate, JsonValueKind.String -> data.GetBytesFromBase64() |> expand inflateTo
| Encoding.Brotli, JsonValueKind.String -> data.GetBytesFromBase64() |> expand brotliDecompressTo
| _ -> data |> direct
let decode = decode_ id (expand InteropHelpers.Utf8ToJsonElement)
let decodeUtf8 = decode_ InteropHelpers.JsonElementToUtf8 (expand ReadOnlyMemory<byte>)
let decode = decode_ id (unpack InteropHelpers.Utf8ToJsonElement)
let decodeUtf8 = decode_ InteropHelpers.JsonElementToUtf8 (unpack ReadOnlyMemory<byte>)

(* Conditional compression logic: triggered as storage layer pulls Data/Meta fields
Bodies under specified minimum size, or not meeting a required compression gain are stored directly, equivalent to if compression had not been wired in *)

let encodeUncompressed (raw: JsonElement): EncodedBodyT = Encoding.Direct, raw
let private blobToStringElement = Convert.ToBase64String >> JsonSerializer.SerializeToElement
let encodeUncompressed (raw: JsonElement): EncodedBody = Encoding.Direct, raw
let private blobToBase64StringJsonElement = Convert.ToBase64String >> JsonSerializer.SerializeToElement
let private brotliCompress (eventBody: ReadOnlyMemory<byte>): System.IO.MemoryStream =
let output = new System.IO.MemoryStream()
use compressor = new System.IO.Compression.BrotliStream(output, System.IO.Compression.CompressionLevel.Optimal, leaveOpen = true)
compressor.Write eventBody.Span
compressor.Close() // NOTE Close, not Flush; we want the output fully terminated to reduce surprises when decompressing
output
let tryCompress minSize minGain (raw: JsonElement): EncodedBodyT =
let tryCompress minSize minGain (raw: JsonElement): EncodedBody =
let utf8: ReadOnlyMemory<byte> = InteropHelpers.JsonElementToUtf8 raw
if utf8.Length < minSize then encodeUncompressed raw else

let brotli = brotliCompress utf8
if utf8.Length <= int brotli.Length + minGain then encodeUncompressed raw else
Encoding.Brotli, brotli.ToArray() |> blobToStringElement
let encodeUncompressedUtf8 (raw: ReadOnlyMemory<byte>): EncodedBodyT = Encoding.Direct, InteropHelpers.Utf8ToJsonElement raw
let tryCompressUtf8 minSize minGain (utf8: ReadOnlyMemory<byte>): EncodedBodyT =
Encoding.Brotli, brotli.ToArray() |> blobToBase64StringJsonElement
let encodeUncompressedUtf8 (raw: ReadOnlyMemory<byte>): EncodedBody = Encoding.Direct, InteropHelpers.Utf8ToJsonElement raw
let tryCompressUtf8 minSize minGain (utf8: ReadOnlyMemory<byte>): EncodedBody =
if utf8.Length < minSize then encodeUncompressedUtf8 utf8 else

let brotli = brotliCompress utf8
if utf8.Length <= int brotli.Length + minGain then encodeUncompressedUtf8 utf8 else
Encoding.Brotli, brotli.ToArray() |> blobToStringElement
Encoding.Brotli, brotli.ToArray() |> blobToBase64StringJsonElement

type [<Struct>] CompressionOptions = { minSize: int; minGain: int } with
/// Attempt to compress anything possible
Expand All @@ -82,57 +82,72 @@ type [<Struct>] CompressionOptions = { minSize: int; minGain: int } with
static member Default = { minSize = 48; minGain = 4 }

[<Extension; AbstractClass; Sealed>]
type EncodedBody private () =
type Encoding private () =

static member Uncompressed(x: JsonElement): EncodedBodyT =
static member Uncompressed(x: JsonElement): EncodedBody =
Impl.encodeUncompressed x
static member Uncompressed(x: ReadOnlyMemory<byte>): EncodedBodyT =
static member Uncompressed(x: ReadOnlyMemory<byte>): EncodedBody =
Impl.encodeUncompressedUtf8 x
static member TryCompress(options, x: JsonElement): EncodedBodyT =
static member TryCompress(options, x: JsonElement): EncodedBody =
Impl.tryCompress options.minSize options.minGain x
static member TryCompress(options, x: ReadOnlyMemory<byte>): EncodedBodyT =
static member TryCompress(options, x: ReadOnlyMemory<byte>): EncodedBody =
Impl.tryCompressUtf8 options.minSize options.minGain x
static member ToJsonElement(x: EncodedBodyT): JsonElement =
static member ToJsonElement(x: EncodedBody): JsonElement =
Impl.decode x
static member ToUtf8(x: EncodedBodyT): ReadOnlyMemory<byte> =
static member ToUtf8(x: EncodedBody): ReadOnlyMemory<byte> =
Impl.decodeUtf8 x
static member ToByteArray(x: EncodedBodyT): byte[] =
EncodedBody.ToUtf8(x).ToArray()
static member ExpandTo(ms: System.IO.Stream, x: EncodedBodyT) =
static member ToByteArray(x: EncodedBody): byte[] =
Encoding.ToUtf8(x).ToArray()
static member ExpandTo(ms: System.IO.Stream, x: EncodedBody) =
Impl.decode_ (fun el -> JsonSerializer.Serialize(ms, el)) (fun dec -> dec ms) x

/// <summary>Adapts an <c>IEventCodec</c> rendering to <c>JsonElement</c> Event Bodies to attempt to compress the data.<br/>
/// If sufficient compression, as defined by <c>options</c> is not achieved, the body is saved as-is.<br/>
/// The <c>int</c> conveys a value that must be round tripped alongside the body in order for the decoding process to correctly interpret it.</summary>
/// <summary>The body will be saved as-is under the following circumstances:<br/>
/// - the <c>shouldCompress</c> predicate is not satisfied for the event in question.<br/>
/// - sufficient compression, as defined by <c>options</c> is not achieved, the body is saved as-is.<br/>
/// The <c>int</c> produced when <c>Encode</c>ing conveys the encoding used, and must be round tripped alongside the body as a required input of a future <c>Decode</c>.</summary>
[<Extension>]
static member EncodeTryCompress<'Event, 'Context>(native: IEventCodec<'Event, ReadOnlyMemory<byte>, 'Context>, [<Optional; DefaultParameterValue null>] ?options)
: IEventCodec<'Event, EncodedBodyT, 'Context> =
static member EncodeTryCompress<'Event, 'Context>(
native: IEventCodec<'Event, ReadOnlyMemory<byte>, 'Context>,
[<Optional; DefaultParameterValue null>] ?shouldCompress: Func<IEventData<ReadOnlyMemory<byte>>, bool>,
[<Optional; DefaultParameterValue null>] ?options)
: IEventCodec<'Event, EncodedBody, 'Context> =
let opts = defaultArg options CompressionOptions.Default
FsCodec.Core.EventCodec.Map(native, (fun x -> EncodedBody.TryCompress(opts, x)), Func<_, _> EncodedBody.ToUtf8)
let encode = shouldCompress |> function
| None -> fun _x (d: ReadOnlyMemory<byte>) -> Encoding.TryCompress(opts, d)
| Some predicate -> fun x d -> if predicate.Invoke x then Encoding.TryCompress(opts, d) else Encoding.Uncompressed d
FsCodec.Core.EventCodec.MapEx(native, encode, Func<_, _> Encoding.ToUtf8)

/// <summary>Adapts an <c>IEventCodec</c> rendering to <c>JsonElement</c> Event Bodies to attempt to compress the data.<br/>
/// If sufficient compression, as defined by <c>options</c> is not achieved, the body is saved as-is.<br/>
/// The <c>int</c> conveys a value that must be round tripped alongside the body in order for the decoding process to correctly interpret it.</summary>
/// The body will be saved as-is under the following circumstances:<br/>
/// - the <c>shouldCompress</c> predicate is not satisfied for the event in question.<br/>
/// - sufficient compression, as defined by <c>options</c> is not achieved, the body is saved as-is.<br/>
/// The <c>int</c> produced when <c>Encode</c>ing conveys the encoding used, and must be round tripped alongside the body as a required input of a future <c>Decode</c>.</summary>
[<Extension>]
static member EncodeTryCompress<'Event, 'Context>(native: IEventCodec<'Event, JsonElement, 'Context>, [<Optional; DefaultParameterValue null>] ?options)
: IEventCodec<'Event, EncodedBodyT, 'Context> =
static member EncodeTryCompress<'Event, 'Context>(
native: IEventCodec<'Event, JsonElement, 'Context>,
[<Optional; DefaultParameterValue null>] ?shouldCompress: Func<IEventData<JsonElement>, bool>,
[<Optional; DefaultParameterValue null>] ?options)
: IEventCodec<'Event, EncodedBody, 'Context> =
let opts = defaultArg options CompressionOptions.Default
FsCodec.Core.EventCodec.Map(native, (fun x -> EncodedBody.TryCompress(opts, x)), Func<_, _> EncodedBody.ToJsonElement)
let encode = shouldCompress |> function
| None -> fun _x (d: JsonElement) -> Encoding.TryCompress(opts, d)
| Some predicate -> fun x d -> if predicate.Invoke x then Encoding.TryCompress(opts, d) else Encoding.Uncompressed d
FsCodec.Core.EventCodec.MapEx(native, encode, Func<_, _> Encoding.ToJsonElement)

/// <summary>Adapts an <c>IEventCodec</c> rendering to <c>JsonElement</c> Event Bodies to encode as per <c>EncodeTryCompress</c>, but without attempting compression.</summary>
[<Extension>]
static member EncodeUncompressed<'Event, 'Context>(native: IEventCodec<'Event, JsonElement, 'Context>)
: IEventCodec<'Event, EncodedBodyT, 'Context> =
FsCodec.Core.EventCodec.Map(native, Func<_, _> EncodedBody.Uncompressed, Func<_, _> EncodedBody.ToJsonElement)
: IEventCodec<'Event, EncodedBody, 'Context> =
FsCodec.Core.EventCodec.Map(native, Func<_, _> Encoding.Uncompressed, Func<_, _> Encoding.ToJsonElement)

/// <summary>Adapts an <c>IEventCodec</c> rendering to <c>int * JsonElement</c> Event Bodies to render and/or consume Uncompressed <c>ReadOnlyMemory&lt;byte&gt;</c>.</summary>
[<Extension>]
static member ToUtf8Codec<'Event, 'Context>(native: IEventCodec<'Event, EncodedBodyT, 'Context>)
static member ToUtf8Codec<'Event, 'Context>(native: IEventCodec<'Event, EncodedBody, 'Context>)
: IEventCodec<'Event, ReadOnlyMemory<byte>, 'Context> =
FsCodec.Core.EventCodec.Map(native, Func<_, _> EncodedBody.ToUtf8, Func<_, _> EncodedBody.Uncompressed)
FsCodec.Core.EventCodec.Map(native, Func<_, _> Encoding.ToUtf8, Func<_, _> Encoding.Uncompressed)

/// <summary>Adapts an <c>IEventCodec</c> rendering to <c>int * JsonElement</c> Event Bodies to render and/or consume Uncompressed <c>byte[]</c>.</summary>
[<Extension>]
static member ToByteArrayCodec<'Event, 'Context>(native: IEventCodec<'Event, EncodedBodyT, 'Context>)
static member ToByteArrayCodec<'Event, 'Context>(native: IEventCodec<'Event, EncodedBody, 'Context>)
: IEventCodec<'Event, byte[], 'Context> =
FsCodec.Core.EventCodec.Map(native, Func<_, _> EncodedBody.ToByteArray, Func<_, _> EncodedBody.Uncompressed)
FsCodec.Core.EventCodec.Map(native, Func<_, _> Encoding.ToByteArray, Func<_, _> Encoding.Uncompressed)
2 changes: 1 addition & 1 deletion src/FsCodec.SystemTextJson/FsCodec.SystemTextJson.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
<Compile Include="CodecJsonElement.fs" />
<Compile Include="Interop.fs" />
<Compile Include="StringIdConverter.fs" />
<Compile Include="EncodedBody.fs" />
<Compile Include="Encoding.fs" />
</ItemGroup>

<ItemGroup>
Expand Down
21 changes: 15 additions & 6 deletions src/FsCodec/FsCodec.fs
Original file line number Diff line number Diff line change
Expand Up @@ -61,17 +61,21 @@ type EventData<'Format>(eventType, data, meta, eventId, correlationId, causation
member _.CausationId = causationId
member _.Timestamp = timestamp

static member Map<'Mapped>(f: Func<'Format, 'Mapped>)
static member MapEx<'Mapped>(f: Func<IEventData<'Format>, 'Format, 'Mapped>)
(x: IEventData<'Format>): IEventData<'Mapped> =
{ new IEventData<'Mapped> with
member _.EventType = x.EventType
member _.Data = f.Invoke x.Data
member _.Meta = f.Invoke x.Meta
member _.Data = f.Invoke(x, x.Data)
member _.Meta = f.Invoke(x, x.Meta)
member _.EventId = x.EventId
member _.CorrelationId = x.CorrelationId
member _.CausationId = x.CausationId
member _.Timestamp = x.Timestamp }

static member Map<'Mapped>(f: Func<'Format, 'Mapped>)
(x: IEventData<'Format>): IEventData<'Mapped> =
EventData.MapEx(Func<_, _, _>(fun _x -> f.Invoke)) x

/// <summary>An Event or Unfold that's been read from a Store and hence has a defined <c>Index</c> on the Event Timeline.</summary>
[<NoComparison; NoEquality>]
type TimelineEvent<'Format>(index, eventType, data, meta, eventId, correlationId, causationId, timestamp, isUnfold, context, size) =
Expand All @@ -90,7 +94,7 @@ type TimelineEvent<'Format>(index, eventType, data, meta, eventId, correlationId
TimelineEvent(index, inner.EventType, inner.Data, inner.Meta, inner.EventId, inner.CorrelationId, inner.CausationId, inner.Timestamp, isUnfold, Option.toObj context, size) :> _

override _.ToString() = sprintf "%s %s @%i" (if isUnfold then "Unfold" else "Event") eventType index

interface ITimelineEvent<'Format> with
member _.Index = index
member _.IsUnfold = isUnfold
Expand Down Expand Up @@ -122,10 +126,10 @@ type TimelineEvent<'Format>(index, eventType, data, meta, eventId, correlationId
[<AbstractClass; Sealed>]
type EventCodec<'Event, 'Format, 'Context> private () =

static member Map<'TargetFormat>(native: IEventCodec<'Event, 'Format, 'Context>, up: Func<'Format,'TargetFormat>, down: Func<'TargetFormat, 'Format>)
static member MapEx<'TargetFormat>(native: IEventCodec<'Event, 'Format, 'Context>, up: Func<IEventData<'Format>, 'Format,'TargetFormat>, down: Func<'TargetFormat, 'Format>)
: IEventCodec<'Event, 'TargetFormat, 'Context> =

let upConvert = EventData.Map up
let upConvert = EventData.MapEx up
let downConvert = TimelineEvent.Map down

{ new IEventCodec<'Event, 'TargetFormat, 'Context> with
Expand All @@ -137,3 +141,8 @@ type EventCodec<'Event, 'Format, 'Context> private () =
member _.Decode target =
let encoded = downConvert target
native.Decode encoded }

static member Map<'TargetFormat>(native: IEventCodec<'Event, 'Format, 'Context>, up: Func<'Format,'TargetFormat>, down: Func<'TargetFormat, 'Format>)
: IEventCodec<'Event, 'TargetFormat, 'Context> =

EventCodec.MapEx(native, Func<_, _, _>(fun _x -> up.Invoke), down)
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
module FsCodec.SystemTextJson.Tests.EncodedBodyTests
module FsCodec.SystemTextJson.Tests.EncodingTests

open Swensen.Unquote
open System
Expand Down Expand Up @@ -38,8 +38,8 @@ module InternalDecoding =
let explicitBrotli = struct (2, JsonSerializer.SerializeToElement "CwuAeyJ2YWx1ZSI6IkhlbGxvIFdvcmxkIn0D")

let decode useRom =
if useRom then FsCodec.SystemTextJson.EncodedBody.ToByteArray >> JsonSerializer.Deserialize
else FsCodec.SystemTextJson.EncodedBody.ToJsonElement >> JsonSerializer.Deserialize
if useRom then FsCodec.SystemTextJson.Encoding.ToByteArray >> JsonSerializer.Deserialize
else FsCodec.SystemTextJson.Encoding.ToJsonElement >> JsonSerializer.Deserialize

let [<Theory; InlineData false; InlineData true>] ``Can decode all known representations`` useRom =
test <@ decode useRom direct = inputValue @>
Expand All @@ -61,7 +61,7 @@ type JsonElement with member x.Utf8ByteCount = if x.ValueKind = JsonValueKind.Nu

module TryCompress =

let sut = FsCodec.SystemTextJson.EncodedBody.EncodeTryCompress StringUtf8.sut
let sut = FsCodec.SystemTextJson.Encoding.EncodeTryCompress StringUtf8.sut

let compressibleValue = {| value = String('x', 5000) |}

Expand All @@ -83,7 +83,7 @@ module TryCompress =

module Uncompressed =

let sut = FsCodec.SystemTextJson.EncodedBody.EncodeUncompressed StringUtf8.sut
let sut = FsCodec.SystemTextJson.Encoding.EncodeUncompressed StringUtf8.sut

// Borrow the value we just demonstrated to be compressible
let compressibleValue = TryCompress.compressibleValue
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
<Link>SomeNullHandlingTests.fs</Link>
</Compile>
<Compile Include="StringIdTests.fs" />
<Compile Include="EncodedBodyTests.fs" />
<Compile Include="EncodingTests.fs" />
</ItemGroup>

</Project>

0 comments on commit 8875e8c

Please sign in to comment.