Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Java API for Merge Operator v2 #12122

Open
wants to merge 11 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions java/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ set(JNI_NATIVE_SOURCES
rocksjni/import_column_family_options.cc
rocksjni/ingest_external_file_options.cc
rocksjni/iterator.cc
rocksjni/jni_merge_operator_v2.cc
rocksjni/jni_multiget_helpers.cc
rocksjni/jni_perf_context.cc
rocksjni/jnicallback.cc
Expand Down Expand Up @@ -179,6 +180,8 @@ set(JAVA_MAIN_CLASSES
src/main/java/org/rocksdb/HistogramData.java
src/main/java/org/rocksdb/HistogramType.java
src/main/java/org/rocksdb/Holder.java
src/main/java/org/rocksdb/ImportColumnFamilyOptions.java
src/main/java/org/rocksdb/InBuiltMergeOperator.java
src/main/java/org/rocksdb/HyperClockCache.java
src/main/java/org/rocksdb/ImportColumnFamilyOptions.java
src/main/java/org/rocksdb/IndexShorteningMode.java
Expand All @@ -198,6 +201,8 @@ set(JAVA_MAIN_CLASSES
src/main/java/org/rocksdb/MemoryUsageType.java
src/main/java/org/rocksdb/MemoryUtil.java
src/main/java/org/rocksdb/MergeOperator.java
src/main/java/org/rocksdb/MergeOperatorV2.java
src/main/java/org/rocksdb/MergeOperatorOutput.java
src/main/java/org/rocksdb/MutableColumnFamilyOptions.java
src/main/java/org/rocksdb/MutableColumnFamilyOptionsInterface.java
src/main/java/org/rocksdb/MutableDBOptions.java
Expand Down Expand Up @@ -355,6 +360,7 @@ set(JAVA_TEST_CLASSES
src/test/java/org/rocksdb/MemTableTest.java
src/test/java/org/rocksdb/MemoryUtilTest.java
src/test/java/org/rocksdb/MergeCFVariantsTest.java
src/test/java/org/rocksdb/MergeOperatorV2Test.java
src/test/java/org/rocksdb/MergeTest.java
src/test/java/org/rocksdb/MergeVariantsTest.java
src/test/java/org/rocksdb/MixedOptionsTest.java
Expand Down Expand Up @@ -473,6 +479,7 @@ set(JAVA_TEST_RUNNING_CLASSES
org.rocksdb.MemTableTest
org.rocksdb.MemoryUtilTest
org.rocksdb.MergeCFVariantsTest
org.rocksdb.MergeOperatorV2Test
org.rocksdb.MergeTest
org.rocksdb.MergeVariantsTest
org.rocksdb.MixedOptionsTest
Expand Down
236 changes: 236 additions & 0 deletions java/rocksjni/jni_merge_operator_v2.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,236 @@
//
// Created by rhubner on 29-Nov-23.
//

#include "jni_merge_operator_v2.h"

#include "include/org_rocksdb_MergeOperatorV2.h"
#include "rocksjni/cplusplus_to_java_convert.h"
#include "rocksjni/portal.h"

jlong Java_org_rocksdb_MergeOperatorV2_toCString(JNIEnv* env, jclass,
jstring operator_name) {
auto operator_name_utf = env->GetStringUTFChars(operator_name, nullptr);
if (operator_name_utf == nullptr) {
return 0; // Exception
}
auto operator_name_len = env->GetStringUTFLength(operator_name);

char* ret_value = new char[operator_name_len + 1];
memcpy(ret_value, operator_name_utf, operator_name_len + 1);

env->ReleaseStringUTFChars(operator_name, operator_name_utf);

return GET_CPLUSPLUS_POINTER(ret_value);
}

jlong Java_org_rocksdb_MergeOperatorV2_newMergeOperator(
JNIEnv* env, jobject java_merge_operator, jlong _operator_name) {
char* operator_name = reinterpret_cast<char*>(_operator_name);

auto* jni_merge_operator =
new std::shared_ptr<ROCKSDB_NAMESPACE::MergeOperator>(
rhubner marked this conversation as resolved.
Show resolved Hide resolved
new rocksdb::JniMergeOperatorV2(env, java_merge_operator,
operator_name));

return GET_CPLUSPLUS_POINTER(jni_merge_operator);
}

void Java_org_rocksdb_MergeOperatorV2_disposeInternal(JNIEnv*, jclass,
jlong j_handle) {
auto* jni_merge_operator =
reinterpret_cast<std::shared_ptr<ROCKSDB_NAMESPACE::MergeOperator>*>(
j_handle);
delete jni_merge_operator;
}

namespace ROCKSDB_NAMESPACE {

JniMergeOperatorV2::JniMergeOperatorV2(JNIEnv* env, jobject java_merge_operator,
char* _operator_name)
: JniCallback(env, java_merge_operator) {
operator_name = _operator_name;

j_merge_class = env->GetObjectClass(java_merge_operator);
if (j_merge_class == nullptr) {
return; // Exception
}
j_merge_class = static_cast<jclass>(env->NewGlobalRef(j_merge_class));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The JNI manual says NewGlobalRef() can return nullptr WITHOUT an exception, e.g. if the system is out of memory. So we should throw a Java exception here. Can you also check the other JNI methods you call, to confirm that they always set an exception on nullptr, and if they don't we should set one in each case.

Then fix the comments to say "XYZException has been thrown", or whatever is appropriate.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I check existing code and we don't throw exceptions when we can't create global references.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Really ? That seems wrong, and may be an omission. If NewGlobalRef() is returning null, surely the right thing to do is to throw an exception, unless we somehow cope with j_merge_class as null ? So let us do it correctly here for now.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@alanpaxton Fixed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. I'm happy.

if (j_merge_class == nullptr) {
if (env->ExceptionCheck() == JNI_FALSE) {
RocksDBExceptionJni::ThrowNew(
env, "Unable to obtain GlobalRef for merge operator");
}
return;
}

j_merge_internal =
env->GetMethodID(j_merge_class, "mergeInternal",
"(Ljava/nio/ByteBuffer;Ljava/nio/ByteBuffer;[Ljava/nio/"
"ByteBuffer;)Lorg/rocksdb/MergeOperatorOutput;");
if (j_merge_internal == nullptr) {
return;
}

return_value_clazz = env->FindClass("org/rocksdb/MergeOperatorOutput");
if (return_value_clazz == nullptr) {
return; // Exception
}
return_value_clazz =
static_cast<jclass>(env->NewGlobalRef(return_value_clazz));
if (return_value_clazz == nullptr) {
return; // Exception
}

return_value_method = env->GetMethodID(return_value_clazz, "getDirectValue",
"()Ljava/nio/ByteBuffer;");
if (return_value_method == nullptr) {
return;
}

return_status_method =
env->GetMethodID(return_value_clazz, "getOpStatus", "()I");
if (return_status_method == nullptr) {
return;
}

j_byte_buffer_class = ByteBufferJni::getJClass(env);
if (j_byte_buffer_class == nullptr) {
return;
}
j_byte_buffer_class =
static_cast<jclass>(env->NewGlobalRef(j_byte_buffer_class));
if (j_byte_buffer_class == nullptr) {
return; // Exception
}

byte_buffer_position =
env->GetMethodID(j_byte_buffer_class, "position", "()I");
if (byte_buffer_position == nullptr) {
return;
}

byte_buffer_remaining =
env->GetMethodID(j_byte_buffer_class, "remaining", "()I");
if (byte_buffer_remaining == nullptr) {
return;
}

return;
}

bool JniMergeOperatorV2::FullMergeV2(const MergeOperationInput& merge_in,
MergeOperationOutput* merge_out) const {
jboolean attached_thread = JNI_FALSE;
auto env = getJniEnv(&attached_thread);

auto j_operand_list =
env->NewObjectArray(static_cast<jsize>(merge_in.operand_list.size()),
j_byte_buffer_class, nullptr);
if (j_operand_list == nullptr) {
return clean_and_return_error(attached_thread, merge_out);
}

for (auto i = 0u; i < merge_in.operand_list.size(); i++) {
auto operand = merge_in.operand_list[i];
auto byte_buffer = env->NewDirectByteBuffer(
const_cast<void*>(reinterpret_cast<const void*>(operand.data())),
operand.size());

if (byte_buffer == nullptr) {
return clean_and_return_error(attached_thread, merge_out);
}
env->SetObjectArrayElement(j_operand_list, i, byte_buffer);
}

auto key = env->NewDirectByteBuffer(
const_cast<void*>(reinterpret_cast<const void*>(merge_in.key.data())),
merge_in.key.size());
if (key == nullptr) {
return clean_and_return_error(attached_thread, merge_out);
}

jobject exising_value = nullptr;
if (merge_in.existing_value != nullptr) {
exising_value = env->NewDirectByteBuffer(
const_cast<void*>(
reinterpret_cast<const void*>(merge_in.existing_value->data())),
merge_in.existing_value->size());
}

jobject result = env->CallObjectMethod(m_jcallback_obj, j_merge_internal, key,
exising_value, j_operand_list);
if (env->ExceptionCheck() == JNI_TRUE) {
env->ExceptionClear();
Error(merge_in.logger, "Unable to merge, Java code throw exception");
return clean_and_return_error(attached_thread, merge_out);
}

if (result == nullptr) {
Error(merge_in.logger, "Unable to merge, Java code return nullptr result");
return clean_and_return_error(attached_thread, merge_out);
}

merge_out->op_failure_scope =
javaToOpFailureScope(env->CallIntMethod(result, return_status_method));
if (merge_out->op_failure_scope != MergeOperator::OpFailureScope::kDefault) {
releaseJniEnv(attached_thread);
return false;
}

auto result_byte_buff = env->CallObjectMethod(result, return_value_method);
if (result_byte_buff == nullptr) {
Error(merge_in.logger,
"Unable to merge, Java code return nullptr ByteBuffer");
return clean_and_return_error(attached_thread, merge_out);
}

auto result_byte_buff_data = env->GetDirectBufferAddress(result_byte_buff);

auto position = env->CallIntMethod(result_byte_buff, byte_buffer_position);
auto remaining = env->CallIntMethod(result_byte_buff, byte_buffer_remaining);

merge_out->new_value.assign(
static_cast<char*>(result_byte_buff_data) + position, remaining);

releaseJniEnv(attached_thread);

return true;
}

JniMergeOperatorV2::~JniMergeOperatorV2() {
jboolean attached_thread = JNI_FALSE;
auto env = getJniEnv(&attached_thread);
env->DeleteGlobalRef(j_merge_class);
env->DeleteGlobalRef(j_byte_buffer_class);
env->DeleteGlobalRef(return_value_clazz);
delete operator_name;
releaseJniEnv(attached_thread);
}

bool JniMergeOperatorV2::clean_and_return_error(
jboolean& attached_thread, MergeOperationOutput* merge_out) const {
merge_out->op_failure_scope =
MergeOperator::OpFailureScope::kOpFailureScopeMax;
releaseJniEnv(attached_thread);
return false;
}

MergeOperator::OpFailureScope JniMergeOperatorV2::javaToOpFailureScope(
jint failure) const {
switch (failure) {
case 0:
return MergeOperator::OpFailureScope::kDefault;
case 1:
return MergeOperator::OpFailureScope::kTryMerge;
case 2:
return MergeOperator::OpFailureScope::kMustMerge;
case 3:
return MergeOperator::OpFailureScope::kOpFailureScopeMax;
default:
return MergeOperator::OpFailureScope::kOpFailureScopeMax;
}
}

const char* JniMergeOperatorV2::Name() const { return operator_name; }
} // namespace ROCKSDB_NAMESPACE
39 changes: 39 additions & 0 deletions java/rocksjni/jni_merge_operator_v2.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
//
// Created by rhubner on 29-Nov-23.
//
#ifndef ROCKSDB_JNI_MERGE_OPERATOR_V2_H
#define ROCKSDB_JNI_MERGE_OPERATOR_V2_H

#include <jni.h>

#include "rocksdb/merge_operator.h"
#include "rocksjni/jnicallback.h"

namespace ROCKSDB_NAMESPACE {

class JniMergeOperatorV2 : public JniCallback, public MergeOperator {
public:
JniMergeOperatorV2(JNIEnv* env1, jobject java_merge_operator,
char* operator_name);
bool FullMergeV2(const MergeOperationInput& merge_in,
MergeOperationOutput* merge_out) const override;
const char* Name() const override;
~JniMergeOperatorV2() override;

private:
MergeOperator::OpFailureScope javaToOpFailureScope(jint failure) const;
bool clean_and_return_error(jboolean& attached_thread,
MergeOperationOutput* merge_out) const;
jclass j_merge_class;
jclass j_byte_buffer_class;
jmethodID j_merge_internal;
jclass return_value_clazz;
jmethodID return_value_method;
jmethodID return_status_method;
jmethodID byte_buffer_position;
jmethodID byte_buffer_remaining;
char* operator_name;
};

} // namespace ROCKSDB_NAMESPACE
#endif // ROCKSDB_JNI_MERGE_OPERATOR_V2_H
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
* CassandraValueMergeOperator is a merge operator that merges two cassandra wide column
* values.
*/
public class CassandraValueMergeOperator extends MergeOperator {
public class CassandraValueMergeOperator extends InBuiltMergeOperator {
public CassandraValueMergeOperator(final int gcGracePeriodInSeconds) {
super(newSharedCassandraValueMergeOperator(gcGracePeriodInSeconds, 0));
}
Expand Down
2 changes: 1 addition & 1 deletion java/src/main/java/org/rocksdb/ColumnFamilyOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ public ColumnFamilyOptions setMergeOperatorName(final String name) {
@Override
public ColumnFamilyOptions setMergeOperator(
final MergeOperator mergeOperator) {
setMergeOperator(nativeHandle_, mergeOperator.nativeHandle_);
setMergeOperator(nativeHandle_, mergeOperator.getNativeHandle());
return this;
}

Expand Down
25 changes: 25 additions & 0 deletions java/src/main/java/org/rocksdb/InBuiltMergeOperator.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
// Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved.
// Copyright (c) 2014, Vlad Balan ([email protected]). All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).

package org.rocksdb;

/**
* MergeOperator holds an operator to be applied when compacting
* two merge operands held under the same key in order to obtain a single
* value.
*/
public abstract class InBuiltMergeOperator extends RocksObject implements MergeOperator {
protected InBuiltMergeOperator(final long nativeHandle) {
super(nativeHandle);
}
}

//
// InBuiltMergeOperator
// interface MergeOperator
//
// interface MergeOperatorV2 extends MergeOperator
// interface MergeOperatorV3 extends MergeOperator
23 changes: 5 additions & 18 deletions java/src/main/java/org/rocksdb/MergeOperator.java
Original file line number Diff line number Diff line change
@@ -1,18 +1,5 @@
// Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved.
// Copyright (c) 2014, Vlad Balan ([email protected]). All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).

package org.rocksdb;

/**
* MergeOperator holds an operator to be applied when compacting
* two merge operands held under the same key in order to obtain a single
* value.
*/
public abstract class MergeOperator extends RocksObject {
protected MergeOperator(final long nativeHandle) {
super(nativeHandle);
}
}
package org.rocksdb;

public interface MergeOperator {
long getNativeHandle();
}
Loading
Loading