Skip to content

Commit

Permalink
More improvements to UsageTest (#80)
Browse files Browse the repository at this point in the history
Refactor tests to avoid timing issues in testCBWithWriteOperation:
- remove items field
- use AtomicRef value set in onComplete as in testCBWithReadOperation

Also applied changes to testCBWithEventBus

Signed-off-by: Thomas Segismont <[email protected]>
  • Loading branch information
tsegismont authored Jan 10, 2025
1 parent e409891 commit 8abdf62
Showing 1 changed file with 63 additions and 109 deletions.
172 changes: 63 additions & 109 deletions src/test/java/io/vertx/circuitbreaker/impl/UsageTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,13 @@
import org.junit.Test;
import org.junit.runner.RunWith;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;

import static com.jayway.awaitility.Awaitility.await;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.notNullValue;
import static org.hamcrest.CoreMatchers.*;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

/**
* @author <a href="http://escoffier.me">Clement Escoffier</a>
Expand All @@ -44,14 +39,12 @@ public class UsageTest {
public RepeatRule repeatRule = new RepeatRule();

private Vertx vertx;
private List<String> items;
private CircuitBreaker cb;
private HttpServer server;

@Before
public void setUp() {
vertx = Vertx.vertx();
items = Collections.synchronizedList(new ArrayList<>());
cb = CircuitBreaker.create("circuit-breaker", vertx, new CircuitBreakerOptions()
.setFallbackOnFailure(true)
.setTimeout(500)
Expand Down Expand Up @@ -163,135 +156,96 @@ public void testCBWithReadOperation() throws Exception {
assertEquals("KO", json.get().getString("status"));
}

private void asyncWrite(Scenario scenario, Promise<Void> resultHandler) {
long random = (long) (Math.random() * 1000);
private void asyncWrite(Scenario scenario, Promise<String> promise) {
long delay;
switch (scenario) {
case TIMEOUT:
random = 2000;
break;
case RUNTIME_EXCEPTION:
throw new RuntimeException("Bad bad bad");
case TIMEOUT:
delay = 2000;
break;
default:
delay = ThreadLocalRandom.current().nextLong(1, 250); // Must be less than CB timeout
break;
}


vertx.setTimer(random, l -> {
vertx.setTimer(delay, l -> {
if (scenario == Scenario.FAILURE) {
items.add("Error");
resultHandler.fail("Bad Bad Bad");
promise.fail("Bad Bad Bad");
} else {
items.add("Hello");
resultHandler.complete();
promise.complete("foo");
}
});
}

enum Scenario {
private enum Scenario {
OK,
FAILURE,
RUNTIME_EXCEPTION,
TIMEOUT
}

@Test
@Repeat(10)
public void testCBWithWriteOperation() {
cb.<Void>executeWithFallback(
future -> {
asyncWrite(Scenario.OK, future);
},
t -> null
);

await().until(() -> items.size() == 1);
items.clear();

AtomicBoolean fallbackCalled = new AtomicBoolean();
cb.<Void>executeWithFallback(
future -> {
asyncWrite(Scenario.FAILURE, future);
},
t -> {
fallbackCalled.set(true);
return null;
}
);

await().until(() -> items.size() == 1);

assertTrue(fallbackCalled.get());

items.clear();
fallbackCalled.set(false);

cb.<Void>executeWithFallback(
future -> asyncWrite(Scenario.TIMEOUT, future),
t -> {
fallbackCalled.set(true);
return null;
}
);
AtomicReference<String> str = new AtomicReference<>();
cb.executeWithFallback(
promise -> asyncWrite(Scenario.OK, promise),
t -> "bar"
).onComplete(ar -> str.set(ar.result()));
await().untilAtomic(str, is(equalTo("foo")));

await().untilAtomic(fallbackCalled, is(true));
assertTrue(items.isEmpty());
str.set(null);
cb.executeWithFallback(
promise -> asyncWrite(Scenario.FAILURE, promise),
t -> "bar"
).onComplete(ar -> str.set(ar.result()));
await().untilAtomic(str, is(equalTo("bar")));

fallbackCalled.set(false);
cb.<Void>executeWithFallback(
future -> asyncWrite(Scenario.RUNTIME_EXCEPTION, future),
t -> {
fallbackCalled.set(true);
return null;
}
);
str.set(null);
cb.executeWithFallback(
promise -> asyncWrite(Scenario.TIMEOUT, promise),
t -> "bar"
).onComplete(ar -> str.set(ar.result()));
await().untilAtomic(str, is(equalTo("bar")));

await().untilAtomic(fallbackCalled, is(true));
assertTrue(items.isEmpty());
str.set(null);
cb.executeWithFallback(
promise -> asyncWrite(Scenario.RUNTIME_EXCEPTION, promise),
t -> "bar"
).onComplete(ar -> str.set(ar.result()));
await().untilAtomic(str, is(equalTo("bar")));
}


@Test
public void testCBWithEventBus() {
cb.<Message<String>>executeWithFallback(
future -> vertx.eventBus().<String>request("ok", "").onComplete(future),
t -> null
).onComplete(ar -> items.add(ar.result().body()));

await().until(() -> items.size() == 1);
items.clear();

AtomicBoolean fallbackCalled = new AtomicBoolean();
cb.<Message<String>>executeWithFallback(
future -> vertx.eventBus().<String>request("timeout", "").onComplete(future),
t -> {
fallbackCalled.set(true);
return null;
}
);

await().untilAtomic(fallbackCalled, is(true));
assertTrue(items.isEmpty());
fallbackCalled.set(false);

cb.<Message<String>>executeWithFallback(
future -> vertx.eventBus().<String>request("fail", "").onComplete(future),
t -> {
fallbackCalled.set(true);
return null;
}
);
AtomicReference<String> str = new AtomicReference<>();
cb.executeWithFallback(
promise -> vertx.eventBus().<String>request("ok", "").map(Message::body).onComplete(promise),
t -> "KO"
).onComplete(ar -> str.set(ar.result()));
await().untilAtomic(str, is(equalTo("OK")));

await().untilAtomic(fallbackCalled, is(true));
assertTrue(items.isEmpty());
fallbackCalled.set(false);
str.set(null);
cb.executeWithFallback(
promise -> vertx.eventBus().<String>request("timeout", "").map(Message::body).onComplete(promise),
t -> "KO"
).onComplete(ar -> str.set(ar.result()));
await().untilAtomic(str, is(equalTo("KO")));

cb.<Message<String>>executeWithFallback(
future -> vertx.eventBus().<String>request("exception", "").onComplete(future),
t -> {
fallbackCalled.set(true);
return null;
}
);
str.set(null);
cb.executeWithFallback(
promise -> vertx.eventBus().<String>request("fail", "").map(Message::body).onComplete(promise),
t -> "KO"
).onComplete(ar -> str.set(ar.result()));
await().untilAtomic(str, is(equalTo("KO")));

await().untilAtomic(fallbackCalled, is(true));
assertTrue(items.isEmpty());
fallbackCalled.set(false);
str.set(null);
cb.executeWithFallback(
promise -> vertx.eventBus().<String>request("exception", "").map(Message::body).onComplete(promise),
t -> "KO"
).onComplete(ar -> str.set(ar.result()));
await().untilAtomic(str, is(equalTo("KO")));
}
}

0 comments on commit 8abdf62

Please sign in to comment.