-
Notifications
You must be signed in to change notification settings - Fork 140
[Documents] Adapting QBit microservice lib queues to Reactive Streams
At the core of QBit, microservices lib, is the queue. The QBit microservice lib centers around queues and micro-batching. A queue is a stream of messages. Micro-batching is employed to improve throughput by reducing thread handoffs, and improving IO efficiency. You can implement back pressure in QBit. However, reactive streams, has a very clean interface for implementing back pressure. Increasingly APIs for new SQL databases, data grids, messaging, are using some sort of streaming API.
If you are using Apache Spark, or Kafka or Apache Storm or Cassandra, then you are likely already familiar with streaming APIs. Java 8 ships with a streaming API and Java 9 improves on the concepts with Flow. Streams are coming to you one way or another if you are a Java programmer.
There are even attempts to make a base level stream API for compatibility sakes, called Reactive Streams.
Reactive Streams is an initiative to provide a standard for asynchronous stream processing with non-blocking back pressure. This encompasses efforts aimed at runtime environments (JVM and JavaScript) as well as network protocols.
Realizing the importance of streaming standards and the ability to integrate into a larger ecosystem, QBit now supports reactive streams and will support it more as time goes on.
QBit endeavors to make more and more things look like QBit queues which is a convenient interface for dealing with IO, interprocess and inter-thread communication with speeds up to 100M TPS to 200M TPS.
We have added two classes to adapt a QBit queue to a stream. Later there will be classes to adapte a stream to a queue. The two classes are: QueueToStreamRoundRobin
and QueueToStreamUnicast
. The overhead of backpressure support is around 20 to 30%. There are more QBit centric ways of implementing back pressure that will have less overhead, but in the grand scheme of thing this overhead is quite small. QueueToStreamRoundRobin
and QueueToStreamUnicast
have been clocked between 70M TPS and 80M TPS with the advantage of easily handling resource consumption management so that a fast data source does not overwhelm the stream destination. QueueToStreamRoundRobin
allows many reactive stream destinations from the same QBit queue.
Let's see these two in action:
public class Trade {
final String name;
final long amount;
private Trade(String name, long amount) {
this.name = name;
this.amount = amount;
}
...
}
final Queue<Trade> queue =
QueueBuilder
.queueBuilder()
.build();
/* Adapt the queue to a stream. */
final QueueToStreamUnicast<Trade> stream =
new QueueToStreamUnicast<>(queue);
stream.subscribe(new Subscriber<Trade>() {
private Subscription subscription;
private int count;
@Override
public void onSubscribe(Subscription s) {
this.subscription = s;
s.request(10);
}
@Override
public void onNext(Trade trade) {
//Do something useful with the trade
count++;
if (count > 9) {
count = 0;
subscription.request(10);
} else {
count++;
}
}
@Override
public void onError(Throwable t) {
error.set(t);
}
@Override
public void onComplete() {
/* shut down. */
}
});
/* Send some sample trades. */
final SendQueue<Trade> tradeSendQueue = queue.sendQueue();
for (int index = 0; index < 20; index++) {
tradeSendQueue.send(new Trade("TESLA", 100L + index));
}
tradeSendQueue.flushSends();
For the multicast version, here is a simple unit test showing how it works.
package io.advantageous.qbit.stream;
import io.advantageous.boon.core.Sys;
import io.advantageous.qbit.queue.Queue;
import io.advantageous.qbit.queue.QueueBuilder;
import io.advantageous.qbit.queue.SendQueue;
import org.junit.Test;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
public class QueueToStreamMulticast {
private class Trade {
final String name;
final long amount;
private Trade(String name, long amount) {
this.name = name;
this.amount = amount;
}
}
@Test
public void test() throws InterruptedException {
final Queue<Trade> queue = QueueBuilder.queueBuilder().build();
final QueueToStreamRoundRobin<Trade> stream = new QueueToStreamRoundRobin<>(queue);
final CopyOnWriteArrayList<Trade> trades = new CopyOnWriteArrayList<>();
final AtomicBoolean stop = new AtomicBoolean();
final AtomicReference<Throwable> error = new AtomicReference<>();
final AtomicReference<Subscription> subscription = new AtomicReference<>();
final CountDownLatch latch = new CountDownLatch(1);
final CountDownLatch stopLatch = new CountDownLatch(1);
stream.subscribe(new Subscriber<Trade>() {
@Override
public void onSubscribe(Subscription s) {
s.request(10);
subscription.set(s);
}
@Override
public void onNext(Trade trade) {
trades.add(trade);
if (trades.size()==10) {
latch.countDown();
}
}
@Override
public void onError(Throwable t) {
error.set(t);
}
@Override
public void onComplete() {
stop.set(true);
stopLatch.countDown();
}
});
final SendQueue<Trade> tradeSendQueue = queue.sendQueue();
for (int index = 0; index < 100; index++) {
tradeSendQueue.send(new Trade("TESLA", 100L + index));
}
tradeSendQueue.flushSends();
Sys.sleep(100);
latch.await(10, TimeUnit.SECONDS);
assertEquals(10, trades.size());
assertEquals(false, stop.get());
assertNotNull(subscription.get());
subscription.get().cancel();
stopLatch.await(10, TimeUnit.SECONDS);
assertEquals(true, stop.get());
}
@Test
public void test2Subscribe() throws InterruptedException {
final Queue<Trade> queue = QueueBuilder.queueBuilder().setBatchSize(5).build();
final QueueToStreamRoundRobin<Trade> stream = new QueueToStreamRoundRobin<>(queue);
final CopyOnWriteArrayList<Trade> trades = new CopyOnWriteArrayList<>();
final AtomicBoolean stop = new AtomicBoolean();
final AtomicReference<Throwable> error = new AtomicReference<>();
final AtomicReference<Subscription> subscription = new AtomicReference<>();
final CountDownLatch latch = new CountDownLatch(1);
final CountDownLatch stopLatch = new CountDownLatch(1);
stream.subscribe(new Subscriber<Trade>() {
@Override
public void onSubscribe(Subscription s) {
s.request(10);
subscription.set(s);
}
@Override
public void onNext(Trade trade) {
trades.add(trade);
if (trades.size()==20) {
latch.countDown();
}
}
@Override
public void onError(Throwable t) {
error.set(t);
}
@Override
public void onComplete() {
stop.set(true);
stopLatch.countDown();
}
});
stream.subscribe(new Subscriber<Trade>() {
@Override
public void onSubscribe(Subscription s) {
s.request(10);
subscription.set(s);
}
@Override
public void onNext(Trade trade) {
trades.add(trade);
if (trades.size()==20) {
latch.countDown();
}
}
@Override
public void onError(Throwable t) {
error.set(t);
}
@Override
public void onComplete() {
stop.set(true);
stopLatch.countDown();
}
});
final SendQueue<Trade> tradeSendQueue = queue.sendQueue();
for (int index = 0; index < 40; index++) {
tradeSendQueue.send(new Trade("TESLA", 100L + index));
}
tradeSendQueue.flushSends();
Sys.sleep(100);
latch.await(10, TimeUnit.SECONDS);
assertEquals(20, trades.size());
assertEquals(false, stop.get());
assertNotNull(subscription.get());
subscription.get().cancel();
stopLatch.await(10, TimeUnit.SECONDS);
assertEquals(true, stop.get());
}
}
QBit has support for a Queue API that works over Kafka, JMS, WebSocket and in-memory queues. You can now use reactive streams with these queues and the performance is very good.
QBit Website What is Microservices Architecture?
QBit Java Micorservices lib tutorials
The Java microservice lib. QBit is a reactive programming lib for building microservices - JSON, HTTP, WebSocket, and REST. QBit uses reactive programming to build elastic REST, and WebSockets based cloud friendly, web services. SOA evolved for mobile and cloud. ServiceDiscovery, Health, reactive StatService, events, Java idiomatic reactive programming for Microservices.
Reactive Programming, Java Microservices, Rick Hightower
Java Microservices Architecture
[Microservice Service Discovery with Consul] (http://www.mammatustech.com/Microservice-Service-Discovery-with-Consul)
Microservices Service Discovery Tutorial with Consul
[Reactive Microservices] (http://www.mammatustech.com/reactive-microservices)
[High Speed Microservices] (http://www.mammatustech.com/high-speed-microservices)
Reactive Microservices Tutorial, using the Reactor
QBit is mentioned in the Restlet blog
All code is written using JetBrains Idea - the best IDE ever!
Kafka training, Kafka consulting, Cassandra training, Cassandra consulting, Spark training, Spark consulting
Tutorials
- QBit tutorials
- Microservices Intro
- Microservice KPI Monitoring
- Microservice Batteries Included
- RESTful APIs
- QBit and Reakt Promises
- Resourceful REST
- Microservices Reactor
- Working with JSON maps and lists
__
Docs
Getting Started
- First REST Microservice
- REST Microservice Part 2
- ServiceQueue
- ServiceBundle
- ServiceEndpointServer
- REST with URI Params
- Simple Single Page App
Basics
- What is QBit?
- Detailed Overview of QBit
- High level overview
- Low-level HTTP and WebSocket
- Low level WebSocket
- HttpClient
- HTTP Request filter
- HTTP Proxy
- Queues and flushing
- Local Proxies
- ServiceQueue remote and local
- ManagedServiceBuilder, consul, StatsD, Swagger support
- Working with Service Pools
- Callback Builders
- Error Handling
- Health System
- Stats System
- Reactor callback coordination
- Early Service Examples
Concepts
REST
Callbacks and Reactor
Event Bus
Advanced
Integration
- Using QBit in Vert.x
- Reactor-Integrating with Cassandra
- Using QBit with Spring Boot
- SolrJ and service pools
- Swagger support
- MDC Support
- Reactive Streams
- Mesos, Docker, Heroku
- DNS SRV
QBit case studies
QBit 2 Roadmap
-- Related Projects
- QBit Reactive Microservices
- Reakt Reactive Java
- Reakt Guava Bridge
- QBit Extensions
- Reactive Microservices
Kafka training, Kafka consulting, Cassandra training, Cassandra consulting, Spark training, Spark consulting