From 0214716208101dbf46a5b5aeb5610419aeb9b82d Mon Sep 17 00:00:00 2001 From: Radek Hubner Date: Tue, 5 Dec 2023 21:49:20 +0400 Subject: [PATCH 01/11] Java API for merge operator. This is the first implementation of the merge operator API, where merge operators can be implemented in Java without any C++ code. It prefers simplicity and community feedback is appreciated. --- java/CMakeLists.txt | 5 + java/rocksjni/jni_merge_operator_v2.cc | 220 ++++++++++++++++++ java/rocksjni/jni_merge_operator_v2.h | 39 ++++ .../rocksdb/CassandraValueMergeOperator.java | 2 +- .../java/org/rocksdb/ColumnFamilyOptions.java | 2 +- .../org/rocksdb/InBuiltMergeOperator.java | 31 +++ .../main/java/org/rocksdb/MergeOperator.java | 25 +- .../java/org/rocksdb/MergeOperatorOutput.java | 51 ++++ .../java/org/rocksdb/MergeOperatorV2.java | 64 +++++ java/src/main/java/org/rocksdb/Options.java | 2 +- .../org/rocksdb/StringAppendOperator.java | 2 +- .../java/org/rocksdb/UInt64AddOperator.java | 2 +- .../java/org/rocksdb/MergeOperatorV2Test.java | 187 +++++++++++++++ 13 files changed, 609 insertions(+), 23 deletions(-) create mode 100644 java/rocksjni/jni_merge_operator_v2.cc create mode 100644 java/rocksjni/jni_merge_operator_v2.h create mode 100644 java/src/main/java/org/rocksdb/InBuiltMergeOperator.java create mode 100644 java/src/main/java/org/rocksdb/MergeOperatorOutput.java create mode 100644 java/src/main/java/org/rocksdb/MergeOperatorV2.java create mode 100644 java/src/test/java/org/rocksdb/MergeOperatorV2Test.java diff --git a/java/CMakeLists.txt b/java/CMakeLists.txt index a60847ead37..ba50fda2a83 100644 --- a/java/CMakeLists.txt +++ b/java/CMakeLists.txt @@ -46,6 +46,7 @@ set(JNI_NATIVE_SOURCES rocksjni/import_column_family_options.cc rocksjni/ingest_external_file_options.cc rocksjni/iterator.cc + rocksjni/jni_merge_operator_v2.cc rocksjni/jni_multiget_helpers.cc rocksjni/jni_perf_context.cc rocksjni/jnicallback.cc @@ -179,6 +180,8 @@ set(JAVA_MAIN_CLASSES src/main/java/org/rocksdb/HistogramData.java src/main/java/org/rocksdb/HistogramType.java src/main/java/org/rocksdb/Holder.java + src/main/java/org/rocksdb/ImportColumnFamilyOptions.java + src/main/java/org/rocksdb/InBuiltMergeOperator.java src/main/java/org/rocksdb/HyperClockCache.java src/main/java/org/rocksdb/ImportColumnFamilyOptions.java src/main/java/org/rocksdb/IndexShorteningMode.java @@ -198,6 +201,8 @@ set(JAVA_MAIN_CLASSES src/main/java/org/rocksdb/MemoryUsageType.java src/main/java/org/rocksdb/MemoryUtil.java src/main/java/org/rocksdb/MergeOperator.java + src/main/java/org/rocksdb/MergeOperatorV2.java + src/main/java/org/rocksdb/MergeOperatorOutput.java src/main/java/org/rocksdb/MutableColumnFamilyOptions.java src/main/java/org/rocksdb/MutableColumnFamilyOptionsInterface.java src/main/java/org/rocksdb/MutableDBOptions.java diff --git a/java/rocksjni/jni_merge_operator_v2.cc b/java/rocksjni/jni_merge_operator_v2.cc new file mode 100644 index 00000000000..b396a0e9b87 --- /dev/null +++ b/java/rocksjni/jni_merge_operator_v2.cc @@ -0,0 +1,220 @@ +// +// Created by rhubner on 29-Nov-23. +// + +#include "include/org_rocksdb_MergeOperatorV2.h" +#include "jni_merge_operator_v2.h" +#include "rocksjni/cplusplus_to_java_convert.h" +#include "rocksjni/portal.h" +#include + + +jlong Java_org_rocksdb_MergeOperatorV2_toCString + (JNIEnv* env, jclass, jstring operator_name) { + auto operator_name_utf = env->GetStringUTFChars(operator_name, nullptr); + if(operator_name_utf == nullptr) { + return 0; //Exception + } + auto operator_name_len = env->GetStringUTFLength(operator_name); + + char* ret_value = new char[operator_name_len + 1]; + strcpy_s(ret_value, operator_name_len + 1, operator_name_utf); + + env->ReleaseStringUTFChars(operator_name, operator_name_utf); + + return GET_CPLUSPLUS_POINTER(ret_value); +} + +jlong Java_org_rocksdb_MergeOperatorV2_newMergeOperator + (JNIEnv* env, jobject java_merge_operator, jlong _operator_name) { + + char* operator_name = reinterpret_cast(_operator_name); + + auto* jni_merge_operator = + new std::shared_ptr( + new rocksdb::JniMergeOperatorV2(env, java_merge_operator, operator_name) + ); + + return GET_CPLUSPLUS_POINTER(jni_merge_operator); +} + +void Java_org_rocksdb_MergeOperatorV2_disposeInternal + (JNIEnv *, jclass, jlong j_handle) { + auto* jni_merge_operator = reinterpret_cast*>(j_handle); + delete jni_merge_operator; +} + +namespace ROCKSDB_NAMESPACE { + +JniMergeOperatorV2::JniMergeOperatorV2(JNIEnv* env, jobject java_merge_operator, char* _operator_name) + : JniCallback(env,java_merge_operator) { + operator_name = _operator_name; + + j_merge_class = env->GetObjectClass(java_merge_operator); + if(j_merge_class == nullptr) { + return; //Exception + } + j_merge_class = static_cast(env->NewGlobalRef(j_merge_class)); + if(j_merge_class == nullptr) { + return; //Exception + } + + j_merge_internal = env->GetMethodID(j_merge_class, "mergeInternal", + "(Ljava/nio/ByteBuffer;Ljava/nio/ByteBuffer;[Ljava/nio/ByteBuffer;)Lorg/rocksdb/MergeOperatorOutput;"); + if(j_merge_internal == nullptr) { + return; + } + + return_value_clazz = env->FindClass("org/rocksdb/MergeOperatorOutput"); + if(return_value_clazz == nullptr) { + return ; //Exception + } + return_value_clazz = static_cast(env->NewGlobalRef(return_value_clazz)); + if(return_value_clazz == nullptr) { + return ; //Exception + } + + + return_value_method = env->GetMethodID(return_value_clazz, "getDirectValue", "()Ljava/nio/ByteBuffer;"); + if(return_value_method == nullptr) { + return ; + } + + return_status_method = env->GetMethodID(return_value_clazz, "getOpStatus", + "()I"); + if(return_status_method == nullptr) { + return ; + } + + j_byte_buffer_class = ByteBufferJni::getJClass(env); + if(j_byte_buffer_class == nullptr) { + return ; + } + j_byte_buffer_class = static_cast(env->NewGlobalRef(j_byte_buffer_class)); + if(j_byte_buffer_class == nullptr) { + return; //Exception + } + + byte_buffer_position = env->GetMethodID(j_byte_buffer_class, "position", "()I"); + if(byte_buffer_position == nullptr) { + return ; + } + + byte_buffer_remaining = env->GetMethodID(j_byte_buffer_class, "remaining", "()I"); + if(byte_buffer_remaining == nullptr) { + return ; + } + + + return ; +} + +bool JniMergeOperatorV2::FullMergeV2(const MergeOperationInput &merge_in, MergeOperationOutput *merge_out) const { + + jboolean attached_thread = JNI_FALSE; + auto env = getJniEnv(&attached_thread); + + auto j_operand_list = env->NewObjectArray(static_cast(merge_in.operand_list.size()), + j_byte_buffer_class, nullptr); + if(j_operand_list == nullptr) { + return clean_and_return_error(attached_thread, merge_out); + } + + for(int i = 0; i < merge_in.operand_list.size(); i++) { + //TODO - Setup array + auto operand = merge_in.operand_list[i]; + //auto byte_buffer = env->NewDirectByteBuffer((void *)operand.data(), operand.size()); //TODO - replace with C++ cast + auto byte_buffer = env->NewDirectByteBuffer( + const_cast(reinterpret_cast(operand.data())), + operand.size()); //TODO - replace with C++ cast + + if(byte_buffer == nullptr) { + return clean_and_return_error(attached_thread, merge_out); + } + env->SetObjectArrayElement(j_operand_list, i, byte_buffer); + } + + auto key = env->NewDirectByteBuffer( + const_cast(reinterpret_cast(merge_in.key.data())), + merge_in.key.size()); + if(key == nullptr) { + return clean_and_return_error(attached_thread, merge_out); + } + + jobject exising_value = nullptr; + if(merge_in.existing_value != nullptr) { + exising_value = env->NewDirectByteBuffer( + const_cast(reinterpret_cast(merge_in.existing_value->data())), + merge_in.existing_value->size()); + } + + + jobject result = env->CallObjectMethod(m_jcallback_obj, j_merge_internal, + key, exising_value, j_operand_list ); + if(env->ExceptionCheck() == JNI_TRUE) { + + env->ExceptionClear(); + Error(merge_in.logger, "Unable to merge, Java code throw exception"); + return clean_and_return_error(attached_thread, merge_out); + } + + if(result == nullptr) { + Error(merge_in.logger, "Unable to merge, Java code return nullptr result"); + return clean_and_return_error(attached_thread, merge_out); + } + + merge_out->op_failure_scope = javaToOpFailureScope(env->CallIntMethod(result, return_status_method)); + if(merge_out->op_failure_scope != MergeOperator::OpFailureScope::kDefault) { + releaseJniEnv(attached_thread); + return false; + } + + auto result_byte_buff = env->CallObjectMethod(result, return_value_method); + if(result_byte_buff == nullptr) { + Error(merge_in.logger, "Unable to merge, Java code return nullptr ByteBuffer"); + return clean_and_return_error(attached_thread, merge_out); + } + + auto result_byte_buff_data = env->GetDirectBufferAddress(result_byte_buff); + + + auto position = env->CallIntMethod(result_byte_buff, byte_buffer_position); + auto remaining = env->CallIntMethod(result_byte_buff, byte_buffer_remaining); + + merge_out->new_value.assign(static_cast(result_byte_buff_data) + position, remaining); + + releaseJniEnv(attached_thread); + + return true; +} + +JniMergeOperatorV2::~JniMergeOperatorV2() { + jboolean attached_thread = JNI_FALSE; + auto env = getJniEnv(&attached_thread); + env->DeleteGlobalRef(j_merge_class); + env->DeleteGlobalRef(j_byte_buffer_class); + env->DeleteGlobalRef(return_value_clazz); + delete operator_name; + releaseJniEnv(attached_thread); +} + +bool JniMergeOperatorV2::clean_and_return_error(jboolean& attached_thread, MergeOperationOutput *merge_out) const { + merge_out->op_failure_scope = MergeOperator::OpFailureScope::kOpFailureScopeMax; + releaseJniEnv(attached_thread); + return false; +} + +const MergeOperator::OpFailureScope JniMergeOperatorV2::javaToOpFailureScope(jint failure) const { + switch (failure) { + case 0: return MergeOperator::OpFailureScope::kDefault; + case 1: return MergeOperator::OpFailureScope::kTryMerge; + case 2: return MergeOperator::OpFailureScope::kMustMerge; + case 3: return MergeOperator::OpFailureScope::kOpFailureScopeMax; + default: return MergeOperator::OpFailureScope::kOpFailureScopeMax; + } +} + +const char *JniMergeOperatorV2::Name() const { + return operator_name; +} +} \ No newline at end of file diff --git a/java/rocksjni/jni_merge_operator_v2.h b/java/rocksjni/jni_merge_operator_v2.h new file mode 100644 index 00000000000..a96a4aeff81 --- /dev/null +++ b/java/rocksjni/jni_merge_operator_v2.h @@ -0,0 +1,39 @@ +// +// Created by rhubner on 29-Nov-23. +// +#ifndef ROCKSDB_JNI_MERGE_OPERATOR_V2_H +#define ROCKSDB_JNI_MERGE_OPERATOR_V2_H + +#include +#include "rocksjni/jnicallback.h" +#include "rocksdb/merge_operator.h" + +namespace ROCKSDB_NAMESPACE { + +class JniMergeOperatorV2 : public JniCallback, + public MergeOperator { + + public: + JniMergeOperatorV2(JNIEnv* env1, jobject java_merge_operator, char* operator_name); + bool FullMergeV2(const MergeOperationInput &merge_in, MergeOperationOutput *merge_out) const override; + const char* Name() const override; + ~JniMergeOperatorV2() override; + + private: + const MergeOperator::OpFailureScope javaToOpFailureScope(jint failure) const; + bool clean_and_return_error(jboolean& attached_thread, MergeOperationOutput *merge_out) const; + jclass j_merge_class; + jclass j_byte_buffer_class; + jmethodID j_merge_internal; + jclass return_value_clazz; + jmethodID return_value_method; + jmethodID return_status_method; + jmethodID byte_buffer_position; + jmethodID byte_buffer_remaining; + char* operator_name; + + +}; + +} +#endif // ROCKSDB_JNI_MERGE_OPERATOR_V2_H \ No newline at end of file diff --git a/java/src/main/java/org/rocksdb/CassandraValueMergeOperator.java b/java/src/main/java/org/rocksdb/CassandraValueMergeOperator.java index cdb82ee4347..7bcd9bd9957 100644 --- a/java/src/main/java/org/rocksdb/CassandraValueMergeOperator.java +++ b/java/src/main/java/org/rocksdb/CassandraValueMergeOperator.java @@ -9,7 +9,7 @@ * CassandraValueMergeOperator is a merge operator that merges two cassandra wide column * values. */ -public class CassandraValueMergeOperator extends MergeOperator { +public class CassandraValueMergeOperator extends InBuiltMergeOperator { public CassandraValueMergeOperator(final int gcGracePeriodInSeconds) { super(newSharedCassandraValueMergeOperator(gcGracePeriodInSeconds, 0)); } diff --git a/java/src/main/java/org/rocksdb/ColumnFamilyOptions.java b/java/src/main/java/org/rocksdb/ColumnFamilyOptions.java index 3af4d2a8ed6..e55ab3908b3 100644 --- a/java/src/main/java/org/rocksdb/ColumnFamilyOptions.java +++ b/java/src/main/java/org/rocksdb/ColumnFamilyOptions.java @@ -221,7 +221,7 @@ public ColumnFamilyOptions setMergeOperatorName(final String name) { @Override public ColumnFamilyOptions setMergeOperator( final MergeOperator mergeOperator) { - setMergeOperator(nativeHandle_, mergeOperator.nativeHandle_); + setMergeOperator(nativeHandle_, mergeOperator.nativeHandler()); return this; } diff --git a/java/src/main/java/org/rocksdb/InBuiltMergeOperator.java b/java/src/main/java/org/rocksdb/InBuiltMergeOperator.java new file mode 100644 index 00000000000..8a44e2988d0 --- /dev/null +++ b/java/src/main/java/org/rocksdb/InBuiltMergeOperator.java @@ -0,0 +1,31 @@ +// Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved. +// Copyright (c) 2014, Vlad Balan (vlad.gm@gmail.com). All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +package org.rocksdb; + +/** + * MergeOperator holds an operator to be applied when compacting + * two merge operands held under the same key in order to obtain a single + * value. + */ +public abstract class InBuiltMergeOperator extends RocksObject implements MergeOperator { + protected InBuiltMergeOperator(final long nativeHandle) { + super(nativeHandle); + } + + @Override + public long nativeHandler() { + return nativeHandle_; + } +} + + +// +// InBuiltMergeOperator +// interface MergeOperator +// +//interface MergeOperatorV2 extends MergeOperator +//interface MergeOperatorV3 extends MergeOperator \ No newline at end of file diff --git a/java/src/main/java/org/rocksdb/MergeOperator.java b/java/src/main/java/org/rocksdb/MergeOperator.java index c299f62210f..1c6e914de0a 100644 --- a/java/src/main/java/org/rocksdb/MergeOperator.java +++ b/java/src/main/java/org/rocksdb/MergeOperator.java @@ -1,18 +1,7 @@ -// Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved. -// Copyright (c) 2014, Vlad Balan (vlad.gm@gmail.com). All rights reserved. -// This source code is licensed under both the GPLv2 (found in the -// COPYING file in the root directory) and Apache 2.0 License -// (found in the LICENSE.Apache file in the root directory). - -package org.rocksdb; - -/** - * MergeOperator holds an operator to be applied when compacting - * two merge operands held under the same key in order to obtain a single - * value. - */ -public abstract class MergeOperator extends RocksObject { - protected MergeOperator(final long nativeHandle) { - super(nativeHandle); - } -} +package org.rocksdb; + +public interface MergeOperator { + + public long nativeHandler(); + +} diff --git a/java/src/main/java/org/rocksdb/MergeOperatorOutput.java b/java/src/main/java/org/rocksdb/MergeOperatorOutput.java new file mode 100644 index 00000000000..4ca46755c9e --- /dev/null +++ b/java/src/main/java/org/rocksdb/MergeOperatorOutput.java @@ -0,0 +1,51 @@ +package org.rocksdb; + + +import java.nio.ByteBuffer; + +/* + std::string& new_value; + Slice& existing_operand; + OpFailureScope op_failure_scope = OpFailureScope::kDefault; + */ +public class MergeOperatorOutput { + public enum OpFailureScope { + Default(0), + TryMerge(1), + MustMerge(2), + OpFailureScopeMax(3); + private final int status; + OpFailureScope(int status) { + this.status = status; + } + } + + private ByteBuffer directValue; + private OpFailureScope op_failure_scope = OpFailureScope.Default; + + + public MergeOperatorOutput(final ByteBuffer directValue) { + this.directValue = directValue; + } + + public MergeOperatorOutput(final ByteBuffer directValue, final OpFailureScope op_failure_scope) { + this.directValue = directValue; + this.op_failure_scope = op_failure_scope; + } + + public ByteBuffer getDirectValue() { + return directValue; + } + + public OpFailureScope getOp_failure_scope() { + return op_failure_scope; + } + + /** + * For JNI. Called from JniMergeOperatorV2 + */ + private int getOpStatus() { + return this.op_failure_scope.status; + } + +} \ No newline at end of file diff --git a/java/src/main/java/org/rocksdb/MergeOperatorV2.java b/java/src/main/java/org/rocksdb/MergeOperatorV2.java new file mode 100644 index 00000000000..b6ccb96496f --- /dev/null +++ b/java/src/main/java/org/rocksdb/MergeOperatorV2.java @@ -0,0 +1,64 @@ +package org.rocksdb; + +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.List; +import java.util.Optional; +import java.util.stream.Collectors; + +public abstract class MergeOperatorV2 extends RocksCallbackObject implements MergeOperator { + + private final String operatorName; + + public MergeOperatorV2(final String operatorName) { + super(new long[]{toCString(operatorName)}); + this.operatorName = operatorName; + } + + /** + * All parameters of this method are valid only during the call. If you want to keep this, you need to make deep copy/clone. + * + * + * @param key + * @param existingValue + * @param operand + * @return + */ + public abstract MergeOperatorOutput fullMergeV2(final ByteBuffer key, final ByteBuffer existingValue, final List operand); + + + final MergeOperatorOutput mergeInternal(final ByteBuffer key, final ByteBuffer existingValue, final ByteBuffer[] operand) { + List operandList = Arrays.stream(operand) + .map(ByteBuffer::asReadOnlyBuffer) + .collect(Collectors.toUnmodifiableList()); + + return fullMergeV2(key.asReadOnlyBuffer(), existingValue.asReadOnlyBuffer(), operandList); + } + + + @Override + final public long nativeHandler() { + return nativeHandle_; + } + + @Override + protected long initializeNative(long... nativeParameterHandles) { + return newMergeOperator(nativeParameterHandles[0]); + } + + @Override + protected void disposeInternal() { + disposeInternal(nativeHandle_); + } + + public String getOperatorName() { + return operatorName; + } + + private native long newMergeOperator(long operatorName); + + private static native long toCString(String operatorName); + + private static native void disposeInternal(long nativeHandle); + +} \ No newline at end of file diff --git a/java/src/main/java/org/rocksdb/Options.java b/java/src/main/java/org/rocksdb/Options.java index d3f6bdea5d9..3efc91d581c 100644 --- a/java/src/main/java/org/rocksdb/Options.java +++ b/java/src/main/java/org/rocksdb/Options.java @@ -239,7 +239,7 @@ public Options setMergeOperatorName(final String name) { @Override public Options setMergeOperator(final MergeOperator mergeOperator) { - setMergeOperator(nativeHandle_, mergeOperator.nativeHandle_); + setMergeOperator(nativeHandle_, mergeOperator.nativeHandler()); return this; } diff --git a/java/src/main/java/org/rocksdb/StringAppendOperator.java b/java/src/main/java/org/rocksdb/StringAppendOperator.java index 25b134c44af..fadd9f1a863 100644 --- a/java/src/main/java/org/rocksdb/StringAppendOperator.java +++ b/java/src/main/java/org/rocksdb/StringAppendOperator.java @@ -10,7 +10,7 @@ * StringAppendOperator is a merge operator that concatenates * two strings. */ -public class StringAppendOperator extends MergeOperator { +public class StringAppendOperator extends InBuiltMergeOperator { public StringAppendOperator() { this(','); } diff --git a/java/src/main/java/org/rocksdb/UInt64AddOperator.java b/java/src/main/java/org/rocksdb/UInt64AddOperator.java index 536ba58d835..ce15569719c 100644 --- a/java/src/main/java/org/rocksdb/UInt64AddOperator.java +++ b/java/src/main/java/org/rocksdb/UInt64AddOperator.java @@ -9,7 +9,7 @@ * Uint64AddOperator is a merge operator that accumlates a long * integer value. */ -public class UInt64AddOperator extends MergeOperator { +public class UInt64AddOperator extends InBuiltMergeOperator { public UInt64AddOperator() { super(newSharedUInt64AddOperator()); } diff --git a/java/src/test/java/org/rocksdb/MergeOperatorV2Test.java b/java/src/test/java/org/rocksdb/MergeOperatorV2Test.java new file mode 100644 index 00000000000..de65e3c1f20 --- /dev/null +++ b/java/src/test/java/org/rocksdb/MergeOperatorV2Test.java @@ -0,0 +1,187 @@ +package org.rocksdb; + +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.in; + +import java.nio.ByteBuffer; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; +import java.util.List; +import java.util.Optional; + +public class MergeOperatorV2Test { + + @ClassRule + public static final RocksNativeLibraryResource ROCKS_NATIVE_LIBRARY_RESOURCE = + new RocksNativeLibraryResource(); + + @Rule + public TemporaryFolder dbFolder = new TemporaryFolder(); + + private static byte[] KEY = "thisIsKey".getBytes(StandardCharsets.UTF_8); + + @Test + public void testMergeOperator() throws RocksDBException { + + try( TestMergeOperator mergeOperator = new TestMergeOperator(); + Options options = new Options()) { + options.setMergeOperator(mergeOperator); + options.setCreateIfMissing(true); + + try(RocksDB db = RocksDB.open(options, dbFolder.getRoot().getAbsolutePath())) { + db.put(KEY, "value".getBytes()); + db.merge(KEY, "value1".getBytes()); + db.merge(KEY, "value2".getBytes()); + db.merge(KEY, "value3".getBytes()); + byte[] valueFromDb = db.get(KEY); + assertThat(valueFromDb).containsExactly("10".getBytes(StandardCharsets.UTF_8)); + + } + } + } + + @Test + public void testMergeOperator2() throws RocksDBException { + + try (MergeOperatorV2 mergeOperator = new MergeOperatorV2("Second operator") { + + @Override + public MergeOperatorOutput fullMergeV2(ByteBuffer key, ByteBuffer existingValue, List operand) { + ByteBuffer b = ByteBuffer.allocateDirect(10); + b.put("xxx".getBytes(StandardCharsets.UTF_8)); + b.put(new byte[]{0,0}); + b.flip(); + b.position(3); + return new MergeOperatorOutput(b); + } + }) { + + + try (Options options = new Options()) { + + options.setMergeOperator(mergeOperator); + options.setCreateIfMissing(true); + + try (RocksDB db = RocksDB.open(options, dbFolder.getRoot().getAbsolutePath())) { + db.put(KEY, "value".getBytes()); + db.merge(KEY, "value1".getBytes()); + db.merge(KEY, "value2".getBytes()); + db.merge(KEY, "value3".getBytes()); + byte[] valueFromDb = db.get(KEY); + assertThat(valueFromDb).containsExactly(new byte[]{0,0}); + + } + } + } + } + + @Test + public void testMergeOperator3() throws RocksDBException { + + try (MergeOperatorV2 mergeOperator = new MergeOperatorV2("Third operator") { + + @Override + public MergeOperatorOutput fullMergeV2(ByteBuffer key, ByteBuffer existingValue, List operand) { + return new MergeOperatorOutput(operand.get(1)); + } + }) { + + try (Options options = new Options()) { + + options.setMergeOperator(mergeOperator); + options.setCreateIfMissing(true); + + try (RocksDB db = RocksDB.open(options, dbFolder.getRoot().getAbsolutePath())) { + db.put(KEY, "value".getBytes()); + db.merge(KEY, "value1".getBytes()); + db.merge(KEY, "value2".getBytes()); + db.merge(KEY, "value3".getBytes()); + byte[] valueFromDb = db.get(KEY); + assertThat(valueFromDb).containsExactly("value2".getBytes(StandardCharsets.UTF_8)); + + } + } + } + } + + @Test(expected = RocksDBException.class) + public void testMergeOperatorCrash() throws RocksDBException { + + try (MergeOperatorV2 mergeOperator = new MergeOperatorV2("CrashOperator") { + + @Override + public MergeOperatorOutput fullMergeV2(ByteBuffer key, ByteBuffer existingValue, List operand) { + return new MergeOperatorOutput(null, MergeOperatorOutput.OpFailureScope.OpFailureScopeMax); + } + }) { + + try (Options options = new Options()) { + + options.setMergeOperator(mergeOperator); + options.setCreateIfMissing(true); + + try (RocksDB db = RocksDB.open(options, dbFolder.getRoot().getAbsolutePath())) { + db.put(KEY, "value".getBytes()); + db.merge(KEY, "value1".getBytes()); + db.merge(KEY, "value2".getBytes()); + db.merge(KEY, "value3".getBytes()); + byte[] valueFromDb = db.get(KEY); + assertThat(valueFromDb).containsExactly("value2".getBytes(StandardCharsets.UTF_8)); + + } + } + } + } + + @Test(expected = RocksDBException.class) + public void testMergeOperatorJavaException() throws RocksDBException { + + try (MergeOperatorV2 mergeOperator = new MergeOperatorV2("CrashOperator") { + + @Override + public MergeOperatorOutput fullMergeV2(ByteBuffer key, ByteBuffer existingValue, List operand) { + throw new RuntimeException("Never do this"); + } + }) { + + try (Options options = new Options()) { + options.setMergeOperator(mergeOperator); + options.setCreateIfMissing(true); + + try (RocksDB db = RocksDB.open(options, dbFolder.getRoot().getAbsolutePath())) { + db.put(KEY, "value".getBytes()); + db.merge(KEY, "value1".getBytes()); + db.merge(KEY, "value2".getBytes()); + db.merge(KEY, "value3".getBytes()); + byte[] valueFromDb = db.get(KEY); + assertThat(valueFromDb).containsExactly("value2".getBytes(StandardCharsets.UTF_8)); + + } + } + } + } + + public static class TestMergeOperator extends MergeOperatorV2 { + + public TestMergeOperator() { + super("TestMergeOperator"); + } + + @Override + public MergeOperatorOutput fullMergeV2(ByteBuffer key, ByteBuffer existingValue, List operand) { + + ByteBuffer b = ByteBuffer.allocateDirect(10); + b.put("10".getBytes(StandardCharsets.UTF_8)); + b.flip(); + return new MergeOperatorOutput(b); + } + + } + + + +} From 043a3b1d951e413300ed0876df65d77b1570004a Mon Sep 17 00:00:00 2001 From: Radek Hubner Date: Wed, 6 Dec 2023 15:54:42 +0400 Subject: [PATCH 02/11] Fix formating and build on linux. --- java/rocksjni/jni_merge_operator_v2.cc | 15 +- java/rocksjni/jni_merge_operator_v2.h | 2 +- .../java/org/rocksdb/MergeOperatorV2Test.java | 302 ++++++++---------- src.mk | 1 + 4 files changed, 144 insertions(+), 176 deletions(-) diff --git a/java/rocksjni/jni_merge_operator_v2.cc b/java/rocksjni/jni_merge_operator_v2.cc index b396a0e9b87..9644245ed5e 100644 --- a/java/rocksjni/jni_merge_operator_v2.cc +++ b/java/rocksjni/jni_merge_operator_v2.cc @@ -2,12 +2,10 @@ // Created by rhubner on 29-Nov-23. // -#include "include/org_rocksdb_MergeOperatorV2.h" #include "jni_merge_operator_v2.h" +#include "include/org_rocksdb_MergeOperatorV2.h" #include "rocksjni/cplusplus_to_java_convert.h" #include "rocksjni/portal.h" -#include - jlong Java_org_rocksdb_MergeOperatorV2_toCString (JNIEnv* env, jclass, jstring operator_name) { @@ -18,7 +16,7 @@ jlong Java_org_rocksdb_MergeOperatorV2_toCString auto operator_name_len = env->GetStringUTFLength(operator_name); char* ret_value = new char[operator_name_len + 1]; - strcpy_s(ret_value, operator_name_len + 1, operator_name_utf); + memcpy(ret_value, operator_name_utf, operator_name_len + 1); env->ReleaseStringUTFChars(operator_name, operator_name_utf); @@ -105,7 +103,6 @@ JniMergeOperatorV2::JniMergeOperatorV2(JNIEnv* env, jobject java_merge_operator, return ; } - return ; } @@ -120,13 +117,11 @@ bool JniMergeOperatorV2::FullMergeV2(const MergeOperationInput &merge_in, MergeO return clean_and_return_error(attached_thread, merge_out); } - for(int i = 0; i < merge_in.operand_list.size(); i++) { - //TODO - Setup array + for (auto i = 0u; i < merge_in.operand_list.size(); i++) { auto operand = merge_in.operand_list[i]; - //auto byte_buffer = env->NewDirectByteBuffer((void *)operand.data(), operand.size()); //TODO - replace with C++ cast auto byte_buffer = env->NewDirectByteBuffer( const_cast(reinterpret_cast(operand.data())), - operand.size()); //TODO - replace with C++ cast + operand.size()); if(byte_buffer == nullptr) { return clean_and_return_error(attached_thread, merge_out); @@ -204,7 +199,7 @@ bool JniMergeOperatorV2::clean_and_return_error(jboolean& attached_thread, Merge return false; } -const MergeOperator::OpFailureScope JniMergeOperatorV2::javaToOpFailureScope(jint failure) const { +MergeOperator::OpFailureScope JniMergeOperatorV2::javaToOpFailureScope(jint failure) const { switch (failure) { case 0: return MergeOperator::OpFailureScope::kDefault; case 1: return MergeOperator::OpFailureScope::kTryMerge; diff --git a/java/rocksjni/jni_merge_operator_v2.h b/java/rocksjni/jni_merge_operator_v2.h index a96a4aeff81..75c07062d56 100644 --- a/java/rocksjni/jni_merge_operator_v2.h +++ b/java/rocksjni/jni_merge_operator_v2.h @@ -20,7 +20,7 @@ class JniMergeOperatorV2 : public JniCallback, ~JniMergeOperatorV2() override; private: - const MergeOperator::OpFailureScope javaToOpFailureScope(jint failure) const; + MergeOperator::OpFailureScope javaToOpFailureScope(jint failure) const; bool clean_and_return_error(jboolean& attached_thread, MergeOperationOutput *merge_out) const; jclass j_merge_class; jclass j_byte_buffer_class; diff --git a/java/src/test/java/org/rocksdb/MergeOperatorV2Test.java b/java/src/test/java/org/rocksdb/MergeOperatorV2Test.java index de65e3c1f20..e03a44118f5 100644 --- a/java/src/test/java/org/rocksdb/MergeOperatorV2Test.java +++ b/java/src/test/java/org/rocksdb/MergeOperatorV2Test.java @@ -1,187 +1,159 @@ package org.rocksdb; -import org.junit.ClassRule; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TemporaryFolder; import static org.assertj.core.api.Assertions.assertThat; -import static org.assertj.core.api.Assertions.in; import java.nio.ByteBuffer; -import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; import java.util.List; -import java.util.Optional; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; public class MergeOperatorV2Test { - - @ClassRule - public static final RocksNativeLibraryResource ROCKS_NATIVE_LIBRARY_RESOURCE = - new RocksNativeLibraryResource(); - - @Rule - public TemporaryFolder dbFolder = new TemporaryFolder(); - - private static byte[] KEY = "thisIsKey".getBytes(StandardCharsets.UTF_8); - - @Test - public void testMergeOperator() throws RocksDBException { - - try( TestMergeOperator mergeOperator = new TestMergeOperator(); - Options options = new Options()) { - options.setMergeOperator(mergeOperator); - options.setCreateIfMissing(true); - - try(RocksDB db = RocksDB.open(options, dbFolder.getRoot().getAbsolutePath())) { - db.put(KEY, "value".getBytes()); - db.merge(KEY, "value1".getBytes()); - db.merge(KEY, "value2".getBytes()); - db.merge(KEY, "value3".getBytes()); - byte[] valueFromDb = db.get(KEY); - assertThat(valueFromDb).containsExactly("10".getBytes(StandardCharsets.UTF_8)); - - } - } + @ClassRule + public static final RocksNativeLibraryResource ROCKS_NATIVE_LIBRARY_RESOURCE = + new RocksNativeLibraryResource(); + + @Rule public TemporaryFolder dbFolder = new TemporaryFolder(); + + private static byte[] KEY = "thisIsKey".getBytes(StandardCharsets.UTF_8); + + @Test + public void testMergeOperator() throws RocksDBException { + try (TestMergeOperator mergeOperator = new TestMergeOperator(); + Options options = new Options()) { + options.setMergeOperator(mergeOperator); + options.setCreateIfMissing(true); + + try (RocksDB db = RocksDB.open(options, dbFolder.getRoot().getAbsolutePath())) { + db.put(KEY, "value".getBytes()); + db.merge(KEY, "value1".getBytes()); + db.merge(KEY, "value2".getBytes()); + db.merge(KEY, "value3".getBytes()); + byte[] valueFromDb = db.get(KEY); + assertThat(valueFromDb).containsExactly("10".getBytes(StandardCharsets.UTF_8)); + } } - - @Test - public void testMergeOperator2() throws RocksDBException { - - try (MergeOperatorV2 mergeOperator = new MergeOperatorV2("Second operator") { - - @Override - public MergeOperatorOutput fullMergeV2(ByteBuffer key, ByteBuffer existingValue, List operand) { - ByteBuffer b = ByteBuffer.allocateDirect(10); - b.put("xxx".getBytes(StandardCharsets.UTF_8)); - b.put(new byte[]{0,0}); - b.flip(); - b.position(3); - return new MergeOperatorOutput(b); - } - }) { - - - try (Options options = new Options()) { - - options.setMergeOperator(mergeOperator); - options.setCreateIfMissing(true); - - try (RocksDB db = RocksDB.open(options, dbFolder.getRoot().getAbsolutePath())) { - db.put(KEY, "value".getBytes()); - db.merge(KEY, "value1".getBytes()); - db.merge(KEY, "value2".getBytes()); - db.merge(KEY, "value3".getBytes()); - byte[] valueFromDb = db.get(KEY); - assertThat(valueFromDb).containsExactly(new byte[]{0,0}); - - } - } + } + + @Test + public void middleOfByteBuffer() throws RocksDBException { + try (MergeOperatorV2 mergeOperator = new MergeOperatorV2("Second operator") { + @Override + public MergeOperatorOutput fullMergeV2( + ByteBuffer key, ByteBuffer existingValue, List operand) { + ByteBuffer b = ByteBuffer.allocateDirect(10); + b.put("xxx".getBytes(StandardCharsets.UTF_8)); + b.put(new byte[] {0, 0}); + b.flip(); + b.position(3); + return new MergeOperatorOutput(b); + } + }) { + try (Options options = new Options()) { + options.setMergeOperator(mergeOperator); + options.setCreateIfMissing(true); + + try (RocksDB db = RocksDB.open(options, dbFolder.getRoot().getAbsolutePath())) { + db.put(KEY, "value".getBytes()); + db.merge(KEY, "value1".getBytes()); + db.merge(KEY, "value2".getBytes()); + db.merge(KEY, "value3".getBytes()); + byte[] valueFromDb = db.get(KEY); + assertThat(valueFromDb).containsExactly(new byte[] {0, 0}); } + } } - - @Test - public void testMergeOperator3() throws RocksDBException { - - try (MergeOperatorV2 mergeOperator = new MergeOperatorV2("Third operator") { - - @Override - public MergeOperatorOutput fullMergeV2(ByteBuffer key, ByteBuffer existingValue, List operand) { - return new MergeOperatorOutput(operand.get(1)); - } - }) { - - try (Options options = new Options()) { - - options.setMergeOperator(mergeOperator); - options.setCreateIfMissing(true); - - try (RocksDB db = RocksDB.open(options, dbFolder.getRoot().getAbsolutePath())) { - db.put(KEY, "value".getBytes()); - db.merge(KEY, "value1".getBytes()); - db.merge(KEY, "value2".getBytes()); - db.merge(KEY, "value3".getBytes()); - byte[] valueFromDb = db.get(KEY); - assertThat(valueFromDb).containsExactly("value2".getBytes(StandardCharsets.UTF_8)); - - } - } + } + + @Test + public void returnExistingOperandByteBuffer() throws RocksDBException { + try (MergeOperatorV2 mergeOperator = new MergeOperatorV2("Third operator") { + @Override + public MergeOperatorOutput fullMergeV2( + ByteBuffer key, ByteBuffer existingValue, List operand) { + return new MergeOperatorOutput(operand.get(1)); + } + }) { + try (Options options = new Options()) { + options.setMergeOperator(mergeOperator); + options.setCreateIfMissing(true); + + try (RocksDB db = RocksDB.open(options, dbFolder.getRoot().getAbsolutePath())) { + db.put(KEY, "value".getBytes()); + db.merge(KEY, "value1".getBytes()); + db.merge(KEY, "value2".getBytes()); + db.merge(KEY, "value3".getBytes()); + byte[] valueFromDb = db.get(KEY); + assertThat(valueFromDb).containsExactly("value2".getBytes(StandardCharsets.UTF_8)); } + } } - - @Test(expected = RocksDBException.class) - public void testMergeOperatorCrash() throws RocksDBException { - - try (MergeOperatorV2 mergeOperator = new MergeOperatorV2("CrashOperator") { - - @Override - public MergeOperatorOutput fullMergeV2(ByteBuffer key, ByteBuffer existingValue, List operand) { - return new MergeOperatorOutput(null, MergeOperatorOutput.OpFailureScope.OpFailureScopeMax); - } - }) { - - try (Options options = new Options()) { - - options.setMergeOperator(mergeOperator); - options.setCreateIfMissing(true); - - try (RocksDB db = RocksDB.open(options, dbFolder.getRoot().getAbsolutePath())) { - db.put(KEY, "value".getBytes()); - db.merge(KEY, "value1".getBytes()); - db.merge(KEY, "value2".getBytes()); - db.merge(KEY, "value3".getBytes()); - byte[] valueFromDb = db.get(KEY); - assertThat(valueFromDb).containsExactly("value2".getBytes(StandardCharsets.UTF_8)); - - } - } + } + + @Test(expected = RocksDBException.class) + public void returnFailureStatus() throws RocksDBException { + try (MergeOperatorV2 mergeOperator = new MergeOperatorV2("CrashOperator") { + @Override + public MergeOperatorOutput fullMergeV2( + ByteBuffer key, ByteBuffer existingValue, List operand) { + return new MergeOperatorOutput(null, MergeOperatorOutput.OpFailureScope.OpFailureScopeMax); + } + }) { + try (Options options = new Options()) { + options.setMergeOperator(mergeOperator); + options.setCreateIfMissing(true); + + try (RocksDB db = RocksDB.open(options, dbFolder.getRoot().getAbsolutePath())) { + db.put(KEY, "value".getBytes()); + db.merge(KEY, "value1".getBytes()); + db.merge(KEY, "value2".getBytes()); + db.merge(KEY, "value3".getBytes()); + byte[] valueFromDb = db.get(KEY); + assertThat(valueFromDb).containsExactly("value2".getBytes(StandardCharsets.UTF_8)); } + } } - - @Test(expected = RocksDBException.class) - public void testMergeOperatorJavaException() throws RocksDBException { - - try (MergeOperatorV2 mergeOperator = new MergeOperatorV2("CrashOperator") { - - @Override - public MergeOperatorOutput fullMergeV2(ByteBuffer key, ByteBuffer existingValue, List operand) { - throw new RuntimeException("Never do this"); - } - }) { - - try (Options options = new Options()) { - options.setMergeOperator(mergeOperator); - options.setCreateIfMissing(true); - - try (RocksDB db = RocksDB.open(options, dbFolder.getRoot().getAbsolutePath())) { - db.put(KEY, "value".getBytes()); - db.merge(KEY, "value1".getBytes()); - db.merge(KEY, "value2".getBytes()); - db.merge(KEY, "value3".getBytes()); - byte[] valueFromDb = db.get(KEY); - assertThat(valueFromDb).containsExactly("value2".getBytes(StandardCharsets.UTF_8)); - - } - } + } + + @Test(expected = RocksDBException.class) + public void throwJavaException() throws RocksDBException { + try (MergeOperatorV2 mergeOperator = new MergeOperatorV2("CrashOperator") { + @Override + public MergeOperatorOutput fullMergeV2( + ByteBuffer key, ByteBuffer existingValue, List operand) { + throw new RuntimeException("Never do this"); + } + }) { + try (Options options = new Options()) { + options.setMergeOperator(mergeOperator); + options.setCreateIfMissing(true); + + try (RocksDB db = RocksDB.open(options, dbFolder.getRoot().getAbsolutePath())) { + db.put(KEY, "value".getBytes()); + db.merge(KEY, "value1".getBytes()); + db.merge(KEY, "value2".getBytes()); + db.merge(KEY, "value3".getBytes()); + byte[] valueFromDb = db.get(KEY); + assertThat(valueFromDb).containsExactly("value2".getBytes(StandardCharsets.UTF_8)); } + } } + } - public static class TestMergeOperator extends MergeOperatorV2 { - - public TestMergeOperator() { - super("TestMergeOperator"); - } - - @Override - public MergeOperatorOutput fullMergeV2(ByteBuffer key, ByteBuffer existingValue, List operand) { - - ByteBuffer b = ByteBuffer.allocateDirect(10); - b.put("10".getBytes(StandardCharsets.UTF_8)); - b.flip(); - return new MergeOperatorOutput(b); - } - + public static class TestMergeOperator extends MergeOperatorV2 { + public TestMergeOperator() { + super("TestMergeOperator"); } - - + @Override + public MergeOperatorOutput fullMergeV2( + ByteBuffer key, ByteBuffer existingValue, List operand) { + ByteBuffer b = ByteBuffer.allocateDirect(10); + b.put("10".getBytes(StandardCharsets.UTF_8)); + b.flip(); + return new MergeOperatorOutput(b); + } + } } diff --git a/src.mk b/src.mk index 7cbc69d1405..3ad8321e5fa 100644 --- a/src.mk +++ b/src.mk @@ -687,6 +687,7 @@ JNI_NATIVE_SOURCES = \ java/rocksjni/jni_perf_context.cc \ java/rocksjni/jni_multiget_helpers.cc \ java/rocksjni/jnicallback.cc \ + java/rocksjni/jni_merge_operator_v2.cc \ java/rocksjni/loggerjnicallback.cc \ java/rocksjni/lru_cache.cc \ java/rocksjni/memtablejni.cc \ From 6a9f60482321c22b0cdd10b478b34190a0fe4e43 Mon Sep 17 00:00:00 2001 From: Radek Hubner Date: Wed, 6 Dec 2023 16:00:16 +0400 Subject: [PATCH 03/11] Fix formating. --- java/rocksjni/jni_merge_operator_v2.cc | 177 ++++++++++-------- java/rocksjni/jni_merge_operator_v2.h | 20 +- .../org/rocksdb/InBuiltMergeOperator.java | 19 +- .../main/java/org/rocksdb/MergeOperator.java | 4 +- .../java/org/rocksdb/MergeOperatorOutput.java | 75 ++++---- .../java/org/rocksdb/MergeOperatorV2.java | 107 ++++++----- 6 files changed, 206 insertions(+), 196 deletions(-) diff --git a/java/rocksjni/jni_merge_operator_v2.cc b/java/rocksjni/jni_merge_operator_v2.cc index 9644245ed5e..d0cdf2cdb3b 100644 --- a/java/rocksjni/jni_merge_operator_v2.cc +++ b/java/rocksjni/jni_merge_operator_v2.cc @@ -3,15 +3,16 @@ // #include "jni_merge_operator_v2.h" + #include "include/org_rocksdb_MergeOperatorV2.h" #include "rocksjni/cplusplus_to_java_convert.h" #include "rocksjni/portal.h" -jlong Java_org_rocksdb_MergeOperatorV2_toCString - (JNIEnv* env, jclass, jstring operator_name) { +jlong Java_org_rocksdb_MergeOperatorV2_toCString(JNIEnv* env, jclass, + jstring operator_name) { auto operator_name_utf = env->GetStringUTFChars(operator_name, nullptr); - if(operator_name_utf == nullptr) { - return 0; //Exception + if (operator_name_utf == nullptr) { + return 0; // Exception } auto operator_name_len = env->GetStringUTFLength(operator_name); @@ -23,97 +24,106 @@ jlong Java_org_rocksdb_MergeOperatorV2_toCString return GET_CPLUSPLUS_POINTER(ret_value); } -jlong Java_org_rocksdb_MergeOperatorV2_newMergeOperator - (JNIEnv* env, jobject java_merge_operator, jlong _operator_name) { - +jlong Java_org_rocksdb_MergeOperatorV2_newMergeOperator( + JNIEnv* env, jobject java_merge_operator, jlong _operator_name) { char* operator_name = reinterpret_cast(_operator_name); auto* jni_merge_operator = new std::shared_ptr( - new rocksdb::JniMergeOperatorV2(env, java_merge_operator, operator_name) - ); + new rocksdb::JniMergeOperatorV2(env, java_merge_operator, + operator_name)); return GET_CPLUSPLUS_POINTER(jni_merge_operator); } -void Java_org_rocksdb_MergeOperatorV2_disposeInternal - (JNIEnv *, jclass, jlong j_handle) { - auto* jni_merge_operator = reinterpret_cast*>(j_handle); +void Java_org_rocksdb_MergeOperatorV2_disposeInternal(JNIEnv*, jclass, + jlong j_handle) { + auto* jni_merge_operator = + reinterpret_cast*>( + j_handle); delete jni_merge_operator; } namespace ROCKSDB_NAMESPACE { -JniMergeOperatorV2::JniMergeOperatorV2(JNIEnv* env, jobject java_merge_operator, char* _operator_name) - : JniCallback(env,java_merge_operator) { +JniMergeOperatorV2::JniMergeOperatorV2(JNIEnv* env, jobject java_merge_operator, + char* _operator_name) + : JniCallback(env, java_merge_operator) { operator_name = _operator_name; j_merge_class = env->GetObjectClass(java_merge_operator); - if(j_merge_class == nullptr) { - return; //Exception + if (j_merge_class == nullptr) { + return; // Exception } j_merge_class = static_cast(env->NewGlobalRef(j_merge_class)); - if(j_merge_class == nullptr) { - return; //Exception + if (j_merge_class == nullptr) { + return; // Exception } - j_merge_internal = env->GetMethodID(j_merge_class, "mergeInternal", - "(Ljava/nio/ByteBuffer;Ljava/nio/ByteBuffer;[Ljava/nio/ByteBuffer;)Lorg/rocksdb/MergeOperatorOutput;"); - if(j_merge_internal == nullptr) { + j_merge_internal = + env->GetMethodID(j_merge_class, "mergeInternal", + "(Ljava/nio/ByteBuffer;Ljava/nio/ByteBuffer;[Ljava/nio/" + "ByteBuffer;)Lorg/rocksdb/MergeOperatorOutput;"); + if (j_merge_internal == nullptr) { return; } return_value_clazz = env->FindClass("org/rocksdb/MergeOperatorOutput"); - if(return_value_clazz == nullptr) { - return ; //Exception + if (return_value_clazz == nullptr) { + return; // Exception } - return_value_clazz = static_cast(env->NewGlobalRef(return_value_clazz)); - if(return_value_clazz == nullptr) { - return ; //Exception + return_value_clazz = + static_cast(env->NewGlobalRef(return_value_clazz)); + if (return_value_clazz == nullptr) { + return; // Exception } - - return_value_method = env->GetMethodID(return_value_clazz, "getDirectValue", "()Ljava/nio/ByteBuffer;"); - if(return_value_method == nullptr) { - return ; + return_value_method = env->GetMethodID(return_value_clazz, "getDirectValue", + "()Ljava/nio/ByteBuffer;"); + if (return_value_method == nullptr) { + return; } - return_status_method = env->GetMethodID(return_value_clazz, "getOpStatus", - "()I"); - if(return_status_method == nullptr) { - return ; + return_status_method = + env->GetMethodID(return_value_clazz, "getOpStatus", "()I"); + if (return_status_method == nullptr) { + return; } j_byte_buffer_class = ByteBufferJni::getJClass(env); - if(j_byte_buffer_class == nullptr) { - return ; + if (j_byte_buffer_class == nullptr) { + return; } - j_byte_buffer_class = static_cast(env->NewGlobalRef(j_byte_buffer_class)); - if(j_byte_buffer_class == nullptr) { - return; //Exception + j_byte_buffer_class = + static_cast(env->NewGlobalRef(j_byte_buffer_class)); + if (j_byte_buffer_class == nullptr) { + return; // Exception } - byte_buffer_position = env->GetMethodID(j_byte_buffer_class, "position", "()I"); - if(byte_buffer_position == nullptr) { - return ; + byte_buffer_position = + env->GetMethodID(j_byte_buffer_class, "position", "()I"); + if (byte_buffer_position == nullptr) { + return; } - byte_buffer_remaining = env->GetMethodID(j_byte_buffer_class, "remaining", "()I"); - if(byte_buffer_remaining == nullptr) { - return ; + byte_buffer_remaining = + env->GetMethodID(j_byte_buffer_class, "remaining", "()I"); + if (byte_buffer_remaining == nullptr) { + return; } - return ; + return; } -bool JniMergeOperatorV2::FullMergeV2(const MergeOperationInput &merge_in, MergeOperationOutput *merge_out) const { - +bool JniMergeOperatorV2::FullMergeV2(const MergeOperationInput& merge_in, + MergeOperationOutput* merge_out) const { jboolean attached_thread = JNI_FALSE; auto env = getJniEnv(&attached_thread); - auto j_operand_list = env->NewObjectArray(static_cast(merge_in.operand_list.size()), - j_byte_buffer_class, nullptr); - if(j_operand_list == nullptr) { + auto j_operand_list = + env->NewObjectArray(static_cast(merge_in.operand_list.size()), + j_byte_buffer_class, nullptr); + if (j_operand_list == nullptr) { return clean_and_return_error(attached_thread, merge_out); } @@ -123,7 +133,7 @@ bool JniMergeOperatorV2::FullMergeV2(const MergeOperationInput &merge_in, MergeO const_cast(reinterpret_cast(operand.data())), operand.size()); - if(byte_buffer == nullptr) { + if (byte_buffer == nullptr) { return clean_and_return_error(attached_thread, merge_out); } env->SetObjectArrayElement(j_operand_list, i, byte_buffer); @@ -132,51 +142,52 @@ bool JniMergeOperatorV2::FullMergeV2(const MergeOperationInput &merge_in, MergeO auto key = env->NewDirectByteBuffer( const_cast(reinterpret_cast(merge_in.key.data())), merge_in.key.size()); - if(key == nullptr) { + if (key == nullptr) { return clean_and_return_error(attached_thread, merge_out); } jobject exising_value = nullptr; - if(merge_in.existing_value != nullptr) { + if (merge_in.existing_value != nullptr) { exising_value = env->NewDirectByteBuffer( - const_cast(reinterpret_cast(merge_in.existing_value->data())), - merge_in.existing_value->size()); + const_cast( + reinterpret_cast(merge_in.existing_value->data())), + merge_in.existing_value->size()); } - - jobject result = env->CallObjectMethod(m_jcallback_obj, j_merge_internal, - key, exising_value, j_operand_list ); - if(env->ExceptionCheck() == JNI_TRUE) { - + jobject result = env->CallObjectMethod(m_jcallback_obj, j_merge_internal, key, + exising_value, j_operand_list); + if (env->ExceptionCheck() == JNI_TRUE) { env->ExceptionClear(); Error(merge_in.logger, "Unable to merge, Java code throw exception"); return clean_and_return_error(attached_thread, merge_out); } - if(result == nullptr) { + if (result == nullptr) { Error(merge_in.logger, "Unable to merge, Java code return nullptr result"); return clean_and_return_error(attached_thread, merge_out); } - merge_out->op_failure_scope = javaToOpFailureScope(env->CallIntMethod(result, return_status_method)); - if(merge_out->op_failure_scope != MergeOperator::OpFailureScope::kDefault) { + merge_out->op_failure_scope = + javaToOpFailureScope(env->CallIntMethod(result, return_status_method)); + if (merge_out->op_failure_scope != MergeOperator::OpFailureScope::kDefault) { releaseJniEnv(attached_thread); return false; } auto result_byte_buff = env->CallObjectMethod(result, return_value_method); - if(result_byte_buff == nullptr) { - Error(merge_in.logger, "Unable to merge, Java code return nullptr ByteBuffer"); + if (result_byte_buff == nullptr) { + Error(merge_in.logger, + "Unable to merge, Java code return nullptr ByteBuffer"); return clean_and_return_error(attached_thread, merge_out); } auto result_byte_buff_data = env->GetDirectBufferAddress(result_byte_buff); - auto position = env->CallIntMethod(result_byte_buff, byte_buffer_position); auto remaining = env->CallIntMethod(result_byte_buff, byte_buffer_remaining); - merge_out->new_value.assign(static_cast(result_byte_buff_data) + position, remaining); + merge_out->new_value.assign( + static_cast(result_byte_buff_data) + position, remaining); releaseJniEnv(attached_thread); @@ -193,23 +204,29 @@ JniMergeOperatorV2::~JniMergeOperatorV2() { releaseJniEnv(attached_thread); } -bool JniMergeOperatorV2::clean_and_return_error(jboolean& attached_thread, MergeOperationOutput *merge_out) const { - merge_out->op_failure_scope = MergeOperator::OpFailureScope::kOpFailureScopeMax; +bool JniMergeOperatorV2::clean_and_return_error( + jboolean& attached_thread, MergeOperationOutput* merge_out) const { + merge_out->op_failure_scope = + MergeOperator::OpFailureScope::kOpFailureScopeMax; releaseJniEnv(attached_thread); return false; } -MergeOperator::OpFailureScope JniMergeOperatorV2::javaToOpFailureScope(jint failure) const { +MergeOperator::OpFailureScope JniMergeOperatorV2::javaToOpFailureScope( + jint failure) const { switch (failure) { - case 0: return MergeOperator::OpFailureScope::kDefault; - case 1: return MergeOperator::OpFailureScope::kTryMerge; - case 2: return MergeOperator::OpFailureScope::kMustMerge; - case 3: return MergeOperator::OpFailureScope::kOpFailureScopeMax; - default: return MergeOperator::OpFailureScope::kOpFailureScopeMax; + case 0: + return MergeOperator::OpFailureScope::kDefault; + case 1: + return MergeOperator::OpFailureScope::kTryMerge; + case 2: + return MergeOperator::OpFailureScope::kMustMerge; + case 3: + return MergeOperator::OpFailureScope::kOpFailureScopeMax; + default: + return MergeOperator::OpFailureScope::kOpFailureScopeMax; } } -const char *JniMergeOperatorV2::Name() const { - return operator_name; -} -} \ No newline at end of file +const char* JniMergeOperatorV2::Name() const { return operator_name; } +} // namespace ROCKSDB_NAMESPACE \ No newline at end of file diff --git a/java/rocksjni/jni_merge_operator_v2.h b/java/rocksjni/jni_merge_operator_v2.h index 75c07062d56..1b9ed78ba87 100644 --- a/java/rocksjni/jni_merge_operator_v2.h +++ b/java/rocksjni/jni_merge_operator_v2.h @@ -5,23 +5,25 @@ #define ROCKSDB_JNI_MERGE_OPERATOR_V2_H #include -#include "rocksjni/jnicallback.h" + #include "rocksdb/merge_operator.h" +#include "rocksjni/jnicallback.h" namespace ROCKSDB_NAMESPACE { -class JniMergeOperatorV2 : public JniCallback, - public MergeOperator { - +class JniMergeOperatorV2 : public JniCallback, public MergeOperator { public: - JniMergeOperatorV2(JNIEnv* env1, jobject java_merge_operator, char* operator_name); - bool FullMergeV2(const MergeOperationInput &merge_in, MergeOperationOutput *merge_out) const override; + JniMergeOperatorV2(JNIEnv* env1, jobject java_merge_operator, + char* operator_name); + bool FullMergeV2(const MergeOperationInput& merge_in, + MergeOperationOutput* merge_out) const override; const char* Name() const override; ~JniMergeOperatorV2() override; private: MergeOperator::OpFailureScope javaToOpFailureScope(jint failure) const; - bool clean_and_return_error(jboolean& attached_thread, MergeOperationOutput *merge_out) const; + bool clean_and_return_error(jboolean& attached_thread, + MergeOperationOutput* merge_out) const; jclass j_merge_class; jclass j_byte_buffer_class; jmethodID j_merge_internal; @@ -31,9 +33,7 @@ class JniMergeOperatorV2 : public JniCallback, jmethodID byte_buffer_position; jmethodID byte_buffer_remaining; char* operator_name; - - }; -} +} // namespace ROCKSDB_NAMESPACE #endif // ROCKSDB_JNI_MERGE_OPERATOR_V2_H \ No newline at end of file diff --git a/java/src/main/java/org/rocksdb/InBuiltMergeOperator.java b/java/src/main/java/org/rocksdb/InBuiltMergeOperator.java index 8a44e2988d0..23036cde305 100644 --- a/java/src/main/java/org/rocksdb/InBuiltMergeOperator.java +++ b/java/src/main/java/org/rocksdb/InBuiltMergeOperator.java @@ -12,20 +12,19 @@ * value. */ public abstract class InBuiltMergeOperator extends RocksObject implements MergeOperator { - protected InBuiltMergeOperator(final long nativeHandle) { - super(nativeHandle); - } + protected InBuiltMergeOperator(final long nativeHandle) { + super(nativeHandle); + } - @Override - public long nativeHandler() { - return nativeHandle_; - } + @Override + public long nativeHandler() { + return nativeHandle_; + } } - // // InBuiltMergeOperator // interface MergeOperator // -//interface MergeOperatorV2 extends MergeOperator -//interface MergeOperatorV3 extends MergeOperator \ No newline at end of file +// interface MergeOperatorV2 extends MergeOperator +// interface MergeOperatorV3 extends MergeOperator \ No newline at end of file diff --git a/java/src/main/java/org/rocksdb/MergeOperator.java b/java/src/main/java/org/rocksdb/MergeOperator.java index 1c6e914de0a..3158857da94 100644 --- a/java/src/main/java/org/rocksdb/MergeOperator.java +++ b/java/src/main/java/org/rocksdb/MergeOperator.java @@ -1,7 +1,5 @@ package org.rocksdb; public interface MergeOperator { - - public long nativeHandler(); - + public long nativeHandler(); } diff --git a/java/src/main/java/org/rocksdb/MergeOperatorOutput.java b/java/src/main/java/org/rocksdb/MergeOperatorOutput.java index 4ca46755c9e..796d31752f1 100644 --- a/java/src/main/java/org/rocksdb/MergeOperatorOutput.java +++ b/java/src/main/java/org/rocksdb/MergeOperatorOutput.java @@ -1,6 +1,5 @@ package org.rocksdb; - import java.nio.ByteBuffer; /* @@ -9,43 +8,41 @@ OpFailureScope op_failure_scope = OpFailureScope::kDefault; */ public class MergeOperatorOutput { - public enum OpFailureScope { - Default(0), - TryMerge(1), - MustMerge(2), - OpFailureScopeMax(3); - private final int status; - OpFailureScope(int status) { - this.status = status; - } - } - - private ByteBuffer directValue; - private OpFailureScope op_failure_scope = OpFailureScope.Default; - - - public MergeOperatorOutput(final ByteBuffer directValue) { - this.directValue = directValue; - } - - public MergeOperatorOutput(final ByteBuffer directValue, final OpFailureScope op_failure_scope) { - this.directValue = directValue; - this.op_failure_scope = op_failure_scope; + public enum OpFailureScope { + Default(0), + TryMerge(1), + MustMerge(2), + OpFailureScopeMax(3); + private final int status; + OpFailureScope(int status) { + this.status = status; } - - public ByteBuffer getDirectValue() { - return directValue; - } - - public OpFailureScope getOp_failure_scope() { - return op_failure_scope; - } - - /** - * For JNI. Called from JniMergeOperatorV2 - */ - private int getOpStatus() { - return this.op_failure_scope.status; - } - + } + + private ByteBuffer directValue; + private OpFailureScope op_failure_scope = OpFailureScope.Default; + + public MergeOperatorOutput(final ByteBuffer directValue) { + this.directValue = directValue; + } + + public MergeOperatorOutput(final ByteBuffer directValue, final OpFailureScope op_failure_scope) { + this.directValue = directValue; + this.op_failure_scope = op_failure_scope; + } + + public ByteBuffer getDirectValue() { + return directValue; + } + + public OpFailureScope getOp_failure_scope() { + return op_failure_scope; + } + + /** + * For JNI. Called from JniMergeOperatorV2 + */ + private int getOpStatus() { + return this.op_failure_scope.status; + } } \ No newline at end of file diff --git a/java/src/main/java/org/rocksdb/MergeOperatorV2.java b/java/src/main/java/org/rocksdb/MergeOperatorV2.java index b6ccb96496f..07f55e4785b 100644 --- a/java/src/main/java/org/rocksdb/MergeOperatorV2.java +++ b/java/src/main/java/org/rocksdb/MergeOperatorV2.java @@ -7,58 +7,57 @@ import java.util.stream.Collectors; public abstract class MergeOperatorV2 extends RocksCallbackObject implements MergeOperator { - - private final String operatorName; - - public MergeOperatorV2(final String operatorName) { - super(new long[]{toCString(operatorName)}); - this.operatorName = operatorName; - } - - /** - * All parameters of this method are valid only during the call. If you want to keep this, you need to make deep copy/clone. - * - * - * @param key - * @param existingValue - * @param operand - * @return - */ - public abstract MergeOperatorOutput fullMergeV2(final ByteBuffer key, final ByteBuffer existingValue, final List operand); - - - final MergeOperatorOutput mergeInternal(final ByteBuffer key, final ByteBuffer existingValue, final ByteBuffer[] operand) { - List operandList = Arrays.stream(operand) - .map(ByteBuffer::asReadOnlyBuffer) - .collect(Collectors.toUnmodifiableList()); - - return fullMergeV2(key.asReadOnlyBuffer(), existingValue.asReadOnlyBuffer(), operandList); - } - - - @Override - final public long nativeHandler() { - return nativeHandle_; - } - - @Override - protected long initializeNative(long... nativeParameterHandles) { - return newMergeOperator(nativeParameterHandles[0]); - } - - @Override - protected void disposeInternal() { - disposeInternal(nativeHandle_); - } - - public String getOperatorName() { - return operatorName; - } - - private native long newMergeOperator(long operatorName); - - private static native long toCString(String operatorName); - - private static native void disposeInternal(long nativeHandle); - + private final String operatorName; + + public MergeOperatorV2(final String operatorName) { + super(new long[] {toCString(operatorName)}); + this.operatorName = operatorName; + } + + /** + * All parameters of this method are valid only during the call. If you want to keep this, you + * need to make deep copy/clone. + * + * + * @param key + * @param existingValue + * @param operand + * @return + */ + public abstract MergeOperatorOutput fullMergeV2( + final ByteBuffer key, final ByteBuffer existingValue, final List operand); + + final MergeOperatorOutput mergeInternal( + final ByteBuffer key, final ByteBuffer existingValue, final ByteBuffer[] operand) { + List operandList = Arrays.stream(operand) + .map(ByteBuffer::asReadOnlyBuffer) + .collect(Collectors.toUnmodifiableList()); + + return fullMergeV2(key.asReadOnlyBuffer(), existingValue.asReadOnlyBuffer(), operandList); + } + + @Override + final public long nativeHandler() { + return nativeHandle_; + } + + @Override + protected long initializeNative(long... nativeParameterHandles) { + return newMergeOperator(nativeParameterHandles[0]); + } + + @Override + protected void disposeInternal() { + disposeInternal(nativeHandle_); + } + + public String getOperatorName() { + return operatorName; + } + + private native long newMergeOperator(long operatorName); + + private static native long toCString(String operatorName); + + private static native void disposeInternal(long nativeHandle); } \ No newline at end of file From e5d120c0f1d66f5c1c78fa2b88b08e4850784749 Mon Sep 17 00:00:00 2001 From: Radek Hubner Date: Wed, 6 Dec 2023 16:20:20 +0400 Subject: [PATCH 04/11] Fix java 8 compatibility. --- java/src/main/java/org/rocksdb/MergeOperatorV2.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/java/src/main/java/org/rocksdb/MergeOperatorV2.java b/java/src/main/java/org/rocksdb/MergeOperatorV2.java index 07f55e4785b..8313911bf99 100644 --- a/java/src/main/java/org/rocksdb/MergeOperatorV2.java +++ b/java/src/main/java/org/rocksdb/MergeOperatorV2.java @@ -31,7 +31,7 @@ final MergeOperatorOutput mergeInternal( final ByteBuffer key, final ByteBuffer existingValue, final ByteBuffer[] operand) { List operandList = Arrays.stream(operand) .map(ByteBuffer::asReadOnlyBuffer) - .collect(Collectors.toUnmodifiableList()); + .collect(Collectors.toList()); return fullMergeV2(key.asReadOnlyBuffer(), existingValue.asReadOnlyBuffer(), operandList); } From 04c6e717da0def5170c129700e86ab1b9c50abca Mon Sep 17 00:00:00 2001 From: Radek Hubner Date: Wed, 6 Dec 2023 16:26:06 +0400 Subject: [PATCH 05/11] Fix formating. --- java/src/main/java/org/rocksdb/MergeOperatorV2.java | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/java/src/main/java/org/rocksdb/MergeOperatorV2.java b/java/src/main/java/org/rocksdb/MergeOperatorV2.java index 8313911bf99..48fb8de765d 100644 --- a/java/src/main/java/org/rocksdb/MergeOperatorV2.java +++ b/java/src/main/java/org/rocksdb/MergeOperatorV2.java @@ -3,7 +3,6 @@ import java.nio.ByteBuffer; import java.util.Arrays; import java.util.List; -import java.util.Optional; import java.util.stream.Collectors; public abstract class MergeOperatorV2 extends RocksCallbackObject implements MergeOperator { @@ -29,9 +28,8 @@ public abstract MergeOperatorOutput fullMergeV2( final MergeOperatorOutput mergeInternal( final ByteBuffer key, final ByteBuffer existingValue, final ByteBuffer[] operand) { - List operandList = Arrays.stream(operand) - .map(ByteBuffer::asReadOnlyBuffer) - .collect(Collectors.toList()); + List operandList = + Arrays.stream(operand).map(ByteBuffer::asReadOnlyBuffer).collect(Collectors.toList()); return fullMergeV2(key.asReadOnlyBuffer(), existingValue.asReadOnlyBuffer(), operandList); } From 22ca5d5868f566003ae68893e234ef5f24300d8c Mon Sep 17 00:00:00 2001 From: Radek Hubner Date: Wed, 6 Dec 2023 16:54:22 +0400 Subject: [PATCH 06/11] Fix PMD errors. --- java/src/main/java/org/rocksdb/MergeOperator.java | 2 +- java/src/main/java/org/rocksdb/MergeOperatorOutput.java | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/java/src/main/java/org/rocksdb/MergeOperator.java b/java/src/main/java/org/rocksdb/MergeOperator.java index 3158857da94..7c75d1477bf 100644 --- a/java/src/main/java/org/rocksdb/MergeOperator.java +++ b/java/src/main/java/org/rocksdb/MergeOperator.java @@ -1,5 +1,5 @@ package org.rocksdb; public interface MergeOperator { - public long nativeHandler(); + long nativeHandler(); } diff --git a/java/src/main/java/org/rocksdb/MergeOperatorOutput.java b/java/src/main/java/org/rocksdb/MergeOperatorOutput.java index 796d31752f1..965f2323ba6 100644 --- a/java/src/main/java/org/rocksdb/MergeOperatorOutput.java +++ b/java/src/main/java/org/rocksdb/MergeOperatorOutput.java @@ -42,6 +42,7 @@ public OpFailureScope getOp_failure_scope() { /** * For JNI. Called from JniMergeOperatorV2 */ + @SuppressWarnings("PMD.UnusedPrivateMethod") private int getOpStatus() { return this.op_failure_scope.status; } From aa1dc704d012142f97aaebe82afd8b9cd6278b35 Mon Sep 17 00:00:00 2001 From: Radek Hubner Date: Thu, 7 Dec 2023 12:21:31 +0400 Subject: [PATCH 07/11] Address PR comments. --- .../java/org/rocksdb/ColumnFamilyOptions.java | 2 +- .../java/org/rocksdb/InBuiltMergeOperator.java | 4 ---- java/src/main/java/org/rocksdb/MergeOperator.java | 2 +- .../main/java/org/rocksdb/MergeOperatorV2.java | 15 +++++++-------- java/src/main/java/org/rocksdb/Options.java | 2 +- .../java/org/rocksdb/MergeOperatorV2Test.java | 5 +++-- 6 files changed, 13 insertions(+), 17 deletions(-) diff --git a/java/src/main/java/org/rocksdb/ColumnFamilyOptions.java b/java/src/main/java/org/rocksdb/ColumnFamilyOptions.java index e55ab3908b3..f2b92f55875 100644 --- a/java/src/main/java/org/rocksdb/ColumnFamilyOptions.java +++ b/java/src/main/java/org/rocksdb/ColumnFamilyOptions.java @@ -221,7 +221,7 @@ public ColumnFamilyOptions setMergeOperatorName(final String name) { @Override public ColumnFamilyOptions setMergeOperator( final MergeOperator mergeOperator) { - setMergeOperator(nativeHandle_, mergeOperator.nativeHandler()); + setMergeOperator(nativeHandle_, mergeOperator.getNativeHandle()); return this; } diff --git a/java/src/main/java/org/rocksdb/InBuiltMergeOperator.java b/java/src/main/java/org/rocksdb/InBuiltMergeOperator.java index 23036cde305..c14fbd1c63f 100644 --- a/java/src/main/java/org/rocksdb/InBuiltMergeOperator.java +++ b/java/src/main/java/org/rocksdb/InBuiltMergeOperator.java @@ -16,10 +16,6 @@ protected InBuiltMergeOperator(final long nativeHandle) { super(nativeHandle); } - @Override - public long nativeHandler() { - return nativeHandle_; - } } // diff --git a/java/src/main/java/org/rocksdb/MergeOperator.java b/java/src/main/java/org/rocksdb/MergeOperator.java index 7c75d1477bf..391be51ff82 100644 --- a/java/src/main/java/org/rocksdb/MergeOperator.java +++ b/java/src/main/java/org/rocksdb/MergeOperator.java @@ -1,5 +1,5 @@ package org.rocksdb; public interface MergeOperator { - long nativeHandler(); + long getNativeHandle(); } diff --git a/java/src/main/java/org/rocksdb/MergeOperatorV2.java b/java/src/main/java/org/rocksdb/MergeOperatorV2.java index 48fb8de765d..a2872decf74 100644 --- a/java/src/main/java/org/rocksdb/MergeOperatorV2.java +++ b/java/src/main/java/org/rocksdb/MergeOperatorV2.java @@ -14,14 +14,13 @@ public MergeOperatorV2(final String operatorName) { } /** - * All parameters of this method are valid only during the call. If you want to keep this, you - * need to make deep copy/clone. + * This is the callback method which supplies the values to be merged for the provided key. + * The parameters are only valid until the method returns, and must be deep copied if you wish to retain them. * - * - * @param key - * @param existingValue - * @param operand - * @return + * @param key The key associated with the merge operation. + * @param existingValue The existing value of the current key, null means that the value doesn't exist. + * @param operand A list of operands to apply + * @return The method should return the result of merging the supplied values with the implemented merge algorithm. */ public abstract MergeOperatorOutput fullMergeV2( final ByteBuffer key, final ByteBuffer existingValue, final List operand); @@ -35,7 +34,7 @@ final MergeOperatorOutput mergeInternal( } @Override - final public long nativeHandler() { + public long getNativeHandle() { return nativeHandle_; } diff --git a/java/src/main/java/org/rocksdb/Options.java b/java/src/main/java/org/rocksdb/Options.java index 3efc91d581c..5beef2b554a 100644 --- a/java/src/main/java/org/rocksdb/Options.java +++ b/java/src/main/java/org/rocksdb/Options.java @@ -239,7 +239,7 @@ public Options setMergeOperatorName(final String name) { @Override public Options setMergeOperator(final MergeOperator mergeOperator) { - setMergeOperator(nativeHandle_, mergeOperator.nativeHandler()); + setMergeOperator(nativeHandle_, mergeOperator.getNativeHandle()); return this; } diff --git a/java/src/test/java/org/rocksdb/MergeOperatorV2Test.java b/java/src/test/java/org/rocksdb/MergeOperatorV2Test.java index e03a44118f5..b0636b5564d 100644 --- a/java/src/test/java/org/rocksdb/MergeOperatorV2Test.java +++ b/java/src/test/java/org/rocksdb/MergeOperatorV2Test.java @@ -1,6 +1,7 @@ package org.rocksdb; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.fail; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; @@ -111,7 +112,7 @@ public MergeOperatorOutput fullMergeV2( db.merge(KEY, "value2".getBytes()); db.merge(KEY, "value3".getBytes()); byte[] valueFromDb = db.get(KEY); - assertThat(valueFromDb).containsExactly("value2".getBytes(StandardCharsets.UTF_8)); + fail("We should never reach this."); } } } @@ -136,7 +137,7 @@ public MergeOperatorOutput fullMergeV2( db.merge(KEY, "value2".getBytes()); db.merge(KEY, "value3".getBytes()); byte[] valueFromDb = db.get(KEY); - assertThat(valueFromDb).containsExactly("value2".getBytes(StandardCharsets.UTF_8)); + fail("We should never reach this."); } } } From a566c3ed2fc164761ba5a989bc3aa2c90244ca49 Mon Sep 17 00:00:00 2001 From: Radek Hubner Date: Thu, 7 Dec 2023 12:23:18 +0400 Subject: [PATCH 08/11] Fix format. --- .../java/org/rocksdb/InBuiltMergeOperator.java | 1 - .../main/java/org/rocksdb/MergeOperatorV2.java | 9 ++++++--- .../java/org/rocksdb/UInt64AddOperator.java | 18 +++++++++--------- 3 files changed, 15 insertions(+), 13 deletions(-) diff --git a/java/src/main/java/org/rocksdb/InBuiltMergeOperator.java b/java/src/main/java/org/rocksdb/InBuiltMergeOperator.java index c14fbd1c63f..93d1c26e510 100644 --- a/java/src/main/java/org/rocksdb/InBuiltMergeOperator.java +++ b/java/src/main/java/org/rocksdb/InBuiltMergeOperator.java @@ -15,7 +15,6 @@ public abstract class InBuiltMergeOperator extends RocksObject implements MergeO protected InBuiltMergeOperator(final long nativeHandle) { super(nativeHandle); } - } // diff --git a/java/src/main/java/org/rocksdb/MergeOperatorV2.java b/java/src/main/java/org/rocksdb/MergeOperatorV2.java index a2872decf74..fa98185ca1f 100644 --- a/java/src/main/java/org/rocksdb/MergeOperatorV2.java +++ b/java/src/main/java/org/rocksdb/MergeOperatorV2.java @@ -15,12 +15,15 @@ public MergeOperatorV2(final String operatorName) { /** * This is the callback method which supplies the values to be merged for the provided key. - * The parameters are only valid until the method returns, and must be deep copied if you wish to retain them. + * The parameters are only valid until the method returns, and must be deep copied if you wish to + * retain them. * * @param key The key associated with the merge operation. - * @param existingValue The existing value of the current key, null means that the value doesn't exist. + * @param existingValue The existing value of the current key, null means that the value doesn't + * exist. * @param operand A list of operands to apply - * @return The method should return the result of merging the supplied values with the implemented merge algorithm. + * @return The method should return the result of merging the supplied values with the implemented + * merge algorithm. */ public abstract MergeOperatorOutput fullMergeV2( final ByteBuffer key, final ByteBuffer existingValue, final List operand); diff --git a/java/src/main/java/org/rocksdb/UInt64AddOperator.java b/java/src/main/java/org/rocksdb/UInt64AddOperator.java index ce15569719c..0065fd14a00 100644 --- a/java/src/main/java/org/rocksdb/UInt64AddOperator.java +++ b/java/src/main/java/org/rocksdb/UInt64AddOperator.java @@ -10,14 +10,14 @@ * integer value. */ public class UInt64AddOperator extends InBuiltMergeOperator { - public UInt64AddOperator() { - super(newSharedUInt64AddOperator()); - } + public UInt64AddOperator() { + super(newSharedUInt64AddOperator()); + } - private static native long newSharedUInt64AddOperator(); - @Override - protected final void disposeInternal(final long handle) { - disposeInternalJni(handle); - } - private static native void disposeInternalJni(final long handle); + private static native long newSharedUInt64AddOperator(); + @Override + protected final void disposeInternal(final long handle) { + disposeInternalJni(handle); + } + private static native void disposeInternalJni(final long handle); } From 4545b74e4aac40cefd4670a1b72a499bb6eec7df Mon Sep 17 00:00:00 2001 From: Radek Hubner Date: Wed, 19 Jun 2024 15:55:27 +0200 Subject: [PATCH 09/11] Fix test in CMake build. --- java/CMakeLists.txt | 2 ++ 1 file changed, 2 insertions(+) diff --git a/java/CMakeLists.txt b/java/CMakeLists.txt index ba50fda2a83..6e5e630e973 100644 --- a/java/CMakeLists.txt +++ b/java/CMakeLists.txt @@ -360,6 +360,7 @@ set(JAVA_TEST_CLASSES src/test/java/org/rocksdb/MemTableTest.java src/test/java/org/rocksdb/MemoryUtilTest.java src/test/java/org/rocksdb/MergeCFVariantsTest.java + src/test/java/org/rocksdb/MergeOperatorV2Test.java src/test/java/org/rocksdb/MergeTest.java src/test/java/org/rocksdb/MergeVariantsTest.java src/test/java/org/rocksdb/MixedOptionsTest.java @@ -478,6 +479,7 @@ set(JAVA_TEST_RUNNING_CLASSES org.rocksdb.MemTableTest org.rocksdb.MemoryUtilTest org.rocksdb.MergeCFVariantsTest + org.rocksdb.MergeOperatorV2Test org.rocksdb.MergeTest org.rocksdb.MergeVariantsTest org.rocksdb.MixedOptionsTest From 383ce59798af571f8f832d3c0cbf15659ee51f32 Mon Sep 17 00:00:00 2001 From: Radek Hubner Date: Tue, 2 Jul 2024 16:28:21 +0200 Subject: [PATCH 10/11] Throw exception on unsuccessfull NewGlobalRef call. --- java/rocksjni/jni_merge_operator_v2.cc | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/java/rocksjni/jni_merge_operator_v2.cc b/java/rocksjni/jni_merge_operator_v2.cc index d0cdf2cdb3b..cacfaf5c9ed 100644 --- a/java/rocksjni/jni_merge_operator_v2.cc +++ b/java/rocksjni/jni_merge_operator_v2.cc @@ -57,7 +57,11 @@ JniMergeOperatorV2::JniMergeOperatorV2(JNIEnv* env, jobject java_merge_operator, } j_merge_class = static_cast(env->NewGlobalRef(j_merge_class)); if (j_merge_class == nullptr) { - return; // Exception + if(env->ExceptionCheck() == JNI_FALSE) { + RocksDBExceptionJni::ThrowNew( + env, "Unable to obtain GlobalRef for merge operator"); + } + return; } j_merge_internal = From a6c5c602006a4ae8e1eb6dd16910b982fa482aeb Mon Sep 17 00:00:00 2001 From: Radek Hubner Date: Tue, 2 Jul 2024 16:43:44 +0200 Subject: [PATCH 11/11] Fix format --- java/rocksjni/jni_merge_operator_v2.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/java/rocksjni/jni_merge_operator_v2.cc b/java/rocksjni/jni_merge_operator_v2.cc index cacfaf5c9ed..c0d8f67b82e 100644 --- a/java/rocksjni/jni_merge_operator_v2.cc +++ b/java/rocksjni/jni_merge_operator_v2.cc @@ -57,7 +57,7 @@ JniMergeOperatorV2::JniMergeOperatorV2(JNIEnv* env, jobject java_merge_operator, } j_merge_class = static_cast(env->NewGlobalRef(j_merge_class)); if (j_merge_class == nullptr) { - if(env->ExceptionCheck() == JNI_FALSE) { + if (env->ExceptionCheck() == JNI_FALSE) { RocksDBExceptionJni::ThrowNew( env, "Unable to obtain GlobalRef for merge operator"); }