Skip to content

Commit

Permalink
Add Iterator::Refresh(Snapshot*) to RocksJava (#12145)
Browse files Browse the repository at this point in the history
Summary:
Adds the API to RocksJava.
Also improves the C++ doc for Iterator::Refresh(Snapshot*)
Closes #12095

Pull Request resolved: #12145

Reviewed By: hx235

Differential Revision: D52266452

Pulled By: ajkr

fbshipit-source-id: 6b72b41672081b966b0c5dd07d9bf151ed009122
  • Loading branch information
adamretter authored and facebook-github-bot committed Dec 21, 2023
1 parent 7b24dec commit d8c1ab8
Show file tree
Hide file tree
Showing 11 changed files with 151 additions and 4 deletions.
3 changes: 3 additions & 0 deletions include/rocksdb/iterator.h
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,9 @@ class Iterator : public Cleanable {
// Regardless of whether the iterator was created/refreshed previously
// with or without a snapshot, the iterator will be reading the
// latest DB state after this call.
// Note that you will need to call a Seek*() function to get the iterator
// back into a valid state before calling a function that assumes the
// state is already valid, like Next().
virtual Status Refresh() { return Refresh(nullptr); }

// Similar to Refresh() but the iterator will be reading the latest DB state
Expand Down
20 changes: 20 additions & 0 deletions java/rocksjni/iterator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,26 @@ void Java_org_rocksdb_RocksIterator_refresh0(JNIEnv* env, jobject /*jobj*/,
ROCKSDB_NAMESPACE::RocksDBExceptionJni::ThrowNew(env, s);
}

/*
* Class: org_rocksdb_RocksIterator
* Method: refresh1
* Signature: (JJ)V
*/
void Java_org_rocksdb_RocksIterator_refresh1(JNIEnv* env, jobject /*jobj*/,
jlong handle,
jlong snapshot_handle) {
auto* it = reinterpret_cast<ROCKSDB_NAMESPACE::Iterator*>(handle);
auto* snapshot =
reinterpret_cast<ROCKSDB_NAMESPACE::Snapshot*>(snapshot_handle);
ROCKSDB_NAMESPACE::Status s = it->Refresh(snapshot);

if (s.ok()) {
return;
}

ROCKSDB_NAMESPACE::RocksDBExceptionJni::ThrowNew(env, s);
}

/*
* Class: org_rocksdb_RocksIterator
* Method: seek0
Expand Down
21 changes: 21 additions & 0 deletions java/rocksjni/sst_file_reader_iterator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -371,3 +371,24 @@ void Java_org_rocksdb_SstFileReaderIterator_refresh0(JNIEnv* env,

ROCKSDB_NAMESPACE::RocksDBExceptionJni::ThrowNew(env, s);
}

/*
* Class: org_rocksdb_SstFileReaderIterator
* Method: refresh1
* Signature: (JJ)V
*/
void Java_org_rocksdb_SstFileReaderIterator_refresh1(JNIEnv* env,
jobject /*jobj*/,
jlong handle,
jlong snapshot_handle) {
auto* it = reinterpret_cast<ROCKSDB_NAMESPACE::Iterator*>(handle);
auto* snapshot =
reinterpret_cast<ROCKSDB_NAMESPACE::Snapshot*>(snapshot_handle);
ROCKSDB_NAMESPACE::Status s = it->Refresh(snapshot);

if (s.ok()) {
return;
}

ROCKSDB_NAMESPACE::RocksDBExceptionJni::ThrowNew(env, s);
}
16 changes: 15 additions & 1 deletion java/rocksjni/write_batch_with_index.cc
Original file line number Diff line number Diff line change
Expand Up @@ -946,8 +946,22 @@ jlongArray Java_org_rocksdb_WBWIRocksIterator_entry1(JNIEnv* env,
* Method: refresh0
* Signature: (J)V
*/
void Java_org_rocksdb_WBWIRocksIterator_refresh0(JNIEnv* env) {
void Java_org_rocksdb_WBWIRocksIterator_refresh0(JNIEnv* env, jobject /*jobj*/,
jlong /*handle*/) {
ROCKSDB_NAMESPACE::Status s =
ROCKSDB_NAMESPACE::Status::NotSupported("Refresh() is not supported");
ROCKSDB_NAMESPACE::RocksDBExceptionJni::ThrowNew(env, s);
}

/*
* Class: org_rocksdb_WBWIRocksIterator
* Method: refresh1
* Signature: (JJ)V
*/
void Java_org_rocksdb_WBWIRocksIterator_refresh1(JNIEnv* env, jobject /*jobj*/,
jlong /*handle*/,
jlong /*snapshot_handle*/) {
ROCKSDB_NAMESPACE::Status s = ROCKSDB_NAMESPACE::Status::NotSupported(
"Refresh(Snapshot*) is not supported");
ROCKSDB_NAMESPACE::RocksDBExceptionJni::ThrowNew(env, s);
}
7 changes: 7 additions & 0 deletions java/src/main/java/org/rocksdb/AbstractRocksIterator.java
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,12 @@ public void refresh() throws RocksDBException {
refresh0(nativeHandle_);
}

@Override
public void refresh(final Snapshot snapshot) throws RocksDBException {
assert (isOwningHandle());
refresh1(nativeHandle_, snapshot.getNativeHandle());
}

@Override
public void status() throws RocksDBException {
assert (isOwningHandle());
Expand Down Expand Up @@ -135,6 +141,7 @@ protected void disposeInternal() {
abstract void next0(long handle);
abstract void prev0(long handle);
abstract void refresh0(long handle) throws RocksDBException;
abstract void refresh1(long handle, long snapshotHandle) throws RocksDBException;
abstract void seek0(long handle, byte[] target, int targetLen);
abstract void seekForPrev0(long handle, byte[] target, int targetLen);
abstract void seekDirect0(long handle, ByteBuffer target, int targetOffset, int targetLen);
Expand Down
1 change: 1 addition & 0 deletions java/src/main/java/org/rocksdb/RocksIterator.java
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@ public int value(final byte[] value, final int offset, final int len) {
@Override final native void next0(long handle);
@Override final native void prev0(long handle);
@Override final native void refresh0(long handle);
@Override final native void refresh1(long handle, long snapshotHandle);
@Override final native void seek0(long handle, byte[] target, int targetLen);
@Override final native void seekForPrev0(long handle, byte[] target, int targetLen);
@Override
Expand Down
17 changes: 14 additions & 3 deletions java/src/main/java/org/rocksdb/RocksIteratorInterface.java
Original file line number Diff line number Diff line change
Expand Up @@ -116,12 +116,23 @@ public interface RocksIteratorInterface {
void status() throws RocksDBException;

/**
* <p>If supported, renew the iterator to represent the latest state. The iterator will be
* invalidated after the call. Not supported if {@link ReadOptions#setSnapshot(Snapshot)} was
* specified when creating the iterator.</p>
* <p>If supported, the DB state that the iterator reads from is updated to
* the latest state. The iterator will be invalidated after the call.
* Regardless of whether the iterator was created/refreshed previously with
* or without a snapshot, the iterator will be reading the latest DB state
* after this call.</p>
* <p>Note that you will need to call a Seek*() function to get the iterator
* back into a valid state before calling a function that assumes the
* state is already valid, like Next().</p>
*
* @throws RocksDBException thrown if the operation is not supported or an error happens in the
* underlying native library
*/
void refresh() throws RocksDBException;

/**
* Similar to {@link #refresh()} but the iterator will be reading the latest DB state under the
* given snapshot.
*/
void refresh(Snapshot snapshot) throws RocksDBException;
}
1 change: 1 addition & 0 deletions java/src/main/java/org/rocksdb/SstFileReaderIterator.java
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ public int value(final ByteBuffer value) {
@Override final native void next0(long handle);
@Override final native void prev0(long handle);
@Override final native void refresh0(long handle) throws RocksDBException;
@Override final native void refresh1(long handle, long snapshotHandle);
@Override final native void seek0(long handle, byte[] target, int targetLen);
@Override final native void seekForPrev0(long handle, byte[] target, int targetLen);
@Override final native void status0(long handle) throws RocksDBException;
Expand Down
1 change: 1 addition & 0 deletions java/src/main/java/org/rocksdb/WBWIRocksIterator.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ public WriteEntry entry() {
@Override final native void next0(long handle);
@Override final native void prev0(long handle);
@Override final native void refresh0(final long handle) throws RocksDBException;
@Override final native void refresh1(long handle, long snapshotHandle);
@Override final native void seek0(long handle, byte[] target, int targetLen);
@Override final native void seekForPrev0(long handle, byte[] target, int targetLen);
@Override final native void status0(long handle) throws RocksDBException;
Expand Down
63 changes: 63 additions & 0 deletions java/src/test/java/org/rocksdb/RocksIteratorTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,69 @@ public void rocksIteratorSeekAndInsert() throws RocksDBException {
}
}

@Test
public void rocksIteratorSeekAndInsertOnSnapshot() throws RocksDBException {
try (final Options options =
new Options().setCreateIfMissing(true).setCreateMissingColumnFamilies(true);
final RocksDB db = RocksDB.open(options, dbFolder.getRoot().getAbsolutePath())) {
db.put("key1".getBytes(), "value1".getBytes());
db.put("key2".getBytes(), "value2".getBytes());

try (final Snapshot snapshot = db.getSnapshot()) {
try (final RocksIterator iterator = db.newIterator()) {
// check for just keys 1 and 2
iterator.seek("key0".getBytes());
assertThat(iterator.isValid()).isTrue();
assertThat(iterator.key()).isEqualTo("key1".getBytes());

iterator.seek("key2".getBytes());
assertThat(iterator.isValid()).isTrue();
assertThat(iterator.key()).isEqualTo("key2".getBytes());

iterator.seek("key3".getBytes());
assertThat(iterator.isValid()).isFalse();
}

// add a new key (after the snapshot was taken)
db.put("key3".getBytes(), "value3".getBytes());

try (final RocksIterator iterator = db.newIterator()) {
// check for keys 1, 2, and 3
iterator.seek("key0".getBytes());
assertThat(iterator.isValid()).isTrue();
assertThat(iterator.key()).isEqualTo("key1".getBytes());

iterator.seek("key2".getBytes());
assertThat(iterator.isValid()).isTrue();
assertThat(iterator.key()).isEqualTo("key2".getBytes());

iterator.seek("key3".getBytes());
assertThat(iterator.isValid()).isTrue();
assertThat(iterator.key()).isEqualTo("key3".getBytes());

iterator.seek("key4".getBytes());
assertThat(iterator.isValid()).isFalse();

// reset iterator to snapshot, iterator should now only see keys
// there were present in the db when the snapshot was taken
iterator.refresh(snapshot);

// again check for just keys 1 and 2
iterator.seek("key0".getBytes());
assertThat(iterator.isValid()).isTrue();
assertThat(iterator.key()).isEqualTo("key1".getBytes());

iterator.seek("key2".getBytes());
assertThat(iterator.isValid()).isTrue();
assertThat(iterator.key()).isEqualTo("key2".getBytes());

iterator.seek("key3".getBytes());
assertThat(iterator.isValid()).isFalse();
}
}
}
}

@Test
public void rocksIteratorReleaseAfterCfClose() throws RocksDBException {
try (final Options options = new Options()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -484,6 +484,11 @@ public void refresh() throws RocksDBException {
offset = -1;
}

@Override
public void refresh(final Snapshot snapshot) throws RocksDBException {
offset = -1;
}

@Override
public void status() throws RocksDBException {
if(offset < 0 || offset >= entries.size()) {
Expand Down

0 comments on commit d8c1ab8

Please sign in to comment.