diff --git a/gateleen-cache/pom.xml b/gateleen-cache/pom.xml index f729e0c4e..27015fe09 100644 --- a/gateleen-cache/pom.xml +++ b/gateleen-cache/pom.xml @@ -6,7 +6,7 @@ org.swisspush.gateleen gateleen - 2.1.13-SNAPSHOT + 2.1.14-SNAPSHOT gateleen-cache diff --git a/gateleen-core/pom.xml b/gateleen-core/pom.xml index a1374ed42..8d0d3f219 100644 --- a/gateleen-core/pom.xml +++ b/gateleen-core/pom.xml @@ -6,7 +6,7 @@ org.swisspush.gateleen gateleen - 2.1.13-SNAPSHOT + 2.1.14-SNAPSHOT gateleen-core diff --git a/gateleen-core/src/main/java/org/swisspush/gateleen/core/util/LockUtil.java b/gateleen-core/src/main/java/org/swisspush/gateleen/core/util/LockUtil.java index 705e03d2a..85f8e2148 100644 --- a/gateleen-core/src/main/java/org/swisspush/gateleen/core/util/LockUtil.java +++ b/gateleen-core/src/main/java/org/swisspush/gateleen/core/util/LockUtil.java @@ -80,4 +80,14 @@ public void releaseLock(Lock lockImpl, String lock, String token, Logger log){ } }); } + + /** + * Calculate the lock expiry time. This is a simple helper to work with the lock expiry time. + * + * @param taskInterval the interval of the task + * @return the calculated lock expiry time + */ + public static long calcLockExpiry(long taskInterval) { + return taskInterval <= 1 ? 1 : taskInterval / 2; + } } diff --git a/gateleen-core/src/test/java/org/swisspush/gateleen/core/util/LockUtilTest.java b/gateleen-core/src/test/java/org/swisspush/gateleen/core/util/LockUtilTest.java index bb0598753..293010daa 100644 --- a/gateleen-core/src/test/java/org/swisspush/gateleen/core/util/LockUtilTest.java +++ b/gateleen-core/src/test/java/org/swisspush/gateleen/core/util/LockUtilTest.java @@ -33,6 +33,20 @@ public void setUp(){ lockUtil = new LockUtil(newGateleenWastefulExceptionFactory()); } + @Test + public void testCalculateLockExpiry(TestContext context) { + context.assertEquals(1L, LockUtil.calcLockExpiry(1)); + context.assertEquals(1L, LockUtil.calcLockExpiry(0)); + context.assertEquals(1L, LockUtil.calcLockExpiry(-20)); + context.assertEquals(1L, LockUtil.calcLockExpiry(2)); + context.assertEquals(1L, LockUtil.calcLockExpiry(3)); + context.assertEquals(2L, LockUtil.calcLockExpiry(4)); + context.assertEquals(4L, LockUtil.calcLockExpiry(8)); + context.assertEquals(32L, LockUtil.calcLockExpiry(64)); + context.assertEquals(750L, LockUtil.calcLockExpiry(1500)); + context.assertEquals(5000L, LockUtil.calcLockExpiry(10001)); + } + @Test public void testAcquireLockWithoutLockImplementationDefined(TestContext context) { Async async = context.async(); diff --git a/gateleen-delegate/pom.xml b/gateleen-delegate/pom.xml index 1e5284186..a2dfd2396 100644 --- a/gateleen-delegate/pom.xml +++ b/gateleen-delegate/pom.xml @@ -6,7 +6,7 @@ org.swisspush.gateleen gateleen - 2.1.13-SNAPSHOT + 2.1.14-SNAPSHOT gateleen-delegate diff --git a/gateleen-delta/pom.xml b/gateleen-delta/pom.xml index fd1ef7530..b3c0c8eb5 100644 --- a/gateleen-delta/pom.xml +++ b/gateleen-delta/pom.xml @@ -6,7 +6,7 @@ org.swisspush.gateleen gateleen - 2.1.13-SNAPSHOT + 2.1.14-SNAPSHOT gateleen-delta diff --git a/gateleen-expansion/pom.xml b/gateleen-expansion/pom.xml index 565ac09d1..7f2a24356 100644 --- a/gateleen-expansion/pom.xml +++ b/gateleen-expansion/pom.xml @@ -6,7 +6,7 @@ org.swisspush.gateleen gateleen - 2.1.13-SNAPSHOT + 2.1.14-SNAPSHOT gateleen-expansion diff --git a/gateleen-hook-js/pom.xml b/gateleen-hook-js/pom.xml index d09e3486f..f096d81d7 100644 --- a/gateleen-hook-js/pom.xml +++ b/gateleen-hook-js/pom.xml @@ -4,7 +4,7 @@ org.swisspush.gateleen gateleen - 2.1.13-SNAPSHOT + 2.1.14-SNAPSHOT gateleen-hook-js jar diff --git a/gateleen-hook/pom.xml b/gateleen-hook/pom.xml index 5931d2bc6..ad5e6de27 100644 --- a/gateleen-hook/pom.xml +++ b/gateleen-hook/pom.xml @@ -6,7 +6,7 @@ org.swisspush.gateleen gateleen - 2.1.13-SNAPSHOT + 2.1.14-SNAPSHOT gateleen-hook diff --git a/gateleen-kafka/pom.xml b/gateleen-kafka/pom.xml index dd6bf50e2..bdb565432 100644 --- a/gateleen-kafka/pom.xml +++ b/gateleen-kafka/pom.xml @@ -6,7 +6,7 @@ org.swisspush.gateleen gateleen - 2.1.13-SNAPSHOT + 2.1.14-SNAPSHOT gateleen-kafka diff --git a/gateleen-logging/pom.xml b/gateleen-logging/pom.xml index 74b30ea88..bc5fcc179 100644 --- a/gateleen-logging/pom.xml +++ b/gateleen-logging/pom.xml @@ -6,7 +6,7 @@ org.swisspush.gateleen gateleen - 2.1.13-SNAPSHOT + 2.1.14-SNAPSHOT gateleen-logging diff --git a/gateleen-merge/pom.xml b/gateleen-merge/pom.xml index 6aae5e51f..60e6c3836 100644 --- a/gateleen-merge/pom.xml +++ b/gateleen-merge/pom.xml @@ -6,7 +6,7 @@ org.swisspush.gateleen gateleen - 2.1.13-SNAPSHOT + 2.1.14-SNAPSHOT gateleen-merge diff --git a/gateleen-monitoring/pom.xml b/gateleen-monitoring/pom.xml index a525247ee..d33b8fbbd 100644 --- a/gateleen-monitoring/pom.xml +++ b/gateleen-monitoring/pom.xml @@ -6,7 +6,7 @@ org.swisspush.gateleen gateleen - 2.1.13-SNAPSHOT + 2.1.14-SNAPSHOT gateleen-monitoring diff --git a/gateleen-packing/pom.xml b/gateleen-packing/pom.xml index e896dea69..fdbefb74f 100644 --- a/gateleen-packing/pom.xml +++ b/gateleen-packing/pom.xml @@ -6,7 +6,7 @@ org.swisspush.gateleen gateleen - 2.1.13-SNAPSHOT + 2.1.14-SNAPSHOT gateleen-packing diff --git a/gateleen-player/pom.xml b/gateleen-player/pom.xml index ac3dc8e79..e9b869bfd 100644 --- a/gateleen-player/pom.xml +++ b/gateleen-player/pom.xml @@ -6,7 +6,7 @@ org.swisspush.gateleen gateleen - 2.1.13-SNAPSHOT + 2.1.14-SNAPSHOT gateleen-player diff --git a/gateleen-playground/pom.xml b/gateleen-playground/pom.xml index f47fd876b..010778a03 100644 --- a/gateleen-playground/pom.xml +++ b/gateleen-playground/pom.xml @@ -6,7 +6,7 @@ org.swisspush.gateleen gateleen - 2.1.13-SNAPSHOT + 2.1.14-SNAPSHOT gateleen-playground @@ -103,6 +103,21 @@ gateleen-kafka ${project.version} + + io.vertx + vertx-micrometer-metrics + ${vertx.version} + + + io.micrometer + micrometer-core + ${micrometer.version} + + + io.micrometer + micrometer-registry-prometheus + ${micrometer.version} + org.swisspush redisques diff --git a/gateleen-qos/pom.xml b/gateleen-qos/pom.xml index 8ca94b3e2..426c0c206 100644 --- a/gateleen-qos/pom.xml +++ b/gateleen-qos/pom.xml @@ -6,7 +6,7 @@ org.swisspush.gateleen gateleen - 2.1.13-SNAPSHOT + 2.1.14-SNAPSHOT gateleen-qos diff --git a/gateleen-queue/README_queue.md b/gateleen-queue/README_queue.md index 0b2617756..7aaa04a08 100644 --- a/gateleen-queue/README_queue.md +++ b/gateleen-queue/README_queue.md @@ -303,6 +303,7 @@ The result will be a json object with the circuit information like the example b "status": "closed", "info": { "failRatio": 15, + "metric": "server-tests", "circuit": "/playground/server/tests/(.*)" } } @@ -323,6 +324,7 @@ The result will be a json object with the information of all circuits like the e "myCircuitHash": { "infos": { "failRatio": 15, + "metric": "server-tests", "circuit": "/playground/server/tests/(.*)" }, "status": "closed" diff --git a/gateleen-queue/pom.xml b/gateleen-queue/pom.xml index 99204819e..8d627c2b0 100644 --- a/gateleen-queue/pom.xml +++ b/gateleen-queue/pom.xml @@ -6,7 +6,7 @@ org.swisspush.gateleen gateleen - 2.1.13-SNAPSHOT + 2.1.14-SNAPSHOT gateleen-queue diff --git a/gateleen-queue/src/main/java/org/swisspush/gateleen/queue/queuing/circuitbreaker/api/QueueCircuitBreakerHttpRequestHandler.java b/gateleen-queue/src/main/java/org/swisspush/gateleen/queue/queuing/circuitbreaker/api/QueueCircuitBreakerHttpRequestHandler.java index 12661461a..1033ee4c7 100644 --- a/gateleen-queue/src/main/java/org/swisspush/gateleen/queue/queuing/circuitbreaker/api/QueueCircuitBreakerHttpRequestHandler.java +++ b/gateleen-queue/src/main/java/org/swisspush/gateleen/queue/queuing/circuitbreaker/api/QueueCircuitBreakerHttpRequestHandler.java @@ -291,7 +291,7 @@ private void handleGetCircuitStatus(Message message) { private void handleCloseCircuit(Message message) { String circuitHash = message.body().getJsonObject(PAYLOAD).getString(CIRCUIT_HASH); - PatternAndCircuitHash patternAndCircuitHash = new PatternAndCircuitHash(null, circuitHash); + PatternAndCircuitHash patternAndCircuitHash = new PatternAndCircuitHash(null, circuitHash, null); storage.closeCircuit(patternAndCircuitHash).onComplete(event -> { if (event.failed()) { message.reply(new JsonObject().put(STATUS, ERROR).put(MESSAGE, event.cause().getMessage())); diff --git a/gateleen-queue/src/main/java/org/swisspush/gateleen/queue/queuing/circuitbreaker/impl/QueueCircuitBreakerImpl.java b/gateleen-queue/src/main/java/org/swisspush/gateleen/queue/queuing/circuitbreaker/impl/QueueCircuitBreakerImpl.java index fbd883d42..55c3516a1 100644 --- a/gateleen-queue/src/main/java/org/swisspush/gateleen/queue/queuing/circuitbreaker/impl/QueueCircuitBreakerImpl.java +++ b/gateleen-queue/src/main/java/org/swisspush/gateleen/queue/queuing/circuitbreaker/impl/QueueCircuitBreakerImpl.java @@ -452,7 +452,7 @@ public Future unlockSampleQueues() { failedFutures.add(event1.cause().getMessage()); } if (futureCounter.get() == 0) { - if (failedFutures.size() > 0) { + if (!failedFutures.isEmpty()) { promise.fail("The following queues could not be unlocked: " + failedFutures); } else { promise.complete((long) queuesToUnlock.size()); diff --git a/gateleen-queue/src/main/java/org/swisspush/gateleen/queue/queuing/circuitbreaker/impl/RedisQueueCircuitBreakerStorage.java b/gateleen-queue/src/main/java/org/swisspush/gateleen/queue/queuing/circuitbreaker/impl/RedisQueueCircuitBreakerStorage.java index 1746916f6..50f7181a2 100644 --- a/gateleen-queue/src/main/java/org/swisspush/gateleen/queue/queuing/circuitbreaker/impl/RedisQueueCircuitBreakerStorage.java +++ b/gateleen-queue/src/main/java/org/swisspush/gateleen/queue/queuing/circuitbreaker/impl/RedisQueueCircuitBreakerStorage.java @@ -1,6 +1,5 @@ package org.swisspush.gateleen.queue.queuing.circuitbreaker.impl; -import io.vertx.core.CompositeFuture; import io.vertx.core.Future; import io.vertx.core.Promise; import io.vertx.core.json.JsonObject; @@ -38,8 +37,10 @@ public class RedisQueueCircuitBreakerStorage implements QueueCircuitBreakerStora public static final String STORAGE_OPEN_CIRCUITS = STORAGE_PREFIX + "open-circuits"; public static final String STORAGE_QUEUES_TO_UNLOCK = STORAGE_PREFIX + "queues-to-unlock"; public static final String FIELD_STATE = "state"; + public static final String FIELD_STATUS = "status"; public static final String FIELD_FAILRATIO = "failRatio"; public static final String FIELD_CIRCUIT = "circuit"; + public static final String FIELD_METRICNAME = "metricName"; private final LuaScriptState openCircuitLuaScriptState; private final LuaScriptState closeCircuitLuaScriptState; @@ -99,7 +100,7 @@ public Future getQueueCircuitState(String circuitHash) { public Future getQueueCircuitInformation(String circuitHash) { Promise promise = Promise.promise(); redisProvider.redis().onSuccess(redisAPI -> redisAPI.hmget(Arrays.asList(buildInfosKey(circuitHash), FIELD_STATE, - FIELD_FAILRATIO, FIELD_CIRCUIT), event -> { + FIELD_FAILRATIO, FIELD_CIRCUIT, FIELD_METRICNAME), event -> { if (event.failed()) { promise.fail(event.cause()); } else { @@ -108,8 +109,9 @@ public Future getQueueCircuitInformation(String circuitHash) { QueueCircuitState.CLOSED); String failRatioStr = Objects.toString(event.result().get(1), null); String circuit = Objects.toString(event.result().get(2), null); + String metric = Objects.toString(event.result().get(3), null); JsonObject result = new JsonObject(); - result.put("status", state.name().toLowerCase()); + result.put(FIELD_STATUS, state.name().toLowerCase()); JsonObject info = new JsonObject(); if (failRatioStr != null) { info.put(FIELD_FAILRATIO, Integer.valueOf(failRatioStr)); @@ -117,6 +119,9 @@ public Future getQueueCircuitInformation(String circuitHash) { if (circuit != null) { info.put(FIELD_CIRCUIT, circuit); } + if (StringUtils.isNotEmptyTrimmed(metric)) { + info.put(FIELD_METRICNAME, metric); + } result.put("info", info); promise.complete(result); } catch (Exception e) { @@ -156,6 +161,7 @@ public Future updateStatistics(PatternAndCircuitHash pat List arguments = Arrays.asList( uniqueRequestID, patternAndCircuitHash.getPattern().pattern(), + patternAndCircuitHash.getMetricName() != null ? patternAndCircuitHash.getMetricName() : "", patternAndCircuitHash.getCircuitHash(), String.valueOf(timestamp), String.valueOf(errorThresholdPercentage), @@ -240,7 +246,7 @@ public Future closeAllCircuits() { Future closeOpenCircuitsFuture = closeCircuitsByKey(STORAGE_OPEN_CIRCUITS); Future closeHalfOpenCircuitsFuture = closeCircuitsByKey(STORAGE_HALFOPEN_CIRCUITS); - CompositeFuture.all(closeOpenCircuitsFuture, closeHalfOpenCircuitsFuture).onComplete(event -> { + Future.all(closeOpenCircuitsFuture, closeHalfOpenCircuitsFuture).onComplete(event -> { if (event.succeeded()) { promise.complete(); } else { @@ -255,14 +261,14 @@ private Future closeCircuitsByKey(String key) { Promise promise = Promise.promise(); redisProvider.redis().onSuccess(redisAPI -> redisAPI.smembers(key, event -> { if (event.succeeded()) { - List promises = new ArrayList<>(); + List> promises = new ArrayList<>(); for (Response circuit : event.result()) { promises.add(closeCircuit(circuit.toString(), false)); } - if (promises.size() == 0) { + if (promises.isEmpty()) { promise.complete(); } else { - CompositeFuture.all(promises).onComplete(event1 -> { + Future.all(promises).onComplete(event1 -> { if (event1.succeeded()) { promise.complete(); } else { diff --git a/gateleen-queue/src/main/java/org/swisspush/gateleen/queue/queuing/circuitbreaker/monitoring/QueueCircuitBreakerMetricsCollector.java b/gateleen-queue/src/main/java/org/swisspush/gateleen/queue/queuing/circuitbreaker/monitoring/QueueCircuitBreakerMetricsCollector.java new file mode 100644 index 000000000..666d375ba --- /dev/null +++ b/gateleen-queue/src/main/java/org/swisspush/gateleen/queue/queuing/circuitbreaker/monitoring/QueueCircuitBreakerMetricsCollector.java @@ -0,0 +1,176 @@ +package org.swisspush.gateleen.queue.queuing.circuitbreaker.monitoring; + +import io.micrometer.core.instrument.Gauge; +import io.micrometer.core.instrument.MeterRegistry; +import io.vertx.core.*; +import io.vertx.core.json.JsonObject; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.swisspush.gateleen.core.exception.GateleenExceptionFactory; +import org.swisspush.gateleen.core.lock.Lock; +import org.swisspush.gateleen.core.util.Address; +import org.swisspush.gateleen.core.util.LockUtil; +import org.swisspush.gateleen.queue.queuing.circuitbreaker.QueueCircuitBreakerStorage; +import org.swisspush.gateleen.queue.queuing.circuitbreaker.util.QueueCircuitState; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Function; + +import static org.swisspush.gateleen.core.util.LockUtil.acquireLock; +import static org.swisspush.gateleen.core.util.LockUtil.calcLockExpiry; +import static org.swisspush.gateleen.queue.queuing.circuitbreaker.impl.RedisQueueCircuitBreakerStorage.*; + +/** + * Class responsible for collecting metrics for the Queue Circuit Breaker. + * + * @author https://github.com/mcweba [Marc-Andre Weber] + */ +public class QueueCircuitBreakerMetricsCollector { + + private final Logger log = LoggerFactory.getLogger(QueueCircuitBreakerMetricsCollector.class); + + private final Lock lock; + private final LockUtil lockUtil; + + public static final String COLLECT_METRICS_TASK_LOCK = "collectCircuitBreakerMetrics"; + public static final String CIRCUIT_BREAKER_STATUS_METRIC = "gateleen.circuitbreaker.status"; + public static final String CIRCUIT_BREAKER_FAILRATIO_METRIC = "gateleen.circuitbreaker.failratio"; + + private final QueueCircuitBreakerStorage queueCircuitBreakerStorage; + private final MeterRegistry meterRegistry; + private final long metricCollectionIntervalMs; + + private final Map circuitStateMap = new HashMap<>(); + private final Map circuitFailRatioMap = new HashMap<>(); + + /** + * Constructor for QueueCircuitBreakerMetricsCollector. + * + * @param vertx Vertx instance + * @param lock Lock instance + * @param queueCircuitBreakerStorage Storage for circuit breaker data + * @param meterRegistry Meter registry for metrics + * @param exceptionFactory Exception factory + * @param metricCollectionIntervalSeconds Interval for metric collection in seconds + */ + public QueueCircuitBreakerMetricsCollector(Vertx vertx, Lock lock, QueueCircuitBreakerStorage queueCircuitBreakerStorage, + MeterRegistry meterRegistry, GateleenExceptionFactory exceptionFactory, + long metricCollectionIntervalSeconds) { + this.lock = lock; + this.lockUtil = new LockUtil(exceptionFactory); + this.queueCircuitBreakerStorage = queueCircuitBreakerStorage; + this.meterRegistry = meterRegistry; + + this.metricCollectionIntervalMs = metricCollectionIntervalSeconds * 1000; + + vertx.setPeriodic(metricCollectionIntervalMs, event -> collectMetrics() + .onFailure(event1 -> log.error("Could not collect metrics. Message: {}", event1.getMessage()))); + } + + /** + * Collects metrics for the Queue Circuit Breaker. + * + * @return Future representing the completion of the metric collection + */ + public Future collectMetrics() { + log.debug("Collecting metrics"); + Promise promise = Promise.promise(); + final String token = createToken(); + acquireLock(lock, COLLECT_METRICS_TASK_LOCK, token, calcLockExpiry(metricCollectionIntervalMs), log).onComplete(lockEvent -> { + if (lockEvent.succeeded()) { + if (lockEvent.result()) { + handleMetricsCollection(token).onComplete(event -> { + if (event.succeeded()) { + promise.complete(); + } else { + promise.fail(event.cause()); + } + }); + } else { + promise.complete(); + } + } else { + log.error("Could not acquire lock '{}'. Message: {}", COLLECT_METRICS_TASK_LOCK, lockEvent.cause().getMessage()); + promise.fail(lockEvent.cause().getMessage()); + } + }); + return promise.future(); + } + + private Future handleMetricsCollection(String token) { + return queueCircuitBreakerStorage.getAllCircuits().compose((Function>) entries -> { + extractMetricsFromCircuitsObject(entries); + return Future.succeededFuture(); + }).andThen(event -> lockUtil.releaseLock(lock, COLLECT_METRICS_TASK_LOCK, token, log)); + } + + private void extractMetricsFromCircuitsObject(JsonObject circuits) { + circuits.stream().forEach(entry -> { + String circuitName = entry.getKey(); + JsonObject circuitValue = (JsonObject) entry.getValue(); + QueueCircuitState queueCircuitState = QueueCircuitState.fromString(circuitValue.getString(FIELD_STATUS), null); + if (queueCircuitState == null) { + log.warn("No status found for circuit '{}'", circuitName); + return; + } + + JsonObject infos = circuitValue.getJsonObject("infos"); + if (infos != null) { + String metric = infos.getString(FIELD_METRICNAME); + Integer failRatio = infos.getInteger(FIELD_FAILRATIO); + if (metric != null && failRatio != null) { + publishMetric(metric, queueCircuitState, failRatio); + } + } + }); + } + + private void publishMetric(String metricName, QueueCircuitState queueCircuitState, int failRatio) { + Integer stateValue = circuitStateToValue(queueCircuitState); + if(stateValue != null) { + getCircuitStateMeter(metricName).set(stateValue); + } + getCircuitFailRatioMeter(metricName).set(failRatio); + } + + private String createToken() { + return Address.instanceAddress() + "_" + System.currentTimeMillis() + "_" + COLLECT_METRICS_TASK_LOCK; + } + + private AtomicInteger getCircuitStateMeter(String metricName) { + return circuitStateMap.computeIfAbsent(metricName, key -> { + AtomicInteger newMeterValue = new AtomicInteger(); + Gauge.builder(CIRCUIT_BREAKER_STATUS_METRIC, newMeterValue, AtomicInteger::get) + .description("Status of the circuit, 0=CLOSED, 1=HALF_OPEN, 2=OPEN") + .tag("metricName", metricName) + .register(meterRegistry); + return newMeterValue; + }); + } + + private AtomicInteger getCircuitFailRatioMeter(String metricName) { + return circuitFailRatioMap.computeIfAbsent(metricName, key -> { + AtomicInteger newMeterValue = new AtomicInteger(); + Gauge.builder(CIRCUIT_BREAKER_FAILRATIO_METRIC, newMeterValue, AtomicInteger::get) + .description("Fail ratio of the circuit in percentage") + .tag("metricName", metricName) + .register(meterRegistry); + return newMeterValue; + }); + } + + private Integer circuitStateToValue(QueueCircuitState queueCircuitState) { + switch (queueCircuitState) { + case CLOSED: + return 0; + case HALF_OPEN: + return 1; + case OPEN: + return 2; + default: + return null; + } + } +} diff --git a/gateleen-queue/src/main/java/org/swisspush/gateleen/queue/queuing/circuitbreaker/util/PatternAndCircuitHash.java b/gateleen-queue/src/main/java/org/swisspush/gateleen/queue/queuing/circuitbreaker/util/PatternAndCircuitHash.java index 321d3ef74..d67675983 100644 --- a/gateleen-queue/src/main/java/org/swisspush/gateleen/queue/queuing/circuitbreaker/util/PatternAndCircuitHash.java +++ b/gateleen-queue/src/main/java/org/swisspush/gateleen/queue/queuing/circuitbreaker/util/PatternAndCircuitHash.java @@ -1,5 +1,7 @@ package org.swisspush.gateleen.queue.queuing.circuitbreaker.util; +import javax.annotation.Nullable; +import java.util.Objects; import java.util.regex.Pattern; /** @@ -11,10 +13,12 @@ public class PatternAndCircuitHash { private final Pattern pattern; private final String circuitHash; + private final String metricName; - public PatternAndCircuitHash(Pattern pattern, String circuitHash) { + public PatternAndCircuitHash(@Nullable Pattern pattern, String circuitHash, @Nullable String metricName) { this.pattern = pattern; this.circuitHash = circuitHash; + this.metricName = metricName; } public Pattern getPattern() { @@ -25,27 +29,31 @@ public String getCircuitHash() { return circuitHash; } + public String getMetricName() { + return metricName; + } + @Override public String toString() { - return "url pattern: " + pattern.pattern() + " circuit hash: " + circuitHash; + return "PatternAndCircuitHash{" + + "pattern=" + pattern + + ", circuitHash='" + circuitHash + '\'' + + ", metricName='" + metricName + '\'' + + '}'; } @Override public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; - PatternAndCircuitHash that = (PatternAndCircuitHash) o; - - if (!pattern.pattern().equals(that.pattern.pattern())) return false; - return circuitHash.equals(that.circuitHash); - + return Objects.equals(pattern != null ? pattern.pattern() : null, + that.pattern != null ? that.pattern.pattern() : null) && + circuitHash.equals(that.circuitHash); } @Override public int hashCode() { - int result = pattern.hashCode(); - result = 31 * result + circuitHash.hashCode(); - return result; + return Objects.hash(pattern, circuitHash); } } diff --git a/gateleen-queue/src/main/java/org/swisspush/gateleen/queue/queuing/circuitbreaker/util/QueueCircuitBreakerRulePatternToCircuitMapping.java b/gateleen-queue/src/main/java/org/swisspush/gateleen/queue/queuing/circuitbreaker/util/QueueCircuitBreakerRulePatternToCircuitMapping.java index 9ac19f372..6f98754f3 100644 --- a/gateleen-queue/src/main/java/org/swisspush/gateleen/queue/queuing/circuitbreaker/util/QueueCircuitBreakerRulePatternToCircuitMapping.java +++ b/gateleen-queue/src/main/java/org/swisspush/gateleen/queue/queuing/circuitbreaker/util/QueueCircuitBreakerRulePatternToCircuitMapping.java @@ -66,7 +66,7 @@ private PatternAndCircuitHash getPatternAndCircuitHashFromRule(Rule rule){ try { Pattern pattern = Pattern.compile(rule.getUrlPattern()); String circuitHash = HashCodeGenerator.createHashCode(rule.getUrlPattern()); - return new PatternAndCircuitHash(pattern, circuitHash); + return new PatternAndCircuitHash(pattern, circuitHash, rule.getMetricName()); } catch (Exception e) { log.error("Could not compile the regex:{} to a pattern.", rule.getUrlPattern()); return null; diff --git a/gateleen-queue/src/main/resources/circuitbreaker_getAllCircuits.lua b/gateleen-queue/src/main/resources/circuitbreaker_getAllCircuits.lua index d1908c1d2..219d0270e 100644 --- a/gateleen-queue/src/main/resources/circuitbreaker_getAllCircuits.lua +++ b/gateleen-queue/src/main/resources/circuitbreaker_getAllCircuits.lua @@ -1,6 +1,7 @@ local stateField = "state" local failRatioField = "failRatio" local circuitField = "circuit" +local metricNameField = "metricName" local allCircuitsKey = KEYS[1] @@ -8,7 +9,7 @@ local circuitInfoKeyPrefix = ARGV[1] local circuitInfoKeySuffix = ARGV[2] local function getCircuitInfos(circuit) - return redis.call('hmget',circuitInfoKeyPrefix..circuit..circuitInfoKeySuffix,stateField,circuitField,failRatioField) + return redis.call('hmget',circuitInfoKeyPrefix..circuit..circuitInfoKeySuffix,stateField,circuitField,metricNameField,failRatioField) end local function string_not_empty(s) @@ -29,7 +30,10 @@ for k, circuit in ipairs(allCircuits) do result[circuit].infos.circuit = fields[2] end if string_not_empty(fields[3]) then - result[circuit].infos.failRatio = tonumber(fields[3]) + result[circuit].infos.metricName = fields[3] + end + if string_not_empty(fields[4]) then + result[circuit].infos.failRatio = tonumber(fields[4]) end end diff --git a/gateleen-queue/src/main/resources/circuitbreaker_update.lua b/gateleen-queue/src/main/resources/circuitbreaker_update.lua index 7d8314779..aa9ce5b1f 100644 --- a/gateleen-queue/src/main/resources/circuitbreaker_update.lua +++ b/gateleen-queue/src/main/resources/circuitbreaker_update.lua @@ -1,6 +1,7 @@ local stateField = "state" local failRatioField = "failRatio" local circuitField = "circuit" +local metricNameField = "metricName" local circuitInfoKey = KEYS[1] local circuitSuccessKey = KEYS[2] local circuitFailureKey = KEYS[3] @@ -10,12 +11,13 @@ local allCircuitsKey = KEYS[6] local requestID = ARGV[1] local circuit = ARGV[2] -local circuitHash = ARGV[3] -local requestTS = tonumber(ARGV[4]) -local errorThresholdPercentage = tonumber(ARGV[5]) -local entriesMaxAgeMS = tonumber(ARGV[6]) -local minQueueSampleCount = tonumber(ARGV[7]) -local maxQueueSampleCount = tonumber(ARGV[8]) +local metricName = ARGV[3] +local circuitHash = ARGV[4] +local requestTS = tonumber(ARGV[5]) +local errorThresholdPercentage = tonumber(ARGV[6]) +local entriesMaxAgeMS = tonumber(ARGV[7]) +local minQueueSampleCount = tonumber(ARGV[8]) +local maxQueueSampleCount = tonumber(ARGV[9]) local return_value = "OK" local minScore = requestTS - entriesMaxAgeMS @@ -24,6 +26,8 @@ local minScore = requestTS - entriesMaxAgeMS redis.call('zadd',circuitKeyToUpdate,requestTS,requestID) -- write circuit pattern to infos redis.call('hsetnx',circuitInfoKey, circuitField, circuit) +-- write metricName to infos +redis.call('hsetnx',circuitInfoKey, metricNameField, metricName) -- add circuit to all circuits set redis.call('sadd',allCircuitsKey,circuitHash) diff --git a/gateleen-queue/src/test/java/org/swisspush/gateleen/queue/queuing/circuitbreaker/api/QueueCircuitBreakerHttpRequestHandlerTest.java b/gateleen-queue/src/test/java/org/swisspush/gateleen/queue/queuing/circuitbreaker/api/QueueCircuitBreakerHttpRequestHandlerTest.java index a33336bb1..32f6d6144 100644 --- a/gateleen-queue/src/test/java/org/swisspush/gateleen/queue/queuing/circuitbreaker/api/QueueCircuitBreakerHttpRequestHandlerTest.java +++ b/gateleen-queue/src/test/java/org/swisspush/gateleen/queue/queuing/circuitbreaker/api/QueueCircuitBreakerHttpRequestHandlerTest.java @@ -84,6 +84,7 @@ public void testGetQueueCircuitInformationSuccess(TestContext context) { JsonObject info = new JsonObject(); info.put("failRatio", 99); info.put("circuit", "/path/of/circuit"); + info.put("metric", "my-metric-1"); result.put("info", info); Mockito.when(queueCircuitBreakerStorage.getQueueCircuitInformation(anyString())) @@ -97,6 +98,34 @@ public void testGetQueueCircuitInformationSuccess(TestContext context) { context.assertEquals(QueueCircuitState.HALF_OPEN.name(), payload.getString(STATUS)); context.assertEquals(99, payload.getJsonObject("info").getInteger("failRatio")); context.assertEquals("/path/of/circuit", payload.getJsonObject("info").getString("circuit")); + context.assertEquals("my-metric-1", payload.getJsonObject("info").getString("metric")); + async.complete(); + }); + } + + @Test + public void testGetQueueCircuitInformationSuccessWithoutMetricName(TestContext context) { + Async async = context.async(); + + JsonObject result = new JsonObject(); + result.put("status", QueueCircuitState.HALF_OPEN.name()); + JsonObject info = new JsonObject(); + info.put("failRatio", 99); + info.put("circuit", "/path/of/circuit"); + result.put("info", info); + + Mockito.when(queueCircuitBreakerStorage.getQueueCircuitInformation(anyString())) + .thenReturn(Future.succeededFuture(result)); + + vertx.eventBus().request(HTTP_REQUEST_API_ADDRESS, QueueCircuitBreakerAPI.buildGetCircuitInformationOperation("someCircuit"), + (Handler>>) reply -> { + JsonObject replyBody = reply.result().body(); + context.assertEquals(OK, replyBody.getString(STATUS)); + JsonObject payload = replyBody.getJsonObject(VALUE); + context.assertEquals(QueueCircuitState.HALF_OPEN.name(), payload.getString(STATUS)); + context.assertEquals(99, payload.getJsonObject("info").getInteger("failRatio")); + context.assertEquals("/path/of/circuit", payload.getJsonObject("info").getString("circuit")); + context.assertNull(payload.getJsonObject("info").getString("metric")); async.complete(); }); } diff --git a/gateleen-queue/src/test/java/org/swisspush/gateleen/queue/queuing/circuitbreaker/impl/QueueCircuitBreakerImplTest.java b/gateleen-queue/src/test/java/org/swisspush/gateleen/queue/queuing/circuitbreaker/impl/QueueCircuitBreakerImplTest.java index d6dcb9916..6cb839e99 100644 --- a/gateleen-queue/src/test/java/org/swisspush/gateleen/queue/queuing/circuitbreaker/impl/QueueCircuitBreakerImplTest.java +++ b/gateleen-queue/src/test/java/org/swisspush/gateleen/queue/queuing/circuitbreaker/impl/QueueCircuitBreakerImplTest.java @@ -176,7 +176,7 @@ public void testHandleQueuedRequest(TestContext context) { HttpRequest req = new HttpRequest(HttpMethod.PUT, "/playground/circuitBreaker/test", MultiMap.caseInsensitiveMultiMap(), null); Mockito.when(ruleToCircuitMapping.getCircuitFromRequestUri(anyString())) - .thenReturn(new PatternAndCircuitHash(Pattern.compile("/someCircuit"), "someCircuitHash")); + .thenReturn(new PatternAndCircuitHash(Pattern.compile("/someCircuit"), "someCircuitHash", "my-metric-1")); Mockito.when(queueCircuitBreakerStorage.getQueueCircuitState(any(PatternAndCircuitHash.class))) .thenReturn(Future.succeededFuture(QueueCircuitState.CLOSED)); @@ -206,7 +206,7 @@ public void testHandleQueuedRequestCallsLockQueueWhenCircuitIsOpen(TestContext c HttpRequest req = new HttpRequest(HttpMethod.PUT, "/playground/circuitBreaker/test", MultiMap.caseInsensitiveMultiMap(), null); Mockito.when(ruleToCircuitMapping.getCircuitFromRequestUri(anyString())) - .thenReturn(new PatternAndCircuitHash(Pattern.compile("/someCircuit"), "someCircuitHash")); + .thenReturn(new PatternAndCircuitHash(Pattern.compile("/someCircuit"), "someCircuitHash", "my-metric-1")); Mockito.when(queueCircuitBreakerStorage.getQueueCircuitState(any(PatternAndCircuitHash.class))) .thenReturn(Future.succeededFuture(QueueCircuitState.OPEN)); @@ -229,7 +229,7 @@ public void testHandleQueuedRequestDoesNotCallLockQueueWhenCircuitIsNotOpen(Test HttpRequest req = new HttpRequest(HttpMethod.PUT, "/playground/circuitBreaker/test", MultiMap.caseInsensitiveMultiMap(), null); Mockito.when(ruleToCircuitMapping.getCircuitFromRequestUri(anyString())) - .thenReturn(new PatternAndCircuitHash(Pattern.compile("/someCircuit"), "someCircuitHash")); + .thenReturn(new PatternAndCircuitHash(Pattern.compile("/someCircuit"), "someCircuitHash", "my-metric-1")); Mockito.when(queueCircuitBreakerStorage.getQueueCircuitState(any(PatternAndCircuitHash.class))) .thenReturn(Future.succeededFuture(QueueCircuitState.CLOSED)); @@ -274,7 +274,7 @@ public void testUpdateStatistics(TestContext context) { HttpRequest req = new HttpRequest(HttpMethod.PUT, "/playground/circuitBreaker/test", MultiMap.caseInsensitiveMultiMap(), null); Mockito.when(ruleToCircuitMapping.getCircuitFromRequestUri(anyString())) - .thenReturn(new PatternAndCircuitHash(Pattern.compile("/someCircuit"), "someCircuitHash")); + .thenReturn(new PatternAndCircuitHash(Pattern.compile("/someCircuit"), "someCircuitHash", "my-metric-1")); Mockito.when(queueCircuitBreakerStorage.updateStatistics(any(PatternAndCircuitHash.class), anyString(), anyLong(), anyInt(), anyLong(), anyLong(), anyLong(), any(QueueResponseType.class))) @@ -311,7 +311,7 @@ public void testUpdateStatisticsTriggersQueueLock(TestContext context) { HttpRequest req = new HttpRequest(HttpMethod.PUT, "/playground/circuitBreaker/test", MultiMap.caseInsensitiveMultiMap(), null); Mockito.when(ruleToCircuitMapping.getCircuitFromRequestUri(anyString())) - .thenReturn(new PatternAndCircuitHash(Pattern.compile("/someCircuit"), "someCircuitHash")); + .thenReturn(new PatternAndCircuitHash(Pattern.compile("/someCircuit"), "someCircuitHash", "my-metric-1")); Mockito.when(queueCircuitBreakerStorage.updateStatistics(any(PatternAndCircuitHash.class), anyString(), anyLong(), anyInt(), anyLong(), anyLong(), anyLong(), any(QueueResponseType.class))) @@ -334,7 +334,7 @@ public void testQueueLock(TestContext context) { HttpRequest req = new HttpRequest(HttpMethod.PUT, "/playground/circuitBreaker/test", MultiMap.caseInsensitiveMultiMap(), null); Mockito.when(ruleToCircuitMapping.getCircuitFromRequestUri(anyString())) - .thenReturn(new PatternAndCircuitHash(Pattern.compile("/someCircuit"), "someCircuitHash")); + .thenReturn(new PatternAndCircuitHash(Pattern.compile("/someCircuit"), "someCircuitHash", "my-metric-1")); Mockito.when(queueCircuitBreakerStorage.lockQueue(anyString(), any(PatternAndCircuitHash.class))) .thenReturn(Future.succeededFuture()); @@ -360,7 +360,7 @@ public void testQueueLockFailingRedisques(TestContext context) { HttpRequest req = new HttpRequest(HttpMethod.PUT, "/playground/circuitBreaker/test", MultiMap.caseInsensitiveMultiMap(), null); Mockito.when(ruleToCircuitMapping.getCircuitFromRequestUri(anyString())) - .thenReturn(new PatternAndCircuitHash(Pattern.compile("/someCircuit"), "someCircuitHash")); + .thenReturn(new PatternAndCircuitHash(Pattern.compile("/someCircuit"), "someCircuitHash", "my-metric-1")); Mockito.when(queueCircuitBreakerStorage.lockQueue(anyString(), any(PatternAndCircuitHash.class))) .thenReturn(Future.succeededFuture()); @@ -387,7 +387,7 @@ public void testQueueLockFailingStorage(TestContext context) { HttpRequest req = new HttpRequest(HttpMethod.PUT, "/playground/circuitBreaker/test", MultiMap.caseInsensitiveMultiMap(), null); Mockito.when(ruleToCircuitMapping.getCircuitFromRequestUri(anyString())) - .thenReturn(new PatternAndCircuitHash(Pattern.compile("/someCircuit"), "someCircuitHash")); + .thenReturn(new PatternAndCircuitHash(Pattern.compile("/someCircuit"), "someCircuitHash", "my-metric-1")); Mockito.when(queueCircuitBreakerStorage.lockQueue(anyString(), any(PatternAndCircuitHash.class))) .thenReturn(Future.failedFuture("queue could not be locked")); @@ -544,7 +544,7 @@ public void testCloseCircuit(TestContext context) { HttpRequest req = new HttpRequest(HttpMethod.PUT, "/playground/circuitBreaker/test", MultiMap.caseInsensitiveMultiMap(), null); Mockito.when(ruleToCircuitMapping.getCircuitFromRequestUri(anyString())) - .thenReturn(new PatternAndCircuitHash(Pattern.compile("/someCircuit"), "someCircuitHash")); + .thenReturn(new PatternAndCircuitHash(Pattern.compile("/someCircuit"), "someCircuitHash", "my-metric-1")); Mockito.when(queueCircuitBreakerStorage.closeCircuit(any(PatternAndCircuitHash.class))) .thenReturn(Future.succeededFuture()); @@ -563,7 +563,7 @@ public void testCloseCircuitFailingStorage(TestContext context) { HttpRequest req = new HttpRequest(HttpMethod.PUT, "/playground/circuitBreaker/test", MultiMap.caseInsensitiveMultiMap(), null); Mockito.when(ruleToCircuitMapping.getCircuitFromRequestUri(anyString())) - .thenReturn(new PatternAndCircuitHash(Pattern.compile("/someCircuit"), "someCircuitHash")); + .thenReturn(new PatternAndCircuitHash(Pattern.compile("/someCircuit"), "someCircuitHash", "my-metric-1")); Mockito.when(queueCircuitBreakerStorage.closeCircuit(any(PatternAndCircuitHash.class))) .thenReturn(Future.failedFuture("unable to close circuit")); @@ -614,7 +614,7 @@ public void testReOpenCircuit(TestContext context) { HttpRequest req = new HttpRequest(HttpMethod.PUT, "/playground/circuitBreaker/test", MultiMap.caseInsensitiveMultiMap(), null); Mockito.when(ruleToCircuitMapping.getCircuitFromRequestUri(anyString())) - .thenReturn(new PatternAndCircuitHash(Pattern.compile("/someCircuit"), "someCircuitHash")); + .thenReturn(new PatternAndCircuitHash(Pattern.compile("/someCircuit"), "someCircuitHash", "my-metric-1")); Mockito.when(queueCircuitBreakerStorage.reOpenCircuit(any(PatternAndCircuitHash.class))) .thenReturn(Future.succeededFuture()); @@ -633,7 +633,7 @@ public void testReOpenCircuitFailingStorage(TestContext context) { HttpRequest req = new HttpRequest(HttpMethod.PUT, "/playground/circuitBreaker/test", MultiMap.caseInsensitiveMultiMap(), null); Mockito.when(ruleToCircuitMapping.getCircuitFromRequestUri(anyString())) - .thenReturn(new PatternAndCircuitHash(Pattern.compile("/someCircuit"), "someCircuitHash")); + .thenReturn(new PatternAndCircuitHash(Pattern.compile("/someCircuit"), "someCircuitHash", "my-metric-1")); Mockito.when(queueCircuitBreakerStorage.reOpenCircuit(any(PatternAndCircuitHash.class))) .thenReturn(Future.failedFuture("unable to re-open circuit")); diff --git a/gateleen-queue/src/test/java/org/swisspush/gateleen/queue/queuing/circuitbreaker/impl/RedisQueueCircuitBreakerStorageTest.java b/gateleen-queue/src/test/java/org/swisspush/gateleen/queue/queuing/circuitbreaker/impl/RedisQueueCircuitBreakerStorageTest.java index 22145d719..b55139e8a 100644 --- a/gateleen-queue/src/test/java/org/swisspush/gateleen/queue/queuing/circuitbreaker/impl/RedisQueueCircuitBreakerStorageTest.java +++ b/gateleen-queue/src/test/java/org/swisspush/gateleen/queue/queuing/circuitbreaker/impl/RedisQueueCircuitBreakerStorageTest.java @@ -18,7 +18,6 @@ import org.junit.BeforeClass; import org.junit.Test; import org.junit.runner.RunWith; -import org.swisspush.gateleen.core.exception.GateleenExceptionFactory; import org.swisspush.gateleen.queue.queuing.circuitbreaker.util.PatternAndCircuitHash; import org.swisspush.gateleen.queue.queuing.circuitbreaker.util.QueueCircuitState; import org.swisspush.gateleen.queue.queuing.circuitbreaker.util.QueueResponseType; @@ -41,7 +40,13 @@ @RunWith(VertxUnitRunner.class) public class RedisQueueCircuitBreakerStorageTest { - private static Vertx vertx; + private static final String INFO = "info"; + private static final String INFOS = "infos"; + private static final String FAIL_RATIO = "failRatio"; + private static final String STATUS = "status"; + private static final String CIRCUIT = "circuit"; + private static final String METRIC_NAME = "metricName"; + private Jedis jedis; private static RedisQueueCircuitBreakerStorage storage; @@ -50,7 +55,7 @@ public class RedisQueueCircuitBreakerStorageTest { @BeforeClass public static void setupStorage(){ - vertx = Vertx.vertx(); + Vertx vertx = Vertx.vertx(); RedisAPI redisAPI = RedisAPI.api(new RedisClient(vertx, new NetClientOptions(), new PoolOptions(), new RedisStandaloneConnectOptions(), TracingPolicy.IGNORE)); storage = new RedisQueueCircuitBreakerStorage(() -> Future.succeededFuture(redisAPI), newGateleenWastefulExceptionFactory()); } @@ -68,7 +73,7 @@ public void setUp(){ @Test public void testGetQueueCircuitState(TestContext context){ Async async = context.async(); - PatternAndCircuitHash patternAndCircuitHash = buildPatternAndCircuitHash("/someCircuit", "someCircuitHash"); + PatternAndCircuitHash patternAndCircuitHash = buildPatternAndCircuitHash("/someCircuit", "someCircuitHash", "my-metric-1"); storage.getQueueCircuitState(patternAndCircuitHash).onComplete(event -> { context.assertEquals(CLOSED, event.result()); writeQueueCircuitStateToDatabase("someCircuitHash", HALF_OPEN); @@ -99,20 +104,53 @@ public void testGetQueueCircuitInformation(TestContext context){ storage.getQueueCircuitInformation(hash).onComplete(event -> { context.assertTrue(event.succeeded()); JsonObject result = event.result(); - context.assertEquals(CLOSED.name().toLowerCase(), result.getString("status")); - context.assertTrue(result.containsKey("info")); - context.assertFalse(result.getJsonObject("info").containsKey("failRatio")); - context.assertFalse(result.getJsonObject("info").containsKey("circuit")); + context.assertEquals(CLOSED.name().toLowerCase(), result.getString(STATUS)); + context.assertTrue(result.containsKey(INFO)); + context.assertFalse(result.getJsonObject(INFO).containsKey(FAIL_RATIO)); + context.assertFalse(result.getJsonObject(INFO).containsKey(CIRCUIT)); + context.assertFalse(result.getJsonObject(INFO).containsKey(METRIC_NAME)); + + writeQueueCircuit(hash, HALF_OPEN, "/some/circuit/path", "my-metric-1", 99); + + storage.getQueueCircuitInformation(hash).onComplete(event1 -> { + context.assertTrue(event1.succeeded()); + JsonObject result1 = event1.result(); + context.assertEquals(HALF_OPEN.name().toLowerCase(), result1.getString(STATUS)); + context.assertTrue(result1.containsKey(INFO)); + context.assertEquals(99, result1.getJsonObject(INFO).getInteger(FAIL_RATIO)); + context.assertEquals("/some/circuit/path", result1.getJsonObject(INFO).getString(CIRCUIT)); + context.assertEquals("my-metric-1", result1.getJsonObject(INFO).getString(METRIC_NAME)); + async.complete(); + }); + }); + } + + @Test + public void testGetQueueCircuitInformationWithoutMetricName(TestContext context){ + Async async = context.async(); + String hash = "someCircuitHash"; + storage.getQueueCircuitInformation(hash).onComplete(event -> { + context.assertTrue(event.succeeded()); + JsonObject result = event.result(); + context.assertEquals(CLOSED.name().toLowerCase(), result.getString(STATUS)); + context.assertTrue(result.containsKey(INFO)); + context.assertFalse(result.getJsonObject(INFO).containsKey(FAIL_RATIO)); + context.assertFalse(result.getJsonObject(INFO).containsKey(CIRCUIT)); + context.assertFalse(result.getJsonObject(INFO).containsKey(METRIC_NAME)); - writeQueueCircuit(hash, HALF_OPEN, "/some/circuit/path", 99); + writeQueueCircuit(hash, HALF_OPEN, "/some/circuit/path", null, 99); storage.getQueueCircuitInformation(hash).onComplete(event1 -> { context.assertTrue(event1.succeeded()); JsonObject result1 = event1.result(); - context.assertEquals(HALF_OPEN.name().toLowerCase(), result1.getString("status")); - context.assertTrue(result1.containsKey("info")); - context.assertEquals(99, result1.getJsonObject("info").getInteger("failRatio")); - context.assertEquals("/some/circuit/path", result1.getJsonObject("info").getString("circuit")); + context.assertEquals(HALF_OPEN.name().toLowerCase(), result1.getString(STATUS)); + context.assertTrue(result1.containsKey(INFO)); + context.assertEquals(99, result1.getJsonObject(INFO).getInteger(FAIL_RATIO)); + context.assertEquals("/some/circuit/path", result1.getJsonObject(INFO).getString(CIRCUIT)); + + // metric name should not be part of the result + context.assertFalse(result.getJsonObject(INFO).containsKey(METRIC_NAME)); + async.complete(); }); }); @@ -131,9 +169,9 @@ public void testGetAllCircuits(TestContext context){ context.assertFalse(jedis.exists(infosKey(hash3))); // prepare - writeQueueCircuit(hash1, HALF_OPEN, "/path/to/hash_1", 60); - writeQueueCircuit(hash2, CLOSED, "/path/to/hash_2", 20); - writeQueueCircuit(hash3, OPEN, "/path/to/hash_3", 99); + writeQueueCircuit(hash1, HALF_OPEN, "/path/to/hash_1", "metric-1", 60); + writeQueueCircuit(hash2, CLOSED, "/path/to/hash_2",null, 20); + writeQueueCircuit(hash3, OPEN, "/path/to/hash_3","metric-3", 99); context.assertTrue(jedis.exists(infosKey(hash1))); context.assertTrue(jedis.exists(infosKey(hash2))); @@ -143,22 +181,25 @@ public void testGetAllCircuits(TestContext context){ context.assertEquals(HALF_OPEN.name().toLowerCase(), jedis.hget(infosKey(hash1), FIELD_STATE).toLowerCase()); context.assertEquals("/path/to/hash_1", jedis.hget(infosKey(hash1), FIELD_CIRCUIT)); + context.assertEquals("metric-1", jedis.hget(infosKey(hash1), FIELD_METRICNAME)); context.assertEquals("60", jedis.hget(infosKey(hash1), FIELD_FAILRATIO)); context.assertEquals(CLOSED.name().toLowerCase(), jedis.hget(infosKey(hash2), FIELD_STATE).toLowerCase()); context.assertEquals("/path/to/hash_2", jedis.hget(infosKey(hash2), FIELD_CIRCUIT)); + context.assertNull(jedis.hget(infosKey(hash2), FIELD_METRICNAME)); context.assertEquals("20", jedis.hget(infosKey(hash2), FIELD_FAILRATIO)); context.assertEquals(OPEN.name().toLowerCase(), jedis.hget(infosKey(hash3), FIELD_STATE).toLowerCase()); context.assertEquals("/path/to/hash_3", jedis.hget(infosKey(hash3), FIELD_CIRCUIT)); + context.assertEquals("metric-3", jedis.hget(infosKey(hash3), FIELD_METRICNAME)); context.assertEquals("99", jedis.hget(infosKey(hash3), FIELD_FAILRATIO)); storage.getAllCircuits().onComplete(event -> { context.assertTrue(event.succeeded()); JsonObject result = event.result(); - assertJsonObjectContents(context, result, hash1, HALF_OPEN, "/path/to/hash_1", 60); - assertJsonObjectContents(context, result, hash2, CLOSED, "/path/to/hash_2", 20); - assertJsonObjectContents(context, result, hash3, OPEN, "/path/to/hash_3", 99); + assertJsonObjectContents(context, result, hash1, HALF_OPEN, "/path/to/hash_1", "metric-1", 60); + assertJsonObjectContents(context, result, hash2, CLOSED, "/path/to/hash_2",null, 20); + assertJsonObjectContents(context, result, hash3, OPEN, "/path/to/hash_3", "metric-3",99); async.complete(); }); } @@ -181,7 +222,7 @@ public void testUpdateStatistics(TestContext context){ Async async = context.async(); String circuitHash = "anotherCircuitHash"; - PatternAndCircuitHash patternAndCircuitHash = buildPatternAndCircuitHash("/anotherCircuit", circuitHash); + PatternAndCircuitHash patternAndCircuitHash = buildPatternAndCircuitHash("/anotherCircuit", circuitHash, "my-metric-1"); context.assertFalse(jedis.exists(key(circuitHash, QueueResponseType.SUCCESS))); context.assertFalse(jedis.exists(key(circuitHash, QueueResponseType.FAILURE))); @@ -222,7 +263,7 @@ public void testOpenCircuit(TestContext context){ Async async = context.async(); String circuitHash = "anotherCircuitHash"; - PatternAndCircuitHash patternAndCircuitHash = buildPatternAndCircuitHash("/anotherCircuit", circuitHash); + PatternAndCircuitHash patternAndCircuitHash = buildPatternAndCircuitHash("/anotherCircuit", circuitHash, "my-metric-1"); context.assertFalse(jedis.exists(key(circuitHash, QueueResponseType.SUCCESS))); context.assertFalse(jedis.exists(key(circuitHash, QueueResponseType.FAILURE))); @@ -244,11 +285,11 @@ public void testOpenCircuit(TestContext context){ storage.getQueueCircuitState(patternAndCircuitHash).onComplete(event1 -> { context.assertEquals(CLOSED, event1.result()); - assertStateAndErroPercentage(context, circuitHash, CLOSED, 66); + assertStateAndErrorPercentage(context, circuitHash, CLOSED, 66); context.assertFalse(jedis.exists(STORAGE_OPEN_CIRCUITS)); storage.updateStatistics(patternAndCircuitHash, "req_4", 4, errorThreshold, entriesMaxAgeMS, minQueueSampleCount, maxQueueSampleCount, QueueResponseType.FAILURE).onComplete(event2 -> { storage.getQueueCircuitState(patternAndCircuitHash).onComplete(event3 -> { - assertStateAndErroPercentage(context, circuitHash, OPEN, 75); + assertStateAndErrorPercentage(context, circuitHash, OPEN, 75); context.assertEquals(OPEN, event3.result()); context.assertTrue(jedis.exists(STORAGE_OPEN_CIRCUITS)); assertHashInOpenCircuitsSet(context, circuitHash, 1); @@ -268,7 +309,7 @@ public void testLockQueue(TestContext context){ context.assertFalse(jedis.exists(queuesKey(circuitHash))); - PatternAndCircuitHash patternAndCircuitHash = buildPatternAndCircuitHash("/anotherCircuit", circuitHash); + PatternAndCircuitHash patternAndCircuitHash = buildPatternAndCircuitHash("/anotherCircuit", circuitHash, "my-metric-1"); storage.lockQueue("someQueue", patternAndCircuitHash).onComplete(event -> { context.assertTrue(jedis.exists(queuesKey(circuitHash))); context.assertEquals(1L, jedis.zcard(queuesKey(circuitHash))); @@ -352,7 +393,7 @@ public void testCloseCircuit(TestContext context){ context.assertEquals(1L, jedis.scard(STORAGE_OPEN_CIRCUITS)); context.assertEquals(0L, jedis.llen(STORAGE_QUEUES_TO_UNLOCK)); - PatternAndCircuitHash patternAndCircuitHash = buildPatternAndCircuitHash("/anotherCircuit", circuitHash); + PatternAndCircuitHash patternAndCircuitHash = buildPatternAndCircuitHash("/anotherCircuit", circuitHash, "my-metric-2"); storage.closeCircuit(patternAndCircuitHash).onComplete(event -> { context.assertTrue(event.succeeded()); @@ -370,7 +411,7 @@ public void testCloseCircuit(TestContext context){ context.assertEquals("queue_3", jedis.lpop(STORAGE_QUEUES_TO_UNLOCK)); context.assertTrue(jedis.exists(infosKey(circuitHash))); - assertStateAndErroPercentage(context, circuitHash, CLOSED, 0); + assertStateAndErrorPercentage(context, circuitHash, CLOSED, 0); context.assertEquals(3L, jedis.scard(STORAGE_HALFOPEN_CIRCUITS)); context.assertEquals(4L, jedis.scard(STORAGE_ALL_CIRCUITS)); @@ -437,7 +478,7 @@ public void testCloseAndRemoveCircuit(TestContext context){ context.assertEquals(1L, jedis.scard(STORAGE_OPEN_CIRCUITS)); context.assertEquals(0L, jedis.llen(STORAGE_QUEUES_TO_UNLOCK)); - PatternAndCircuitHash patternAndCircuitHash = buildPatternAndCircuitHash("/anotherCircuit", circuitHash); + PatternAndCircuitHash patternAndCircuitHash = buildPatternAndCircuitHash("/anotherCircuit", circuitHash, "my-metric-2"); storage.closeAndRemoveCircuit(patternAndCircuitHash).onComplete(event -> { context.assertTrue(event.succeeded()); @@ -517,7 +558,7 @@ public void testCloseAllCircuits(TestContext context){ for (int index = 1; index <= 5; index++) { String hash = "hash_" + index; context.assertFalse(jedis.exists(queuesKey(hash))); - assertStateAndErroPercentage(context, hash, CLOSED, 0); + assertStateAndErrorPercentage(context, hash, CLOSED, 0); context.assertFalse(jedis.exists(key(hash, QueueResponseType.SUCCESS))); context.assertFalse(jedis.exists(key(hash, QueueResponseType.FAILURE))); context.assertFalse(jedis.exists(queuesKey(hash))); @@ -583,7 +624,7 @@ public void testReOpenCircuit(TestContext context){ context.assertEquals(4L, jedis.scard(STORAGE_HALFOPEN_CIRCUITS)); context.assertEquals(5L, jedis.scard(STORAGE_OPEN_CIRCUITS)); - PatternAndCircuitHash patternAndCircuitHash = buildPatternAndCircuitHash("/anotherCircuit", circuitHash); + PatternAndCircuitHash patternAndCircuitHash = buildPatternAndCircuitHash("/anotherCircuit", circuitHash, "my-metric-2"); storage.reOpenCircuit(patternAndCircuitHash).onComplete(event -> { context.assertTrue(event.succeeded()); @@ -591,7 +632,7 @@ public void testReOpenCircuit(TestContext context){ context.assertTrue(jedis.exists(STORAGE_HALFOPEN_CIRCUITS)); context.assertTrue(jedis.exists(STORAGE_OPEN_CIRCUITS)); - assertStateAndErroPercentage(context, circuitHash, OPEN, 50); + assertStateAndErrorPercentage(context, circuitHash, OPEN, 50); context.assertEquals(3L, jedis.scard(STORAGE_HALFOPEN_CIRCUITS)); Set halfOpenCircuits = jedis.smembers(STORAGE_HALFOPEN_CIRCUITS); @@ -812,8 +853,8 @@ private void buildCircuitEntry(String circuitHash, QueueCircuitState state, List } } - private PatternAndCircuitHash buildPatternAndCircuitHash(String pattern, String circuitHash){ - return new PatternAndCircuitHash(Pattern.compile(pattern), circuitHash); + private PatternAndCircuitHash buildPatternAndCircuitHash(String pattern, String circuitHash, String metricName){ + return new PatternAndCircuitHash(Pattern.compile(pattern), circuitHash, metricName); } private String key(String circuitHash, QueueResponseType queueResponseType){ @@ -828,9 +869,12 @@ private String infosKey(String circuitHash){ return STORAGE_PREFIX + circuitHash + STORAGE_INFOS_SUFFIX; } - private void writeQueueCircuit(String circuitHash, QueueCircuitState state, String circuit, int failPercentage){ + private void writeQueueCircuit(String circuitHash, QueueCircuitState state, String circuit, String metricName, int failPercentage){ writeQueueCircuitField(circuitHash, FIELD_STATE, state.name().toLowerCase()); writeQueueCircuitField(circuitHash, FIELD_CIRCUIT, circuit); + if(metricName != null){ + writeQueueCircuitField(circuitHash, FIELD_METRICNAME, metricName); + } writeQueueCircuitField(circuitHash, FIELD_FAILRATIO, String.valueOf(failPercentage)); jedis.sadd(STORAGE_ALL_CIRCUITS, circuitHash); } @@ -856,7 +900,7 @@ private void assertState(TestContext context, String circuitHash, QueueCircuitSt context.assertEquals(state.name().toLowerCase(), stateFromDb); } - private void assertStateAndErroPercentage(TestContext context, String circuitHash, QueueCircuitState state, int percentage){ + private void assertStateAndErrorPercentage(TestContext context, String circuitHash, QueueCircuitState state, int percentage){ assertState(context, circuitHash, state); String percentageAsString = jedis.hget(STORAGE_PREFIX + circuitHash + STORAGE_INFOS_SUFFIX, FIELD_FAILRATIO); context.assertEquals(percentage, Integer.valueOf(percentageAsString)); @@ -882,15 +926,23 @@ private void assertQueuesToUnlockItems(TestContext context, List items){ } } - private void assertJsonObjectContents(TestContext context, JsonObject result, String hash, QueueCircuitState status, String circuit, int failRatio){ + private void assertJsonObjectContents(TestContext context, JsonObject result, String hash, QueueCircuitState status, String circuit, String metricName, int failRatio){ context.assertTrue(result.containsKey(hash)); - context.assertTrue(result.getJsonObject(hash).containsKey("status")); - context.assertEquals(status.name().toLowerCase(), result.getJsonObject(hash).getString("status")); - context.assertTrue(result.getJsonObject(hash).containsKey("infos")); - context.assertTrue(result.getJsonObject(hash).getJsonObject("infos").containsKey("circuit")); - context.assertEquals(circuit, result.getJsonObject(hash).getJsonObject("infos").getString("circuit")); - context.assertTrue(result.getJsonObject(hash).getJsonObject("infos").containsKey("failRatio")); - context.assertEquals(failRatio, result.getJsonObject(hash).getJsonObject("infos").getInteger("failRatio")); + context.assertTrue(result.getJsonObject(hash).containsKey(STATUS)); + context.assertEquals(status.name().toLowerCase(), result.getJsonObject(hash).getString(STATUS)); + context.assertTrue(result.getJsonObject(hash).containsKey(INFOS)); + context.assertTrue(result.getJsonObject(hash).getJsonObject(INFOS).containsKey(CIRCUIT)); + context.assertEquals(circuit, result.getJsonObject(hash).getJsonObject(INFOS).getString(CIRCUIT)); + + if(metricName != null){ + context.assertTrue(result.getJsonObject(hash).getJsonObject(INFOS).containsKey(METRIC_NAME)); + context.assertEquals(metricName, result.getJsonObject(hash).getJsonObject(INFOS).getString(METRIC_NAME)); + } else { + context.assertFalse(result.getJsonObject(hash).getJsonObject(INFOS).containsKey(METRIC_NAME)); + } + + context.assertTrue(result.getJsonObject(hash).getJsonObject(INFOS).containsKey(FAIL_RATIO)); + context.assertEquals(failRatio, result.getJsonObject(hash).getJsonObject(INFOS).getInteger(FAIL_RATIO)); } private void addToQueuesToUnlock(String queueToUnlock){ diff --git a/gateleen-queue/src/test/java/org/swisspush/gateleen/queue/queuing/circuitbreaker/lua/QueueCircuitBreakerGetAllCircuitsLuaScriptTests.java b/gateleen-queue/src/test/java/org/swisspush/gateleen/queue/queuing/circuitbreaker/lua/QueueCircuitBreakerGetAllCircuitsLuaScriptTests.java index cdf42cd47..f80d2e7bb 100644 --- a/gateleen-queue/src/test/java/org/swisspush/gateleen/queue/queuing/circuitbreaker/lua/QueueCircuitBreakerGetAllCircuitsLuaScriptTests.java +++ b/gateleen-queue/src/test/java/org/swisspush/gateleen/queue/queuing/circuitbreaker/lua/QueueCircuitBreakerGetAllCircuitsLuaScriptTests.java @@ -14,6 +14,7 @@ import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.Assert.assertNull; import static org.swisspush.gateleen.queue.queuing.circuitbreaker.util.QueueCircuitState.*; import static org.swisspush.gateleen.queue.queuing.circuitbreaker.impl.RedisQueueCircuitBreakerStorage.*; @@ -25,6 +26,12 @@ @RunWith(VertxUnitRunner.class) public class QueueCircuitBreakerGetAllCircuitsLuaScriptTests extends AbstractLuaScriptTest { + public static final String INFOS = "infos"; + public static final String STATUS = "status"; + public static final String FAILRATIO = "failRatio"; + public static final String CIRCUIT = "circuit"; + public static final String METRICNAME = "metricName"; + private final String circuitInfoKeyPrefix = "q:"; private final String circuitInfoKeySuffix = ":infos"; private final String allCircuitsKey = "all_circuits"; @@ -38,41 +45,50 @@ public void testGetAllCircuits(){ String hash2 = "hash_2"; String hash3 = "hash_3"; - writeQueueCircuit(hash1, HALF_OPEN, "/path/to/hash_1", 60); - writeQueueCircuit(hash2, CLOSED, "/path/to/hash_2", 20); - writeQueueCircuit(hash3, OPEN, "/path/to/hash_3", 99); + writeQueueCircuit(hash1, HALF_OPEN, "/path/to/hash_1", "metric_1", 60); + writeQueueCircuit(hash2, CLOSED, "/path/to/hash_2", null, 20); + writeQueueCircuit(hash3, OPEN, "/path/to/hash_3", "metric_3", 99); assertThat(jedis.hget(circuitInfoKey(hash1), FIELD_STATE).toLowerCase(), equalTo(HALF_OPEN.name().toLowerCase())); assertThat(jedis.hget(circuitInfoKey(hash1), FIELD_CIRCUIT), equalTo("/path/to/hash_1")); assertThat(jedis.hget(circuitInfoKey(hash1), FIELD_FAILRATIO), equalTo("60")); + assertThat(jedis.hget(circuitInfoKey(hash1), FIELD_METRICNAME), equalTo("metric_1")); assertThat(jedis.hget(circuitInfoKey(hash2), FIELD_STATE).toLowerCase(), equalTo(CLOSED.name().toLowerCase())); assertThat(jedis.hget(circuitInfoKey(hash2), FIELD_CIRCUIT), equalTo("/path/to/hash_2")); assertThat(jedis.hget(circuitInfoKey(hash2), FIELD_FAILRATIO), equalTo("20")); + assertNull(jedis.hget(circuitInfoKey(hash2), FIELD_METRICNAME)); assertThat(jedis.hget(circuitInfoKey(hash3), FIELD_STATE).toLowerCase(), equalTo(OPEN.name().toLowerCase())); assertThat(jedis.hget(circuitInfoKey(hash3), FIELD_CIRCUIT), equalTo("/path/to/hash_3")); assertThat(jedis.hget(circuitInfoKey(hash3), FIELD_FAILRATIO), equalTo("99")); + assertThat(jedis.hget(circuitInfoKey(hash3), FIELD_METRICNAME), equalTo("metric_3")); assertThat(jedis.scard(allCircuitsKey), equalTo(3L)); JsonObject result = new JsonObject(evalScriptGetAllCircuits().toString()); // assertions - assertJsonObjectContents(result, hash1, "half_open", "/path/to/hash_1", 60); - assertJsonObjectContents(result, hash2, "closed", "/path/to/hash_2", 20); - assertJsonObjectContents(result, hash3, "open", "/path/to/hash_3", 99); + assertJsonObjectContents(result, hash1, "half_open", "/path/to/hash_1", "metric_1", 60); + assertJsonObjectContents(result, hash2, "closed", "/path/to/hash_2", null, 20); + assertJsonObjectContents(result, hash3, "open", "/path/to/hash_3", "metric_3", 99); } - private void assertJsonObjectContents(JsonObject result, String hash, String status, String circuit, int failRatio){ + private void assertJsonObjectContents(JsonObject result, String hash, String status, String circuit, String metricName, int failRatio){ assertThat(result.containsKey(hash), is(true)); - assertThat(result.getJsonObject(hash).containsKey("status"), is(true)); - assertThat(result.getJsonObject(hash).getString("status"), equalTo(status)); - assertThat(result.getJsonObject(hash).containsKey("infos"), is(true)); - assertThat(result.getJsonObject(hash).getJsonObject("infos").containsKey("circuit"), is(true)); - assertThat(result.getJsonObject(hash).getJsonObject("infos").getString("circuit"), equalTo(circuit)); - assertThat(result.getJsonObject(hash).getJsonObject("infos").containsKey("failRatio"), is(true)); - assertThat(result.getJsonObject(hash).getJsonObject("infos").getInteger("failRatio"), equalTo(failRatio)); + assertThat(result.getJsonObject(hash).containsKey(STATUS), is(true)); + assertThat(result.getJsonObject(hash).getString(STATUS), equalTo(status)); + assertThat(result.getJsonObject(hash).containsKey(INFOS), is(true)); + assertThat(result.getJsonObject(hash).getJsonObject(INFOS).containsKey(CIRCUIT), is(true)); + assertThat(result.getJsonObject(hash).getJsonObject(INFOS).getString(CIRCUIT), equalTo(circuit)); + assertThat(result.getJsonObject(hash).getJsonObject(INFOS).containsKey(FAILRATIO), is(true)); + assertThat(result.getJsonObject(hash).getJsonObject(INFOS).getInteger(FAILRATIO), equalTo(failRatio)); + + if(metricName != null) { + assertThat(result.getJsonObject(hash).getJsonObject(INFOS).getString(METRICNAME), equalTo(metricName)); + } else { + assertThat(result.getJsonObject(hash).getJsonObject(INFOS).containsKey(METRICNAME), is(false)); + } } private Object evalScriptGetAllCircuits(){ @@ -82,10 +98,13 @@ private Object evalScriptGetAllCircuits(){ return jedis.eval(script, keys, arguments); } - private void writeQueueCircuit(String circuitHash, QueueCircuitState state, String circuit, int failPercentage){ + private void writeQueueCircuit(String circuitHash, QueueCircuitState state, String circuit, String metricName, int failPercentage){ writeQueueCircuitField(circuitHash, FIELD_STATE, state.name().toLowerCase()); writeQueueCircuitField(circuitHash, FIELD_CIRCUIT, circuit); writeQueueCircuitField(circuitHash, FIELD_FAILRATIO, String.valueOf(failPercentage)); + if(metricName != null){ + writeQueueCircuitField(circuitHash, FIELD_METRICNAME, metricName); + } jedis.sadd(allCircuitsKey, circuitHash); } diff --git a/gateleen-queue/src/test/java/org/swisspush/gateleen/queue/queuing/circuitbreaker/lua/QueueCircuitBreakerUpdateStatsLuaScriptTests.java b/gateleen-queue/src/test/java/org/swisspush/gateleen/queue/queuing/circuitbreaker/lua/QueueCircuitBreakerUpdateStatsLuaScriptTests.java index 291722e68..0216c2775 100644 --- a/gateleen-queue/src/test/java/org/swisspush/gateleen/queue/queuing/circuitbreaker/lua/QueueCircuitBreakerUpdateStatsLuaScriptTests.java +++ b/gateleen-queue/src/test/java/org/swisspush/gateleen/queue/queuing/circuitbreaker/lua/QueueCircuitBreakerUpdateStatsLuaScriptTests.java @@ -46,9 +46,9 @@ public void testCalculateErrorPercentage(){ assertThat(jedis.exists(allCircuitsKey), is(false)); // adding 3 failing requests - evalScriptUpdateQueueCircuitBreakerStats(update_fail, "req_1", "url_pattern", 0, 50, 10, 4, 10); - evalScriptUpdateQueueCircuitBreakerStats(update_fail, "req_2", "url_pattern", 1, 50, 10, 4, 10); - evalScriptUpdateQueueCircuitBreakerStats(update_fail, "req_3", "url_pattern", 2, 50, 10, 4, 10); + evalScriptUpdateQueueCircuitBreakerStats(update_fail, "req_1", "url_pattern", "metric-1", 0, 50, 10, 4, 10); + evalScriptUpdateQueueCircuitBreakerStats(update_fail, "req_2", "url_pattern", "metric-1",1, 50, 10, 4, 10); + evalScriptUpdateQueueCircuitBreakerStats(update_fail, "req_3", "url_pattern","metric-1", 2, 50, 10, 4, 10); // asserts assertThat(jedis.exists(circuitInfoKey), is(true)); @@ -256,6 +256,13 @@ private void assertSizeSizeNotExceedingLimit(String setKey, long maxSetSize){ private Object evalScriptUpdateQueueCircuitBreakerStats(String circuitKeyToUpdate, String uniqueRequestID, String circuit, long timestamp, int errorThresholdPercentage, long entriesMaxAgeMS, long minQueueSampleCount, long maxQueueSampleCount) { + return evalScriptUpdateQueueCircuitBreakerStats(circuitKeyToUpdate, uniqueRequestID, circuit, null, timestamp, errorThresholdPercentage, + entriesMaxAgeMS, minQueueSampleCount, maxQueueSampleCount); + } + + private Object evalScriptUpdateQueueCircuitBreakerStats(String circuitKeyToUpdate, String uniqueRequestID, + String circuit, String metricName, long timestamp, int errorThresholdPercentage, + long entriesMaxAgeMS, long minQueueSampleCount, long maxQueueSampleCount) { String script = readScript(QueueCircuitBreakerLuaScripts.UPDATE_CIRCUIT.getFilename()); List keys = Arrays.asList( @@ -270,6 +277,7 @@ private Object evalScriptUpdateQueueCircuitBreakerStats(String circuitKeyToUpdat List arguments = Arrays.asList( uniqueRequestID, circuit, + metricName != null ? metricName : "", circuit+"Hash", String.valueOf(timestamp), String.valueOf(errorThresholdPercentage), diff --git a/gateleen-queue/src/test/java/org/swisspush/gateleen/queue/queuing/circuitbreaker/monitoring/QueueCircuitBreakerMetricsCollectorTest.java b/gateleen-queue/src/test/java/org/swisspush/gateleen/queue/queuing/circuitbreaker/monitoring/QueueCircuitBreakerMetricsCollectorTest.java new file mode 100644 index 000000000..4c3209628 --- /dev/null +++ b/gateleen-queue/src/test/java/org/swisspush/gateleen/queue/queuing/circuitbreaker/monitoring/QueueCircuitBreakerMetricsCollectorTest.java @@ -0,0 +1,271 @@ +package org.swisspush.gateleen.queue.queuing.circuitbreaker.monitoring; + +import io.micrometer.core.instrument.Gauge; +import io.micrometer.core.instrument.MeterRegistry; +import io.micrometer.core.instrument.search.MeterNotFoundException; +import io.micrometer.core.instrument.simple.SimpleMeterRegistry; +import io.vertx.core.Future; +import io.vertx.core.Vertx; +import io.vertx.core.json.JsonObject; +import io.vertx.ext.unit.Async; +import io.vertx.ext.unit.TestContext; +import io.vertx.ext.unit.junit.Timeout; +import io.vertx.ext.unit.junit.VertxUnitRunner; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mockito; +import org.swisspush.gateleen.core.lock.Lock; +import org.swisspush.gateleen.queue.queuing.circuitbreaker.QueueCircuitBreakerStorage; + +import static org.mockito.ArgumentMatchers.*; +import static org.mockito.Mockito.verify; +import static org.swisspush.gateleen.core.exception.GateleenExceptionFactory.newGateleenWastefulExceptionFactory; +import static org.swisspush.gateleen.queue.queuing.circuitbreaker.impl.RedisQueueCircuitBreakerStorage.*; +import static org.swisspush.gateleen.queue.queuing.circuitbreaker.monitoring.QueueCircuitBreakerMetricsCollector.*; + +/** + * Tests for the {@link QueueCircuitBreakerMetricsCollector} class + * + * @author https://github.com/mcweba [Marc-Andre Weber] + */ +@RunWith(VertxUnitRunner.class) +public class QueueCircuitBreakerMetricsCollectorTest { + + private Vertx vertx; + private Lock lock; + private QueueCircuitBreakerStorage queueCircuitBreakerStorage; + private MeterRegistry meterRegistry; + private QueueCircuitBreakerMetricsCollector collector; + + @org.junit.Rule + public Timeout rule = Timeout.seconds(50); + + @Before + public void setUp() { + vertx = Vertx.vertx(); + + lock = Mockito.mock(Lock.class); + Mockito.when(lock.acquireLock(anyString(), anyString(), anyLong())).thenReturn(Future.succeededFuture(Boolean.TRUE)); + Mockito.when(lock.releaseLock(anyString(), anyString())).thenReturn(Future.succeededFuture(Boolean.TRUE)); + + meterRegistry = new SimpleMeterRegistry(); + queueCircuitBreakerStorage = Mockito.mock(QueueCircuitBreakerStorage.class); + + collector = new QueueCircuitBreakerMetricsCollector(vertx, lock, queueCircuitBreakerStorage, meterRegistry, + newGateleenWastefulExceptionFactory(), 5); + } + + @Test + public void testCollectMetricsSuccess(TestContext context) { + Async async = context.async(); + + JsonObject allCircuits = new JsonObject(); + allCircuits.put("5645745t43f54gf", createCircuitInfo("closed", "M1", 0)); + allCircuits.put("12rt878665f54gf", createCircuitInfo("half_open", "M2", 35)); + allCircuits.put("8789jz45745t43f54gf", createCircuitInfo("open", "M3", 100)); + + Mockito.when(queueCircuitBreakerStorage.getAllCircuits()) + .thenReturn(Future.succeededFuture(allCircuits)); + + collector.collectMetrics().onComplete(event -> { + context.assertTrue(event.succeeded()); + + // verify status gauges + context.assertEquals(0.0, getStatusGauge("M1").value(), "Status of circuit M1 should be 0.0 -> CLOSED"); + context.assertEquals(1.0, getStatusGauge("M2").value(), "Status of circuit M2 should be 0.0 -> HALF_OPEN"); + context.assertEquals(2.0, getStatusGauge("M3").value(), "Status of circuit M3 should be 0.0 -> OPEN"); + + // verify fail ratio gauges + context.assertEquals(0.0, getFailRatioGauge("M1").value(), "Fail ratio of circuit M1 should be 0.0"); + context.assertEquals(35.0, getFailRatioGauge("M2").value(), "Fail ratio of circuit M2 should be 35.0"); + context.assertEquals(100.0, getFailRatioGauge("M3").value(), "Fail ratio of circuit M3 should be 100.0"); + + verify(lock, Mockito.times(1)).releaseLock(eq(COLLECT_METRICS_TASK_LOCK), anyString()); + + async.complete(); + }); + } + + @Test + public void testCollectMetricsSuccessUpdatedMetrics(TestContext context) { + Async async = context.async(); + + JsonObject allCircuits = new JsonObject(); + allCircuits.put("5645745t43f54gf", createCircuitInfo("closed", "M1", 0)); + + Mockito.when(queueCircuitBreakerStorage.getAllCircuits()) + .thenReturn(Future.succeededFuture(allCircuits)); + + collector.collectMetrics().onComplete(event -> { + context.assertTrue(event.succeeded()); + + // verify status gauge + context.assertEquals(0.0, getStatusGauge("M1").value(), "Status of circuit M1 should be 0.0 -> CLOSED"); + + // verify fail ratio gauge + context.assertEquals(0.0, getFailRatioGauge("M1").value(), "Fail ratio of circuit M1 should be 0.0"); + + allCircuits.put("5645745t43f54gf", createCircuitInfo("half_open", "M1", 55)); + + Mockito.when(queueCircuitBreakerStorage.getAllCircuits()) + .thenReturn(Future.succeededFuture(allCircuits)); + + collector.collectMetrics().onComplete(event1 -> { + context.assertTrue(event1.succeeded()); + + // verify status gauge + context.assertEquals(1.0, getStatusGauge("M1").value(), "Status of circuit M1 should be 1.0 -> HALF_OPEN"); + + // verify fail ratio gauge + context.assertEquals(55.0, getFailRatioGauge("M1").value(), "Fail ratio of circuit M1 should be 55.0"); + + verify(lock, Mockito.times(2)).releaseLock(eq(COLLECT_METRICS_TASK_LOCK), anyString()); + + async.complete(); + }); + }); + } + + @Test + public void testCollectMetricsStorageFailure(TestContext context) { + Async async = context.async(); + + JsonObject allCircuits = new JsonObject(); + allCircuits.put("5645745t43f54gf", createCircuitInfo("closed", "M1", 0)); + allCircuits.put("12rt878665f54gf", createCircuitInfo("half_open", "M2", 35)); + allCircuits.put("8789jz45745t43f54gf", createCircuitInfo("open", "M3", 100)); + + Mockito.when(queueCircuitBreakerStorage.getAllCircuits()) + .thenReturn(Future.failedFuture("Boooom")); + + collector.collectMetrics().onComplete(event -> { + context.assertTrue(event.failed()); + + context.assertFalse(statusGaugeExists("M1")); + context.assertFalse(failRatioGaugeExists("M1")); + + context.assertFalse(statusGaugeExists("M2")); + context.assertFalse(failRatioGaugeExists("M2")); + + context.assertFalse(statusGaugeExists("M3")); + context.assertFalse(failRatioGaugeExists("M3")); + + verify(lock, Mockito.times(1)).releaseLock(eq(COLLECT_METRICS_TASK_LOCK), anyString()); + + async.complete(); + }); + } + + @Test + public void testCollectMetricsIgnoreEntries(TestContext context) { + Async async = context.async(); + + JsonObject allCircuits = new JsonObject(); + allCircuits.put("5645745t43f5465", createCircuitInfo("closed", "M1", 0)); + allCircuits.put("12rt878665f54gf", createCircuitInfo("foobar_state", "M2", 35)); + allCircuits.put("8789jz45745t4g8", createCircuitInfo(null, "M3", 100)); + allCircuits.put("8634662437g894c", createCircuitInfo("open", null, 100)); + allCircuits.put("125645t43f5465", createCircuitInfo("half_open", "M5", 20)); + allCircuits.put("f6545745t43f5465", createCircuitInfo("open", "M6", 90)); + + Mockito.when(queueCircuitBreakerStorage.getAllCircuits()) + .thenReturn(Future.succeededFuture(allCircuits)); + + collector.collectMetrics().onComplete(event -> { + context.assertTrue(event.succeeded()); + + // verify status gauges + context.assertEquals(0.0, getStatusGauge("M1").value(), "Status of circuit M1 should be 0.0 -> CLOSED"); + context.assertFalse(statusGaugeExists("M2")); + context.assertFalse(statusGaugeExists("M3")); + context.assertFalse(statusGaugeExists("M4")); + context.assertEquals(1.0, getStatusGauge("M5").value(), "Status of circuit M5 should be 1.0 -> HALF_OPEN"); + context.assertEquals(2.0, getStatusGauge("M6").value(), "Status of circuit M6 should be 2.0 -> OPEN"); + + // verify fail ratio gauges + context.assertEquals(0.0, getFailRatioGauge("M1").value(), "Fail ratio of circuit M1 should be 0.0"); + context.assertFalse(failRatioGaugeExists("M2")); + context.assertFalse(failRatioGaugeExists("M3")); + context.assertFalse(failRatioGaugeExists("M4")); + context.assertEquals(20.0, getFailRatioGauge("M5").value(), "Fail ratio of circuit M5 should be 20.0"); + context.assertEquals(90.0, getFailRatioGauge("M6").value(), "Fail ratio of circuit M6 should be 90.0"); + + verify(lock, Mockito.times(1)).releaseLock(eq(COLLECT_METRICS_TASK_LOCK), anyString()); + + async.complete(); + }); + } + + @Test + public void testCollectMetricsFailedToAcquireLock(TestContext context) { + Async async = context.async(); + + Mockito.when(lock.acquireLock(anyString(), anyString(), anyLong())).thenReturn(Future.failedFuture("Boooom")); + + collector.collectMetrics().onComplete(event -> { + context.assertTrue(event.failed()); + context.assertEquals("Boooom", event.cause().getMessage()); + Mockito.verifyNoInteractions(queueCircuitBreakerStorage); + verify(lock, Mockito.never()).releaseLock(eq(COLLECT_METRICS_TASK_LOCK), anyString()); + async.complete(); + }); + } + + @Test + public void testCollectMetricsLockAlreadyAcquired(TestContext context) { + Async async = context.async(); + + Mockito.when(lock.acquireLock(anyString(), anyString(), anyLong())).thenReturn(Future.succeededFuture(Boolean.FALSE)); + + collector.collectMetrics().onComplete(event -> { + context.assertTrue(event.succeeded()); + Mockito.verifyNoInteractions(queueCircuitBreakerStorage); + async.complete(); + }); + } + + private Gauge getStatusGauge(String metricName){ + return meterRegistry.get(CIRCUIT_BREAKER_STATUS_METRIC).tag("metricName", metricName).gauge(); + } + + private boolean statusGaugeExists(String metricName){ + try { + meterRegistry.get(CIRCUIT_BREAKER_STATUS_METRIC).tag("metricName", metricName).gauge(); + return true; + } catch (MeterNotFoundException ex) { + return false; + } + } + + private Gauge getFailRatioGauge(String metricName){ + return meterRegistry.get(CIRCUIT_BREAKER_FAILRATIO_METRIC).tag("metricName", metricName).gauge(); + } + + private boolean failRatioGaugeExists(String metricName){ + try { + meterRegistry.get(CIRCUIT_BREAKER_FAILRATIO_METRIC).tag("metricName", metricName).gauge(); + return true; + } catch (MeterNotFoundException ex) { + return false; + } + } + + private JsonObject createCircuitInfo(String status, String metricName, Integer failRatio){ + JsonObject circuit = new JsonObject(); + JsonObject infos = new JsonObject().put(FIELD_CIRCUIT, "/some/circuit/url"); + circuit.put("infos", infos); + + if(status != null) { + circuit.put(FIELD_STATUS, status); + } + if(metricName != null) { + infos.put(FIELD_METRICNAME, metricName); + } + if(failRatio != null) { + infos.put(FIELD_FAILRATIO, failRatio); + } + + return circuit; + } +} diff --git a/gateleen-routing/pom.xml b/gateleen-routing/pom.xml index 91cfa062c..6875d2f8e 100644 --- a/gateleen-routing/pom.xml +++ b/gateleen-routing/pom.xml @@ -6,7 +6,7 @@ org.swisspush.gateleen gateleen - 2.1.13-SNAPSHOT + 2.1.14-SNAPSHOT gateleen-routing diff --git a/gateleen-routing/src/main/java/org/swisspush/gateleen/routing/Forwarder.java b/gateleen-routing/src/main/java/org/swisspush/gateleen/routing/Forwarder.java index 9eb2a7189..e9ceccff2 100755 --- a/gateleen-routing/src/main/java/org/swisspush/gateleen/routing/Forwarder.java +++ b/gateleen-routing/src/main/java/org/swisspush/gateleen/routing/Forwarder.java @@ -358,7 +358,7 @@ public void handle(AsyncResult event) { */ if (bodyData == null) { - // Gateleen internal requests (e.g. from scedulers or delegates) often have neither "Content-Length" nor "Transfer-Encoding: chunked" + // Gateleen internal requests (e.g. from schedulers or delegates) often have neither "Content-Length" nor "Transfer-Encoding: chunked" // header - so we must wait for a body buffer to know: Is there a body or not? Only looking on the headers and/or the http-method is not // sustainable to know "has body or not" // But: if there is a body, then we need to either setChunked or a Content-Length header (otherwise Vertx complains with an Exception) @@ -444,16 +444,16 @@ public WriteStream drainHandler(@Nullable Handler handler) { // Setting the endHandler would then lead to an Exception // see also https://github.com/eclipse-vertx/vert.x/issues/2763 // so we now check if the request already is ended before installing an endHandler - cReq.send(cResHandler); + cReq.send(); } else { - req.endHandler(v -> cReq.send(cResHandler)); + req.endHandler(v -> cReq.send()); pump.start(); } } else { loggingHandler.appendRequestPayload(bodyData); // we already have the body complete in-memory - so we can use Content-Length header and avoid chunked transfer cReq.putHeader(HttpHeaders.CONTENT_LENGTH, Integer.toString(bodyData.length())); - cReq.send(bodyData, cResHandler); + cReq.send(bodyData); } loggingHandler.request(cReq.headers()); diff --git a/gateleen-runconfig/pom.xml b/gateleen-runconfig/pom.xml index 3b74e46d4..20e7a4eb0 100644 --- a/gateleen-runconfig/pom.xml +++ b/gateleen-runconfig/pom.xml @@ -6,7 +6,7 @@ org.swisspush.gateleen gateleen - 2.1.13-SNAPSHOT + 2.1.14-SNAPSHOT gateleen-runconfig diff --git a/gateleen-scheduler/pom.xml b/gateleen-scheduler/pom.xml index dae147c14..f058248da 100644 --- a/gateleen-scheduler/pom.xml +++ b/gateleen-scheduler/pom.xml @@ -6,7 +6,7 @@ org.swisspush.gateleen gateleen - 2.1.13-SNAPSHOT + 2.1.14-SNAPSHOT gateleen-scheduler diff --git a/gateleen-security/pom.xml b/gateleen-security/pom.xml index 1e53af8b3..01ab7e2e9 100644 --- a/gateleen-security/pom.xml +++ b/gateleen-security/pom.xml @@ -6,7 +6,7 @@ org.swisspush.gateleen gateleen - 2.1.13-SNAPSHOT + 2.1.14-SNAPSHOT gateleen-security diff --git a/gateleen-test/pom.xml b/gateleen-test/pom.xml index fabf66f4f..cd1a6622f 100644 --- a/gateleen-test/pom.xml +++ b/gateleen-test/pom.xml @@ -6,7 +6,7 @@ org.swisspush.gateleen gateleen - 2.1.13-SNAPSHOT + 2.1.14-SNAPSHOT gateleen-test jar diff --git a/gateleen-testhelper/pom.xml b/gateleen-testhelper/pom.xml index 6cc33c307..bc55e7f70 100644 --- a/gateleen-testhelper/pom.xml +++ b/gateleen-testhelper/pom.xml @@ -6,7 +6,7 @@ org.swisspush.gateleen gateleen - 2.1.13-SNAPSHOT + 2.1.14-SNAPSHOT gateleen-testhelper diff --git a/gateleen-user/pom.xml b/gateleen-user/pom.xml index a19758134..a45637133 100644 --- a/gateleen-user/pom.xml +++ b/gateleen-user/pom.xml @@ -6,7 +6,7 @@ org.swisspush.gateleen gateleen - 2.1.13-SNAPSHOT + 2.1.14-SNAPSHOT gateleen-user diff --git a/gateleen-validation/pom.xml b/gateleen-validation/pom.xml index d0e624ccc..8b356ba7e 100644 --- a/gateleen-validation/pom.xml +++ b/gateleen-validation/pom.xml @@ -6,7 +6,7 @@ org.swisspush.gateleen gateleen - 2.1.13-SNAPSHOT + 2.1.14-SNAPSHOT gateleen-validation diff --git a/pom.xml b/pom.xml index cd834c448..05abf31c1 100644 --- a/pom.xml +++ b/pom.xml @@ -6,7 +6,7 @@ org.swisspush.gateleen gateleen - 2.1.13-SNAPSHOT + 2.1.14-SNAPSHOT pom gateleen Middleware library based on Vert.x to build advanced JSON/REST communication servers