Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Switched to rent an array in Giraffe handler and use PooledList in AsyncVal #465

Open
wants to merge 5 commits into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
namespace FSharp.Data.GraphQL.Server.AspNetCore.Giraffe

open System
open System.Buffers
open System.IO
open System.Text.Json
open System.Text.Json.Serialization
Expand Down Expand Up @@ -154,10 +155,13 @@ module HttpHandlers =
else
request.EnableBuffering()
let body = request.Body
let buffer = Array.zeroCreate 1
let! bytesRead = body.ReadAsync(buffer, 0, 1)
body.Seek(0, SeekOrigin.Begin) |> ignore
return bytesRead > 0
let buffer = ArrayPool.Shared.Rent 1
try
let! bytesRead = body.ReadAsync(buffer, 0, 1)
body.Seek(0, SeekOrigin.Begin) |> ignore
return bytesRead > 0
finally
ArrayPool.Shared.Return buffer
}

/// <summary>Check if the request is an introspection query
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ type GraphQLWebSocketMiddleware<'Root>
let receiveMessageViaSocket (cancellationToken : CancellationToken) (serializerOptions : JsonSerializerOptions) (socket : WebSocket) = taskResult {
let buffer = ArrayPool.Shared.Rent options.ReadBufferSize
try
let completeMessage = new PooledList<byte> ()
use completeMessage = new PooledResizeArray<byte> ()
let mutable segmentResponse : WebSocketReceiveResult = null
while (not cancellationToken.IsCancellationRequested)
&& socket |> isSocketOpen
Expand Down
50 changes: 24 additions & 26 deletions src/FSharp.Data.GraphQL.Server/Execution.fs
Original file line number Diff line number Diff line change
Expand Up @@ -254,13 +254,13 @@ let private resolveField (execute: ExecuteField) (ctx: ResolveFieldContext) (par
|> AsyncVal.map(fun v -> if isNull v then None else Some v)


type ResolverResult<'T> = Result<'T * IObservable<GQLDeferredResponseContent> option * GQLProblemDetails list, GQLProblemDetails list>
type ResolverResult<'T> = Result<'T * IObservable<GQLDeferredResponseContent> voption * GQLProblemDetails list, GQLProblemDetails list>

[<RequireQualifiedAccess>]
module ResolverResult =

let data data = Ok (data, None, [])
let defered data deferred = Ok (data, Some deferred, [])
let data data = Ok (data, ValueNone, [])
let defered data deferred = Ok (data, ValueSome deferred, [])

let mapValue (f : 'T -> 'U) (r : ResolverResult<'T>) : ResolverResult<'U> =
Result.map(fun (data, deferred, errs) -> (f data, deferred, errs)) r
Expand Down Expand Up @@ -296,11 +296,11 @@ let deferResults path (res : ResolverResult<obj>) : IObservable<GQLDeferredRespo
| [] -> DeferredResult (data, formattedPath)
| _ -> DeferredErrors (data, errs, formattedPath)
|> Observable.singleton
Option.foldBack Observable.concat deferred deferredData
ValueOption.foldBack Observable.concat deferred deferredData
| Error errs -> Observable.singleton <| DeferredErrors (null, errs, formattedPath)

/// Collect together an array of results using the appropriate execution strategy.
let collectFields (strategy : ExecutionStrategy) (rs : AsyncVal<ResolverResult<KeyValuePair<string, obj>>> []) : AsyncVal<ResolverResult<KeyValuePair<string, obj> []>> = asyncVal {
let collectFields (strategy : ExecutionStrategy) (rs : AsyncVal<ResolverResult<KeyValuePair<string, obj>>> seq) : AsyncVal<ResolverResult<KeyValuePair<string, obj> []>> = asyncVal {
let! collected =
match strategy with
| Parallel -> AsyncVal.collectParallel rs
Expand All @@ -312,12 +312,12 @@ let collectFields (strategy : ExecutionStrategy) (rs : AsyncVal<ResolverResult<K
match (r, acc) with
| Ok(field, d, e), Ok(i, deferred, errs) ->
Array.set data i field
Ok(i - 1, Option.mergeWith Observable.merge deferred d, e @ errs)
Ok(i - 1, ValueOption.mergeWith Observable.merge deferred d, e @ errs)
| Error e, Ok (_, _, errs) -> Error (e @ errs)
| Ok (_, _, e), Error errs -> Error (e @ errs)
| Error e, Error errs -> Error (e @ errs)
return
Array.foldBack merge collected (Ok (data.Length - 1, None, []))
Array.foldBack merge collected (Ok (data.Length - 1, ValueNone, []))
|> ResolverResult.mapValue(fun _ -> data)
}

Expand Down Expand Up @@ -354,16 +354,15 @@ let rec private direct (returnDef : OutputDef) (ctx : ResolveFieldContext) (path
| :? System.Collections.IEnumerable as enumerable ->
enumerable
|> Seq.cast<obj>
|> Seq.toArray
|> Array.mapi resolveItem
|> Seq.mapi resolveItem
|> collectFields Parallel
|> AsyncVal.map(ResolverResult.mapValue(fun items -> KeyValuePair(name, items |> Array.map(fun d -> d.Value) |> box)))
| _ -> raise <| GQLMessageException (ErrorMessages.expectedEnumerableValue ctx.ExecutionInfo.Identifier (value.GetType()))

| Nullable (Output innerDef) ->
let innerCtx = { ctx with ExecutionInfo = { ctx.ExecutionInfo with IsNullable = true; ReturnDef = innerDef } }
executeResolvers innerCtx path parent (toOption value |> AsyncVal.wrap)
|> AsyncVal.map(Result.valueOr (fun errs -> (KeyValuePair(name, null), None, errs)) >> Ok)
|> AsyncVal.map(Result.valueOr (fun errs -> (KeyValuePair(name, null), ValueNone, errs)) >> Ok)

| Interface iDef ->
let possibleTypesFn = ctx.Schema.GetPossibleTypes
Expand Down Expand Up @@ -398,7 +397,7 @@ and deferred (ctx : ResolveFieldContext) (path : FieldPath) (parent : obj) (valu
executeResolvers ctx path parent (toOption value |> AsyncVal.wrap)
|> Observable.ofAsyncVal
|> Observable.bind(ResolverResult.mapValue(fun d -> d.Value) >> deferResults path)
ResolverResult.defered (KeyValuePair (info.Identifier, null)) deferred |> AsyncVal.wrap
ResolverResult.defered (KeyValuePair (name, null)) deferred |> AsyncVal.wrap

and private streamed (options : BufferedStreamOptions) (innerDef : OutputDef) (ctx : ResolveFieldContext) (path : FieldPath) (parent : obj) (value : obj) =
let info = ctx.ExecutionInfo
Expand All @@ -420,9 +419,9 @@ and private streamed (options : BufferedStreamOptions) (innerDef : OutputDef) (c
match r with
| Ok (item, d, e) ->
Array.set data i item.Value
(i - 1, box index :: indicies, Option.mergeWith Observable.merge deferred d, e @ errs)
(i - 1, box index :: indicies, ValueOption.mergeWith Observable.merge deferred d, e @ errs)
| Error e -> (i - 1, box index :: indicies, deferred, e @ errs)
let (_, indicies, deferred, errs) = List.foldBack merge chunk (chunk.Length - 1, [], None, [])
let (_, indicies, deferred, errs) = List.foldBack merge chunk (chunk.Length - 1, [], ValueNone, [])
deferResults (box indicies :: path) (Ok (box data, deferred, errs))

let buffer (items : IObservable<int * ResolverResult<KeyValuePair<string, obj>>>) : IObservable<GQLDeferredResponseContent> =
Expand All @@ -449,8 +448,8 @@ and private streamed (options : BufferedStreamOptions) (innerDef : OutputDef) (c
|> Array.mapi resolveItem
|> Observable.ofAsyncValSeq
|> buffer
ResolverResult.defered (KeyValuePair (info.Identifier, box [])) stream |> AsyncVal.wrap
| _ -> raise <| GQLMessageException (ErrorMessages.expectedEnumerableValue ctx.ExecutionInfo.Identifier (value.GetType()))
ResolverResult.defered (KeyValuePair (name, box [])) stream |> AsyncVal.wrap
| _ -> raise <| GQLMessageException (ErrorMessages.expectedEnumerableValue name (value.GetType()))

and private live (ctx : ResolveFieldContext) (path : FieldPath) (parent : obj) (value : obj) =
let info = ctx.ExecutionInfo
Expand Down Expand Up @@ -485,7 +484,7 @@ and private live (ctx : ResolveFieldContext) (path : FieldPath) (parent : obj) (

executeResolvers ctx path parent (value |> Some |> AsyncVal.wrap)
// TODO: Add tests for `Observable.merge deferred updates` correct order
|> AsyncVal.map(Result.map(fun (data, deferred, errs) -> (data, Some <| Option.foldBack Observable.merge deferred updates, errs)))
|> AsyncVal.map(Result.map(fun (data, deferred, errs) -> (data, ValueSome <| ValueOption.foldBack Observable.merge deferred updates, errs)))

/// Actually execute the resolvers.
and private executeResolvers (ctx : ResolveFieldContext) (path : FieldPath) (parent : obj) (value : AsyncVal<obj option>) : AsyncVal<ResolverResult<KeyValuePair<string, obj>>> =
Expand All @@ -505,8 +504,8 @@ and private executeResolvers (ctx : ResolveFieldContext) (path : FieldPath) (par
let resolveWith (ctx : ResolveFieldContext) (onSuccess : ResolveFieldContext -> FieldPath -> obj -> obj -> AsyncVal<ResolverResult<KeyValuePair<string, obj>>>) : AsyncVal<ResolverResult<KeyValuePair<string, obj>>> = asyncVal {
let! resolved = value |> AsyncVal.rescue path ctx.Schema.ParseError
match resolved with
| Error errs when ctx.ExecutionInfo.IsNullable -> return Ok (KeyValuePair(name, null), None, errs)
| Ok None when ctx.ExecutionInfo.IsNullable -> return Ok (KeyValuePair(name, null), None, [])
| Error errs when ctx.ExecutionInfo.IsNullable -> return Ok (KeyValuePair(name, null), ValueNone, errs)
| Ok None when ctx.ExecutionInfo.IsNullable -> return Ok (KeyValuePair(name, null), ValueNone, [])
| Error errs -> return Error errs
| Ok None -> return Error (nullResolverError name path ctx)
| Ok (Some v) -> return! onSuccess ctx path parent v
Expand Down Expand Up @@ -543,7 +542,6 @@ and executeObjectFields (fields : ExecutionInfo list) (objName : string) (objDef
let! res =
fields
|> Seq.map executeField
|> Seq.toArray
|> collectFields Parallel
match res with
| Error errs -> return Error errs
Expand Down Expand Up @@ -604,16 +602,16 @@ let private executeQueryOrMutation (resultSet: (string * ExecutionInfo) []) (ctx
| Ok (Error errs)
| Error errs -> Error errs
match result with
| Error errs when info.IsNullable -> return Ok (KeyValuePair(name, null), None, errs)
| Error errs when info.IsNullable -> return Ok (KeyValuePair(name, null), ValueNone, errs)
| Error errs -> return Error errs
| Ok r -> return Ok r
}

asyncVal {
let documentId = ctx.ExecutionPlan.DocumentId
match! resultSet |> Array.map executeRootOperation |> collectFields ctx.ExecutionPlan.Strategy with
| Ok (data, Some deferred, errs) -> return GQLExecutionResult.Deferred(documentId, NameValueLookup(data), errs, deferred, ctx.Metadata)
| Ok (data, None, errs) -> return GQLExecutionResult.Direct(documentId, NameValueLookup(data), errs, ctx.Metadata)
match! resultSet |> Seq.map executeRootOperation |> collectFields ctx.ExecutionPlan.Strategy with
| Ok (data, ValueSome deferred, errs) -> return GQLExecutionResult.Deferred(documentId, NameValueLookup(data), errs, deferred, ctx.Metadata)
| Ok (data, ValueNone, errs) -> return GQLExecutionResult.Direct(documentId, NameValueLookup(data), errs, ctx.Metadata)
| Error errs -> return GQLExecutionResult.RequestError(documentId, errs, ctx.Metadata)
}

Expand All @@ -635,9 +633,9 @@ let private executeSubscription (resultSet: (string * ExecutionInfo) []) (ctx: E
Path = fieldPath |> List.rev }
let onValue v = asyncVal {
match! executeResolvers fieldCtx fieldPath value (toOption v |> AsyncVal.wrap) with
| Ok (data, None, []) -> return SubscriptionResult (NameValueLookup.ofList [nameOrAlias, data.Value])
| Ok (data, None, errs) -> return SubscriptionErrors (NameValueLookup.ofList [nameOrAlias, data.Value], errs)
| Ok (_, Some _, _) -> return failwith "Deferred/Streamed/Live are not supported for subscriptions!"
| Ok (data, ValueNone, []) -> return SubscriptionResult (NameValueLookup.ofList [nameOrAlias, data.Value])
| Ok (data, ValueNone, errs) -> return SubscriptionErrors (NameValueLookup.ofList [nameOrAlias, data.Value], errs)
| Ok (_, ValueSome _, _) -> return failwith "Deferred/Streamed/Live are not supported for subscriptions!"
| Error errs -> return SubscriptionErrors (null, errs)
}
return
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
</ItemGroup>

<ItemGroup>
<PackageReference Include="Collections.Pooled" />
<PackageReference Include="FSharp.Control.Reactive" />
<PackageReference Include="System.Reactive" />
</ItemGroup>
Expand Down
52 changes: 30 additions & 22 deletions src/FSharp.Data.GraphQL.Shared/AsyncVal.fs
Original file line number Diff line number Diff line change
Expand Up @@ -130,49 +130,55 @@ module AsyncVal =
/// executed asynchronously, one by one with regard to their order in array.
/// Returned array maintain order of values.
/// If the array contains a Failure, then the entire array will not resolve
let collectSequential (values : AsyncVal<'T>[]) : AsyncVal<'T[]> =
if values.Length = 0 then Value [||]
elif values |> Array.exists isAsync then
let collectSequential (values : AsyncVal<'T> seq) : AsyncVal<'T[]> =
let values = new PooledResizeArray<_> (values)
let length = values.Count
if length = 0 then Value [||]
elif values.Exists isAsync then
Async (async {
let results = Array.zeroCreate values.Length
let exceptions = ResizeArray values.Length
for i = 0 to values.Length - 1 do
let results = Array.zeroCreate length
use exceptions = new PooledResizeArray<_> (length)
for i = 0 to length - 1 do
let v = values.[i]
match v with
| Value v -> results.[i] <- v
| Async a ->
let! r = a
results.[i] <- r
| Failure f -> exceptions.Add f
values.Dispose()
match exceptions.Count with
| 0 -> return results
| 1 -> return exceptions.First().Reraise ()
| _ -> return AggregateException exceptions |> raise
| _ -> return AggregateException (exceptions.AsReadOnly()) |> raise
})
else
let exceptions =
use values = values
use exceptions =
values
|> Array.choose (function
| Failure f -> Some f
| _ -> None)
match exceptions.Length with
| 0 -> Value (values |> Array.map (fun (Value v) -> v))
|> PooledResizeArray.vChoose (function
| Failure f -> ValueSome f
| _ -> ValueNone)
match exceptions.Count with
| 0 -> Value (values |> Seq.map (fun (Value v) -> v) |> Seq.toArray)
| 1 -> Failure (exceptions.First ())
| _ -> Failure (AggregateException exceptions)
| _ -> Failure (AggregateException (exceptions.AsReadOnly()))

/// Converts array of AsyncVals into AsyncVal with array results.
/// In case when are non-immediate values in provided array, they are
/// executed all in parallel, in unordered fashion. Order of values
/// inside returned array is maintained.
/// If the array contains a Failure, then the entire array will not resolve
let collectParallel (values : AsyncVal<'T>[]) : AsyncVal<'T[]> =
if values.Length = 0 then Value [||]
let collectParallel (values : AsyncVal<'T> seq) : AsyncVal<'T[]> =
use values = new PooledResizeArray<_> (values)
let length = values.Count
if length = 0 then Value [||]
else
let indexes = List<_> (0)
let continuations = List<_> (0)
let results = Array.zeroCreate values.Length
let exceptions = ResizeArray values.Length
for i = 0 to values.Length - 1 do
let indexes = new PooledResizeArray<_> (length)
let continuations = new PooledResizeArray<_> (length)
let results = Array.zeroCreate length
use exceptions = new PooledResizeArray<_> (length)
for i = 0 to length - 1 do
let value = values.[i]
match value with
| Value v -> results.[i] <- v
Expand All @@ -182,13 +188,15 @@ module AsyncVal =
| Failure f -> exceptions.Add f
match exceptions.Count with
| 1 -> AsyncVal.Failure (exceptions.First ())
| count when count > 1 -> AsyncVal.Failure (AggregateException exceptions)
| count when count > 1 -> AsyncVal.Failure (AggregateException (exceptions.AsReadOnly()))
| _ ->
if indexes.Count = 0 then Value (results)
else Async (async {
let! vals = continuations |> Async.Parallel
for i = 0 to indexes.Count - 1 do
results.[indexes.[i]] <- vals.[i]
indexes.Dispose()
continuations.Dispose()
return results
})

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@
<AssemblyAttribute Include="System.Runtime.CompilerServices.InternalsVisibleToAttribute">
<_Parameter1>FSharp.Data.GraphQL.Server</_Parameter1>
</AssemblyAttribute>
<AssemblyAttribute Include="System.Runtime.CompilerServices.InternalsVisibleToAttribute">
<_Parameter1>FSharp.Data.GraphQL.Server.AspNetCore</_Parameter1>
</AssemblyAttribute>
<AssemblyAttribute Include="System.Runtime.CompilerServices.InternalsVisibleToAttribute">
<_Parameter1>FSharp.Data.GraphQL.Client</_Parameter1>
</AssemblyAttribute>
Expand All @@ -29,6 +32,7 @@
</ItemGroup>

<ItemGroup>
<PackageReference Include="Collections.Pooled" />
<PackageReference Include="FParsec" />
<PackageReference Include="FSharp.SystemTextJson" />
<PackageReference Include="FsToolkit.ErrorHandling" />
Expand All @@ -44,6 +48,7 @@
<Compile Include="Helpers\Extensions.fs" />
<Compile Include="Helpers\Reflection.fs" />
<Compile Include="Helpers\MemoryCache.fs" />
<Compile Include="Helpers\PooledResizeArray.fs" />
<Compile Include="Errors.fs" />
<Compile Include="Exception.fs" />
<Compile Include="ValidationTypes.fs" />
Expand Down
14 changes: 0 additions & 14 deletions src/FSharp.Data.GraphQL.Shared/Helpers/Extensions.fs
Original file line number Diff line number Diff line change
Expand Up @@ -39,20 +39,6 @@ type TypeInfo with
x.GetDeclaredMethod(first + propertyName.Substring(1))
| prop, _ -> prop

module Option =

let mergeWith (f: 'T -> 'T -> 'T) (o1 : 'T option) (o2 : 'T option) : 'T option =
match (o1, o2) with
| Some a, Some b -> Some (f a b)
| Some a, _ -> Some a
| _, Some b -> Some b
| _, _ -> None

let unwrap (defaultValue : 'U) (onSome : 'T -> 'U) (o : 'T option) : 'U =
match o with
| Some t -> onSome t
| None -> defaultValue

module Skippable =

let ofList list =
Expand Down
24 changes: 24 additions & 0 deletions src/FSharp.Data.GraphQL.Shared/Helpers/ObjAndStructConversions.fs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,18 @@ module internal ValueOption =

let ofOption value = Option.toVOption value

let mergeWith (f: 'T -> 'T -> 'T) (o1 : 'T voption) (o2 : 'T voption) : 'T voption =
match (o1, o2) with
| ValueSome a, ValueSome b -> ValueSome (f a b)
| ValueSome a, _ -> ValueSome a
| _, ValueSome b -> ValueSome b
| _, _ -> ValueNone

let unwrap (defaultValue : 'U) (onSome : 'T -> 'U) (o : 'T voption) : 'U =
match o with
| ValueSome t -> onSome t
| ValueNone -> defaultValue
valbers marked this conversation as resolved.
Show resolved Hide resolved

module internal Option =

let toVOption voption =
Expand All @@ -18,6 +30,18 @@ module internal Option =

let ofVOption voption = voption |> ValueOption.toOption

let mergeWith (f: 'T -> 'T -> 'T) (o1 : 'T option) (o2 : 'T option) : 'T option =
match (o1, o2) with
| Some a, Some b -> Some (f a b)
| Some a, _ -> Some a
| _, Some b -> Some b
| _, _ -> None

let unwrap (defaultValue : 'U) (onSome : 'T -> 'U) (o : 'T option) : 'U =
match o with
| Some t -> onSome t
| None -> defaultValue

[<AutoOpen>]
module internal ValueTuple =

Expand Down
Loading