Skip to content

Commit

Permalink
Reduce SystemKeyspace truncation record contention
Browse files Browse the repository at this point in the history
JFR profiles show significant synchronization contention on
SystemKeyspace#getTruncationRecord(UUID) as part of committing
PaxosState.

The SystemKeyspace truncationRecords map is a lazily initialized
unmodifiable snapshot of the local node's truncated_at map.  We now
guard this with a read/write lock to optimistically allow concurrent
reads, while falling back to most costly write lock and re-reading the
local node's truncated_at from system tables when there are
truncation modifications.

Contention back trace:
  org.apache.cassandra.db.SystemKeyspace.getTruncationRecord(UUID):494
  org.apache.cassandra.db.SystemKeyspace.getTruncatedAt(UUID):488
  org.apache.cassandra.service.paxos.PaxosState.commit(Commit):144
  org.apache.cassandra.service.paxos.CommitVerbHandler.doVerb(MessageIn, int):34
  org.apache.cassandra.net.MessageDeliveryTask.run():70
  java.util.concurrent.Executors$RunnableAdapter.call():539
  org.apache.cassandra.concurrent.AbstractLocalAwareExecutorService$FutureTask.run():164
  org.apache.cassandra.concurrent.AbstractLocalAwareExecutorService$LocalSessionFutureTask.run():136
  org.apache.cassandra.concurrent.SEPWorker.run():110
  • Loading branch information
schlosna committed Oct 23, 2024
1 parent c3ab8d6 commit fc48c49
Showing 1 changed file with 46 additions and 15 deletions.
61 changes: 46 additions & 15 deletions src/java/org/apache/cassandra/db/SystemKeyspace.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,11 @@
import java.net.InetAddress;
import java.nio.ByteBuffer;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import javax.annotation.concurrent.GuardedBy;
import javax.management.openmbean.OpenDataException;
import javax.management.openmbean.TabularData;

Expand Down Expand Up @@ -298,7 +302,9 @@ public static KSMetaData definition()
return new KSMetaData(NAME, LocalStrategy.class, Collections.<String, String>emptyMap(), true, tables);
}

private static volatile Map<UUID, Pair<ReplayPosition, Long>> truncationRecords;
@GuardedBy("truncationLock")
private static Map<UUID, Pair<ReplayPosition, Long>> truncationRecords;
private static final ReadWriteLock truncationLock = new ReentrantReadWriteLock(/* fair= */ true);

public enum BootstrapState
{
Expand Down Expand Up @@ -446,21 +452,31 @@ public static TabularData getCompactionHistory() throws OpenDataException

public static synchronized void saveTruncationRecord(ColumnFamilyStore cfs, long truncatedAt, ReplayPosition position)
{
String req = "UPDATE system.%s SET truncated_at = truncated_at + ? WHERE key = '%s'";
executeInternal(String.format(req, LOCAL, LOCAL), truncationAsMapEntry(cfs, truncatedAt, position));
truncationRecords = null;
forceBlockingFlush(LOCAL, "Saving truncation record");
truncationLock.writeLock().lock();
try {
String req = "UPDATE system.%s SET truncated_at = truncated_at + ? WHERE key = '%s'";
executeInternal(String.format(req, LOCAL, LOCAL), truncationAsMapEntry(cfs, truncatedAt, position));
truncationRecords = null;
forceBlockingFlush(LOCAL, "Saving truncation record");
} finally {
truncationLock.writeLock().unlock();
}
}

/**
* This method is used to remove information about truncation time for specified column family
*/
public static synchronized void removeTruncationRecord(UUID cfId)
{
String req = "DELETE truncated_at[?] from system.%s WHERE key = '%s'";
executeInternal(String.format(req, LOCAL, LOCAL), cfId);
truncationRecords = null;
forceBlockingFlush(LOCAL, "Removing truncation record");
truncationLock.writeLock().lock();
try {
String req = "DELETE truncated_at[?] from system.%s WHERE key = '%s'";
executeInternal(String.format(req, LOCAL, LOCAL), cfId);
truncationRecords = null;
forceBlockingFlush(LOCAL, "Removing truncation record");
} finally {
truncationLock.writeLock().unlock();
}
}

private static Map<UUID, ByteBuffer> truncationAsMapEntry(ColumnFamilyStore cfs, long truncatedAt, ReplayPosition position)
Expand Down Expand Up @@ -489,27 +505,42 @@ public static long getTruncatedAt(UUID cfId)
return record == null ? Long.MIN_VALUE : record.right;
}

private static synchronized Pair<ReplayPosition, Long> getTruncationRecord(UUID cfId)
private static Pair<ReplayPosition, Long> getTruncationRecord(UUID cfId)
{
if (truncationRecords == null)
truncationLock.readLock().lock();
try {
Map<UUID, Pair<ReplayPosition, Long>> records = truncationRecords;
if (records != null)
return records.get(cfId);

// truncated records snapshot not cached, fall through to release read lock and acquire write lock
} finally {
truncationLock.readLock().unlock();
}

truncationLock.writeLock().lock();
try {
truncationRecords = readTruncationRecords();
return truncationRecords.get(cfId);
return truncationRecords.get(cfId);
} finally {
truncationLock.writeLock().unlock();
}
}

private static Map<UUID, Pair<ReplayPosition, Long>> readTruncationRecords()
{
UntypedResultSet rows = executeInternal(String.format("SELECT truncated_at FROM system.%s WHERE key = '%s'", LOCAL, LOCAL));

Map<UUID, Pair<ReplayPosition, Long>> records = new HashMap<>();

if (!rows.isEmpty() && rows.one().has("truncated_at"))
{
Map<UUID, ByteBuffer> map = rows.one().getMap("truncated_at", UUIDType.instance, BytesType.instance);
Map<UUID, Pair<ReplayPosition, Long>> records = new HashMap<>(map.size());
for (Map.Entry<UUID, ByteBuffer> entry : map.entrySet())
records.put(entry.getKey(), truncationRecordFromBlob(entry.getValue()));
return Collections.unmodifiableMap(records);
}

return records;
return Collections.emptyMap();
}

private static Pair<ReplayPosition, Long> truncationRecordFromBlob(ByteBuffer bytes)
Expand Down

0 comments on commit fc48c49

Please sign in to comment.