Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix database open with column family. #12167

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 10 additions & 3 deletions java/src/main/java/org/rocksdb/RocksDB.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ private enum LibraryState {
private ColumnFamilyHandle defaultColumnFamilyHandle_;
private final ReadOptions defaultReadOptions_ = new ReadOptions();

private final List<ColumnFamilyHandle> ownedColumnFamilyHandles = new ArrayList<>();
final List<ColumnFamilyHandle> ownedColumnFamilyHandles = new ArrayList<>();

/**
* Loads the necessary library files.
Expand Down Expand Up @@ -305,11 +305,19 @@ public static RocksDB open(final DBOptions options, final String path,

final byte[][] cfNames = new byte[columnFamilyDescriptors.size()][];
final long[] cfOptionHandles = new long[columnFamilyDescriptors.size()];
int defaultColumnFamilyIndex = -1;
for (int i = 0; i < columnFamilyDescriptors.size(); i++) {
final ColumnFamilyDescriptor cfDescriptor = columnFamilyDescriptors
.get(i);
cfNames[i] = cfDescriptor.getName();
cfOptionHandles[i] = cfDescriptor.getOptions().nativeHandle_;
if (Arrays.equals(cfDescriptor.getName(), RocksDB.DEFAULT_COLUMN_FAMILY)) {
defaultColumnFamilyIndex = i;
}
}
if (defaultColumnFamilyIndex < 0) {
new IllegalArgumentException(
"You must provide the default column family in your columnFamilyDescriptors");
}

final long[] handles = open(options.nativeHandle_, path, cfNames,
Expand All @@ -324,8 +332,7 @@ public static RocksDB open(final DBOptions options, final String path,
}

db.ownedColumnFamilyHandles.addAll(columnFamilyHandles);
db.storeDefaultColumnFamilyHandle(db.makeDefaultColumnFamilyHandle());

db.storeDefaultColumnFamilyHandle(columnFamilyHandles.get(defaultColumnFamilyIndex));
return db;
}

Expand Down
18 changes: 17 additions & 1 deletion java/src/main/java/org/rocksdb/TtlDB.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

package org.rocksdb;

import java.util.Arrays;
import java.util.List;

/**
Expand Down Expand Up @@ -84,7 +85,10 @@ public static TtlDB open(final Options options, final String db_path)
*/
public static TtlDB open(final Options options, final String db_path,
final int ttl, final boolean readOnly) throws RocksDBException {
return new TtlDB(open(options.nativeHandle_, db_path, ttl, readOnly));
final TtlDB db = new TtlDB(open(options.nativeHandle_, db_path, ttl, readOnly));
db.storeOptionsInstance(options);
db.storeDefaultColumnFamilyHandle(db.makeDefaultColumnFamilyHandle());
return db;
}

/**
Expand Down Expand Up @@ -116,13 +120,21 @@ public static TtlDB open(final DBOptions options, final String db_path,
+ " family handle.");
}

int defaultColumnFamilyIndex = -1;
final byte[][] cfNames = new byte[columnFamilyDescriptors.size()][];
final long[] cfOptionHandles = new long[columnFamilyDescriptors.size()];
for (int i = 0; i < columnFamilyDescriptors.size(); i++) {
final ColumnFamilyDescriptor cfDescriptor =
columnFamilyDescriptors.get(i);
cfNames[i] = cfDescriptor.getName();
cfOptionHandles[i] = cfDescriptor.getOptions().nativeHandle_;
if (Arrays.equals(cfDescriptor.getName(), RocksDB.DEFAULT_COLUMN_FAMILY)) {
defaultColumnFamilyIndex = i;
}
}
if (defaultColumnFamilyIndex < 0) {
new IllegalArgumentException(
"You must provide the default column family in your columnFamilyDescriptors");
}

final int[] ttlVals = new int[ttlValues.size()];
Expand All @@ -136,6 +148,10 @@ public static TtlDB open(final DBOptions options, final String db_path,
for (int i = 1; i < handles.length; i++) {
columnFamilyHandles.add(new ColumnFamilyHandle(ttlDB, handles[i]));
}
ttlDB.storeOptionsInstance(options);
ttlDB.ownedColumnFamilyHandles.addAll(columnFamilyHandles);
ttlDB.storeDefaultColumnFamilyHandle(columnFamilyHandles.get(defaultColumnFamilyIndex));

return ttlDB;
}

Expand Down
86 changes: 80 additions & 6 deletions java/src/test/java/org/rocksdb/TtlDBTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,20 @@

package org.rocksdb;

import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import static org.assertj.core.api.Assertions.assertThat;

import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.TimeUnit;

import static org.assertj.core.api.Assertions.assertThat;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

public class TtlDBTest {
private static final int BATCH_ITERATION = 16;

@ClassRule
public static final RocksNativeLibraryResource ROCKS_NATIVE_LIBRARY_RESOURCE =
Expand Down Expand Up @@ -109,4 +110,77 @@ public void createTtlColumnFamily() throws RocksDBException,
assertThat(ttlDB.get(columnFamilyHandle, "key".getBytes())).isNull();
}
}

@Test
public void writeBatchWithFlush() throws RocksDBException {
try (final Options dbOptions = new Options()) {
dbOptions.setCreateIfMissing(true);
dbOptions.setCreateMissingColumnFamilies(true);

try (final RocksDB db =
TtlDB.open(dbOptions, dbFolder.getRoot().getAbsolutePath(), 100, false)) {
try (WriteBatch wb = new WriteBatch()) {
for (int i = 0; i < BATCH_ITERATION; i++) {
wb.put(("key" + i).getBytes(StandardCharsets.UTF_8),
("value" + i).getBytes(StandardCharsets.UTF_8));
}
try (WriteOptions writeOptions = new WriteOptions()) {
db.write(writeOptions, wb);
}
try (FlushOptions fOptions = new FlushOptions()) {
db.flush(fOptions);
}
}
for (int i = 0; i < BATCH_ITERATION; i++) {
assertThat(db.get(("key" + i).getBytes(StandardCharsets.UTF_8)))
.isEqualTo(("value" + i).getBytes(StandardCharsets.UTF_8));
}
}
}
}

@Test
public void writeBatchWithFlushAndColumnFamily() throws RocksDBException {
try (final DBOptions dbOptions = new DBOptions()) {
System.out.println("Test start");
dbOptions.setCreateIfMissing(true);
dbOptions.setCreateMissingColumnFamilies(true);

final List<ColumnFamilyDescriptor> cfNames =
Arrays.asList(new ColumnFamilyDescriptor("new_cf".getBytes()),
new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY));
final List<ColumnFamilyHandle> columnFamilyHandleList = new ArrayList<>();

final List<Integer> ttlValues = Arrays.asList(0, 1);

try (final RocksDB db = TtlDB.open(dbOptions, dbFolder.getRoot().getAbsolutePath(), cfNames,
columnFamilyHandleList, ttlValues, false)) {
try {
assertThat(columnFamilyHandleList.get(1).isDefaultColumnFamily()).isTrue();

try (WriteBatch wb = new WriteBatch()) {
for (int i = 0; i < BATCH_ITERATION; i++) {
wb.put(("key" + i).getBytes(StandardCharsets.UTF_8),
("value" + i).getBytes(StandardCharsets.UTF_8));
}
try (WriteOptions writeOptions = new WriteOptions()) {
db.write(writeOptions, wb);
}
try (FlushOptions fOptions = new FlushOptions()) {
// Test both flush options, db.flush(fOptions) slush only default CF
db.flush(fOptions);
db.flush(fOptions, columnFamilyHandleList);
}
}
for (int i = 0; i < BATCH_ITERATION; i++) {
assertThat(db.get(("key" + i).getBytes(StandardCharsets.UTF_8)))
.isEqualTo(("value" + i).getBytes(StandardCharsets.UTF_8));
}
} finally {
// All CF handles must be closed before we close DB.
columnFamilyHandleList.stream().forEach(ColumnFamilyHandle::close);
}
}
}
}
}
Loading