diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml index e633a30e1..9b6f3a540 100644 --- a/checkstyle/suppressions.xml +++ b/checkstyle/suppressions.xml @@ -23,6 +23,7 @@ + diff --git a/core/src/main/java/io/aiven/kafka/tieredstorage/RemoteStorageManager.java b/core/src/main/java/io/aiven/kafka/tieredstorage/RemoteStorageManager.java index 29697af9e..04c3d2e46 100644 --- a/core/src/main/java/io/aiven/kafka/tieredstorage/RemoteStorageManager.java +++ b/core/src/main/java/io/aiven/kafka/tieredstorage/RemoteStorageManager.java @@ -155,7 +155,8 @@ public void configure(final Map configs) { config.segmentManifestCacheRetention(), fetcher, mapper, - executor); + executor, + config.enableJmxOperations()); } // for testing diff --git a/core/src/main/java/io/aiven/kafka/tieredstorage/config/RemoteStorageManagerConfig.java b/core/src/main/java/io/aiven/kafka/tieredstorage/config/RemoteStorageManagerConfig.java index ba1fd3de6..c60b7d998 100644 --- a/core/src/main/java/io/aiven/kafka/tieredstorage/config/RemoteStorageManagerConfig.java +++ b/core/src/main/java/io/aiven/kafka/tieredstorage/config/RemoteStorageManagerConfig.java @@ -44,8 +44,11 @@ public class RemoteStorageManagerConfig extends AbstractConfig { private static final String OBJECT_KEY_PREFIX_CONFIG = "key.prefix"; private static final String OBJECT_KEY_PREFIX_DOC = "The object storage path prefix"; + private static final String ENABLE_JMX_OPERATIONS_CONFIG = "enable.jmx"; + private static final String ENABLE_JMX_OPERATIONS_DOC = "Enable JMX MBeans operations to manage caches"; + private static final String SEGMENT_MANIFEST_CACHE_PREFIX = "segment.manifest.cache."; - private static final String SEGMENT_MANIFEST_CACHE_SIZE_CONFIG = SEGMENT_MANIFEST_CACHE_PREFIX + "size"; + public static final String SEGMENT_MANIFEST_CACHE_SIZE_CONFIG = SEGMENT_MANIFEST_CACHE_PREFIX + "size"; private static final Long SEGMENT_MANIFEST_CACHE_SIZE_DEFAULT = 1000L; // TODO consider a better default private static final String SEGMENT_MANIFEST_CACHE_SIZE_DOC = "The size in items of the segment manifest cache. " @@ -107,6 +110,14 @@ public class RemoteStorageManagerConfig extends AbstractConfig { OBJECT_KEY_PREFIX_DOC ); + CONFIG.define( + ENABLE_JMX_OPERATIONS_CONFIG, + ConfigDef.Type.BOOLEAN, + false, + ConfigDef.Importance.LOW, + ENABLE_JMX_OPERATIONS_DOC + ); + CONFIG.define( SEGMENT_MANIFEST_CACHE_SIZE_CONFIG, ConfigDef.Type.LONG, @@ -314,6 +325,10 @@ public Optional segmentManifestCacheRetention() { return Optional.of(Duration.ofMillis(rawValue)); } + public boolean enableJmxOperations() { + return getBoolean(ENABLE_JMX_OPERATIONS_CONFIG); + } + public String keyPrefix() { return getString(OBJECT_KEY_PREFIX_CONFIG); } diff --git a/core/src/main/java/io/aiven/kafka/tieredstorage/manifest/SegmentManifestCacheManager.java b/core/src/main/java/io/aiven/kafka/tieredstorage/manifest/SegmentManifestCacheManager.java new file mode 100644 index 000000000..0680e326e --- /dev/null +++ b/core/src/main/java/io/aiven/kafka/tieredstorage/manifest/SegmentManifestCacheManager.java @@ -0,0 +1,23 @@ +/* + * Copyright 2023 Aiven Oy + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.aiven.kafka.tieredstorage.manifest; + +public interface SegmentManifestCacheManager { + String MBEAN_NAME = "aiven.kafka.server.tieredstorage.cache:type=segment-manifest-cache-manager"; + + void clean(); +} diff --git a/core/src/main/java/io/aiven/kafka/tieredstorage/manifest/SegmentManifestCacheManagerMBean.java b/core/src/main/java/io/aiven/kafka/tieredstorage/manifest/SegmentManifestCacheManagerMBean.java new file mode 100644 index 000000000..ce535a848 --- /dev/null +++ b/core/src/main/java/io/aiven/kafka/tieredstorage/manifest/SegmentManifestCacheManagerMBean.java @@ -0,0 +1,31 @@ +/* + * Copyright 2023 Aiven Oy + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.aiven.kafka.tieredstorage.manifest; + +public class SegmentManifestCacheManagerMBean implements SegmentManifestCacheManager { + + final SegmentManifestProvider provider; + + public SegmentManifestCacheManagerMBean(final SegmentManifestProvider segmentManifestProvider) { + this.provider = segmentManifestProvider; + } + + @Override + public void clean() { + provider.cache().invalidateAll(); + } +} diff --git a/core/src/main/java/io/aiven/kafka/tieredstorage/manifest/SegmentManifestProvider.java b/core/src/main/java/io/aiven/kafka/tieredstorage/manifest/SegmentManifestProvider.java index af2a76d5d..bda55c490 100644 --- a/core/src/main/java/io/aiven/kafka/tieredstorage/manifest/SegmentManifestProvider.java +++ b/core/src/main/java/io/aiven/kafka/tieredstorage/manifest/SegmentManifestProvider.java @@ -16,8 +16,16 @@ package io.aiven.kafka.tieredstorage.manifest; +import javax.management.InstanceAlreadyExistsException; +import javax.management.MBeanRegistrationException; +import javax.management.MalformedObjectNameException; +import javax.management.NotCompliantMBeanException; +import javax.management.ObjectName; +import javax.management.StandardMBean; + import java.io.IOException; import java.io.InputStream; +import java.lang.management.ManagementFactory; import java.time.Duration; import java.util.Optional; import java.util.concurrent.ExecutionException; @@ -31,9 +39,13 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.github.benmanes.caffeine.cache.AsyncLoadingCache; +import com.github.benmanes.caffeine.cache.Cache; import com.github.benmanes.caffeine.cache.Caffeine; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class SegmentManifestProvider { + private static final Logger LOG = LoggerFactory.getLogger(SegmentManifestProvider.class); private static final String SEGMENT_MANIFEST_METRIC_GROUP_NAME = "segment-manifest-cache"; private static final long GET_TIMEOUT_SEC = 10; @@ -47,7 +59,8 @@ public SegmentManifestProvider(final Optional maxCacheSize, final Optional cacheRetention, final ObjectFetcher fileFetcher, final ObjectMapper mapper, - final Executor executor) { + final Executor executor, + final boolean enableJmxOperations) { final var statsCounter = new CaffeineStatsCounter(SEGMENT_MANIFEST_METRIC_GROUP_NAME); final var cacheBuilder = Caffeine.newBuilder() .recordStats(() -> statsCounter) @@ -60,6 +73,29 @@ public SegmentManifestProvider(final Optional maxCacheSize, } }); statsCounter.registerSizeMetric(cache.synchronous()::estimatedSize); + if (enableJmxOperations) { + enableJmxMBean(); + } + } + + private void enableJmxMBean() { + final var mbeanName = SegmentManifestCacheManager.MBEAN_NAME; + try { + final var name = new ObjectName(mbeanName); + final var mbean = new StandardMBean( + new SegmentManifestCacheManagerMBean(this), + SegmentManifestCacheManager.class); + ManagementFactory.getPlatformMBeanServer().registerMBean(mbean, name); + } catch (NotCompliantMBeanException + | MalformedObjectNameException + | InstanceAlreadyExistsException + | MBeanRegistrationException e) { + LOG.warn("Error creating MBean {}", mbeanName, e); + } + } + + Cache cache() { + return cache.synchronous(); } public SegmentManifest get(final String manifestKey) diff --git a/core/src/test/java/io/aiven/kafka/tieredstorage/config/RemoteStorageManagerConfigTest.java b/core/src/test/java/io/aiven/kafka/tieredstorage/config/RemoteStorageManagerConfigTest.java index bd786efbc..5ecf705d8 100644 --- a/core/src/test/java/io/aiven/kafka/tieredstorage/config/RemoteStorageManagerConfigTest.java +++ b/core/src/test/java/io/aiven/kafka/tieredstorage/config/RemoteStorageManagerConfigTest.java @@ -50,6 +50,7 @@ void minimalConfig() { assertThat(config.encryptionKeyPairId()).isNull(); assertThat(config.encryptionKeyRing()).isNull(); assertThat(config.keyPrefix()).isEmpty(); + assertThat(config.enableJmxOperations()).isFalse(); } @Test @@ -294,7 +295,6 @@ void invalidChunkSizeRange() { .hasMessage("Invalid value 2147483648 for configuration chunk.size: Not a number of type INT"); } - @Test void invalidCompressionConfig() { assertThatThrownBy(() -> new RemoteStorageManagerConfig( @@ -307,4 +307,15 @@ void invalidCompressionConfig() { .isInstanceOf(ConfigException.class) .hasMessage("compression.enabled must be enabled if compression.heuristic.enabled is"); } + + @Test + void enableJmx() { + final HashMap props = new HashMap<>(); + props.put("storage.backend.class", NoopStorageBackend.class); + props.put("chunk.size", "123"); + props.put("enable.jmx", true); + final RemoteStorageManagerConfig config = new RemoteStorageManagerConfig(props); + assertThat(config.enableJmxOperations()).isTrue(); + } + } diff --git a/core/src/test/java/io/aiven/kafka/tieredstorage/manifest/SegmentManifestProviderTest.java b/core/src/test/java/io/aiven/kafka/tieredstorage/manifest/SegmentManifestProviderTest.java index 5fb4f2a1d..ab50de363 100644 --- a/core/src/test/java/io/aiven/kafka/tieredstorage/manifest/SegmentManifestProviderTest.java +++ b/core/src/test/java/io/aiven/kafka/tieredstorage/manifest/SegmentManifestProviderTest.java @@ -16,9 +16,12 @@ package io.aiven.kafka.tieredstorage.manifest; +import javax.management.ObjectName; + import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStream; +import java.lang.management.ManagementFactory; import java.time.Duration; import java.util.Optional; import java.util.concurrent.ForkJoinPool; @@ -40,6 +43,7 @@ import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; @@ -68,7 +72,7 @@ class SegmentManifestProviderTest { void setup() { provider = new SegmentManifestProvider( Optional.of(1000L), Optional.empty(), storage, MAPPER, - ForkJoinPool.commonPool()); + ForkJoinPool.commonPool(), false); } @Test @@ -76,7 +80,7 @@ void unboundedShouldBeCreated() { assertThatNoException() .isThrownBy(() -> new SegmentManifestProvider( Optional.empty(), Optional.of(Duration.ofMillis(1)), storage, MAPPER, - ForkJoinPool.commonPool())); + ForkJoinPool.commonPool(), false)); } @Test @@ -84,7 +88,7 @@ void withoutRetentionLimitsShouldBeCreated() { assertThatNoException() .isThrownBy(() -> new SegmentManifestProvider( Optional.of(1L), Optional.empty(), storage, MAPPER, - ForkJoinPool.commonPool())); + ForkJoinPool.commonPool(), false)); } @Test @@ -102,6 +106,40 @@ void shouldReturnAndCache() throws StorageBackendException, IOException { verifyNoMoreInteractions(storage); } + @Test + void invalidateCache_jmx() throws Exception { + provider = new SegmentManifestProvider( + OBJECT_KEY, Optional.of(1000L), Optional.empty(), storage, MAPPER, + ForkJoinPool.commonPool(), true); + + final String key = "topic-AAAAAAAAAAAAAAAAAAAAAQ/7/00000000000000000023-AAAAAAAAAAAAAAAAAAAAAA.rsm-manifest"; + final SegmentManifestV1 expectedManifest = new SegmentManifestV1( + new FixedSizeChunkIndex(100, 1000, 110, 110), + false, null + ); + when(storage.fetch(key)) + .thenReturn(new ByteArrayInputStream(MANIFEST.getBytes())); + assertThat(provider.get(REMOTE_LOG_METADATA)).isEqualTo(expectedManifest); + verify(storage).fetch(key); + + final var mbeanName = new ObjectName(SegmentManifestCacheManager.MBEAN_NAME); + final var mbeanServer = ManagementFactory.getPlatformMBeanServer(); + assertThat(mbeanServer.isRegistered(mbeanName)).isTrue(); + + final var sizeBefore = provider.cache().estimatedSize(); + assertThat(sizeBefore).isEqualTo(1L); + + mbeanServer.invoke(mbeanName, "clean", new Object[]{}, new String[]{}); + + final var sizeAfter = provider.cache().estimatedSize(); + assertThat(sizeAfter).isEqualTo(0L); + + when(storage.fetch(key)) + .thenReturn(new ByteArrayInputStream(MANIFEST.getBytes())); + assertThat(provider.get(REMOTE_LOG_METADATA)).isEqualTo(expectedManifest); + verify(storage, times(2)).fetch(key); + } + @Test void shouldPropagateStorageBackendException() throws StorageBackendException { when(storage.fetch(anyString()))