diff --git a/.vsts-ci.yml b/.vsts-ci.yml index daf5bee..bb8d17a 100644 --- a/.vsts-ci.yml +++ b/.vsts-ci.yml @@ -1,10 +1,15 @@ pool: - vmImage: "ubuntu-16.04" + vmImage: "windows-2019" variables: buildConfiguration: "Release" steps: + - task: DotNetCoreInstaller@0 + inputs: + packageType: 'sdk' + version: 3.0.100 + - task: DotNetCoreCLI@2 inputs: command: "restore" diff --git a/.vsts-release.yml b/.vsts-release.yml index 632302a..4c899f9 100644 --- a/.vsts-release.yml +++ b/.vsts-release.yml @@ -1,10 +1,15 @@ pool: - vmImage: "ubuntu-16.04" + vmImage: "windows-2019" variables: buildConfiguration: "Release" steps: + - task: DotNetCoreInstaller@0 + inputs: + packageType: 'sdk' + version: 3.0.100 + - task: DotNetCoreCLI@2 inputs: command: "restore" @@ -43,6 +48,15 @@ steps: versioningScheme: byEnvVar versionEnvVar: Version + - task: DotNetCoreCLI@2 + inputs: + command: pack + packagesToPack: "src/Orleans.Streaming.CosmosDB/*.csproj" + packDirectory: "$(build.artifactStagingDirectory)" + configuration: "$(buildConfiguration)" + versioningScheme: byEnvVar + versionEnvVar: Version + - task: NuGetCommand@2 inputs: command: push diff --git a/Orleans.CosmosDB.sln b/Orleans.CosmosDB.sln index c737e1a..245b8f1 100644 --- a/Orleans.CosmosDB.sln +++ b/Orleans.CosmosDB.sln @@ -25,6 +25,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Orleans.Persistence.CosmosD EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Orleans.Reminders.CosmosDB", "src\Orleans.Reminders.CosmosDB\Orleans.Reminders.CosmosDB.csproj", "{E82D600B-2C44-4458-AB68-BCC25DE16631}" EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Orleans.Streaming.CosmosDB", "src\Orleans.Streaming.CosmosDB\Orleans.Streaming.CosmosDB.csproj", "{97AC434A-A072-44E4-B8F3-CCFD87A94F08}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -83,6 +85,18 @@ Global {E82D600B-2C44-4458-AB68-BCC25DE16631}.Release|x64.Build.0 = Release|Any CPU {E82D600B-2C44-4458-AB68-BCC25DE16631}.Release|x86.ActiveCfg = Release|Any CPU {E82D600B-2C44-4458-AB68-BCC25DE16631}.Release|x86.Build.0 = Release|Any CPU + {97AC434A-A072-44E4-B8F3-CCFD87A94F08}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {97AC434A-A072-44E4-B8F3-CCFD87A94F08}.Debug|Any CPU.Build.0 = Debug|Any CPU + {97AC434A-A072-44E4-B8F3-CCFD87A94F08}.Debug|x64.ActiveCfg = Debug|Any CPU + {97AC434A-A072-44E4-B8F3-CCFD87A94F08}.Debug|x64.Build.0 = Debug|Any CPU + {97AC434A-A072-44E4-B8F3-CCFD87A94F08}.Debug|x86.ActiveCfg = Debug|Any CPU + {97AC434A-A072-44E4-B8F3-CCFD87A94F08}.Debug|x86.Build.0 = Debug|Any CPU + {97AC434A-A072-44E4-B8F3-CCFD87A94F08}.Release|Any CPU.ActiveCfg = Release|Any CPU + {97AC434A-A072-44E4-B8F3-CCFD87A94F08}.Release|Any CPU.Build.0 = Release|Any CPU + {97AC434A-A072-44E4-B8F3-CCFD87A94F08}.Release|x64.ActiveCfg = Release|Any CPU + {97AC434A-A072-44E4-B8F3-CCFD87A94F08}.Release|x64.Build.0 = Release|Any CPU + {97AC434A-A072-44E4-B8F3-CCFD87A94F08}.Release|x86.ActiveCfg = Release|Any CPU + {97AC434A-A072-44E4-B8F3-CCFD87A94F08}.Release|x86.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -92,6 +106,7 @@ Global {E388AE16-3DE3-4181-AF2F-B880A78584AB} = {12BE367B-569F-4C2E-AC15-876709C119D1} {1262090F-EDB2-47F1-AC69-48195CDFFBE6} = {12BE367B-569F-4C2E-AC15-876709C119D1} {E82D600B-2C44-4458-AB68-BCC25DE16631} = {12BE367B-569F-4C2E-AC15-876709C119D1} + {97AC434A-A072-44E4-B8F3-CCFD87A94F08} = {12BE367B-569F-4C2E-AC15-876709C119D1} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {9740C707-08EC-41C2-A199-A861D618296C} diff --git a/README.md b/README.md index 600a11c..5f0c1b1 100644 --- a/README.md +++ b/README.md @@ -28,6 +28,8 @@ From Package Manager: > PS> Install-Package Orleans.Reminders.CosmosDB -prerelease +> PS> Install-Package Orleans.Streaming.CosmosDB -prerelease + .Net CLI: > \# dotnet add package Orleans.Clustering.CosmosDB -prerelease @@ -36,6 +38,8 @@ From Package Manager: > \# dotnet add package Orleans.Reminders.CosmosDB -prerelease +> \# dotnet add package Orleans.Streaming.CosmosDB -prerelease + Paket: > \# paket add Orleans.Clustering.CosmosDB -prerelease @@ -44,6 +48,8 @@ Paket: > \# paket add Orleans.Reminders.CosmosDB -prerelease +> \# paket add Orleans.Streaming.CosmosDB -prerelease + # Configuration It is not mandatory to use all the providers at once. Just pick the one you are interested in from the samples and you should be good as they don't depend on each other. @@ -127,5 +133,36 @@ The change will not affect existing systems. Configuring a custom `IPartitionKey ### Indexing The current indexing fork relies on CosmosDB stored procedures for lookup. As stored procedures must be executed against a specific partition, the use of custom partition key builders is not compatible with the Orleans indexing fork. +## Stream Provider + +To use the Stream Provider you need to register it on your `ISiloBuilder`: + +```csharp +.AddCosmosDBStreaming(config => + config.AddStream("", configure => + { + // The information on FeedCollectionInfo property is related to the database that will be monitored by the change feed + configure.FeedCollectionInfo = new DocumentCollectionInfo + { + Uri = new Uri(""), + MasterKey = "" , + DatabaseName = "", + CollectionName = "" + }; + + // The information on LeaseCollectionInfo is related to the CosmosDB Change Feed lease collection + configure.LeaseCollectionInfo = new DocumentCollectionInfo + { + Uri = new Uri(""), + MasterKey = "" , + DatabaseName = "", + CollectionName = "" + }; + }, typeof(PartitionKeyBasedStreamMapper))) + +``` + +Then on your grain, you need to implement `IAsyncObserver` in order to receive the document that has changed and published thru Cosmos DB Change Feed. + # Contributions PRs and feedback are **very** welcome! diff --git a/global.json b/global.json new file mode 100644 index 0000000..b1ddc83 --- /dev/null +++ b/global.json @@ -0,0 +1,5 @@ +{ + "sdk": { + "version": "3.0.100" + } +} \ No newline at end of file diff --git a/src/Orleans.Clustering.CosmosDB/ClusteringExtensions.cs b/src/Orleans.Clustering.CosmosDB/ClusteringExtensions.cs index 3060f1b..5d42099 100644 --- a/src/Orleans.Clustering.CosmosDB/ClusteringExtensions.cs +++ b/src/Orleans.Clustering.CosmosDB/ClusteringExtensions.cs @@ -31,6 +31,27 @@ public static ISiloHostBuilder UseCosmosDBMembership(this ISiloHostBuilder build }); } + public static ISiloBuilder UseCosmosDBMembership(this ISiloBuilder builder, + Action configureOptions) + { + return builder.ConfigureServices(services => services.UseCosmosDBMembership(configureOptions)); + } + + public static ISiloBuilder UseCosmosDBMembership(this ISiloBuilder builder, + Action> configureOptions) + { + return builder.ConfigureServices(services => services.UseCosmosDBMembership(configureOptions)); + } + + public static ISiloBuilder UseCosmosDBMembership(this ISiloBuilder builder) + { + return builder.ConfigureServices(services => + { + services.AddOptions(); + services.AddSingleton(); + }); + } + public static IClientBuilder UseCosmosDBGatewayListProvider(this IClientBuilder builder, Action configureOptions) { return builder.ConfigureServices(services => services.UseCosmosDBGatewayListProvider(configureOptions)); diff --git a/src/Orleans.Clustering.CosmosDB/Orleans.Clustering.CosmosDB.csproj b/src/Orleans.Clustering.CosmosDB/Orleans.Clustering.CosmosDB.csproj index 421a0e6..0130b90 100644 --- a/src/Orleans.Clustering.CosmosDB/Orleans.Clustering.CosmosDB.csproj +++ b/src/Orleans.Clustering.CosmosDB/Orleans.Clustering.CosmosDB.csproj @@ -62,13 +62,9 @@ - - - - - - - + + + diff --git a/src/Orleans.Persistence.CosmosDB/Orleans.Persistence.CosmosDB.csproj b/src/Orleans.Persistence.CosmosDB/Orleans.Persistence.CosmosDB.csproj index 800b78e..bad81cd 100644 --- a/src/Orleans.Persistence.CosmosDB/Orleans.Persistence.CosmosDB.csproj +++ b/src/Orleans.Persistence.CosmosDB/Orleans.Persistence.CosmosDB.csproj @@ -38,10 +38,10 @@ - - - - + + + + \ No newline at end of file diff --git a/src/Orleans.Persistence.CosmosDB/StorageExtensions.cs b/src/Orleans.Persistence.CosmosDB/StorageExtensions.cs index 616a444..86ee951 100644 --- a/src/Orleans.Persistence.CosmosDB/StorageExtensions.cs +++ b/src/Orleans.Persistence.CosmosDB/StorageExtensions.cs @@ -14,6 +14,123 @@ namespace Orleans.Hosting { public static class StorageExtensions { + /// + /// Configure silo to use Azure CosmosDB storage as the default grain storage using a custom Partition Key Provider. + /// + public static ISiloBuilder AddCosmosDBGrainStorageAsDefault(this ISiloBuilder builder, Action configureOptions) where TPartitionKeyProvider : class, IPartitionKeyProvider + { + return builder.AddCosmosDBGrainStorage(ProviderConstants.DEFAULT_STORAGE_PROVIDER_NAME, configureOptions); + } + + /// + /// Configure silo to use Azure CosmosDB storage for grain storage using a custom Partition Key Provider. + /// + public static ISiloBuilder AddCosmosDBGrainStorage(this ISiloBuilder builder, string name, Action configureOptions) where TPartitionKeyProvider : class, IPartitionKeyProvider + { + return builder.ConfigureServices(services => + { + services.TryAddSingleton(); + services.AddCosmosDBGrainStorage(name, configureOptions); + }); + } + + /// + /// Configure silo to use Azure CosmosDB storage as the default grain storage using a custom Partition Key Provider. + /// + public static ISiloBuilder AddCosmosDBGrainStorageAsDefault(this ISiloBuilder builder, Action configureOptions, Type customPartitionKeyProviderType) + { + return builder.AddCosmosDBGrainStorage(ProviderConstants.DEFAULT_STORAGE_PROVIDER_NAME, configureOptions, customPartitionKeyProviderType); + } + + /// + /// Configure silo to use Azure CosmosDB storage for grain storage using a custom Partition Key Provider. + /// + public static ISiloBuilder AddCosmosDBGrainStorage(this ISiloBuilder builder, string name, Action configureOptions, Type customPartitionKeyProviderType) + { + return builder.ConfigureServices(services => + { + if (customPartitionKeyProviderType != null) + { + services.TryAddSingleton(typeof(IPartitionKeyProvider), customPartitionKeyProviderType); + } + services.AddCosmosDBGrainStorage(name, configureOptions); + }); + } + + /// + /// Configure silo to use Azure CosmosDB storage as the default grain storage. + /// + public static ISiloBuilder AddCosmosDBGrainStorageAsDefault(this ISiloBuilder builder, Action configureOptions) + { + return builder.AddCosmosDBGrainStorage(ProviderConstants.DEFAULT_STORAGE_PROVIDER_NAME, configureOptions); + } + + /// + /// Configure silo to use Azure CosmosDB storage for grain storage. + /// + public static ISiloBuilder AddCosmosDBGrainStorage(this ISiloBuilder builder, string name, Action configureOptions) + { + return builder.ConfigureServices(services => services.AddCosmosDBGrainStorage(name, configureOptions)); + } + + /// + /// Configure silo to use Azure CosmosDB storage as the default grain storage using a custom Partition Key Provider. + /// + public static ISiloBuilder AddCosmosDBGrainStorageAsDefault(this ISiloBuilder builder, Action> configureOptions = null) where TPartitionKeyProvider : class, IPartitionKeyProvider + { + return builder.AddCosmosDBGrainStorage(ProviderConstants.DEFAULT_STORAGE_PROVIDER_NAME, configureOptions); + } + + /// + /// Configure silo to use Azure CosmosDB storage for grain storage using a custom Partition Key Provider. + /// + public static ISiloBuilder AddCosmosDBGrainStorage(this ISiloBuilder builder, string name, Action> configureOptions = null) where TPartitionKeyProvider : class, IPartitionKeyProvider + { + return builder.ConfigureServices(services => + { + services.TryAddSingleton(); + services.AddCosmosDBGrainStorage(name, configureOptions); + }); + } + + /// + /// Configure silo to use Azure CosmosDB storage as the default grain storage using a custom Partition Key Provider. + /// + public static ISiloBuilder AddCosmosDBGrainStorageAsDefault(this ISiloBuilder builder, Type customPartitionKeyProviderType, Action> configureOptions = null) + { + return builder.AddCosmosDBGrainStorage(ProviderConstants.DEFAULT_STORAGE_PROVIDER_NAME, customPartitionKeyProviderType, configureOptions); + } + + /// + /// Configure silo to use Azure CosmosDB storage for grain storage using a custom Partition Key Provider. + /// + public static ISiloBuilder AddCosmosDBGrainStorage(this ISiloBuilder builder, string name, Type customPartitionKeyProviderType, Action> configureOptions = null) + { + return builder.ConfigureServices(services => + { + if (customPartitionKeyProviderType != null) + { + services.TryAddSingleton(typeof(IPartitionKeyProvider), customPartitionKeyProviderType); + } + services.AddCosmosDBGrainStorage(name, configureOptions); + }); + } + + /// + /// Configure silo to use Azure CosmosDB storage as the default grain storage. + /// + public static ISiloBuilder AddCosmosDBGrainStorageAsDefault(this ISiloBuilder builder, Action> configureOptions = null) + { + return builder.AddCosmosDBGrainStorage(ProviderConstants.DEFAULT_STORAGE_PROVIDER_NAME, configureOptions); + } + + /// + /// Configure silo to use Azure CosmosDB storage for grain storage. + /// + public static ISiloBuilder AddCosmosDBGrainStorage(this ISiloBuilder builder, string name, Action> configureOptions = null) + { + return builder.ConfigureServices(services => services.AddCosmosDBGrainStorage(name, configureOptions)); + } /// /// Configure silo to use Azure CosmosDB storage as the default grain storage using a custom Partition Key Provider. diff --git a/src/Orleans.Reminders.CosmosDB/Orleans.Reminders.CosmosDB.csproj b/src/Orleans.Reminders.CosmosDB/Orleans.Reminders.CosmosDB.csproj index 1d4ab68..fbb768d 100644 --- a/src/Orleans.Reminders.CosmosDB/Orleans.Reminders.CosmosDB.csproj +++ b/src/Orleans.Reminders.CosmosDB/Orleans.Reminders.CosmosDB.csproj @@ -58,8 +58,8 @@ - - + + @@ -74,8 +74,4 @@ - - - - diff --git a/src/Orleans.Reminders.CosmosDB/ReminderExtensions.cs b/src/Orleans.Reminders.CosmosDB/ReminderExtensions.cs index c4072d2..6717881 100644 --- a/src/Orleans.Reminders.CosmosDB/ReminderExtensions.cs +++ b/src/Orleans.Reminders.CosmosDB/ReminderExtensions.cs @@ -1,49 +1,67 @@ -using Microsoft.Extensions.DependencyInjection; -using Orleans.Configuration; -using Orleans.Hosting; -using Orleans.Reminders.CosmosDB; -using System; - -namespace Orleans.Hosting -{ - public static class ReminderExtensions - { - /// - /// Adds reminder storage backed by Azure CosmosDB. - /// - /// - /// The builder. - /// - /// - /// The delegate used to configure the reminder store. - /// - /// - /// The provided , for chaining. - /// - public static ISiloHostBuilder UseCosmosDBReminderService(this ISiloHostBuilder builder, Action configure) - { - builder.ConfigureServices(services => services.UseCosmosDBReminderService(configure)); - return builder; - } - - /// - /// Adds reminder storage backed by Azure CosmosDB. - /// - /// - /// The service collection. - /// - /// - /// The delegate used to configure the reminder store. - /// - /// - /// The provided , for chaining. - /// - public static IServiceCollection UseCosmosDBReminderService(this IServiceCollection services, Action configure) - { - services.AddSingleton(); - services.Configure(configure); - services.ConfigureFormatter(); - return services; - } - } -} +using Microsoft.Extensions.DependencyInjection; +using Orleans.Configuration; +using Orleans.Hosting; +using Orleans.Reminders.CosmosDB; +using System; + +namespace Orleans.Hosting +{ + public static class ReminderExtensions + { + /// + /// Adds reminder storage backed by Azure CosmosDB. + /// + /// + /// The builder. + /// + /// + /// The delegate used to configure the reminder store. + /// + /// + /// The provided , for chaining. + /// + public static ISiloHostBuilder UseCosmosDBReminderService(this ISiloHostBuilder builder, Action configure) + { + builder.ConfigureServices(services => services.UseCosmosDBReminderService(configure)); + return builder; + } + + /// + /// Adds reminder storage backed by Azure CosmosDB. + /// + /// + /// The builder. + /// + /// + /// The delegate used to configure the reminder store. + /// + /// + /// The provided , for chaining. + /// + public static ISiloBuilder UseCosmosDBReminderService(this ISiloBuilder builder, Action configure) + { + builder.ConfigureServices(services => services.UseCosmosDBReminderService(configure)); + return builder; + } + + /// + /// Adds reminder storage backed by Azure CosmosDB. + /// + /// + /// The service collection. + /// + /// + /// The delegate used to configure the reminder store. + /// + /// + /// The provided , for chaining. + /// + public static IServiceCollection UseCosmosDBReminderService(this IServiceCollection services, Action configure) + { + services.AddSingleton(); + services.Configure(configure); + services.ConfigureFormatter(); + return services; + } + } +} diff --git a/src/Orleans.Streaming.CosmosDB/ChangeFeed/BatchRegistry.cs b/src/Orleans.Streaming.CosmosDB/ChangeFeed/BatchRegistry.cs new file mode 100644 index 0000000..6581653 --- /dev/null +++ b/src/Orleans.Streaming.CosmosDB/ChangeFeed/BatchRegistry.cs @@ -0,0 +1,29 @@ +using System; +using System.Threading.Tasks; +using Microsoft.Azure.Documents.ChangeFeedProcessor.FeedProcessing; + +namespace Orleans.Streaming.CosmosDB +{ + /// + /// A wrapper around the processing Task. + /// + public class BatchRegistry + { + private readonly Task _batch; + private readonly IChangeFeedObserverContext _context; + + public bool IsCompleted => this._batch.IsCompleted && !this._batch.IsFaulted; + + public BatchRegistry(Task batch, IChangeFeedObserverContext context) + { + this._batch = batch ?? throw new ArgumentNullException(nameof(batch)); + this._context = context ?? throw new ArgumentNullException(nameof(context)); + } + + /// + /// Tell the CosmosDB underlying Change Feed context to checkpoint. + /// + /// + public Task CheckpointNow() => this._context.CheckpointAsync(); + } +} \ No newline at end of file diff --git a/src/Orleans.Streaming.CosmosDB/ChangeFeed/ChangeFeedCheckpointer.cs b/src/Orleans.Streaming.CosmosDB/ChangeFeed/ChangeFeedCheckpointer.cs new file mode 100644 index 0000000..483c9a1 --- /dev/null +++ b/src/Orleans.Streaming.CosmosDB/ChangeFeed/ChangeFeedCheckpointer.cs @@ -0,0 +1,92 @@ +using System; +using System.Collections.Concurrent; +using System.Threading; +using System.Threading.Tasks; +using Microsoft.Azure.Documents.ChangeFeedProcessor.FeedProcessing; +using Microsoft.Extensions.Logging; + +namespace Orleans.Streaming.CosmosDB +{ + /// + /// This class describes a time-based checkpoint. + /// So, whatever is set to 'CheckpointInterval' defined on the provider options, will be the time the checkpointer will await to checkpoint successful batches. + /// + internal class ChangeFeedCheckpointer + { + private readonly TimeSpan _interval; + private readonly ILogger _logger; + private readonly ConcurrentQueue _batches; + + public ChangeFeedCheckpointer(ILoggerFactory loggerFactory, CosmosDBStreamOptions options) + { + this._interval = options.CheckpointInterval; + this._logger = loggerFactory.CreateLogger(); + this._batches = new ConcurrentQueue(); + } + + /// + /// Start the checkpointer loop that checks for completed batch tasks periodically in order to perform the checkpoints. + /// + /// The cancellation token. + public async Task RunAsync(CancellationToken cancellationToken) + { + while (!cancellationToken.IsCancellationRequested) + { + await CheckpointLastCompletedBatchAsync(); + + try + { + await Task.Delay(this._interval, cancellationToken); + } + catch (TaskCanceledException) { } + } + + await CheckpointLastCompletedBatchAsync(); + } + + /// + /// Try to checkpoint all the completed checkpoint batch tasks. + /// It is called periodically by the main loop. + /// + private async Task CheckpointLastCompletedBatchAsync() + { + try + { + BatchRegistry lastCompletedRegistry = null; + + while (this._batches.TryPeek(out var batchTask)) + { + if (batchTask.IsCompleted) + { + this._batches.TryDequeue(out batchTask); + lastCompletedRegistry = batchTask; + } + else + { + break; + } + } + + if (lastCompletedRegistry != null) + { + this._logger.LogInformation("Checkpointing..."); + await lastCompletedRegistry.CheckpointNow(); + } + } + catch (Exception ex) + { + this._logger.LogError(ex, "Error occured while executing checkpoint."); + } + } + + /// + /// Add a processing batch task to be checkpointed later on. + /// + /// The batck task + /// The Change Feed context + public void AddBatch(Task task, IChangeFeedObserverContext context) + { + this._batches.Enqueue(new BatchRegistry(task, context)); + } + } +} \ No newline at end of file diff --git a/src/Orleans.Streaming.CosmosDB/ChangeFeed/ChangeFeedDispatcher.cs b/src/Orleans.Streaming.CosmosDB/ChangeFeed/ChangeFeedDispatcher.cs new file mode 100644 index 0000000..15f75d5 --- /dev/null +++ b/src/Orleans.Streaming.CosmosDB/ChangeFeed/ChangeFeedDispatcher.cs @@ -0,0 +1,89 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading.Tasks; +using Microsoft.Azure.Documents; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; +using Newtonsoft.Json; +using Orleans.Concurrency; +using Orleans.Streams; + +namespace Orleans.Streaming.CosmosDB +{ + internal interface IChangeFeedDispatcher : IGrainWithStringKey + { + Task Dispatch(Immutable> batch); + } + + /// + /// This grain dispatch messages to the subscribers. + /// This stateless worker poll is identified by {ProviderName}-{PartitionRagenId}. + /// That way, we can scale better the dispatching process based on the ranges a given processor have. + /// This grain exist to close the gap on GrainServices feature that doesn't allow it to directly talk to Streams. + /// + [StatelessWorker] + internal class ChangeFeedDispatcher : Grain, IChangeFeedDispatcher + { + private const char KEY_SEPARATOR = '-'; + private readonly ILogger _logger; + private readonly TimeSpan _latencyAnnouncementWindow; + private readonly CosmosDBStreamOptions _options; + private IStreamProvider _streamProvider; + private TimeSpan _consumeLatency; + private DateTime? _lastLatencyOutput; + + public ChangeFeedDispatcher(ILoggerFactory loggerFactory, IOptions options) + { + this._logger = loggerFactory.CreateLogger(); + this._options = options.Value; + } + + public override Task OnActivateAsync() + { + // The stream provider is registered by a name, and the first part of this stateless worker poll key, is that name. + this._streamProvider = this.GetStreamProvider(this.GetPrimaryKeyString().Split(KEY_SEPARATOR)[0]); + return Task.CompletedTask; + } + + public async Task Dispatch(Immutable> batch) + { + var consumeLatency = DateTime.UtcNow - batch.Value.First().Document.Timestamp; + + if (_lastLatencyOutput is null || DateTime.UtcNow - _lastLatencyOutput >= _latencyAnnouncementWindow) + { + if (consumeLatency > this._options.MaxLatencyThreshold) + { + _logger.LogWarning($"Cosmos consume latency estimated at '{consumeLatency}' exceeded the maximum allowed of '{this._options.MaxLatencyThreshold}'."); + _lastLatencyOutput = DateTime.UtcNow; + } + this._logger.LogInformation($"Cosmos consume latency estimated at '{consumeLatency}'."); + } + + foreach (var change in batch.Value) + { + if (change.StreamId == Guid.Empty || + string.IsNullOrWhiteSpace(change.StreamNamespace)) + { + this._logger.LogWarning($"Invalid stream identity. The document will be skipped: '{JsonConvert.SerializeObject(change.Document)}'"); + continue; + } + + var stream = this._streamProvider.GetStream(change.StreamId, change.StreamNamespace); + try + { + await stream.OnNextAsync(change.Document); + } + catch (Exception exc) + { + this._logger.LogError(exc, $"Failure dispatching message from change feed. DocumentId: {change.Document.Id} | Error: {exc.Message}."); + } + + if (this._logger.IsEnabled(LogLevel.Trace)) + { + this._logger.LogWarning($"Dispatched document: {JsonConvert.SerializeObject(change)}"); + } + } + } + } +} \ No newline at end of file diff --git a/src/Orleans.Streaming.CosmosDB/ChangeFeed/ChangeFeedProcessor.cs b/src/Orleans.Streaming.CosmosDB/ChangeFeed/ChangeFeedProcessor.cs new file mode 100644 index 0000000..eaca0a8 --- /dev/null +++ b/src/Orleans.Streaming.CosmosDB/ChangeFeed/ChangeFeedProcessor.cs @@ -0,0 +1,172 @@ +using System; +using System.Collections.Generic; +using System.Threading; +using System.Threading.Tasks; +using Microsoft.Azure.Documents; +using Microsoft.Azure.Documents.ChangeFeedProcessor.FeedProcessing; +using Microsoft.Azure.Documents.ChangeFeedProcessor.PartitionManagement; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Logging; +using Newtonsoft.Json; +using Orleans.Concurrency; + +namespace Orleans.Streaming.CosmosDB +{ + internal interface IProcessor + { + string Name { get; } + Task Start(); + Task Stop(); + } + + /// + /// Wraps CosmosDB IChangeFeedProcessor instance. + /// One of this type is created per stream registered. + /// It is responsible for monitoring and dispatching of changes in a given CosmosDB document Collection + /// + internal class ChangeFeedProcessor : IProcessor, IChangeFeedObserver, IChangeFeedObserverFactory + { + private readonly TaskScheduler _scheduler; + private readonly IGrainFactory _grainFactory; + private readonly ILogger _logger; + private readonly string _name; + private readonly string _siloName; + private readonly CosmosDBStreamOptions _options; + private readonly IStreamMapper _mapper; + private readonly CancellationTokenSource _ctsClose; + private readonly ChangeFeedCheckpointer _checkpointer; + private IChangeFeedProcessor _processor; + private Task _checkpointerTask; + + public string Name => this._name; + + private ChangeFeedProcessor(string name, string siloName, + CosmosDBStreamOptions options, IServiceProvider serviceProvider, + TaskScheduler scheduler, IStreamMapper mapper) + { + var loggerFactory = serviceProvider.GetRequiredService(); + this._name = name; + this._siloName = siloName; + this._scheduler = scheduler; + this._grainFactory = serviceProvider.GetRequiredService(); + this._logger = loggerFactory.CreateLogger($"{nameof(ChangeFeedProcessor)}-{name}"); + this._mapper = mapper; + this._options = options; + this._ctsClose = new CancellationTokenSource(); + this._checkpointer = new ChangeFeedCheckpointer(loggerFactory, this._options); + } + + /// + /// Factory method to create a new IProcessor instances + /// + /// Name of the stream provider + /// This Silo name + /// Configuration for both the monitored and lease collections + /// DI container + /// Orleans Task Scheduler + /// The IStreamMapper implementation + /// IProcessor implementation + public static IProcessor Create(string name, string siloName, CosmosDBStreamOptions options, IServiceProvider serviceProvider, TaskScheduler scheduler, IStreamMapper mapper) + { + return new ChangeFeedProcessor(name, siloName, options, serviceProvider, scheduler, mapper); + } + + /// + /// Invoked whenever the change feed receive change events. + /// The documents here are ordered by change date and are delivered on batches from a single partition. + /// + /// Change feed context + /// The batch of changed documents + /// Cancellation token + public Task ProcessChangesAsync(IChangeFeedObserverContext context, IReadOnlyList docs, CancellationToken cancellationToken) + { + if (this._logger.IsEnabled(LogLevel.Trace)) + { + this._logger.LogInformation($"Received {docs.Count} doc(s) from collection '{this._options.FeedCollectionInfo.CollectionName}'."); + } + + var grain = this._grainFactory.GetGrain($"{this._name}-{context.PartitionKeyRangeId}"); + var processTask = Task.Factory.StartNew( + async () => await grain.Dispatch(await this.BuildBatch(docs)), + cancellationToken, + TaskCreationOptions.None, + this._scheduler + ); + + this._checkpointer.AddBatch(processTask, context); + + return Task.CompletedTask; + } + + /// + /// Build a batch of messages to be dispatched to the subscribers + /// + /// The back of messages to be dispatched + private async Task>> BuildBatch(IReadOnlyList docs) + { + var batch = new List<(Guid StreamId, string StreamNamespace, Document Document)>(); + foreach (var d in docs) + { + var streamIdentity = await this._mapper.GetStreamIdentity(d); + if (streamIdentity == default || + streamIdentity.StreamId == Guid.Empty || + string.IsNullOrWhiteSpace(streamIdentity.StreamNamespace)) + { + this._logger.LogWarning($"Invalid stream identity. The document will be skipped: '{JsonConvert.SerializeObject(d)}'"); + continue; + } + batch.Add((streamIdentity.StreamId, streamIdentity.StreamNamespace, d)); + } + return ((IReadOnlyList<(Guid StreamId, string StreamNamespace, Document Document)>)batch.AsReadOnly()).AsImmutable(); + } + + /// + /// Begin monitoring the CosmosDB Change Feed + /// + public async Task Start() + { + this._processor = await new Microsoft.Azure.Documents.ChangeFeedProcessor.ChangeFeedProcessorBuilder() + .WithHostName(this._siloName) + .WithProcessorOptions(new Microsoft.Azure.Documents.ChangeFeedProcessor.ChangeFeedProcessorOptions + { + CheckpointFrequency = new Microsoft.Azure.Documents.ChangeFeedProcessor.CheckpointFrequency + { + ExplicitCheckpoint = true + } +#if DEBUG + , + LeasePrefix = "DEBUG" +#endif + }) + .WithFeedCollection(this._options.FeedCollectionInfo) + .WithLeaseCollection(this._options.LeaseCollectionInfo) + .WithObserverFactory(this) + .BuildAsync(); + await this._processor.StartAsync(); + } + + /// + /// Stops monitoring the CosmosDB Change Feed + /// + public async Task Stop() + { + await this._processor.StopAsync(); + } + + public IChangeFeedObserver CreateObserver() => this; + + public Task OpenAsync(IChangeFeedObserverContext context) + { + this._logger.LogInformation($"Starting to monitor CosmosDB document collection '{this._options.FeedCollectionInfo.CollectionName}'. Silo: '{this._siloName}' | Partition key range: '{context.PartitionKeyRangeId}'"); + this._checkpointerTask = this._checkpointer.RunAsync(this._ctsClose.Token); + return Task.CompletedTask; + } + + public async Task CloseAsync(IChangeFeedObserverContext context, ChangeFeedObserverCloseReason reason) + { + this._logger.LogInformation($"Stopping to monitor CosmosDB document collection '{this._options.FeedCollectionInfo.CollectionName}'. Silo: '{this._siloName}' | Partition key range: '{context.PartitionKeyRangeId}' | Reason: '{reason}'"); + this._ctsClose.Cancel(); + await this._checkpointerTask; + } + } +} \ No newline at end of file diff --git a/src/Orleans.Streaming.CosmosDB/ChangeFeed/ChangeFeedService.cs b/src/Orleans.Streaming.CosmosDB/ChangeFeed/ChangeFeedService.cs new file mode 100644 index 0000000..d6f4789 --- /dev/null +++ b/src/Orleans.Streaming.CosmosDB/ChangeFeed/ChangeFeedService.cs @@ -0,0 +1,79 @@ +using System; +using System.Collections.Generic; +using System.Threading.Tasks; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; +using Orleans.Configuration; +using Orleans.Core; +using Orleans.Runtime; +using Orleans.Services; + +namespace Orleans.Streaming.CosmosDB +{ + internal interface IChangeFeedService : IGrainService { } + + /// + /// This GrainService hold a list of IProcessors and is the bridge between CosmosDB ChangeFeed and Orleans Streaming system. + /// + internal class ChangeFeedService : GrainService, IChangeFeedService + { + private readonly ILogger _logger; + private readonly IReadOnlyList _processors; + + public ChangeFeedService( + CosmosDBStreamConfigurator configurator, + IServiceProvider serviceProvider, + IGrainIdentity id, + IOptions siloOptions, + Silo silo, + ILoggerFactory loggerFactory) + : base(id, silo, loggerFactory) + { + this._logger = loggerFactory.CreateLogger(); + + var processor = new List(); + foreach (var setting in configurator.Settings) + { + processor.Add(ChangeFeedProcessor.Create( + setting.Key, + siloOptions.Value.SiloName, + setting.Value.Options, + serviceProvider, + TaskScheduler.Current, + (IStreamMapper)ActivatorUtilities.CreateInstance(serviceProvider, setting.Value.MapperType) + )); + } + + this._processors = processor; + } + + public override async Task Start() + { + this._logger.LogInformation($"Starting CosmosDB Change Feed stream provider registered processor(s)."); + foreach (var proc in this._processors) + { + this._logger.LogInformation($"Starting processor: {proc.Name}..."); + await proc.Start(); + this._logger.LogInformation($"Processor: {proc.Name} started."); + } + this._logger.LogInformation($"{this._processors.Count} processor(s) started."); + + await base.Start(); + } + + public override async Task Stop() + { + this._logger.LogInformation($"Stopping CosmosDB Change Feed stream provider registered processor(s)."); + foreach (var proc in this._processors) + { + this._logger.LogInformation($"Stopping processor: {proc.Name}..."); + await proc.Stop(); + this._logger.LogInformation($"Processor: {proc.Name} stopped."); + } + this._logger.LogInformation($"{this._processors.Count} processor(s) stopped."); + + await base.Stop(); + } + } +} \ No newline at end of file diff --git a/src/Orleans.Streaming.CosmosDB/ChangeFeed/IStreamMapper.cs b/src/Orleans.Streaming.CosmosDB/ChangeFeed/IStreamMapper.cs new file mode 100644 index 0000000..0c16bbe --- /dev/null +++ b/src/Orleans.Streaming.CosmosDB/ChangeFeed/IStreamMapper.cs @@ -0,0 +1,20 @@ +using System; +using System.Threading.Tasks; +using Microsoft.Azure.Documents; + +namespace Orleans.Streaming.CosmosDB +{ + /// + /// Types must implement this interface to map a CosmosDB Document to a particular stream. + /// Custom implementations of this interface can use any arbitrary logic to define the stream identity based on the the document. + /// + public interface IStreamMapper + { + /// + /// Map a document to a stream Identity + /// + /// The document + /// Tuple containing the Stream Identity (StreamId, StreamNamespace) + ValueTask<(Guid StreamId, string StreamNamespace)> GetStreamIdentity(Document document); + } +} \ No newline at end of file diff --git a/src/Orleans.Streaming.CosmosDB/ChangeFeed/IdBasedStreamMapper.cs b/src/Orleans.Streaming.CosmosDB/ChangeFeed/IdBasedStreamMapper.cs new file mode 100644 index 0000000..57b1f66 --- /dev/null +++ b/src/Orleans.Streaming.CosmosDB/ChangeFeed/IdBasedStreamMapper.cs @@ -0,0 +1,43 @@ +using System; +using System.Threading.Tasks; +using Microsoft.Azure.Documents; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; + +namespace Orleans.Streaming.CosmosDB +{ + /// + /// Default implementation of Stream Identity Mapper. + /// This implementation uses the Document.Id as the stream ID and the configurable CosmosDBStreamOptions.DefaultStreamNamespace as the namespace. + /// To use this mapper, the Document.Id must be set as CosmosDB default Id field, a Guid. + /// + internal class IdBasedStreamMapper : IStreamMapper + { + private readonly CosmosDBStreamOptions _options; + private readonly ILogger _logger; + + public IdBasedStreamMapper(IOptions options, ILoggerFactory loggerFactory) + { + this._options = options.Value; + this._logger = loggerFactory.CreateLogger(); + } + + /// + /// For a given document, get the Document.Id as the stream Id, and the CosmosDBStreamOptions.DefaultStreamNamespace as the namespace. + /// + /// The CosmosDB document + /// Tuple containing the Stream Identity (StreamId, StreamNamespace) + public ValueTask<(Guid StreamId, string StreamNamespace)> GetStreamIdentity(Document document) + { + if (Guid.TryParse(document.Id, out Guid id)) + { + return new ValueTask<(Guid, string)>((Guid.Parse(document.Id), this._options.DefaultStreamNamespace)); + } + else + { + this._logger.LogError($"Unable to get stream id from the document. Document.Id = '{document.Id}'. It must be a Guid in order to be used by IdBasedStreamMapper. The document will be skipped."); + return default; + } + } + } +} \ No newline at end of file diff --git a/src/Orleans.Streaming.CosmosDB/Hosting/HostingExtensions.cs b/src/Orleans.Streaming.CosmosDB/Hosting/HostingExtensions.cs new file mode 100644 index 0000000..2069d4d --- /dev/null +++ b/src/Orleans.Streaming.CosmosDB/Hosting/HostingExtensions.cs @@ -0,0 +1,56 @@ +using System; +using Microsoft.Extensions.DependencyInjection; +using Orleans; +using Orleans.Hosting; + +namespace Orleans.Streaming.CosmosDB +{ + public static class HostingExtensions + { + /// + /// Register CosmosDB Change feed stream provider to Orleans runtime. + /// + /// Configuration builder delegate + public static ISiloHostBuilder AddCosmosDBStreaming(this ISiloHostBuilder siloBuilder, Action configure) + { + if (configure == null) throw new ArgumentNullException(nameof(configure)); + + var configurator = new CosmosDBStreamConfigurator(); + configure.Invoke(configurator); + + foreach (var config in configurator.Settings) + { + siloBuilder.AddSimpleMessageStreamProvider(config.Key); + } + + return siloBuilder + .Configure(configure) + .AddGrainService() + .ConfigureServices(services => services.AddSingleton(configurator)) + .ConfigureApplicationParts(parts => parts.AddApplicationPart(typeof(ChangeFeedService).Assembly).WithReferences()); + } + + /// + /// Register CosmosDB Change feed stream provider to Orleans runtime. + /// + /// Configuration builder delegate + public static ISiloBuilder AddCosmosDBStreaming(this ISiloBuilder siloBuilder, Action configure) + { + if (configure == null) throw new ArgumentNullException(nameof(configure)); + + var configurator = new CosmosDBStreamConfigurator(); + configure.Invoke(configurator); + + foreach (var config in configurator.Settings) + { + siloBuilder.AddSimpleMessageStreamProvider(config.Key); + } + + return siloBuilder + .Configure(configure) + .AddGrainService() + .ConfigureServices(services => services.AddSingleton(configurator)) + .ConfigureApplicationParts(parts => parts.AddApplicationPart(typeof(ChangeFeedService).Assembly).WithReferences()); + } + } +} \ No newline at end of file diff --git a/src/Orleans.Streaming.CosmosDB/Options/CosmosDBStreamConfigurator.cs b/src/Orleans.Streaming.CosmosDB/Options/CosmosDBStreamConfigurator.cs new file mode 100644 index 0000000..4947332 --- /dev/null +++ b/src/Orleans.Streaming.CosmosDB/Options/CosmosDBStreamConfigurator.cs @@ -0,0 +1,45 @@ +using System; +using System.Collections.Generic; +using System.Linq; + +namespace Orleans.Streaming.CosmosDB +{ + /// + /// Configure CosmosDB stream provider + /// + public class CosmosDBStreamConfigurator + { + internal Dictionary Settings; + + internal CosmosDBStreamConfigurator() + { + this.Settings = new Dictionary(); + } + + /// + /// Register CosmosDB Change Feed to the stream provider + /// + /// Provider Name + /// Options configuration delegate + /// (optional) A custom IStreamMapper implementation type. + public CosmosDBStreamConfigurator AddStream(string name, Action configure, Type mapperType = null) + { + if (configure == null) throw new ArgumentNullException(nameof(configure)); + + if (mapperType == null) + { + mapperType = typeof(IdBasedStreamMapper); + } + else if (!mapperType.GetInterfaces().Contains(typeof(IStreamMapper))) + { + throw new InvalidOperationException("Mapper must implement 'IStreamMapper' interface'"); + } + + var options = new CosmosDBStreamOptions(); + configure.Invoke(options); + this.Settings[name] = (options, mapperType); + + return this; + } + } +} \ No newline at end of file diff --git a/src/Orleans.Streaming.CosmosDB/Options/CosmosDBStreamOptions.cs b/src/Orleans.Streaming.CosmosDB/Options/CosmosDBStreamOptions.cs new file mode 100644 index 0000000..07e6c47 --- /dev/null +++ b/src/Orleans.Streaming.CosmosDB/Options/CosmosDBStreamOptions.cs @@ -0,0 +1,35 @@ +using System; +using Microsoft.Azure.Documents.ChangeFeedProcessor; + +namespace Orleans.Streaming.CosmosDB +{ + public class CosmosDBStreamOptions + { + /// + /// The connection settings for the CosmosDB Document Collection tha will be monitored. + /// + public DocumentCollectionInfo FeedCollectionInfo { get; set; } = new DocumentCollectionInfo(); + + /// + /// The connection settings for the CosmosDB Change Feed Lease Collection. + /// + public DocumentCollectionInfo LeaseCollectionInfo { get; set; } = new DocumentCollectionInfo(); + + /// + /// The desired interval between checkpoints with the CosmosDB Change Feed. + /// + public TimeSpan CheckpointInterval { get; set; } = TimeSpan.FromSeconds(5); + + /// + /// Maximum amount of time to process stream documents. + /// If this amount is exceed, an alert will be writen to the output logs. + /// Default to 30 seconds. + /// + public TimeSpan MaxLatencyThreshold { get; set; } = TimeSpan.FromSeconds(30); + + /// + /// If no custom implementation type of IStreamMapper is provided, this property will be used to define the Stream namespace. + /// + public string DefaultStreamNamespace { get; set; } = "Documents"; + } +} \ No newline at end of file diff --git a/src/Orleans.Streaming.CosmosDB/Orleans.Streaming.CosmosDB.csproj b/src/Orleans.Streaming.CosmosDB/Orleans.Streaming.CosmosDB.csproj new file mode 100644 index 0000000..fc1f487 --- /dev/null +++ b/src/Orleans.Streaming.CosmosDB/Orleans.Streaming.CosmosDB.csproj @@ -0,0 +1,35 @@ + + + + netstandard2.0 + latest + + + + Orleans.Streaming.CosmosDB + Microsoft Orleans Streaming provider for Azure CosmosDB + Microsoft Orleans streaming provider backed by Azure CosmosDB Change Feed + Gutemberg Ribeiro + Orleans Azure CosmosDB + https://github.com/dotnet/Orleans#license + https://github.com/OrleansContrib/Orleans.CosmosDB + https://raw.githubusercontent.com/dotnet/orleans/gh-pages/assets/logo_128.png + Orleans Cloud-Computing Actor-Model Actors Distributed-Systems Azure CosmosDB C# .NET + + https://github.com/OrleansContrib/Orleans.CosmosDB + git + true + true + 1.1.0-rc1 + + + + + + + + + + + + diff --git a/test/Orleans.CosmosDB.Tests/IndexTests.cs b/test/Orleans.CosmosDB.Tests/IndexTests.cs index b677b0d..cd457e2 100644 --- a/test/Orleans.CosmosDB.Tests/IndexTests.cs +++ b/test/Orleans.CosmosDB.Tests/IndexTests.cs @@ -1,141 +1,141 @@ -using System; -using Orleans.CosmosDB.Tests.Grains; -using Orleans.Hosting; -using Orleans.Persistence.CosmosDB; -using Orleans.Runtime; -using Orleans.Storage; -using System.Collections.Generic; -using System.Linq; -using System.Threading.Tasks; -using Xunit; - -// For Index coverage CreateDocumentQuery -using Microsoft.Azure.Documents.Client; -using Microsoft.Azure.Documents.Linq; -using Microsoft.Azure.Documents; -using static Orleans.CosmosDB.Tests.IndexTests; - -namespace Orleans.CosmosDB.Tests -{ - - - public class IndexTests : IClassFixture - { - private const string StorageDbName = "OrleansStorageTest"; - private StorageIndexFixture _fixture; - - public class StorageIndexFixture : OrleansFixture - { - internal string AccountEndpoint; - internal string AccountKey; - - protected override ISiloHostBuilder PreBuild(ISiloHostBuilder builder) - { - OrleansFixture.GetAccountInfo(out this.AccountEndpoint, out this.AccountKey); - - return builder - .AddCosmosDBGrainStorage(OrleansFixture.TEST_STORAGE, opt => - { - opt.AccountEndpoint = this.AccountEndpoint; - opt.AccountKey = this.AccountKey; - opt.ConnectionMode = ConnectionMode.Gateway; - opt.DropDatabaseOnInit = true; - opt.AutoUpdateStoredProcedures = true; - opt.CanCreateResources = true; - opt.DB = StorageDbName; - opt.StateFieldsToIndex.Add("NftIndexedInt"); - opt.StateFieldsToIndex.Add("UserState.FtIndexedString"); - }); - } - } - - public IndexTests(StorageIndexFixture fixture) => this._fixture = fixture; - - private async Task AssertAllTasksCompletedSuccessfullyAsync(IEnumerable tasks) - { - await Task.WhenAll(tasks); - foreach (var t in tasks) - { - Assert.True(t.IsCompletedSuccessfully); - } - } - - [Fact] - public async Task Index_Test() - { - var tasks = new List(); - - const int mod = 10; - const int max = 100; - int nftValue(int value) => value % mod; - string ftValue(int value) => $"FtIndex {value}"; - string nonValue(int value) => $"NonIndex {value}"; - int parseIntValue(string data) => int.Parse(data.Substring(data.LastIndexOf(" ") + 1)); - - for (int i = 0; i < max; i++) - { - var grain = this._fixture.Client.GetGrain(i) as ITestIndexedPropertiesGrain; - - // NftIndexedInt has multiple entities per key value. - await grain.SetNftIndexedIntAsync(nftValue(i)); - - // FtIndexedString has a single entity per key value (as does NonIndexedString). - await grain.SetFtIndexedStringAsync(ftValue(i)); - await grain.SetNonIndexedStringAsync(nonValue(i)); - tasks.Add(grain.WriteAsync()); - } - - await AssertAllTasksCompletedSuccessfullyAsync(tasks); - - var storage = this._fixture.Silo.Services.GetServiceByName(OrleansFixture.TEST_STORAGE) as CosmosDBGrainStorage; - string grainTypeName() => typeof(TestIndexedPropertiesGrain).FullName; - - // Use the Client GrainReferenceConverter here to obtain a grain in OutsideClientRuntime. - var grainReferenceConverter = (IGrainReferenceConverter)this._fixture.Client.ServiceProvider.GetService(typeof(IGrainReferenceConverter)); - ITestIndexedPropertiesGrain castToClientSpace(GrainReference grainRef) - => grainReferenceConverter.GetGrainFromKeyString(grainRef.ToKeyString()).Cast() as ITestIndexedPropertiesGrain; - - // One entity per key value. - for (int i = 0; i < max; i++) - { - var grains = await storage.LookupAsync(grainTypeName(), "UserState.FtIndexedString", ftValue(i)); - Assert.Single(grains); - var grain = castToClientSpace(grains[0]); - Assert.Equal(nftValue(i), await grain.GetNftIndexedIntAsync()); - Assert.Equal(ftValue(i), await grain.GetFtIndexedStringAsync()); - Assert.Equal(nonValue(i), await grain.GetNonIndexedStringAsync()); - } - - // Multiple entities per key value. - for (int i = 0; i < mod; i++) - { - var grains = await storage.LookupAsync(grainTypeName(), "NftIndexedInt", nftValue(i)); - Assert.Equal(max / mod, grains.Count); - foreach (var grain in grains.Select(g => castToClientSpace(g))) { - Assert.Equal(i, await grain.GetNftIndexedIntAsync()); - Assert.True(parseIntValue(await grain.GetFtIndexedStringAsync()) % mod == i); - Assert.True(parseIntValue(await grain.GetNonIndexedStringAsync()) % mod == i); - } - } - - // Verify index usage. This will return max / mod + 1 items. - IDocumentQuery query = storage._dbClient.CreateDocumentQuery( - UriFactory.CreateDocumentCollectionUri(StorageDbName, CosmosDBStorageOptions.ORLEANS_STORAGE_COLLECTION), - $"SELECT * FROM c WHERE c.GrainType = \"{grainTypeName()}\"" + - $" AND (c.State.UserState.FtIndexedString = \"{ftValue(42)}\" OR c.State.NftIndexedInt = 5)", - new FeedOptions - { - PopulateQueryMetrics = true, - MaxItemCount = -1, - MaxDegreeOfParallelism = -1, - EnableCrossPartitionQuery = true - }).AsDocumentQuery(); - FeedResponse result = await query.ExecuteNextAsync(); - - // This should return a dictionary containing a single QueryMetrics item. - IReadOnlyDictionary metrics = result.QueryMetrics; - Assert.Single(metrics); - Assert.Equal((max / mod + 1) * (1.0 / max), metrics["0"].IndexHitRatio); // IndexHitDocumentCount is not public - } - } -} +using System; +using Orleans.CosmosDB.Tests.Grains; +using Orleans.Hosting; +using Orleans.Persistence.CosmosDB; +using Orleans.Runtime; +using Orleans.Storage; +using System.Collections.Generic; +using System.Linq; +using System.Threading.Tasks; +using Xunit; + +// For Index coverage CreateDocumentQuery +using Microsoft.Azure.Documents.Client; +using Microsoft.Azure.Documents.Linq; +using Microsoft.Azure.Documents; +using static Orleans.CosmosDB.Tests.IndexTests; + +namespace Orleans.CosmosDB.Tests +{ + + + public class IndexTests : IClassFixture + { + private const string StorageDbName = "OrleansStorageTest"; + private StorageIndexFixture _fixture; + + public class StorageIndexFixture : OrleansFixture + { + internal string AccountEndpoint; + internal string AccountKey; + + protected override ISiloBuilder PreBuild(ISiloBuilder builder) + { + OrleansFixture.GetAccountInfo(out this.AccountEndpoint, out this.AccountKey); + + return builder + .AddCosmosDBGrainStorage(OrleansFixture.TEST_STORAGE, opt => + { + opt.AccountEndpoint = this.AccountEndpoint; + opt.AccountKey = this.AccountKey; + opt.ConnectionMode = ConnectionMode.Gateway; + opt.DropDatabaseOnInit = true; + opt.AutoUpdateStoredProcedures = true; + opt.CanCreateResources = true; + opt.DB = StorageDbName; + opt.StateFieldsToIndex.Add("NftIndexedInt"); + opt.StateFieldsToIndex.Add("UserState.FtIndexedString"); + }); + } + } + + public IndexTests(StorageIndexFixture fixture) => this._fixture = fixture; + + private async Task AssertAllTasksCompletedSuccessfullyAsync(IEnumerable tasks) + { + await Task.WhenAll(tasks); + foreach (var t in tasks) + { + Assert.True(t.IsCompletedSuccessfully); + } + } + + [Fact] + public async Task Index_Test() + { + var tasks = new List(); + + const int mod = 10; + const int max = 100; + int nftValue(int value) => value % mod; + string ftValue(int value) => $"FtIndex {value}"; + string nonValue(int value) => $"NonIndex {value}"; + int parseIntValue(string data) => int.Parse(data.Substring(data.LastIndexOf(" ") + 1)); + + for (int i = 0; i < max; i++) + { + var grain = this._fixture.Client.GetGrain(i) as ITestIndexedPropertiesGrain; + + // NftIndexedInt has multiple entities per key value. + await grain.SetNftIndexedIntAsync(nftValue(i)); + + // FtIndexedString has a single entity per key value (as does NonIndexedString). + await grain.SetFtIndexedStringAsync(ftValue(i)); + await grain.SetNonIndexedStringAsync(nonValue(i)); + tasks.Add(grain.WriteAsync()); + } + + await AssertAllTasksCompletedSuccessfullyAsync(tasks); + + var storage = this._fixture.Host.Services.GetServiceByName(OrleansFixture.TEST_STORAGE) as CosmosDBGrainStorage; + string grainTypeName() => typeof(TestIndexedPropertiesGrain).FullName; + + // Use the Client GrainReferenceConverter here to obtain a grain in OutsideClientRuntime. + var grainReferenceConverter = (IGrainReferenceConverter)this._fixture.Client.ServiceProvider.GetService(typeof(IGrainReferenceConverter)); + ITestIndexedPropertiesGrain castToClientSpace(GrainReference grainRef) + => grainReferenceConverter.GetGrainFromKeyString(grainRef.ToKeyString()).Cast() as ITestIndexedPropertiesGrain; + + // One entity per key value. + for (int i = 0; i < max; i++) + { + var grains = await storage.LookupAsync(grainTypeName(), "UserState.FtIndexedString", ftValue(i)); + Assert.Single(grains); + var grain = castToClientSpace(grains[0]); + Assert.Equal(nftValue(i), await grain.GetNftIndexedIntAsync()); + Assert.Equal(ftValue(i), await grain.GetFtIndexedStringAsync()); + Assert.Equal(nonValue(i), await grain.GetNonIndexedStringAsync()); + } + + // Multiple entities per key value. + for (int i = 0; i < mod; i++) + { + var grains = await storage.LookupAsync(grainTypeName(), "NftIndexedInt", nftValue(i)); + Assert.Equal(max / mod, grains.Count); + foreach (var grain in grains.Select(g => castToClientSpace(g))) { + Assert.Equal(i, await grain.GetNftIndexedIntAsync()); + Assert.True(parseIntValue(await grain.GetFtIndexedStringAsync()) % mod == i); + Assert.True(parseIntValue(await grain.GetNonIndexedStringAsync()) % mod == i); + } + } + + // Verify index usage. This will return max / mod + 1 items. + IDocumentQuery query = storage._dbClient.CreateDocumentQuery( + UriFactory.CreateDocumentCollectionUri(StorageDbName, CosmosDBStorageOptions.ORLEANS_STORAGE_COLLECTION), + $"SELECT * FROM c WHERE c.GrainType = \"{grainTypeName()}\"" + + $" AND (c.State.UserState.FtIndexedString = \"{ftValue(42)}\" OR c.State.NftIndexedInt = 5)", + new FeedOptions + { + PopulateQueryMetrics = true, + MaxItemCount = -1, + MaxDegreeOfParallelism = -1, + EnableCrossPartitionQuery = true + }).AsDocumentQuery(); + FeedResponse result = await query.ExecuteNextAsync(); + + // This should return a dictionary containing a single QueryMetrics item. + IReadOnlyDictionary metrics = result.QueryMetrics; + Assert.Single(metrics); + Assert.Equal((max / mod + 1) * (1.0 / max), metrics["0"].IndexHitRatio); // IndexHitDocumentCount is not public + } + } +} diff --git a/test/Orleans.CosmosDB.Tests/Orleans.CosmosDB.Tests.csproj b/test/Orleans.CosmosDB.Tests/Orleans.CosmosDB.Tests.csproj index 369315d..99076f3 100644 --- a/test/Orleans.CosmosDB.Tests/Orleans.CosmosDB.Tests.csproj +++ b/test/Orleans.CosmosDB.Tests/Orleans.CosmosDB.Tests.csproj @@ -1,22 +1,20 @@  - netcoreapp2.0 + netcoreapp3.0 latest false - - - - - - - - - - + + + + + + + + diff --git a/test/Orleans.CosmosDB.Tests/OrleansFixture.cs b/test/Orleans.CosmosDB.Tests/OrleansFixture.cs index 6156564..f463245 100644 --- a/test/Orleans.CosmosDB.Tests/OrleansFixture.cs +++ b/test/Orleans.CosmosDB.Tests/OrleansFixture.cs @@ -1,19 +1,17 @@ +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; using Newtonsoft.Json.Linq; using Orleans.Configuration; using Orleans.Hosting; -using Orleans.Runtime; -using Orleans.Runtime.Configuration; using System; -using System.Collections.Generic; using System.IO; -using System.Net; using System.Threading; namespace Orleans.CosmosDB.Tests { public class OrleansFixture : IDisposable { - public ISiloHost Silo { get; } + public IHost Host { get; } public IClusterClient Client { get; } private const string ClusterId = "TESTCLUSTER"; @@ -29,45 +27,28 @@ public OrleansFixture() var portInc = Interlocked.Increment(ref portUniquifier); var siloPort = EndpointOptions.DEFAULT_SILO_PORT + portInc; var gatewayPort = EndpointOptions.DEFAULT_GATEWAY_PORT + portInc; - ClusterConfiguration clusterConfig = ClusterConfiguration.LocalhostPrimarySilo(siloPort, gatewayPort); - var configDefaults = clusterConfig.Defaults; - var siloAddress = clusterConfig.PrimaryNode.Address; - - var builder = new SiloHostBuilder(); - var silo = PreBuild(builder) - .Configure(options => + var silo = new HostBuilder() + .UseOrleans(b => { - options.ClusterId = ClusterId; - options.ServiceId = serviceId; - }) - .UseDevelopmentClustering(options => options.PrimarySiloEndpoint = clusterConfig.PrimaryNode) - .ConfigureEndpoints(siloAddress, siloPort, gatewayPort) - //.UseConfiguration(clusterConfig) - .ConfigureApplicationParts(pm => pm.AddApplicationPart(typeof(PersistenceTests).Assembly)) - .Build(); - silo.StartAsync().Wait(); - this.Silo = silo; - - //clientConfig.FallbackSerializationProvider = typeof(ILBasedSerializer).GetTypeInfo(); + b.UseLocalhostClustering(); + b.ConfigureEndpoints(siloPort, gatewayPort); + b.Configure(options => + { + options.ClusterId = ClusterId; + options.ServiceId = serviceId; + }); + b.ConfigureApplicationParts(pm => pm.AddApplicationPart(typeof(PersistenceTests).Assembly)); + PreBuild(b); + }).Build(); - ClientConfiguration clientConfig = ClientConfiguration.LocalhostSilo(); - var client = new ClientBuilder() - //.UseConfiguration(clientConfig) - .Configure(options => - { - options.ClusterId = ClusterId; - options.ServiceId = serviceId; - }) - .UseStaticClustering(options => options.Gateways = new List { new IPEndPoint(siloAddress, gatewayPort).ToGatewayUri() }) - .ConfigureApplicationParts(pm => pm.AddApplicationPart(typeof(PersistenceTests).Assembly)) - .Build(); + silo.StartAsync().Wait(); - client.Connect().Wait(); + this.Host = silo; - this.Client = client; + this.Client = this.Host.Services.GetRequiredService(); } - protected virtual ISiloHostBuilder PreBuild(ISiloHostBuilder builder) { return builder; } + protected virtual ISiloBuilder PreBuild(ISiloBuilder builder) { return builder; } public const string TEST_STORAGE = "TEST_STORAGE_PROVIDER"; @@ -109,7 +90,7 @@ private static bool GetFileInCurrentOrParentDir(string fileName, out string foun public void Dispose() { this.Client.Close().Wait(); - this.Silo.StopAsync().Wait(); + this.Host.StopAsync().Wait(); } } } diff --git a/test/Orleans.CosmosDB.Tests/PersistenceTests.cs b/test/Orleans.CosmosDB.Tests/PersistenceTests.cs index 6ede1d9..855ed4a 100644 --- a/test/Orleans.CosmosDB.Tests/PersistenceTests.cs +++ b/test/Orleans.CosmosDB.Tests/PersistenceTests.cs @@ -40,7 +40,7 @@ public class StorageFixture : OrleansFixture internal string AccountEndpoint; internal string AccountKey; - protected override ISiloHostBuilder PreBuild(ISiloHostBuilder builder) + protected override ISiloBuilder PreBuild(ISiloBuilder builder) { OrleansFixture.GetAccountInfo(out this.AccountEndpoint, out this.AccountKey); @@ -94,7 +94,7 @@ public async Task Custom_Partition_Test() await grain.Write("Test Partition"); await grain.Deactivate(); - var storage = this._fixture.Silo.Services.GetServiceByName(OrleansFixture.TEST_STORAGE) as CosmosDBGrainStorage; + var storage = this._fixture.Host.Services.GetServiceByName(OrleansFixture.TEST_STORAGE) as CosmosDBGrainStorage; IDocumentQuery query = storage._dbClient.CreateDocumentQuery( UriFactory.CreateDocumentCollectionUri(StorageDbName, CosmosDBStorageOptions.ORLEANS_STORAGE_COLLECTION), $"SELECT * FROM c WHERE c.PartitionKey = \"" + guid.ToString() + "\"", diff --git a/test/Orleans.CosmosDB.Tests/ReminderTests.cs b/test/Orleans.CosmosDB.Tests/ReminderTests.cs index 6cf9f05..fdf9a3f 100644 --- a/test/Orleans.CosmosDB.Tests/ReminderTests.cs +++ b/test/Orleans.CosmosDB.Tests/ReminderTests.cs @@ -1,69 +1,69 @@ -using Orleans.CosmosDB.Tests.Grains; -using Orleans.Hosting; -using System.Net; -using System.Threading.Tasks; -using Xunit; -using Xunit.Abstractions; -using static Orleans.CosmosDB.Tests.ReminderTests; - -namespace Orleans.CosmosDB.Tests -{ - public class ReminderTests : IClassFixture +using Orleans.CosmosDB.Tests.Grains; +using Orleans.Hosting; +using System.Net; +using System.Threading.Tasks; +using Xunit; +using Xunit.Abstractions; +using static Orleans.CosmosDB.Tests.ReminderTests; + +namespace Orleans.CosmosDB.Tests +{ + public class ReminderTests : IClassFixture { - public class ReminderFixture : OrleansFixture + public class ReminderFixture : OrleansFixture { private const string DatabaseName = "OrleansRemindersTest"; - protected override ISiloHostBuilder PreBuild(ISiloHostBuilder builder) - { + protected override ISiloBuilder PreBuild(ISiloBuilder builder) + { OrleansFixture.GetAccountInfo(out var accountEndpoint, out var accountKey); - //siloConfig.Globals.ReminderServiceType = GlobalConfiguration.ReminderServiceProviderType.Custom; - //siloConfig.Globals.ReminderTableAssembly = "Orleans.Reminders.CosmosDB"; - //siloConfig.AddAzureCosmosDBPersistence(TEST_STORAGE); - return builder //.UseDevelopmentClustering(opt => opt.PrimarySiloEndpoint = new IPEndPoint(IPAddress.Any, 10000))//.UseConfiguration(siloConfig) - .AddCosmosDBGrainStorage(OrleansFixture.TEST_STORAGE, opt => - { - opt.AccountEndpoint = accountEndpoint; - opt.AccountKey = accountKey; - opt.ConnectionMode = Microsoft.Azure.Documents.Client.ConnectionMode.Gateway; - opt.DropDatabaseOnInit = true; - opt.CanCreateResources = true; + //siloConfig.Globals.ReminderServiceType = GlobalConfiguration.ReminderServiceProviderType.Custom; + //siloConfig.Globals.ReminderTableAssembly = "Orleans.Reminders.CosmosDB"; + //siloConfig.AddAzureCosmosDBPersistence(TEST_STORAGE); + return builder //.UseDevelopmentClustering(opt => opt.PrimarySiloEndpoint = new IPEndPoint(IPAddress.Any, 10000))//.UseConfiguration(siloConfig) + .AddCosmosDBGrainStorage(OrleansFixture.TEST_STORAGE, opt => + { + opt.AccountEndpoint = accountEndpoint; + opt.AccountKey = accountKey; + opt.ConnectionMode = Microsoft.Azure.Documents.Client.ConnectionMode.Gateway; + opt.DropDatabaseOnInit = true; + opt.CanCreateResources = true; + opt.AutoUpdateStoredProcedures = true; + opt.InitStage = ServiceLifecycleStage.RuntimeStorageServices; + opt.DB = DatabaseName; + }) + .UseCosmosDBReminderService(opt => + { + opt.AccountEndpoint = accountEndpoint; + opt.AccountKey = accountKey; + opt.ConnectionMode = Microsoft.Azure.Documents.Client.ConnectionMode.Gateway; + opt.CanCreateResources = true; opt.AutoUpdateStoredProcedures = true; - opt.InitStage = ServiceLifecycleStage.RuntimeStorageServices; - opt.DB = DatabaseName; - }) - .UseCosmosDBReminderService(opt => - { - opt.AccountEndpoint = accountEndpoint; - opt.AccountKey = accountKey; - opt.ConnectionMode = Microsoft.Azure.Documents.Client.ConnectionMode.Gateway; - opt.CanCreateResources = true; - opt.AutoUpdateStoredProcedures = true; - opt.DB = DatabaseName; + opt.DB = DatabaseName; }); - } - } - - private ReminderFixture _fixture; - - public ReminderTests(ReminderFixture fixture) - { - this._fixture = fixture; - } - - [Fact] - public async Task CreateReminderTest() - { - var grain = _fixture.Client.GetGrain(0); + } + } + + private ReminderFixture _fixture; + + public ReminderTests(ReminderFixture fixture) + { + this._fixture = fixture; + } + + [Fact] + public async Task CreateReminderTest() + { + var grain = _fixture.Client.GetGrain(0); var test = "grain"; var reminder = await grain.RegisterReminder(test); Assert.NotNull(reminder); - Assert.True(await grain.ReminderExist(test)); - await Task.Delay((int)TestGrain.ReminderWaitTime.TotalMilliseconds); - Assert.True(await grain.ReminderTicked()); - await grain.DismissReminder(test); - Assert.False(await grain.ReminderExist(test)); + Assert.True(await grain.ReminderExist(test)); + await Task.Delay((int)TestGrain.ReminderWaitTime.TotalMilliseconds); + Assert.True(await grain.ReminderTicked()); + await grain.DismissReminder(test); + Assert.False(await grain.ReminderExist(test)); } - } -} + } +} diff --git a/test/Orleans.CosmosDB.Tests/ThroughputConfigurationTests.cs b/test/Orleans.CosmosDB.Tests/ThroughputConfigurationTests.cs index 6a66b5f..e0f1999 100644 --- a/test/Orleans.CosmosDB.Tests/ThroughputConfigurationTests.cs +++ b/test/Orleans.CosmosDB.Tests/ThroughputConfigurationTests.cs @@ -1,106 +1,106 @@ -using System; -using Orleans.CosmosDB.Tests.Grains; -using Orleans.Hosting; -using Orleans.Persistence.CosmosDB; -using Orleans.Runtime; -using Orleans.Storage; -using System.Collections.Generic; -using System.Linq; -using System.Threading.Tasks; -using Xunit; -using static Orleans.CosmosDB.Tests.ThroughputConfigurationTests; - -// For Index coverage CreateDocumentQuery -using Microsoft.Azure.Documents.Client; -using Microsoft.Azure.Documents; - -namespace Orleans.CosmosDB.Tests -{ - public class ThroughputConfigurationTests : IClassFixture - { - //private const string StorageDbName = "OrleansStorageTest"; - private const string DatabaseName = "DatabaseRUTest"; - private StorageFixture _fixture; - - public class StorageFixture : OrleansFixture - { - internal string AccountEndpoint; - internal string AccountKey; - - protected override ISiloHostBuilder PreBuild(ISiloHostBuilder builder) - { - OrleansFixture.GetAccountInfo(out this.AccountEndpoint, out this.AccountKey); - - return builder - .AddCosmosDBGrainStorage(OrleansFixture.TEST_STORAGE, opt => - { - opt.AccountEndpoint = this.AccountEndpoint; - opt.AccountKey = this.AccountKey; - opt.ConnectionMode = ConnectionMode.Gateway; - opt.DropDatabaseOnInit = true; - opt.AutoUpdateStoredProcedures = true; - opt.CanCreateResources = true; - opt.DB = DatabaseName; - opt.DatabaseThroughput = 1000; - opt.CollectionThroughput = 0; - opt.Collection = "RUTest"; - }) - .AddCosmosDBGrainStorage("Second", opt => - { - opt.AccountEndpoint = this.AccountEndpoint; - opt.AccountKey = this.AccountKey; - opt.ConnectionMode = ConnectionMode.Gateway; - opt.DropDatabaseOnInit = true; - opt.AutoUpdateStoredProcedures = true; - opt.CanCreateResources = true; - opt.DB = DatabaseName; - opt.DatabaseThroughput = 1000; - opt.CollectionThroughput = 500; - opt.Collection = "RUTest2"; - }); - - } - } - - public ThroughputConfigurationTests(StorageFixture fixture) => this._fixture = fixture; - - [Fact] - public async Task VerifyDbThroughput() - { - var storage = this._fixture.Silo.Services.GetServiceByName(OrleansFixture.TEST_STORAGE) as CosmosDBGrainStorage; - var dbClient = storage._dbClient; - var offers = dbClient.CreateOfferQuery().ToList(); - - //Database has offer - var database = (Database)(await dbClient.ReadDatabaseAsync(UriFactory.CreateDatabaseUri(DatabaseName))); - var offerDatabase = (OfferV2)offers.Single(o => o.ResourceLink == database.SelfLink); - Assert.Equal(1000, offerDatabase.Content.OfferThroughput); - } - - [Fact] - public async Task VerifyCollectionWithoutOffer() - { - var storage = this._fixture.Silo.Services.GetServiceByName(OrleansFixture.TEST_STORAGE) as CosmosDBGrainStorage; - var dbClient = storage._dbClient; - var offers = dbClient.CreateOfferQuery().ToList(); - - //Collection RUTest does not - var collection1 = (DocumentCollection)(await dbClient.ReadDocumentCollectionAsync(UriFactory.CreateDocumentCollectionUri(DatabaseName, "RUTest"))); - var offerCollection1 = offers.FirstOrDefault(o => o.ResourceLink == collection1.SelfLink); - Assert.Null(offerCollection1); - } - - [Fact] - public async Task VerifiyCollectionWithOfferInDbWithOffer() - { - var storage = this._fixture.Silo.Services.GetServiceByName(OrleansFixture.TEST_STORAGE) as CosmosDBGrainStorage; - var dbClient = storage._dbClient; - var offers = dbClient.CreateOfferQuery().ToList(); - - //Collection RUTest2 has offer - var collection2 = (DocumentCollection)(await dbClient.ReadDocumentCollectionAsync(UriFactory.CreateDocumentCollectionUri(DatabaseName, "RUTest2"))); - var offerCollection2 = (OfferV2)offers.Single(o => o.ResourceLink == collection2.SelfLink); - Assert.Equal(500, offerCollection2.Content.OfferThroughput); - } - } -} +using System; +using Orleans.CosmosDB.Tests.Grains; +using Orleans.Hosting; +using Orleans.Persistence.CosmosDB; +using Orleans.Runtime; +using Orleans.Storage; +using System.Collections.Generic; +using System.Linq; +using System.Threading.Tasks; +using Xunit; +using static Orleans.CosmosDB.Tests.ThroughputConfigurationTests; + +// For Index coverage CreateDocumentQuery +using Microsoft.Azure.Documents.Client; +using Microsoft.Azure.Documents; + +namespace Orleans.CosmosDB.Tests +{ + public class ThroughputConfigurationTests : IClassFixture + { + //private const string StorageDbName = "OrleansStorageTest"; + private const string DatabaseName = "DatabaseRUTest"; + private StorageFixture _fixture; + + public class StorageFixture : OrleansFixture + { + internal string AccountEndpoint; + internal string AccountKey; + + protected override ISiloBuilder PreBuild(ISiloBuilder builder) + { + OrleansFixture.GetAccountInfo(out this.AccountEndpoint, out this.AccountKey); + + return builder + .AddCosmosDBGrainStorage(OrleansFixture.TEST_STORAGE, opt => + { + opt.AccountEndpoint = this.AccountEndpoint; + opt.AccountKey = this.AccountKey; + opt.ConnectionMode = ConnectionMode.Gateway; + opt.DropDatabaseOnInit = true; + opt.AutoUpdateStoredProcedures = true; + opt.CanCreateResources = true; + opt.DB = DatabaseName; + opt.DatabaseThroughput = 1000; + opt.CollectionThroughput = 0; + opt.Collection = "RUTest"; + }) + .AddCosmosDBGrainStorage("Second", opt => + { + opt.AccountEndpoint = this.AccountEndpoint; + opt.AccountKey = this.AccountKey; + opt.ConnectionMode = ConnectionMode.Gateway; + opt.DropDatabaseOnInit = true; + opt.AutoUpdateStoredProcedures = true; + opt.CanCreateResources = true; + opt.DB = DatabaseName; + opt.DatabaseThroughput = 1000; + opt.CollectionThroughput = 500; + opt.Collection = "RUTest2"; + }); + + } + } + + public ThroughputConfigurationTests(StorageFixture fixture) => this._fixture = fixture; + + [Fact] + public async Task VerifyDbThroughput() + { + var storage = this._fixture.Host.Services.GetServiceByName(OrleansFixture.TEST_STORAGE) as CosmosDBGrainStorage; + var dbClient = storage._dbClient; + var offers = dbClient.CreateOfferQuery().ToList(); + + //Database has offer + var database = (Database)(await dbClient.ReadDatabaseAsync(UriFactory.CreateDatabaseUri(DatabaseName))); + var offerDatabase = (OfferV2)offers.Single(o => o.ResourceLink == database.SelfLink); + Assert.Equal(1000, offerDatabase.Content.OfferThroughput); + } + + [Fact] + public async Task VerifyCollectionWithoutOffer() + { + var storage = this._fixture.Host.Services.GetServiceByName(OrleansFixture.TEST_STORAGE) as CosmosDBGrainStorage; + var dbClient = storage._dbClient; + var offers = dbClient.CreateOfferQuery().ToList(); + + //Collection RUTest does not + var collection1 = (DocumentCollection)(await dbClient.ReadDocumentCollectionAsync(UriFactory.CreateDocumentCollectionUri(DatabaseName, "RUTest"))); + var offerCollection1 = offers.FirstOrDefault(o => o.ResourceLink == collection1.SelfLink); + Assert.Null(offerCollection1); + } + + [Fact] + public async Task VerifiyCollectionWithOfferInDbWithOffer() + { + var storage = this._fixture.Host.Services.GetServiceByName(OrleansFixture.TEST_STORAGE) as CosmosDBGrainStorage; + var dbClient = storage._dbClient; + var offers = dbClient.CreateOfferQuery().ToList(); + + //Collection RUTest2 has offer + var collection2 = (DocumentCollection)(await dbClient.ReadDocumentCollectionAsync(UriFactory.CreateDocumentCollectionUri(DatabaseName, "RUTest2"))); + var offerCollection2 = (OfferV2)offers.Single(o => o.ResourceLink == collection2.SelfLink); + Assert.Equal(500, offerCollection2.Content.OfferThroughput); + } + } +}