Skip to content

Commit

Permalink
IGNITE-22285 Implement Embedded Data Streamer with Receiver (#4103)
Browse files Browse the repository at this point in the history
Implement embedded streaming with receiver in `RecordViewImpl`, `RecordBinaryViewImpl`, `KeyValueViewImpl`, `KeyValueBinaryViewImpl`.

* Extract `StreamerReceiverJob` to `ignite-compute` module for reuse in client and embedded streamers
* `ignite-compute` module already depends on `ignite-table` module, but we need to run the receiver as a compute job:
  * Expose `StreamerReceiverRunner` interface from `ignite-table` module
  * Implement in `ignite-compute` and inject into `IgniteTablesInternal`
  * `ignite-compute` now depends on `ignite-client-common` to reuse streamer serialization logic
* Client protocol is not affected
  • Loading branch information
ptupitsyn authored Jul 18, 2024
1 parent 96095a1 commit 9b27158
Show file tree
Hide file tree
Showing 44 changed files with 612 additions and 191 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,5 +48,5 @@ public interface DataStreamerReceiver<T, A, R> {
@Nullable CompletableFuture<List<R>> receive(
List<T> page,
DataStreamerReceiverContext ctx,
A arg);
@Nullable A arg);
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,5 +63,5 @@ <E, V, R, A> CompletableFuture<Void> streamData(
ReceiverDescriptor<A> receiver,
@Nullable Flow.Subscriber<R> resultSubscriber,
@Nullable DataStreamerOptions options,
A receiverArg);
@Nullable A receiverArg);
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,19 +31,19 @@ public class ReceiverDescriptor<A> {

private final List<DeploymentUnit> units;

private final @Nullable Marshaler<A, byte[]> argumentsMarshaler;
private final @Nullable Marshaler<A, byte[]> argumentMarshaller;

private ReceiverDescriptor(
String receiverClassName,
List<DeploymentUnit> units,
@Nullable Marshaler<A, byte[]> argumentsMarshaler
@Nullable Marshaler<A, byte[]> argumentMarshaller
) {
Objects.requireNonNull(receiverClassName);
Objects.requireNonNull(units);

this.receiverClassName = receiverClassName;
this.units = units;
this.argumentsMarshaler = argumentsMarshaler;
this.argumentMarshaller = argumentMarshaller;
}

/**
Expand Down Expand Up @@ -86,8 +86,8 @@ public static <A> Builder<A> builder(Class<? extends DataStreamerReceiver<?, A,
return new Builder<>(receiverClass.getName());
}

public @Nullable Marshaler<A, byte[]> argumentsMarshaler() {
return argumentsMarshaler;
public @Nullable Marshaler<A, byte[]> argumentMarshaller() {
return argumentMarshaller;
}

/**
Expand All @@ -96,7 +96,7 @@ public static <A> Builder<A> builder(Class<? extends DataStreamerReceiver<?, A,
public static class Builder<A> {
private final String receiverClassName;
private List<DeploymentUnit> units;
private @Nullable Marshaler<A, byte[]> argumentsMarshaller;
private @Nullable Marshaler<A, byte[]> argumentMarshaller;


private Builder(String receiverClassName) {
Expand Down Expand Up @@ -127,8 +127,8 @@ public Builder<A> units(DeploymentUnit... units) {
return this;
}

public Builder<A> argumentsMarshaler(@Nullable Marshaler<A, byte[]> argumentsMarshaller) {
this.argumentsMarshaller = argumentsMarshaller;
public Builder<A> argumentMarshaller(@Nullable Marshaler<A, byte[]> argumentsMarshaller) {
this.argumentMarshaller = argumentsMarshaller;
return this;
}

Expand All @@ -141,7 +141,7 @@ public ReceiverDescriptor<A> build() {
return new ReceiverDescriptor<>(
receiverClassName,
units == null ? List.of() : units,
argumentsMarshaller
argumentMarshaller
);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ static Function<Integer, Object> readerForType(BinaryTupleReader binTuple, Colum
* @param builder Builder.
* @param obj Object.
*/
public static <T> void appendObject(BinaryTupleBuilder builder, T obj, @Nullable Marshaler<T, byte[]> marshaler) {
public static <T> void appendObject(BinaryTupleBuilder builder, @Nullable T obj, @Nullable Marshaler<T, byte[]> marshaler) {
if (obj == null) {
builder.appendNull(); // Type.
builder.appendNull(); // Scale.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,24 @@
import static org.apache.ignite.lang.ErrorGroups.Client.PROTOCOL_ERR;

import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.Collection;
import java.util.List;
import org.apache.ignite.internal.binarytuple.BinaryTupleBuilder;
import org.apache.ignite.internal.binarytuple.BinaryTupleReader;
import org.apache.ignite.lang.IgniteException;
import org.apache.ignite.marshaling.Marshaler;
import org.apache.ignite.table.ReceiverDescriptor;
import org.jetbrains.annotations.Nullable;

/**
* Streamer receiver serializer.
*
* <p>Client streamer: client -> handler -> job -> handler -> client:
* Avoid deserializing receiver payload and results on handler side and pass byte array as is.
*
* <p>Embedded streamer: node -> job -> node:
* No intermediate steps, trivial serialize/deserialize.
*/
public class StreamerReceiverSerializer {
/**
Expand All @@ -40,7 +48,7 @@ public class StreamerReceiverSerializer {
* @param receiverArgs Receiver arguments.
* @param items Items.
*/
public static <A> void serialize(ClientMessagePacker w, String receiverClassName, A receiverArgs,
public static <A> void serializeReceiverInfoOnClient(ClientMessagePacker w, String receiverClassName, A receiverArgs,
@Nullable Marshaler<A, byte[]> receiverArgsMarshaler, Collection<?> items) {
// className + arg + items size + item type + items.
int binaryTupleSize = 1 + 3 + 1 + 1 + items.size();
Expand All @@ -55,14 +63,44 @@ public static <A> void serialize(ClientMessagePacker w, String receiverClassName
w.packBinaryTuple(builder);
}

/**
* Serializes streamer receiver info.
*
* @param receiver Receiver descriptor.
* @param receiverArgs Receiver arguments.
* @param items Items.
*/
public static <A> byte[] serializeReceiverInfoWithElementCount(
ReceiverDescriptor<A> receiver,
@Nullable A receiverArgs,
Collection<?> items) {
// className + arg + items size + item type + items.
int binaryTupleSize = 1 + 3 + 1 + 1 + items.size();
var builder = new BinaryTupleBuilder(binaryTupleSize);
builder.appendString(receiver.receiverClassName());

ClientBinaryTupleUtils.appendObject(builder, receiverArgs, receiver.argumentMarshaller());

ClientBinaryTupleUtils.appendCollectionToBinaryTuple(builder, items);

ByteBuffer buf = builder.build();
int bufSize = buf.limit() - buf.position();
byte[] res = new byte[bufSize + 4];

ByteBuffer.wrap(res).order(ByteOrder.LITTLE_ENDIAN).putInt(binaryTupleSize);
buf.get(res, 4, bufSize);

return res;
}

/**
* Deserializes streamer receiver info.
*
* @param bytes Bytes.
* @param elementCount Number of elements in the binary tuple.
* @return Streamer receiver info.
*/
public static SteamerReceiverInfo deserialize(ByteBuffer bytes, int elementCount) {
public static SteamerReceiverInfo deserializeReceiverInfo(ByteBuffer bytes, int elementCount) {
var reader = new BinaryTupleReader(elementCount, bytes);

int readerIndex = 0;
Expand All @@ -72,42 +110,92 @@ public static SteamerReceiverInfo deserialize(ByteBuffer bytes, int elementCount
throw new IgniteException(PROTOCOL_ERR, "Receiver class name is null");
}

Object receiverArgs = ClientBinaryTupleUtils.readObject(reader, readerIndex);
Object receiverArg = ClientBinaryTupleUtils.readObject(reader, readerIndex);

readerIndex += 3;

List<Object> items = ClientBinaryTupleUtils.readCollectionFromBinaryTuple(reader, readerIndex);

return new SteamerReceiverInfo(receiverClassName, receiverArgs, items);
return new SteamerReceiverInfo(receiverClassName, receiverArg, items);
}

/**
* Serializes receiver results.
*
* @param w Writer.
* @param receiverResults Receiver results.
*/
public static void serializeResults(ClientMessagePacker w, @Nullable List<Object> receiverResults) {
public static byte @Nullable [] serializeReceiverJobResults(@Nullable List<Object> receiverResults) {
if (receiverResults == null || receiverResults.isEmpty()) {
w.packNil();
return;
return null;
}

int numElements = 2 + receiverResults.size();
var builder = new BinaryTupleBuilder(numElements);
ClientBinaryTupleUtils.appendCollectionToBinaryTuple(builder, receiverResults);

ByteBuffer res = builder.build();

// Resulting byte array.
int numElementsSize = 4;
byte[] resBytes = new byte[res.limit() - res.position() + numElementsSize];

// Prepend count.
ByteBuffer.wrap(resBytes).order(ByteOrder.LITTLE_ENDIAN).putInt(numElements);

// Copy binary tuple.
res.get(resBytes, numElementsSize, resBytes.length - numElementsSize);

return resBytes;
}

/**
* Deserializes receiver job results produced by {@link #serializeReceiverJobResults} method.
*
* @param results Serialized results.
* @return Deserialized results.
*/
public static <R> List<R> deserializeReceiverJobResults(byte[] results) {
if (results == null || results.length == 0) {
return List.of();
}

ByteBuffer buf = ByteBuffer.wrap(results).order(ByteOrder.LITTLE_ENDIAN);
int numElements = buf.getInt();

var reader = new BinaryTupleReader(numElements, buf.slice().order(ByteOrder.LITTLE_ENDIAN));

return ClientBinaryTupleUtils.readCollectionFromBinaryTuple(reader, 0);
}

/**
* Serializes receiver results.
*
* @param w Writer.
* @param receiverJobResults Receiver results serialized by {@link #serializeReceiverJobResults}.
*/
public static void serializeReceiverResultsForClient(ClientMessagePacker w, byte @Nullable [] receiverJobResults) {
if (receiverJobResults == null || receiverJobResults.length == 0) {
w.packNil();
return;
}

int numElementsSize = 4;
int binaryTupleSize = receiverJobResults.length - numElementsSize;

int numElements = ByteBuffer.wrap(receiverJobResults).order(ByteOrder.LITTLE_ENDIAN).getInt();

w.packInt(numElements);
w.packBinaryTuple(builder);
w.packBinaryHeader(binaryTupleSize);
w.writePayload(receiverJobResults, numElementsSize, binaryTupleSize);
}

/**
* Deserializes receiver results.
* Deserializes receiver results from {@link #serializeReceiverResultsForClient} method.
*
* @param r Reader.
* @return Receiver results.
*/
public static @Nullable <R> List<R> deserializeResults(ClientMessageUnpacker r) {
public static @Nullable <R> List<R> deserializeReceiverResultsOnClient(ClientMessageUnpacker r) {
if (r.tryUnpackNil()) {
return null;
}
Expand All @@ -124,12 +212,12 @@ public static void serializeResults(ClientMessagePacker w, @Nullable List<Object
*/
public static class SteamerReceiverInfo {
private final String className;
private final Object args;
private final @Nullable Object arg;
private final List<Object> items;

private SteamerReceiverInfo(String className, Object args, List<Object> items) {
private SteamerReceiverInfo(String className, @Nullable Object arg, List<Object> items) {
this.className = className;
this.args = args;
this.arg = arg;
this.items = items;
}

Expand All @@ -147,8 +235,8 @@ public String className() {
*
* @return Receiver args.
*/
public Object args() {
return args;
public @Nullable Object arg() {
return arg;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -797,7 +797,7 @@ private void processOperation(ChannelHandlerContext ctx, ClientMessageUnpacker i
return ClientTablePartitionPrimaryReplicasNodesGetRequest.process(in, out, igniteTables);

case ClientOp.STREAMER_WITH_RECEIVER_BATCH_SEND:
return ClientStreamerWithReceiverBatchSendRequest.process(in, out, igniteTables, compute);
return ClientStreamerWithReceiverBatchSendRequest.process(in, out, igniteTables);

default:
throw new IgniteException(PROTOCOL_ERR, "Unexpected operation code: " + opCode);
Expand Down
Loading

0 comments on commit 9b27158

Please sign in to comment.