-
Notifications
You must be signed in to change notification settings - Fork 140
[Z Design Doc] QBit supports Metrics, KPI gathering for runtime stats and reactive stats for Microservices
For some background on why this is important for microservices see Reactive Microservices Monitoring.
#QBit supports Metrics, KPI gathering
QBit support collecting metrics for microservices. The QBit runtime statistics
system can be queried, it can be clustered, and it can replicate to any
statistic system. The core interfaces for the QBit runtime stats system
is io.advantageous.qbit.service.stats
. The main interface for collecting
stats is StatsCollector.
package io.advantageous.qbit.service.stats;
import io.advantageous.qbit.client.ClientProxy;
/**
* Collects stats
* This collects key performance indicators: timings, counts and levels/gauges.
* Created by rick on 6/6/15.
*/
public interface StatsCollector extends ClientProxy {
/** Increment a counter by 1.
* This is a short cut for recordCount(name, 1);
* @param name name name of metric, KPI, metric.
*/
default void increment(String name) {
}
/**
* Record a a count.
* Used to record things like how many users used the site.
*
* @param name name of the metric, KPI, stat
* @param count count to record.
*/
default void recordCount(String name, long count) {
}
/**
* This is used to record things like the count of current threads or
* free system memory or free disk, etc.
* Record Level. Some systems call this a gauge.
* @param name name of the gauge or level
* @param level level
*/
default void recordLevel(String name, long level) {
}
/**
* This is used to record timings.
* This would be things like how long did it take this service to call
* this remote service.
* @param name name of the timing
* @param duration duration
*/
default void recordTiming(String name, long duration) {
}
}
You will probably never use a StatsCollector
but a StatsCollectorBuffer
instead as it buffers metric calls to reduce IO and reporting to the stats
engine. Another important concept in this package is the ServiceStatsListener
.
The ServiceStatsListener
gets registered on your behalf if you use the
ManagedServiceBuilder
.
The ServiceStatsListener
is used to intercept queue calls for the ServiceQueue
.
All services and end-points end up using the ServiceQueue
.
This class is able to track stats for services.
startBatchCountKey = serviceName + ".startBatchCount";
receiveCountKey = serviceName + ".receiveCount";
receiveTimeKey = serviceName + ".callTimeSample";
this.queueRequestSizeKey = serviceName + ".queueRequestSize";
this.queueResponseSizeKey = serviceName + ".queueResponseSize";
The ${serviceName}.startBatchCount
tracks how many times a batch has been sent.
This can tell you how well your batching is setup.
The ${serviceName}.receiveCount
is how many times the service has been called.
The ${serviceName}.callTimeSample
is how long do methods take for this service
(if enabled, call times are sampled).
The ${serviceName}.queueRequestSize
keeps track of how large the request
queue is. This is an indication of calls not getting handled if greater than 0.
If this continues to rise then the service could be down.
(Note there is a health check to see a queue is blocked, and the service
will be marked unhealthy.)
The ${serviceName}.queueResponseSize
keeps track of how large the response
queue is getting. This is an indication that responses are not getting drained.
All of the classes that we covered so far are in QBit core. This means that stats, KPI gathering is just part of the QBit system. It is an integral part of microservices so it is an integral part of QBit.
##StatService and StatsD
The StatService
is in QBit admin package. The StatService
interface allows
you to both record stats, KPI, and metrics for microservices and to query the
services. The StatService
can replicate KPIs (key performance indicators) to
replicators. It does this efficiently.
Let's look at the StatService
interface and its comments.
package io.advantageous.qbit.metrics;
import io.advantageous.qbit.reactive.Callback;
import io.advantageous.qbit.service.stats.Stats;
import io.advantageous.qbit.service.stats.StatsCollector;
/**
* The StatService collects stats, and allows stats to be queried.
* This collects key performance indicators: timings, counts and levels/gauges.
* It also allow internal or external clients to query this system.
*
* Created by rick on 6/6/15.
*/
public interface StatService extends StatsCollector {
/**
* Get the last n Seconds of stats (up to two minutes of stats typically
* kept in memory).
*
* The `Stat` object has the mean, median, etc.
*
* ```java
*
* private final float mean;
* private final float stdDev;
* private final float variance;
* private final long sum;
* private final long max;
* private final long min;
* private final long median;
* ```
* @param callback callback to get Stat
* @param name name metric, KPI, etc.
* @param secondCount secondCount
*/
default void statsForLastSeconds(Callback<Stats> callback, String name,
int secondCount) {
}
/**
* Gets the average last n Seconds of of a level.
*
* @param callback callback
* @param name name of metric, KPI, etc.
* @param secondCount secondCount
*/
default void averageLastLevel(Callback<Long> callback, String name,
int secondCount) {
}
/**
* Gets count of the current minute
*
* @param callback callback
* @param name name of metric
*/
default void currentMinuteCount(Callback<Long> callback, String name) {
}
/**
* Gets count of the current second.
*
* @param callback callback
* @param name name of metric
*/
default void currentSecondCount(Callback<Long> callback, String name) {
}
/**
* Gets count of the last recorded full second.
*
* @param callback callback
* @param name name of metric
*/
default void lastSecondCount(Callback<Long> callback, String name) {
}
/**
* Gets count of the last recorded ten full seconds.
*
* @param callback callback
* @param name name of metric
*/
default void lastTenSecondCount(Callback<Long> callback, String name) {
}
/**
* Gets count of the last recorded five full seconds.
*
* @param callback callback
* @param name name of metric
*/
default void lastFiveSecondCount(Callback<Long> callback, String name) {
}
/**
* Gets count of the last recorded N full seconds.
*
* @param callback callback
* @param name name of metric
*/
default void lastNSecondsCount(Callback<Long> callback, String name,
int secondCount) {
}
/**
* Gets count of the last recorded N full seconds.
* This is more exact if the count overlaps two minutes.
*
* @param callback callback
* @param name name of metric
*/
default void lastNSecondsCountExact(Callback<Long> callback, String name,
int secondCount) {
}
/**
* Gets count of the last recorded N full seconds.
* This is more exact if the count overlaps two minutes.
*
* @param callback callback
* @param name name of metric
*/
default void lastTenSecondCountExact(Callback<Long> callback, String name) {
}
/**
* Gets count of the last recorded N full seconds.
* This is more exact if the count overlaps two minutes.
*
* @param callback callback
* @param name name of metric
*/
default void lastFiveSecondCountExact(Callback<Long> callback, String name) {
}
/**
* Bulk record.
* @param name name of metric
* @param count count
* @param timestamp timestamp
*/
default void recordWithTime(String name, int count, long timestamp) {
}
/**
* Bulk record.
* @param names names of metric
* @param counts counts of metrics
* @param timestamp timestamp
*/
default void recordAll(long timestamp, String[] names, long[] counts) {
}
/**
* Bulk record.
* @param names names of metric
* @param counts counts of metrics
* @param times times
*/
default void recordAllWithTimes(String[] names,
long[] counts, long[] times){
}
}
You can query the metrics system and provide reactive support. For example, you could query the current REQUESTS PER SECOND to a service and dynamically change the size of buffering to increase throughput.
QBit does not only monitor metrics, but it makes the metrics queryable so your microservices can be reactive.
The StatService
system that comes with QBit can replicate changes to other
systems via the StatReplicator
.
/**
* Stat Replicator.
* This is used to replicate stats to another system.
* created by rhightower on 1/28/15.
*/
public interface StatReplicator extends RemoteTCPClientProxy, ServiceFlushable, Stoppable {
void replicateCount(String name, long count, long time);
void replicateLevel(String name, long level, long time);
void replicateTiming(String name, long timing, long time);
}
The QBit Admin package has two built-in collectors. The StatsDReplicator
(notice the statsD) implements StatReplicator
and replicates via UDP to a
StatsD server (e.g., Graphite,
Statsite, and more). The
StatsD is a wire protocol over UDP
to send stats. The StatsDReplicator
implements this wire protocol to talk
UDP to a given host and port over UDP.
The QBit Admin package has two built-in collectors. The StatsDReplicator
(notice the statsD) implements StatReplicator
and replicates via UDP to a
StatsD server (e.g., Graphite,
Statsite, and more). The
StatsD is a wire protocol over UDP to send
stats. The StatsDReplicator
implements this wire protocol to talk UDP to a
given host and port over UDP. The other built-collector the LocalStatsCollector
which just sends stats over a REST endpoint (/__stats/instance
) that will
deliver up a JSON version of the stats (and it resets the stats after the REST
request) or it keeps collecting them until some other system queries the
/__stats/instance
REST endpoint. Both the StatsDReplicator
and the
LocalStatsCollector
have builders, but you typically build them for free by
using the ManagedServiceBuilder
. We use LocalStatsCollector
for Heroku-like
environments.
You can configure StatsD via the ManagedServiceBuilder
.
if (config.isStatsD()) {
managedServiceBuilder.setEnableStatsD(true);
managedServiceBuilder.getStatsDReplicatorBuilder()
.setHost(config.getStatsDHost());
if (config.getStatsDPort() != -1) {
managedServiceBuilder.getStatsDReplicatorBuilder()
.setPort(config.getStatsDPort());
}
}
If you are using the JSON config file, you setup StatsD as follows:
{
"statsD" : true,
"statsDHost" : "lab99.myhost.com",
}
You can send your own stats and not just the ones that are sent via the default stats gathering.
Assuming you have a service called TodoService
/** Create a stats collector. */
final StatsCollector statsCollector = managedServiceBuilder
.getStatServiceBuilder().buildStatsCollector();
final TodoService tododService = new TodoService(statsCollector,
ReactorBuilder.reactorBuilder().build(),
taskRepo,
Timer.timer());
/** Add the todo service to the managedServiceBuilder. */
managedServiceBuilder.addEndpointService(tododService);
public TodoService(final StatsCollector statsCollector,
final Reactor reactor,
final TaskRepo taskRepo,
final Timer timer) {
this.statsCollector = statsCollector;
this.timer = timer;
this.taskRepo = taskRepo;
this.reactor = reactor;
this.reactor.addServiceToFlush(statsCollector);
Calling reactor.addServiceToFlush
and passing the statsCollector
will ensure that when service queue that is managing the TodoService is idle or full that all of the stats will be flushed if there are any to save. The statsCollector
is the one does buffering as mentioned earlier.
The reactor
does not auto flush unless it is told to do. For now, you always use the reactor with the falling queue callback (no magic).
/** Process Reactor stuff. */
@QueueCallback({QueueCallbackType.LIMIT, QueueCallbackType.EMPTY})
public void process(){
reactor.process();
time = timer.time();
}
The reactor.process
will flush all calls to statsCollector
which will then send the stats to actual StatService
where they will be replicated to all outstanding replicators.
/**
* Load TODOs from TodoRepo.
*/
@RequestMapping(value = "/todo", summary = "Load TODOs",
...)
public void loadTodo(final Callback<Boolean> callback) {
final Set<TodoCategory> categories = new HashSet<>(this.categories);
/* If there are no categories or if service is paused, then return right away. */
if (categories.size() > 0 && !stop) {
loadFromTodoRepoCache++;
statsCollector.recordCount("Todo.repo.call.count", 1);
} else {
logger.warn("Service can't load categories count {} or stopped {}",
components.size(), stop);
return;
}
...
Notice the use of statsCollector.recordCount("Todo.repo.call.count", 1)
since this is just incrementing one time we can call
statsCollector.increment("Todo.repo.call.count", 1)
.
/**
* Load TODOs from TodoRepo.
*/
@RequestMapping(value = "/todo", summary = "Load TODOs",
...)
public void loadTodo(final Callback<Boolean> callback) {
final Set<TodoCategory> categories = new HashSet<>(this.categories);
/* If there are no categories or if service is paused, then return right away. */
if (categories.size() > 0 && !stop) {
loadFromTodoRepoCache++;
statsCollector.increment("Todo.repo.call.count");
} else {
logger.warn("Service can't load categories count {} or stopped {}",
components.size(), stop);
return;
}
...
Now lets show a timing.
/**
* Load TODOs from TodoRepo.
*/
@RequestMapping(value = "/todo", summary = "Load TODOs",
...)
public void loadTodo(final Callback<Boolean> callback) {
final Set<TodoCategory> categories = new HashSet<>(this.categories);
/* If there are no categories or if service is paused, then return right away.*/
...
final long startTime = timer.time();
/* For each TodoCategory call TodoRepo to load the todo items. */
categories.forEach(category -> {
final Callback<List<Todo>> todoCacheCallback =
createLoadFromCacheCallback(count, errorCount, category);
taskRepo.loadTodosFromCache(todoCacheCallback, category);
});
/* Coordinate all of the callbacks are done. */
reactor.coordinatorBuilder()
/* If the success count is equal to the
component size, we are done. */
.setCoordinator(() -> {
if (logger.isDebugEnabled()) {
logger.debug("COUNT " + count.get());
}
return count.get() == components.size();
}
)
/* Set the timeout to be seconds times two since
we are calling two services. */
.setTimeoutDuration(config.getTimeoutMakingRemoteCallInSeconds() * 2)
.setTimeoutTimeUnit(TimeUnit.SECONDS)
/* If there were no errors, then return success. */
.setFinishedHandler(() -> {
statsCollector
.recordTiming("Todo.loadCache.time",
timer.time() - startTime);
})
/* Set the timeout handler to return no
success and log that there was a timeout. */
.setTimeOutHandler(() -> {
logger.error("Timeout while loading todo items" );
callback.returnThis(false);
}).build();
...
This records a start time startTime = timer.time()
then it makes a bunch of
async calls. And when all of the async calls return, we then send a timing
to record how long the process took using statsCollector.recordTiming
.
To really understand the complex call coordination with the QBit reactor
, you
first need to understand how QBit coordinates calls, etc. You can learn more about this
at QBit Reactive Microservices Tutorial for handling async calls with the reactor.
Here is a simpler timing example timing a call to Cassandra.
public void executeAsyncCassandraCall(final Callback<ResultSet> callback,
final Statement stmt) {
final ResultSetFuture future = this.session.executeAsync(stmt);
final long startTime = timer.time();
Futures.addCallback(future, new FutureCallback<ResultSet>() {
@Override
public void onSuccess(ResultSet result) {
statsCollector
.recordTiming("Cassandra.load.time",
timer.time() - startTime);
callback.accept(result);
}
@Override
public void onFailure(Throwable t) {
statsCollector
.recordTiming("Cassandra.load.error.time",
timer.time() - startTime);
callback.onError(t);
}
});
}
Notice that we use final long startTime = timer.time()
and we record two
timings either how long the successful call took or how long the error took.
To store a level just use style.
statsCollector.recordLevel("Todo.categories.size",
categories.size());
Remember is a level is a gauge like how large is my cache, how many outstanding items are in my queue, etc.
QBit Website What is Microservices Architecture?
QBit Java Micorservices lib tutorials
The Java microservice lib. QBit is a reactive programming lib for building microservices - JSON, HTTP, WebSocket, and REST. QBit uses reactive programming to build elastic REST, and WebSockets based cloud friendly, web services. SOA evolved for mobile and cloud. ServiceDiscovery, Health, reactive StatService, events, Java idiomatic reactive programming for Microservices.
Reactive Programming, Java Microservices, Rick Hightower
Java Microservices Architecture
[Microservice Service Discovery with Consul] (http://www.mammatustech.com/Microservice-Service-Discovery-with-Consul)
Microservices Service Discovery Tutorial with Consul
[Reactive Microservices] (http://www.mammatustech.com/reactive-microservices)
[High Speed Microservices] (http://www.mammatustech.com/high-speed-microservices)
Reactive Microservices Tutorial, using the Reactor
QBit is mentioned in the Restlet blog
All code is written using JetBrains Idea - the best IDE ever!
Kafka training, Kafka consulting, Cassandra training, Cassandra consulting, Spark training, Spark consulting
Tutorials
- QBit tutorials
- Microservices Intro
- Microservice KPI Monitoring
- Microservice Batteries Included
- RESTful APIs
- QBit and Reakt Promises
- Resourceful REST
- Microservices Reactor
- Working with JSON maps and lists
__
Docs
Getting Started
- First REST Microservice
- REST Microservice Part 2
- ServiceQueue
- ServiceBundle
- ServiceEndpointServer
- REST with URI Params
- Simple Single Page App
Basics
- What is QBit?
- Detailed Overview of QBit
- High level overview
- Low-level HTTP and WebSocket
- Low level WebSocket
- HttpClient
- HTTP Request filter
- HTTP Proxy
- Queues and flushing
- Local Proxies
- ServiceQueue remote and local
- ManagedServiceBuilder, consul, StatsD, Swagger support
- Working with Service Pools
- Callback Builders
- Error Handling
- Health System
- Stats System
- Reactor callback coordination
- Early Service Examples
Concepts
REST
Callbacks and Reactor
Event Bus
Advanced
Integration
- Using QBit in Vert.x
- Reactor-Integrating with Cassandra
- Using QBit with Spring Boot
- SolrJ and service pools
- Swagger support
- MDC Support
- Reactive Streams
- Mesos, Docker, Heroku
- DNS SRV
QBit case studies
QBit 2 Roadmap
-- Related Projects
- QBit Reactive Microservices
- Reakt Reactive Java
- Reakt Guava Bridge
- QBit Extensions
- Reactive Microservices
Kafka training, Kafka consulting, Cassandra training, Cassandra consulting, Spark training, Spark consulting