Skip to content

Commit

Permalink
Fix database open with column family. (#12167)
Browse files Browse the repository at this point in the history
Summary:
When is RocksDB is opened with Column Family descriptors, the default column family must be set properly. If it was not, then the flush operation will fail.

Pull Request resolved: #12167

Reviewed By: ajkr

Differential Revision: D53104007

Pulled By: cbi42

fbshipit-source-id: dffa8e34a4b2a438553ee4ea308f3fa2e22e46f7
  • Loading branch information
rhubner authored and facebook-github-bot committed Jan 26, 2024
1 parent 2233a2f commit f2ddb92
Show file tree
Hide file tree
Showing 3 changed files with 107 additions and 10 deletions.
13 changes: 10 additions & 3 deletions java/src/main/java/org/rocksdb/RocksDB.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ private enum LibraryState {
private ColumnFamilyHandle defaultColumnFamilyHandle_;
private final ReadOptions defaultReadOptions_ = new ReadOptions();

private final List<ColumnFamilyHandle> ownedColumnFamilyHandles = new ArrayList<>();
final List<ColumnFamilyHandle> ownedColumnFamilyHandles = new ArrayList<>();

/**
* Loads the necessary library files.
Expand Down Expand Up @@ -305,11 +305,19 @@ public static RocksDB open(final DBOptions options, final String path,

final byte[][] cfNames = new byte[columnFamilyDescriptors.size()][];
final long[] cfOptionHandles = new long[columnFamilyDescriptors.size()];
int defaultColumnFamilyIndex = -1;
for (int i = 0; i < columnFamilyDescriptors.size(); i++) {
final ColumnFamilyDescriptor cfDescriptor = columnFamilyDescriptors
.get(i);
cfNames[i] = cfDescriptor.getName();
cfOptionHandles[i] = cfDescriptor.getOptions().nativeHandle_;
if (Arrays.equals(cfDescriptor.getName(), RocksDB.DEFAULT_COLUMN_FAMILY)) {
defaultColumnFamilyIndex = i;
}
}
if (defaultColumnFamilyIndex < 0) {
new IllegalArgumentException(
"You must provide the default column family in your columnFamilyDescriptors");
}

final long[] handles = open(options.nativeHandle_, path, cfNames,
Expand All @@ -324,8 +332,7 @@ public static RocksDB open(final DBOptions options, final String path,
}

db.ownedColumnFamilyHandles.addAll(columnFamilyHandles);
db.storeDefaultColumnFamilyHandle(db.makeDefaultColumnFamilyHandle());

db.storeDefaultColumnFamilyHandle(columnFamilyHandles.get(defaultColumnFamilyIndex));
return db;
}

Expand Down
18 changes: 17 additions & 1 deletion java/src/main/java/org/rocksdb/TtlDB.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

package org.rocksdb;

import java.util.Arrays;
import java.util.List;

/**
Expand Down Expand Up @@ -84,7 +85,10 @@ public static TtlDB open(final Options options, final String db_path)
*/
public static TtlDB open(final Options options, final String db_path,
final int ttl, final boolean readOnly) throws RocksDBException {
return new TtlDB(open(options.nativeHandle_, db_path, ttl, readOnly));
final TtlDB db = new TtlDB(open(options.nativeHandle_, db_path, ttl, readOnly));
db.storeOptionsInstance(options);
db.storeDefaultColumnFamilyHandle(db.makeDefaultColumnFamilyHandle());
return db;
}

/**
Expand Down Expand Up @@ -116,13 +120,21 @@ public static TtlDB open(final DBOptions options, final String db_path,
+ " family handle.");
}

int defaultColumnFamilyIndex = -1;
final byte[][] cfNames = new byte[columnFamilyDescriptors.size()][];
final long[] cfOptionHandles = new long[columnFamilyDescriptors.size()];
for (int i = 0; i < columnFamilyDescriptors.size(); i++) {
final ColumnFamilyDescriptor cfDescriptor =
columnFamilyDescriptors.get(i);
cfNames[i] = cfDescriptor.getName();
cfOptionHandles[i] = cfDescriptor.getOptions().nativeHandle_;
if (Arrays.equals(cfDescriptor.getName(), RocksDB.DEFAULT_COLUMN_FAMILY)) {
defaultColumnFamilyIndex = i;
}
}
if (defaultColumnFamilyIndex < 0) {
new IllegalArgumentException(
"You must provide the default column family in your columnFamilyDescriptors");
}

final int[] ttlVals = new int[ttlValues.size()];
Expand All @@ -136,6 +148,10 @@ public static TtlDB open(final DBOptions options, final String db_path,
for (int i = 1; i < handles.length; i++) {
columnFamilyHandles.add(new ColumnFamilyHandle(ttlDB, handles[i]));
}
ttlDB.storeOptionsInstance(options);
ttlDB.ownedColumnFamilyHandles.addAll(columnFamilyHandles);
ttlDB.storeDefaultColumnFamilyHandle(columnFamilyHandles.get(defaultColumnFamilyIndex));

return ttlDB;
}

Expand Down
86 changes: 80 additions & 6 deletions java/src/test/java/org/rocksdb/TtlDBTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,20 @@

package org.rocksdb;

import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import static org.assertj.core.api.Assertions.assertThat;

import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.TimeUnit;

import static org.assertj.core.api.Assertions.assertThat;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

public class TtlDBTest {
private static final int BATCH_ITERATION = 16;

@ClassRule
public static final RocksNativeLibraryResource ROCKS_NATIVE_LIBRARY_RESOURCE =
Expand Down Expand Up @@ -109,4 +110,77 @@ public void createTtlColumnFamily() throws RocksDBException,
assertThat(ttlDB.get(columnFamilyHandle, "key".getBytes())).isNull();
}
}

@Test
public void writeBatchWithFlush() throws RocksDBException {
try (final Options dbOptions = new Options()) {
dbOptions.setCreateIfMissing(true);
dbOptions.setCreateMissingColumnFamilies(true);

try (final RocksDB db =
TtlDB.open(dbOptions, dbFolder.getRoot().getAbsolutePath(), 100, false)) {
try (WriteBatch wb = new WriteBatch()) {
for (int i = 0; i < BATCH_ITERATION; i++) {
wb.put(("key" + i).getBytes(StandardCharsets.UTF_8),
("value" + i).getBytes(StandardCharsets.UTF_8));
}
try (WriteOptions writeOptions = new WriteOptions()) {
db.write(writeOptions, wb);
}
try (FlushOptions fOptions = new FlushOptions()) {
db.flush(fOptions);
}
}
for (int i = 0; i < BATCH_ITERATION; i++) {
assertThat(db.get(("key" + i).getBytes(StandardCharsets.UTF_8)))
.isEqualTo(("value" + i).getBytes(StandardCharsets.UTF_8));
}
}
}
}

@Test
public void writeBatchWithFlushAndColumnFamily() throws RocksDBException {
try (final DBOptions dbOptions = new DBOptions()) {
System.out.println("Test start");
dbOptions.setCreateIfMissing(true);
dbOptions.setCreateMissingColumnFamilies(true);

final List<ColumnFamilyDescriptor> cfNames =
Arrays.asList(new ColumnFamilyDescriptor("new_cf".getBytes()),
new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY));
final List<ColumnFamilyHandle> columnFamilyHandleList = new ArrayList<>();

final List<Integer> ttlValues = Arrays.asList(0, 1);

try (final RocksDB db = TtlDB.open(dbOptions, dbFolder.getRoot().getAbsolutePath(), cfNames,
columnFamilyHandleList, ttlValues, false)) {
try {
assertThat(columnFamilyHandleList.get(1).isDefaultColumnFamily()).isTrue();

try (WriteBatch wb = new WriteBatch()) {
for (int i = 0; i < BATCH_ITERATION; i++) {
wb.put(("key" + i).getBytes(StandardCharsets.UTF_8),
("value" + i).getBytes(StandardCharsets.UTF_8));
}
try (WriteOptions writeOptions = new WriteOptions()) {
db.write(writeOptions, wb);
}
try (FlushOptions fOptions = new FlushOptions()) {
// Test both flush options, db.flush(fOptions) slush only default CF
db.flush(fOptions);
db.flush(fOptions, columnFamilyHandleList);
}
}
for (int i = 0; i < BATCH_ITERATION; i++) {
assertThat(db.get(("key" + i).getBytes(StandardCharsets.UTF_8)))
.isEqualTo(("value" + i).getBytes(StandardCharsets.UTF_8));
}
} finally {
// All CF handles must be closed before we close DB.
columnFamilyHandleList.stream().forEach(ColumnFamilyHandle::close);
}
}
}
}
}

0 comments on commit f2ddb92

Please sign in to comment.