diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index e633a30e1..9b6f3a540 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -23,6 +23,7 @@
+
diff --git a/core/src/main/java/io/aiven/kafka/tieredstorage/RemoteStorageManager.java b/core/src/main/java/io/aiven/kafka/tieredstorage/RemoteStorageManager.java
index 3a0026e94..9f2e254b8 100644
--- a/core/src/main/java/io/aiven/kafka/tieredstorage/RemoteStorageManager.java
+++ b/core/src/main/java/io/aiven/kafka/tieredstorage/RemoteStorageManager.java
@@ -158,7 +158,8 @@ public void configure(final Map configs) {
config.segmentManifestCacheRetention(),
fetcher,
mapper,
- executor);
+ executor,
+ config.enableJmxOperations());
}
// for testing
diff --git a/core/src/main/java/io/aiven/kafka/tieredstorage/config/RemoteStorageManagerConfig.java b/core/src/main/java/io/aiven/kafka/tieredstorage/config/RemoteStorageManagerConfig.java
index ba1fd3de6..c60b7d998 100644
--- a/core/src/main/java/io/aiven/kafka/tieredstorage/config/RemoteStorageManagerConfig.java
+++ b/core/src/main/java/io/aiven/kafka/tieredstorage/config/RemoteStorageManagerConfig.java
@@ -44,8 +44,11 @@ public class RemoteStorageManagerConfig extends AbstractConfig {
private static final String OBJECT_KEY_PREFIX_CONFIG = "key.prefix";
private static final String OBJECT_KEY_PREFIX_DOC = "The object storage path prefix";
+ private static final String ENABLE_JMX_OPERATIONS_CONFIG = "enable.jmx";
+ private static final String ENABLE_JMX_OPERATIONS_DOC = "Enable JMX MBeans operations to manage caches";
+
private static final String SEGMENT_MANIFEST_CACHE_PREFIX = "segment.manifest.cache.";
- private static final String SEGMENT_MANIFEST_CACHE_SIZE_CONFIG = SEGMENT_MANIFEST_CACHE_PREFIX + "size";
+ public static final String SEGMENT_MANIFEST_CACHE_SIZE_CONFIG = SEGMENT_MANIFEST_CACHE_PREFIX + "size";
private static final Long SEGMENT_MANIFEST_CACHE_SIZE_DEFAULT = 1000L; // TODO consider a better default
private static final String SEGMENT_MANIFEST_CACHE_SIZE_DOC =
"The size in items of the segment manifest cache. "
@@ -107,6 +110,14 @@ public class RemoteStorageManagerConfig extends AbstractConfig {
OBJECT_KEY_PREFIX_DOC
);
+ CONFIG.define(
+ ENABLE_JMX_OPERATIONS_CONFIG,
+ ConfigDef.Type.BOOLEAN,
+ false,
+ ConfigDef.Importance.LOW,
+ ENABLE_JMX_OPERATIONS_DOC
+ );
+
CONFIG.define(
SEGMENT_MANIFEST_CACHE_SIZE_CONFIG,
ConfigDef.Type.LONG,
@@ -314,6 +325,10 @@ public Optional segmentManifestCacheRetention() {
return Optional.of(Duration.ofMillis(rawValue));
}
+ public boolean enableJmxOperations() {
+ return getBoolean(ENABLE_JMX_OPERATIONS_CONFIG);
+ }
+
public String keyPrefix() {
return getString(OBJECT_KEY_PREFIX_CONFIG);
}
diff --git a/core/src/main/java/io/aiven/kafka/tieredstorage/manifest/SegmentManifestCacheManager.java b/core/src/main/java/io/aiven/kafka/tieredstorage/manifest/SegmentManifestCacheManager.java
new file mode 100644
index 000000000..0680e326e
--- /dev/null
+++ b/core/src/main/java/io/aiven/kafka/tieredstorage/manifest/SegmentManifestCacheManager.java
@@ -0,0 +1,23 @@
+/*
+ * Copyright 2023 Aiven Oy
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.aiven.kafka.tieredstorage.manifest;
+
+public interface SegmentManifestCacheManager {
+ String MBEAN_NAME = "aiven.kafka.server.tieredstorage.cache:type=segment-manifest-cache-manager";
+
+ void clean();
+}
diff --git a/core/src/main/java/io/aiven/kafka/tieredstorage/manifest/SegmentManifestCacheManagerMBean.java b/core/src/main/java/io/aiven/kafka/tieredstorage/manifest/SegmentManifestCacheManagerMBean.java
new file mode 100644
index 000000000..ce535a848
--- /dev/null
+++ b/core/src/main/java/io/aiven/kafka/tieredstorage/manifest/SegmentManifestCacheManagerMBean.java
@@ -0,0 +1,31 @@
+/*
+ * Copyright 2023 Aiven Oy
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.aiven.kafka.tieredstorage.manifest;
+
+public class SegmentManifestCacheManagerMBean implements SegmentManifestCacheManager {
+
+ final SegmentManifestProvider provider;
+
+ public SegmentManifestCacheManagerMBean(final SegmentManifestProvider segmentManifestProvider) {
+ this.provider = segmentManifestProvider;
+ }
+
+ @Override
+ public void clean() {
+ provider.cache().invalidateAll();
+ }
+}
diff --git a/core/src/main/java/io/aiven/kafka/tieredstorage/manifest/SegmentManifestProvider.java b/core/src/main/java/io/aiven/kafka/tieredstorage/manifest/SegmentManifestProvider.java
index 19ed3b94f..76a90e34a 100644
--- a/core/src/main/java/io/aiven/kafka/tieredstorage/manifest/SegmentManifestProvider.java
+++ b/core/src/main/java/io/aiven/kafka/tieredstorage/manifest/SegmentManifestProvider.java
@@ -16,8 +16,16 @@
package io.aiven.kafka.tieredstorage.manifest;
+import javax.management.InstanceAlreadyExistsException;
+import javax.management.MBeanRegistrationException;
+import javax.management.MalformedObjectNameException;
+import javax.management.NotCompliantMBeanException;
+import javax.management.ObjectName;
+import javax.management.StandardMBean;
+
import java.io.IOException;
import java.io.InputStream;
+import java.lang.management.ManagementFactory;
import java.time.Duration;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
@@ -34,9 +42,13 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.github.benmanes.caffeine.cache.AsyncLoadingCache;
+import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class SegmentManifestProvider {
+ private static final Logger LOG = LoggerFactory.getLogger(SegmentManifestProvider.class);
private static final String SEGMENT_MANIFEST_METRIC_GROUP_NAME = "segment-manifest-cache";
private static final long GET_TIMEOUT_SEC = 10;
@@ -52,7 +64,8 @@ public SegmentManifestProvider(final ObjectKey objectKey,
final Optional cacheRetention,
final ObjectFetcher fileFetcher,
final ObjectMapper mapper,
- final Executor executor) {
+ final Executor executor,
+ final boolean enableJmxOperations) {
this.objectKey = objectKey;
final var cacheBuilder = Caffeine.newBuilder()
.recordStats(() -> new CaffeineStatsCounter(SEGMENT_MANIFEST_METRIC_GROUP_NAME))
@@ -64,6 +77,29 @@ public SegmentManifestProvider(final ObjectKey objectKey,
return mapper.readValue(is, SegmentManifest.class);
}
});
+ if (enableJmxOperations) {
+ enableJmxMBean();
+ }
+ }
+
+ private void enableJmxMBean() {
+ final var mbeanName = SegmentManifestCacheManager.MBEAN_NAME;
+ try {
+ final var name = new ObjectName(mbeanName);
+ final var mbean = new StandardMBean(
+ new SegmentManifestCacheManagerMBean(this),
+ SegmentManifestCacheManager.class);
+ ManagementFactory.getPlatformMBeanServer().registerMBean(mbean, name);
+ } catch (NotCompliantMBeanException
+ | MalformedObjectNameException
+ | InstanceAlreadyExistsException
+ | MBeanRegistrationException e) {
+ LOG.warn("Error creating MBean {}", mbeanName, e);
+ }
+ }
+
+ Cache cache() {
+ return cache.synchronous();
}
public SegmentManifest get(final RemoteLogSegmentMetadata remoteLogSegmentMetadata)
diff --git a/core/src/test/java/io/aiven/kafka/tieredstorage/config/RemoteStorageManagerConfigTest.java b/core/src/test/java/io/aiven/kafka/tieredstorage/config/RemoteStorageManagerConfigTest.java
index bd786efbc..5ecf705d8 100644
--- a/core/src/test/java/io/aiven/kafka/tieredstorage/config/RemoteStorageManagerConfigTest.java
+++ b/core/src/test/java/io/aiven/kafka/tieredstorage/config/RemoteStorageManagerConfigTest.java
@@ -50,6 +50,7 @@ void minimalConfig() {
assertThat(config.encryptionKeyPairId()).isNull();
assertThat(config.encryptionKeyRing()).isNull();
assertThat(config.keyPrefix()).isEmpty();
+ assertThat(config.enableJmxOperations()).isFalse();
}
@Test
@@ -294,7 +295,6 @@ void invalidChunkSizeRange() {
.hasMessage("Invalid value 2147483648 for configuration chunk.size: Not a number of type INT");
}
-
@Test
void invalidCompressionConfig() {
assertThatThrownBy(() -> new RemoteStorageManagerConfig(
@@ -307,4 +307,15 @@ void invalidCompressionConfig() {
.isInstanceOf(ConfigException.class)
.hasMessage("compression.enabled must be enabled if compression.heuristic.enabled is");
}
+
+ @Test
+ void enableJmx() {
+ final HashMap props = new HashMap<>();
+ props.put("storage.backend.class", NoopStorageBackend.class);
+ props.put("chunk.size", "123");
+ props.put("enable.jmx", true);
+ final RemoteStorageManagerConfig config = new RemoteStorageManagerConfig(props);
+ assertThat(config.enableJmxOperations()).isTrue();
+ }
+
}
diff --git a/core/src/test/java/io/aiven/kafka/tieredstorage/manifest/SegmentManifestProviderTest.java b/core/src/test/java/io/aiven/kafka/tieredstorage/manifest/SegmentManifestProviderTest.java
index 4666ebdee..1e93d09d4 100644
--- a/core/src/test/java/io/aiven/kafka/tieredstorage/manifest/SegmentManifestProviderTest.java
+++ b/core/src/test/java/io/aiven/kafka/tieredstorage/manifest/SegmentManifestProviderTest.java
@@ -16,9 +16,12 @@
package io.aiven.kafka.tieredstorage.manifest;
+import javax.management.ObjectName;
+
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
+import java.lang.management.ManagementFactory;
import java.time.Duration;
import java.util.Map;
import java.util.Optional;
@@ -48,6 +51,7 @@
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
@@ -86,7 +90,7 @@ class SegmentManifestProviderTest {
void setup() {
provider = new SegmentManifestProvider(
OBJECT_KEY, Optional.of(1000L), Optional.empty(), storage, MAPPER,
- ForkJoinPool.commonPool());
+ ForkJoinPool.commonPool(), false);
}
@Test
@@ -94,7 +98,7 @@ void unboundedShouldBeCreated() {
assertThatNoException()
.isThrownBy(() -> new SegmentManifestProvider(
OBJECT_KEY, Optional.empty(), Optional.of(Duration.ofMillis(1)), storage, MAPPER,
- ForkJoinPool.commonPool()));
+ ForkJoinPool.commonPool(), false));
}
@Test
@@ -102,7 +106,7 @@ void withoutRetentionLimitsShouldBeCreated() {
assertThatNoException()
.isThrownBy(() -> new SegmentManifestProvider(
OBJECT_KEY, Optional.of(1L), Optional.empty(), storage, MAPPER,
- ForkJoinPool.commonPool()));
+ ForkJoinPool.commonPool(), false));
}
@Test
@@ -120,6 +124,40 @@ void shouldReturnAndCache() throws StorageBackendException, IOException {
verifyNoMoreInteractions(storage);
}
+ @Test
+ void invalidateCache_jmx() throws Exception {
+ provider = new SegmentManifestProvider(
+ OBJECT_KEY, Optional.of(1000L), Optional.empty(), storage, MAPPER,
+ ForkJoinPool.commonPool(), true);
+
+ final String key = "topic-AAAAAAAAAAAAAAAAAAAAAQ/7/00000000000000000023-AAAAAAAAAAAAAAAAAAAAAA.rsm-manifest";
+ final SegmentManifestV1 expectedManifest = new SegmentManifestV1(
+ new FixedSizeChunkIndex(100, 1000, 110, 110),
+ false, null
+ );
+ when(storage.fetch(key))
+ .thenReturn(new ByteArrayInputStream(MANIFEST.getBytes()));
+ assertThat(provider.get(REMOTE_LOG_METADATA)).isEqualTo(expectedManifest);
+ verify(storage).fetch(key);
+
+ final var mbeanName = new ObjectName(SegmentManifestCacheManager.MBEAN_NAME);
+ final var mbeanServer = ManagementFactory.getPlatformMBeanServer();
+ assertThat(mbeanServer.isRegistered(mbeanName)).isTrue();
+
+ final var sizeBefore = provider.cache().estimatedSize();
+ assertThat(sizeBefore).isEqualTo(1L);
+
+ mbeanServer.invoke(mbeanName, "clean", new Object[]{}, new String[]{});
+
+ final var sizeAfter = provider.cache().estimatedSize();
+ assertThat(sizeAfter).isEqualTo(0L);
+
+ when(storage.fetch(key))
+ .thenReturn(new ByteArrayInputStream(MANIFEST.getBytes()));
+ assertThat(provider.get(REMOTE_LOG_METADATA)).isEqualTo(expectedManifest);
+ verify(storage, times(2)).fetch(key);
+ }
+
@Test
void shouldPropagateStorageBackendException() throws StorageBackendException {
when(storage.fetch(anyString()))