Skip to content

Commit

Permalink
Detect and recycle dangling websockets.
Browse files Browse the repository at this point in the history
  • Loading branch information
Dimitris Papavasiliou committed May 4, 2019
1 parent ae6ef62 commit 8f86147
Showing 1 changed file with 103 additions and 6 deletions.
109 changes: 103 additions & 6 deletions src/org/thoughtcrime/securesms/service/IncomingMessageObserver.java
Original file line number Diff line number Diff line change
@@ -1,11 +1,18 @@
package org.thoughtcrime.securesms.service;

import android.app.Service;
import android.content.BroadcastReceiver;
import android.arch.lifecycle.DefaultLifecycleObserver;
import android.arch.lifecycle.LifecycleOwner;
import android.arch.lifecycle.ProcessLifecycleOwner;
import android.content.Context;
import android.content.Intent;
import android.content.IntentFilter;
import android.net.ConnectivityManager;
import android.net.Network;
import android.net.NetworkRequest;
import android.net.NetworkInfo;
import android.os.Build;
import android.os.IBinder;
import android.support.annotation.NonNull;
import android.support.annotation.Nullable;
Expand All @@ -28,6 +35,7 @@
import org.whispersystems.signalservice.api.SignalServiceMessagePipe;
import org.whispersystems.signalservice.api.SignalServiceMessageReceiver;

import java.io.IOException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

Expand All @@ -46,6 +54,14 @@ public class IncomingMessageObserver implements InjectableType, ConstraintObserv
private final Context context;
private final NetworkConstraint networkConstraint;

public static AtomicReference<SignalServiceMessagePipe> pipeReference = new AtomicReference<>();
public static AtomicReference<SignalServiceMessagePipe> unidentifiedPipeReference = new AtomicReference<>();

private final MessageRetrievalThread retrievalThread;

private BroadcastReceiver connectivityChangeReceiver;
private ConnectivityManager.NetworkCallback connectivityChangeCallback;

private boolean appVisible;

@Inject SignalServiceMessageReceiver receiver;
Expand All @@ -58,10 +74,13 @@ public IncomingMessageObserver(@NonNull Context context) {
this.networkConstraint = new NetworkConstraint.Factory(ApplicationContext.getInstance(context)).create();

new NetworkConstraintObserver(ApplicationContext.getInstance(context)).register(this);
new MessageRetrievalThread().start();

retrievalThread = new MessageRetrievalThread();
retrievalThread.start();

if (TextSecurePreferences.isFcmDisabled(context)) {
ContextCompat.startForegroundService(context, new Intent(context, ForegroundService.class));
setupNetworkMonitoring();
}

ProcessLifecycleOwner.get().getLifecycle().addObserver(new DefaultLifecycleObserver() {
Expand Down Expand Up @@ -108,10 +127,12 @@ private synchronized boolean isConnectionNecessary() {
}

private synchronized void waitForConnectionNecessary() {
try {
while (!isConnectionNecessary()) wait();
} catch (InterruptedException e) {
throw new AssertionError(e);
while (!isConnectionNecessary()) {
try {
wait();
} catch (InterruptedException e) {
Log.d(TAG, "Retrieval thread interrupted while not connected; ignoring.");
}
}
}

Expand Down Expand Up @@ -153,7 +174,7 @@ public void run() {
SignalServiceMessagePipe unidentifiedLocalPipe = unidentifiedPipe;

try {
while (isConnectionNecessary()) {
while (isConnectionNecessary() && !interrupted()) {
try {
Log.i(TAG, "Reading message...");
localPipe.read(REQUEST_TIMEOUT_MINUTES, TimeUnit.MINUTES,
Expand All @@ -167,6 +188,10 @@ public void run() {
Log.w(TAG, e);
}
}
} catch (InterruptedException e) {
Log.d(TAG, "Retrieval thread interrupted.");
} catch (IOException e) {
Log.d(TAG, "Message pipe failed: " + e.getMessage());
} catch (Throwable e) {
Log.w(TAG, e);
} finally {
Expand All @@ -185,6 +210,78 @@ public void uncaughtException(Thread t, Throwable e) {
}
}

private void setupNetworkMonitoring() {
if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.M) {
final ConnectivityManager connectivityManager = (ConnectivityManager)context.getSystemService(Context.CONNECTIVITY_SERVICE);

connectivityChangeCallback = new ConnectivityManager.NetworkCallback() {
private Network current;

private void update(Network network) {
final Network previous = current;
current = network;

Log.d(TAG, "Currently active network: " + network);

if (previous != null && (current == null || !current.equals(previous))) {
Log.d(TAG,
"Active network changed (" + previous + " -> " + current +
"); interrupting the retrieval thread to recycle the pipe.");

retrievalThread.interrupt();
}
}

@Override
public void onAvailable(Network network) {
final ConnectivityManager connectivityManager = (ConnectivityManager)context.getSystemService(Context.CONNECTIVITY_SERVICE);

update(connectivityManager.getActiveNetwork());
}

@Override
public void onLost(Network network) {
final ConnectivityManager connectivityManager = (ConnectivityManager)context.getSystemService(Context.CONNECTIVITY_SERVICE);

update(connectivityManager.getActiveNetwork());
}
};

connectivityManager.registerNetworkCallback(new NetworkRequest.Builder().build(),
connectivityChangeCallback);
} else {
connectivityChangeReceiver = new BroadcastReceiver() {
private int current = -1;

@Override
public void onReceive(Context context, Intent intent) {
final ConnectivityManager connectivityManager = (ConnectivityManager)context.getSystemService(Context.CONNECTIVITY_SERVICE);

final NetworkInfo info = connectivityManager.getActiveNetworkInfo();
final int previous = current;

if (info == null) {
current = -1;
} else if (info.isConnected()) {
current = info.getType();
}

Log.d(TAG, "Currently active network: " + current);

if (previous != -1 && previous != current) {
Log.d(TAG,
"Active network changed (" + previous + " -> " + current +
"); interrupting the retrieval thread to recycle the pipe.");
retrievalThread.interrupt();
}
}
};

context.registerReceiver(connectivityChangeReceiver,
new IntentFilter(ConnectivityManager.CONNECTIVITY_ACTION));
}
}

public static class ForegroundService extends Service {

@Override
Expand Down

0 comments on commit 8f86147

Please sign in to comment.