Skip to content

Commit

Permalink
some restructuring and optimizing - introduced minimal duration for c…
Browse files Browse the repository at this point in the history
…onnection
  • Loading branch information
chges100 committed Feb 24, 2020
1 parent 6772530 commit 3d5deac
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,7 @@ public void init() throws IOException {
public void connect(RCInformation remoteInfo) throws IOException {

if(initConnection.getAndSet(true)){
LOGGER.error("Connection already connected");
throw new IOException("Connection is already connected");
throw new IOException("Connection is already connected to " + remoteLid);
}

if(!queuePair.modify(QueuePair.Attributes.Builder.buildReadyToReceiveAttributesRC(
Expand All @@ -73,7 +72,7 @@ public void connect(RCInformation remoteInfo) throws IOException {
LOGGER.info("Moved queue pair into RTS state");

// TODO: not necessary?
//initialHandshake();
initialHandshake();

isConnected.getAndSet(true);
}
Expand Down Expand Up @@ -206,7 +205,7 @@ public void disconnect() throws IOException{
return;
}

LOGGER.debug("Start to disconnect connection {} to {}", id, remoteLid);
LOGGER.debug("Start to disconnect connection {} from {}", id, remoteLid);

// set remote LID
remoteLid.getAndSet(LID_MAX);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ public class DynamicConnectionManager {
private static final long EXECUTE_POLL_TIME = 1000;
private static final long IDX_POLL_TIME = 1000;
private static final int LOCAL_BUFFER_READ = 19;
private static final long MIN_CONNECTION_DURATION = 10;
private static final int MAX_CONNECTONS = 1;
private static final int NULL_INDEX = Integer.MAX_VALUE;
private static final short LID_MAX = Short.MAX_VALUE;
Expand All @@ -56,6 +57,7 @@ public class DynamicConnectionManager {
private final AtomicIntegerArray lidToIndex;

private final ReliableConnection connections[];
private long connectionDuration[];
private final StampedLock rwLocks[];

private final ConcurrentLinkedQueue<Integer> lru;
Expand Down Expand Up @@ -101,6 +103,7 @@ public DynamicConnectionManager(int port) throws IOException {
}

connections = new ReliableConnection[MAX_CONNECTONS];
connectionDuration = new long[MAX_CONNECTONS];
rwLocks = new StampedLock[MAX_CONNECTONS];
lru = new ConcurrentLinkedQueue<>();

Expand Down Expand Up @@ -157,28 +160,35 @@ private void remoteExecute(SendWorkRequest.OpCode opCode, RegisteredBuffer data,
while (stamp == 0) {
try {
idx = lidToIndex.get(remoteLocalId);
stamp = rwLocks[idx].tryReadLock();
} catch (IndexOutOfBoundsException e) {
idx = createConnection(remoteLocalId);
stamp = rwLocks[idx].readLock();

var connectionRemoteLocalId = connections[idx].getRemoteLocalId();

if(idx < NULL_INDEX) {
// check if connection idx is still connected to remote
if(connectionRemoteLocalId != remoteLocalId && connectionRemoteLocalId != LID_MAX) {
rwLocks[idx].unlockRead(stamp);
stamp = 0;
} else if (connectionRemoteLocalId == LID_MAX){
var localQPInfo = new RCInformation((byte) 1, connections[idx].getPortAttributes().getLocalId(), connections[idx].getQueuePair().getQueuePairNumber());
dynamicConnectionHandler.sendConnectionRequest(localQPInfo, remoteLocalId);
}

} catch (IndexOutOfBoundsException e) {
createConnection(remoteLocalId);
}
}

while (remoteBuffers[remoteLocalId] == null || !connections[idx].isConnected()) {
LockSupport.parkNanos(EXECUTE_POLL_TIME);
}

LOGGER.debug("Execute remote RDMA operation on {}", remoteLocalId);
LOGGER.debug("Execute remote RDMA operation on {}", connections[idx].getRemoteLocalId());
connections[idx].execute(data, opCode, offset, length, remoteBuffers[remoteLocalId].getAddress(), remoteBuffers[remoteLocalId].getRemoteKey(), 0);

rwLocks[idx].unlockRead(stamp);
}

private int createConnection(short remoteLocalId) {
private void createConnection(short remoteLocalId) {
int idx = NULL_INDEX;
long stamp = 0;

Expand All @@ -194,9 +204,13 @@ private int createConnection(short remoteLocalId) {
}
} while (!gotID);

int ret_idx = idx;

try {
// make sure that this connection was alive long enough
long duration = System.currentTimeMillis() - connectionDuration[idx];
if(duration < MIN_CONNECTION_DURATION) {
LockSupport.parkNanos((MIN_CONNECTION_DURATION - duration) * 1000000);
}

stamp = rwLocks[idx].writeLock();

// check if another thread has already connected to remote
Expand All @@ -213,9 +227,10 @@ private int createConnection(short remoteLocalId) {
connections[idx].disconnect();
}

connectionDuration[idx] = System.currentTimeMillis();

} catch (Exception e) {
lru.offer(idx);
ret_idx = NULL_INDEX;
LOGGER.error("Could not create connection to {}\n {}", remoteLocalId, e);
//e.printStackTrace();

Expand All @@ -226,8 +241,6 @@ private int createConnection(short remoteLocalId) {
if(idx < NULL_INDEX) {
LOGGER.debug("Created connection {} to {}", connections[idx].getId(), remoteLocalId);
}

return ret_idx;
}

public RegisteredBuffer allocRegisteredBuffer(int deviceId, long size) {
Expand Down Expand Up @@ -615,22 +628,28 @@ private void handleConnectionRequest() {
try {
idx = lidToIndex.get(remoteLocalId);
stamp = rwLocks[idx].readLock();
var connectionRemoteLocalId = connections[idx].getRemoteLocalId();

var localQPInfo = new RCInformation((byte) 1, connections[idx].getPortAttributes().getLocalId(), connections[idx].getQueuePair().getQueuePairNumber());
dynamicConnectionHandler.answerConnectionRequest(localQPInfo, remoteHandlerInfos.get(remoteLocalId));

rwLocks[idx].unlockRead(stamp);
// check if this connection is still assigned to correct remote
if(connectionRemoteLocalId != remoteLocalId && connectionRemoteLocalId != LID_MAX) {
rwLocks[idx].unlockRead(stamp);
stamp = 0;
} else {
var localQPInfo = new RCInformation((byte) 1, connections[idx].getPortAttributes().getLocalId(), connections[idx].getQueuePair().getQueuePairNumber());
dynamicConnectionHandler.answerConnectionRequest(localQPInfo, remoteHandlerInfos.get(remoteLocalId));

} catch (IndexOutOfBoundsException e) {
idx = createConnection(remoteLocalId);
if(idx != NULL_INDEX) {
try {
if(connectionRemoteLocalId == LID_MAX) {
connections[idx].connect(remoteInfo);
lru.offer(idx);
} catch (IOException e2) {
LOGGER.error("Could not connect to {}", remoteLocalId);
}

rwLocks[idx].unlockRead(stamp);
}

} catch (IndexOutOfBoundsException e) {
createConnection(remoteLocalId);
} catch (IOException e) {
LOGGER.debug("Could not connect to {}\n {}", remoteLocalId, e);
}
}
}
Expand All @@ -639,7 +658,7 @@ private void handleConnectionAck() {
var split = payload.split(":");
var remoteInfo = new RCInformation(Byte.parseByte(split[0]), Short.parseShort(split[1]), Integer.parseInt(split[2]));

LOGGER.info("Got new connection ack from {}", remoteInfo);
LOGGER.info("Got new connection ack from {}", remoteInfo.getLocalId());

var idx = lidToIndex.get(remoteInfo.getLocalId());
long stamp = rwLocks[idx].readLock();
Expand All @@ -648,7 +667,7 @@ private void handleConnectionAck() {
connections[idx].connect(remoteInfo);
lru.offer(idx);
} catch (Exception e) {
LOGGER.error("Could not connect to {}", remoteInfo);
LOGGER.debug("Could not connect to {}\n {}", remoteInfo.getLocalId(), e);
} finally {
rwLocks[idx].unlockRead(stamp);
}
Expand All @@ -660,7 +679,7 @@ private void handleBufferInfo() {
var bufferInfo = new BufferInformation(Long.parseLong(split[1]), Long.parseLong(split[2]), Integer.parseInt(split[3]));
var remoteLid = Short.parseShort(split[0]);

LOGGER.info("Received new remote buffer information from {}: {}", remoteLid, bufferInfo);
LOGGER.trace("Received new remote buffer information from {}: {}", remoteLid, bufferInfo);

remoteBuffers[remoteLid] = bufferInfo;
}
Expand All @@ -675,6 +694,12 @@ private void handleDisconnect() {
var idx = lidToIndex.getAndSet(remoteLocalId, NULL_INDEX);
long stamp = 0;
try {
// make sure that this connection was alive long enough
long duration = System.currentTimeMillis() - connectionDuration[idx];
if(duration < MIN_CONNECTION_DURATION) {
LockSupport.parkNanos((MIN_CONNECTION_DURATION - duration) * 1000000);
}

stamp = rwLocks[idx].writeLock();
connections[idx].disconnect();
} catch (IndexOutOfBoundsException e) {
Expand Down

0 comments on commit 3d5deac

Please sign in to comment.