From 3d5deac38bbbd01b88721211615e27a88567bf02 Mon Sep 17 00:00:00 2001 From: Christian Gesse Date: Mon, 24 Feb 2020 18:55:54 +0100 Subject: [PATCH] some restructuring and optimizing - introduced minimal duration for connection --- .../connection/ReliableConnection.java | 7 +- .../dynamic/DynamicConnectionManager.java | 73 +++++++++++++------ 2 files changed, 52 insertions(+), 28 deletions(-) diff --git a/neutrino/src/main/java/de/hhu/bsinfo/neutrino/connection/ReliableConnection.java b/neutrino/src/main/java/de/hhu/bsinfo/neutrino/connection/ReliableConnection.java index 68d43a1..5fd33c6 100644 --- a/neutrino/src/main/java/de/hhu/bsinfo/neutrino/connection/ReliableConnection.java +++ b/neutrino/src/main/java/de/hhu/bsinfo/neutrino/connection/ReliableConnection.java @@ -53,8 +53,7 @@ public void init() throws IOException { public void connect(RCInformation remoteInfo) throws IOException { if(initConnection.getAndSet(true)){ - LOGGER.error("Connection already connected"); - throw new IOException("Connection is already connected"); + throw new IOException("Connection is already connected to " + remoteLid); } if(!queuePair.modify(QueuePair.Attributes.Builder.buildReadyToReceiveAttributesRC( @@ -73,7 +72,7 @@ public void connect(RCInformation remoteInfo) throws IOException { LOGGER.info("Moved queue pair into RTS state"); // TODO: not necessary? - //initialHandshake(); + initialHandshake(); isConnected.getAndSet(true); } @@ -206,7 +205,7 @@ public void disconnect() throws IOException{ return; } - LOGGER.debug("Start to disconnect connection {} to {}", id, remoteLid); + LOGGER.debug("Start to disconnect connection {} from {}", id, remoteLid); // set remote LID remoteLid.getAndSet(LID_MAX); diff --git a/neutrino/src/main/java/de/hhu/bsinfo/neutrino/connection/dynamic/DynamicConnectionManager.java b/neutrino/src/main/java/de/hhu/bsinfo/neutrino/connection/dynamic/DynamicConnectionManager.java index cd43282..faf6048 100644 --- a/neutrino/src/main/java/de/hhu/bsinfo/neutrino/connection/dynamic/DynamicConnectionManager.java +++ b/neutrino/src/main/java/de/hhu/bsinfo/neutrino/connection/dynamic/DynamicConnectionManager.java @@ -40,6 +40,7 @@ public class DynamicConnectionManager { private static final long EXECUTE_POLL_TIME = 1000; private static final long IDX_POLL_TIME = 1000; private static final int LOCAL_BUFFER_READ = 19; + private static final long MIN_CONNECTION_DURATION = 10; private static final int MAX_CONNECTONS = 1; private static final int NULL_INDEX = Integer.MAX_VALUE; private static final short LID_MAX = Short.MAX_VALUE; @@ -56,6 +57,7 @@ public class DynamicConnectionManager { private final AtomicIntegerArray lidToIndex; private final ReliableConnection connections[]; + private long connectionDuration[]; private final StampedLock rwLocks[]; private final ConcurrentLinkedQueue lru; @@ -101,6 +103,7 @@ public DynamicConnectionManager(int port) throws IOException { } connections = new ReliableConnection[MAX_CONNECTONS]; + connectionDuration = new long[MAX_CONNECTONS]; rwLocks = new StampedLock[MAX_CONNECTONS]; lru = new ConcurrentLinkedQueue<>(); @@ -157,14 +160,21 @@ private void remoteExecute(SendWorkRequest.OpCode opCode, RegisteredBuffer data, while (stamp == 0) { try { idx = lidToIndex.get(remoteLocalId); - stamp = rwLocks[idx].tryReadLock(); - } catch (IndexOutOfBoundsException e) { - idx = createConnection(remoteLocalId); + stamp = rwLocks[idx].readLock(); + + var connectionRemoteLocalId = connections[idx].getRemoteLocalId(); - if(idx < NULL_INDEX) { + // check if connection idx is still connected to remote + if(connectionRemoteLocalId != remoteLocalId && connectionRemoteLocalId != LID_MAX) { + rwLocks[idx].unlockRead(stamp); + stamp = 0; + } else if (connectionRemoteLocalId == LID_MAX){ var localQPInfo = new RCInformation((byte) 1, connections[idx].getPortAttributes().getLocalId(), connections[idx].getQueuePair().getQueuePairNumber()); dynamicConnectionHandler.sendConnectionRequest(localQPInfo, remoteLocalId); } + + } catch (IndexOutOfBoundsException e) { + createConnection(remoteLocalId); } } @@ -172,13 +182,13 @@ private void remoteExecute(SendWorkRequest.OpCode opCode, RegisteredBuffer data, LockSupport.parkNanos(EXECUTE_POLL_TIME); } - LOGGER.debug("Execute remote RDMA operation on {}", remoteLocalId); + LOGGER.debug("Execute remote RDMA operation on {}", connections[idx].getRemoteLocalId()); connections[idx].execute(data, opCode, offset, length, remoteBuffers[remoteLocalId].getAddress(), remoteBuffers[remoteLocalId].getRemoteKey(), 0); rwLocks[idx].unlockRead(stamp); } - private int createConnection(short remoteLocalId) { + private void createConnection(short remoteLocalId) { int idx = NULL_INDEX; long stamp = 0; @@ -194,9 +204,13 @@ private int createConnection(short remoteLocalId) { } } while (!gotID); - int ret_idx = idx; - try { + // make sure that this connection was alive long enough + long duration = System.currentTimeMillis() - connectionDuration[idx]; + if(duration < MIN_CONNECTION_DURATION) { + LockSupport.parkNanos((MIN_CONNECTION_DURATION - duration) * 1000000); + } + stamp = rwLocks[idx].writeLock(); // check if another thread has already connected to remote @@ -213,9 +227,10 @@ private int createConnection(short remoteLocalId) { connections[idx].disconnect(); } + connectionDuration[idx] = System.currentTimeMillis(); + } catch (Exception e) { lru.offer(idx); - ret_idx = NULL_INDEX; LOGGER.error("Could not create connection to {}\n {}", remoteLocalId, e); //e.printStackTrace(); @@ -226,8 +241,6 @@ private int createConnection(short remoteLocalId) { if(idx < NULL_INDEX) { LOGGER.debug("Created connection {} to {}", connections[idx].getId(), remoteLocalId); } - - return ret_idx; } public RegisteredBuffer allocRegisteredBuffer(int deviceId, long size) { @@ -615,22 +628,28 @@ private void handleConnectionRequest() { try { idx = lidToIndex.get(remoteLocalId); stamp = rwLocks[idx].readLock(); + var connectionRemoteLocalId = connections[idx].getRemoteLocalId(); - var localQPInfo = new RCInformation((byte) 1, connections[idx].getPortAttributes().getLocalId(), connections[idx].getQueuePair().getQueuePairNumber()); - dynamicConnectionHandler.answerConnectionRequest(localQPInfo, remoteHandlerInfos.get(remoteLocalId)); - - rwLocks[idx].unlockRead(stamp); + // check if this connection is still assigned to correct remote + if(connectionRemoteLocalId != remoteLocalId && connectionRemoteLocalId != LID_MAX) { + rwLocks[idx].unlockRead(stamp); + stamp = 0; + } else { + var localQPInfo = new RCInformation((byte) 1, connections[idx].getPortAttributes().getLocalId(), connections[idx].getQueuePair().getQueuePairNumber()); + dynamicConnectionHandler.answerConnectionRequest(localQPInfo, remoteHandlerInfos.get(remoteLocalId)); - } catch (IndexOutOfBoundsException e) { - idx = createConnection(remoteLocalId); - if(idx != NULL_INDEX) { - try { + if(connectionRemoteLocalId == LID_MAX) { connections[idx].connect(remoteInfo); lru.offer(idx); - } catch (IOException e2) { - LOGGER.error("Could not connect to {}", remoteLocalId); } + + rwLocks[idx].unlockRead(stamp); } + + } catch (IndexOutOfBoundsException e) { + createConnection(remoteLocalId); + } catch (IOException e) { + LOGGER.debug("Could not connect to {}\n {}", remoteLocalId, e); } } } @@ -639,7 +658,7 @@ private void handleConnectionAck() { var split = payload.split(":"); var remoteInfo = new RCInformation(Byte.parseByte(split[0]), Short.parseShort(split[1]), Integer.parseInt(split[2])); - LOGGER.info("Got new connection ack from {}", remoteInfo); + LOGGER.info("Got new connection ack from {}", remoteInfo.getLocalId()); var idx = lidToIndex.get(remoteInfo.getLocalId()); long stamp = rwLocks[idx].readLock(); @@ -648,7 +667,7 @@ private void handleConnectionAck() { connections[idx].connect(remoteInfo); lru.offer(idx); } catch (Exception e) { - LOGGER.error("Could not connect to {}", remoteInfo); + LOGGER.debug("Could not connect to {}\n {}", remoteInfo.getLocalId(), e); } finally { rwLocks[idx].unlockRead(stamp); } @@ -660,7 +679,7 @@ private void handleBufferInfo() { var bufferInfo = new BufferInformation(Long.parseLong(split[1]), Long.parseLong(split[2]), Integer.parseInt(split[3])); var remoteLid = Short.parseShort(split[0]); - LOGGER.info("Received new remote buffer information from {}: {}", remoteLid, bufferInfo); + LOGGER.trace("Received new remote buffer information from {}: {}", remoteLid, bufferInfo); remoteBuffers[remoteLid] = bufferInfo; } @@ -675,6 +694,12 @@ private void handleDisconnect() { var idx = lidToIndex.getAndSet(remoteLocalId, NULL_INDEX); long stamp = 0; try { + // make sure that this connection was alive long enough + long duration = System.currentTimeMillis() - connectionDuration[idx]; + if(duration < MIN_CONNECTION_DURATION) { + LockSupport.parkNanos((MIN_CONNECTION_DURATION - duration) * 1000000); + } + stamp = rwLocks[idx].writeLock(); connections[idx].disconnect(); } catch (IndexOutOfBoundsException e) {