Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Initial commit of actor presence test #77

Draft
wants to merge 2 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 62 additions & 0 deletions actor-presence/actor-presence.sln
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@

Microsoft Visual Studio Solution File, Format Version 12.00
# Visual Studio 15
VisualStudioVersion = 15.0.26124.0
MinimumVisualStudioVersion = 15.0.26124.0
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "load_generator", "load_generator\load_generator.csproj", "{E8B44B23-4BC7-4B56-8C88-4A820F455875}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "presence_actor_service", "presence_actor_service\presence_actor_service.csproj", "{E93959C5-CF16-4974-AE77-CB97478642D4}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "presence_interface", "presence_interface\presence_interface.csproj", "{76536772-3EBB-44BD-969E-58E8361A2F87}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Debug|x64 = Debug|x64
Debug|x86 = Debug|x86
Release|Any CPU = Release|Any CPU
Release|x64 = Release|x64
Release|x86 = Release|x86
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
EndGlobalSection
GlobalSection(ProjectConfigurationPlatforms) = postSolution
{E8B44B23-4BC7-4B56-8C88-4A820F455875}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{E8B44B23-4BC7-4B56-8C88-4A820F455875}.Debug|Any CPU.Build.0 = Debug|Any CPU
{E8B44B23-4BC7-4B56-8C88-4A820F455875}.Debug|x64.ActiveCfg = Debug|Any CPU
{E8B44B23-4BC7-4B56-8C88-4A820F455875}.Debug|x64.Build.0 = Debug|Any CPU
{E8B44B23-4BC7-4B56-8C88-4A820F455875}.Debug|x86.ActiveCfg = Debug|Any CPU
{E8B44B23-4BC7-4B56-8C88-4A820F455875}.Debug|x86.Build.0 = Debug|Any CPU
{E8B44B23-4BC7-4B56-8C88-4A820F455875}.Release|Any CPU.ActiveCfg = Release|Any CPU
{E8B44B23-4BC7-4B56-8C88-4A820F455875}.Release|Any CPU.Build.0 = Release|Any CPU
{E8B44B23-4BC7-4B56-8C88-4A820F455875}.Release|x64.ActiveCfg = Release|Any CPU
{E8B44B23-4BC7-4B56-8C88-4A820F455875}.Release|x64.Build.0 = Release|Any CPU
{E8B44B23-4BC7-4B56-8C88-4A820F455875}.Release|x86.ActiveCfg = Release|Any CPU
{E8B44B23-4BC7-4B56-8C88-4A820F455875}.Release|x86.Build.0 = Release|Any CPU
{E93959C5-CF16-4974-AE77-CB97478642D4}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{E93959C5-CF16-4974-AE77-CB97478642D4}.Debug|Any CPU.Build.0 = Debug|Any CPU
{E93959C5-CF16-4974-AE77-CB97478642D4}.Debug|x64.ActiveCfg = Debug|Any CPU
{E93959C5-CF16-4974-AE77-CB97478642D4}.Debug|x64.Build.0 = Debug|Any CPU
{E93959C5-CF16-4974-AE77-CB97478642D4}.Debug|x86.ActiveCfg = Debug|Any CPU
{E93959C5-CF16-4974-AE77-CB97478642D4}.Debug|x86.Build.0 = Debug|Any CPU
{E93959C5-CF16-4974-AE77-CB97478642D4}.Release|Any CPU.ActiveCfg = Release|Any CPU
{E93959C5-CF16-4974-AE77-CB97478642D4}.Release|Any CPU.Build.0 = Release|Any CPU
{E93959C5-CF16-4974-AE77-CB97478642D4}.Release|x64.ActiveCfg = Release|Any CPU
{E93959C5-CF16-4974-AE77-CB97478642D4}.Release|x64.Build.0 = Release|Any CPU
{E93959C5-CF16-4974-AE77-CB97478642D4}.Release|x86.ActiveCfg = Release|Any CPU
{E93959C5-CF16-4974-AE77-CB97478642D4}.Release|x86.Build.0 = Release|Any CPU
{76536772-3EBB-44BD-969E-58E8361A2F87}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{76536772-3EBB-44BD-969E-58E8361A2F87}.Debug|Any CPU.Build.0 = Debug|Any CPU
{76536772-3EBB-44BD-969E-58E8361A2F87}.Debug|x64.ActiveCfg = Debug|Any CPU
{76536772-3EBB-44BD-969E-58E8361A2F87}.Debug|x64.Build.0 = Debug|Any CPU
{76536772-3EBB-44BD-969E-58E8361A2F87}.Debug|x86.ActiveCfg = Debug|Any CPU
{76536772-3EBB-44BD-969E-58E8361A2F87}.Debug|x86.Build.0 = Debug|Any CPU
{76536772-3EBB-44BD-969E-58E8361A2F87}.Release|Any CPU.ActiveCfg = Release|Any CPU
{76536772-3EBB-44BD-969E-58E8361A2F87}.Release|Any CPU.Build.0 = Release|Any CPU
{76536772-3EBB-44BD-969E-58E8361A2F87}.Release|x64.ActiveCfg = Release|Any CPU
{76536772-3EBB-44BD-969E-58E8361A2F87}.Release|x64.Build.0 = Release|Any CPU
{76536772-3EBB-44BD-969E-58E8361A2F87}.Release|x86.ActiveCfg = Release|Any CPU
{76536772-3EBB-44BD-969E-58E8361A2F87}.Release|x86.Build.0 = Release|Any CPU
EndGlobalSection
EndGlobal
103 changes: 103 additions & 0 deletions actor-presence/load_generator/Program.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
// ------------------------------------------------------------
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
// ------------------------------------------------------------

namespace Dapr.Tests.Actors.PresenceTest
{
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text.Json;
using System.Threading;
using System.Threading.Tasks;

using Dapr.Actors;
using Dapr.Actors.Client;

class Program
{
private static void Main()
{
const int nGames = 10; // number of games to simulate
const int nPlayersPerGame = 4; // number of players in each game

var sendInterval = TimeSpan.FromSeconds(5); // interval for sending updates

var heartbeats = new HeartbeatData[nGames];
for (var i = 0; i < nGames; i++)
{
heartbeats[i] = new HeartbeatData();
heartbeats[i].Game = Guid.NewGuid();
for (var j = 0; j < nPlayersPerGame; j++)
{
var playerId = Guid.NewGuid();
heartbeats[i].Status.Players.Add(playerId);
}
}

var outstandingUpdates = new List<Task>();
var outstandingScoreReads = new List<Task<byte[]>>();
var iteration = 0;

while (true)
{
iteration++;
Console.WriteLine("Sending heartbeat series # {0}", iteration);

ulong high = ((ulong) iteration) << 32;
ulong low = (ulong) (iteration > 5 ? iteration - 5 : 0);
var value = high | low;
var score = BitConverter.GetBytes(value);
var presence = ActorProxy.Create<IPresenceActor>(ActorId.CreateRandom(), "PresenceActor"); // get any stateless actor
outstandingUpdates.Clear();
try
{
for (var i = 0; i < nGames; i++)
{
heartbeats[i].Status.Score = score;

var heartbeatData = JsonSerializer.SerializeToUtf8Bytes<HeartbeatData>(heartbeats[i]);
var t = presence.Heartbeat(heartbeatData);
outstandingUpdates.Add(t);
}

// Wait for all calls to finish.
// It is okay to block the thread here because it's a client program with no parallelism.
// One should never block a thread in grain code.
Console.WriteLine("Wating for the tasks to finish");
Task.WaitAll(outstandingUpdates.ToArray());
}
catch (Exception e)
{
Console.WriteLine("Error: {0}", e);
}

Console.WriteLine();
Console.WriteLine("Getting game scores: ");
outstandingScoreReads.Clear();
try
{
for (var i = 0; i < nGames; i++)
{
var t = ActorProxy.Create<IGameActor>(new ActorId(heartbeats[i].Game.ToString()), "PresenceActor").GetGameScore();
outstandingScoreReads.Add(t);
}

Task.WhenAll(outstandingScoreReads.ToArray()).Wait();

for (var i = 0; i < nGames; i++)
{
Console.WriteLine("Game: {0}, Score: {1}", heartbeats[i].Game, String.Join(",", outstandingScoreReads[i].Result.Select(b => b.ToString())));
}
}
catch (Exception e)
{
Console.WriteLine("Error: {0}", e);
}

Thread.Sleep(sendInterval);
}
}
}
}
16 changes: 16 additions & 0 deletions actor-presence/load_generator/load_generator.csproj
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>netcoreapp3.1</TargetFramework>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Dapr.Actors" Version="0.10.0-preview01" />
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\presence_interface\presence_interface.csproj" />
</ItemGroup>

</Project>
170 changes: 170 additions & 0 deletions actor-presence/presence_actor_service/GameActor.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
// ------------------------------------------------------------
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
// ------------------------------------------------------------

namespace Dapr.Tests.Actors.PresenceTest
{
using System;
using System.Collections.Generic;
using System.Globalization;
using System.Net;
using System.Threading.Tasks;

using Dapr.Actors;
using Dapr.Actors.Client;
using Dapr.Actors.Runtime;

class GameActor : Actor, IGameActor
{
private const string StateName = "state";

private byte[] ipAddressBytes;

public GameActor(ActorService actorService, ActorId actorId)
: base(actorService, actorId)
{
}

protected override async Task OnActivateAsync()
{
try
{
await this.StateManager.TryAddStateAsync(StateName, new GameState
{
Players = new HashSet<Guid>(),
Status = new GameStatus {Score = new byte[1]}
});
await base.OnActivateAsync();
}
catch (Exception e)
{
System.Console.WriteLine($"Exception in OnActivateAsync: {e}");
throw;
}
}

public async Task<string> Initialize(int scoreSizeInBytes)
{
try
{
var state = await this.StateManager.GetStateAsync<GameState>(StateName);
state.Status.Score = new byte[scoreSizeInBytes];
Random r = new Random();
r.NextBytes(state.Status.Score);
await this.StateManager.SetStateAsync(StateName, state);

return "";
}
catch (Exception e)
{
System.Console.WriteLine($"Exception in InitializeScoreBuffer: {e}");
throw;
}
}

public async Task UpdateGameStatus(GameStatus newStatus)
{
try
{
IPAddress senderIPAddress;
int senderProcessId;
long requestId;
var scoreBuffer = newStatus.Score;

if (scoreBuffer[0] == (byte)ScoreBufferContentType.NoRequestId)
{
return;
}

var ipAddressLength = BitConverter.ToInt32(scoreBuffer, 1);
GetIPAddressBytes(scoreBuffer, 1+sizeof(int), ipAddressLength);
senderIPAddress = new IPAddress(this.ipAddressBytes);
senderProcessId = BitConverter.ToInt32(scoreBuffer, 1 + sizeof(int) + ipAddressLength);
requestId = BitConverter.ToInt64(scoreBuffer, 1 + (2*sizeof(int)) + ipAddressLength);

try
{
var state = await this.StateManager.GetStateAsync<GameState>(StateName);
state.Status = newStatus;

// Check for new players that joined since last update
foreach (var player in newStatus.Players)
{
if (!state.Players.Contains(player))
{
try
{
// Here we call player grains serially, which is less efficient than a fan-out but simpler to express.
await ActorProxy.Create<IPlayerActor>(new ActorId(player.ToString()), "PlayerActor").JoinGame(this);
state.Players.Add(player);
}
catch
{
// Ignore exceptions while telling player grains to join the game.
// Since we didn't add the player to the list, this will be tried again with next update.
}
}
}

// Check for players that left the game since last update
var promises = new List<Task>();
foreach (var player in state.Players)
{
if (!newStatus.Players.Contains(player))
{
try
{
// Here we do a fan-out with multiple calls going out in parallel. We join the promisses later.
// More code to write but we get lower latency when calling multiple player grains.
promises.Add(ActorProxy.Create<IPlayerActor>(new ActorId(player.ToString()), "PlayerActor").LeaveGame(this));
state.Players.Remove(player);
}
catch
{
// Ignore exceptions while telling player grains to leave the game.
// Since we didn't remove the player from the list, this will be tried again with next update.
}
}
}

// Joining promises
await Task.WhenAll(promises);

await this.StateManager.SetStateAsync(StateName, state);
}
finally
{
System.Console.WriteLine($"Processed request. {requestId},{senderProcessId},{senderIPAddress}");
}
}
catch (Exception e)
{
System.Console.WriteLine($"Exception in UpdateGameStatus: {e}");
throw;
}
}

public async Task<byte[]> GetGameScore()
{
try
{
return (await this.StateManager.GetStateAsync<GameState>(StateName)).Status.Score;
}
catch (Exception e)
{
System.Console.WriteLine($"Exception in GetGameScore: {e}");
throw;
}
}

private void GetIPAddressBytes(byte[] scoreBuffer, int startIndex, int length)
{
if ((null == this.ipAddressBytes) || (this.ipAddressBytes.Length != length))
{
this.ipAddressBytes = new byte[length];
}
Array.Copy(scoreBuffer, startIndex, this.ipAddressBytes, 0, length);
}
}
}
18 changes: 18 additions & 0 deletions actor-presence/presence_actor_service/GameState.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
// ------------------------------------------------------------
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
// ------------------------------------------------------------

namespace Dapr.Tests.Actors.PresenceTest
{
using System;
using System.Collections.Generic;
using System.Runtime.Serialization;

class GameState
{
public GameStatus Status;

public HashSet<Guid> Players;
}
}
Loading