blob: bbe81a895d07030cd4c16b40c8219271f7e7c990 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.samza.system;
import java.time.Duration;
import java.time.Instant;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.samza.util.Clock;
/**
* Fetches and caches metadata about a set of SSPs. When fetching metadata for a stale SSP, this will also prefetch
* metadata for all other SSPs specified for prefetching which are stale and in the same system.
*/
public class SSPMetadataCache {
private final SystemAdmins systemAdmins;
private final Duration cacheTTL;
private final Clock clock;
private final Set<SystemStreamPartition> sspsToPrefetch;
private final Object metadataRefreshLock;
private final ConcurrentHashMap<SystemStreamPartition, CacheEntry> cache;
public SSPMetadataCache(SystemAdmins systemAdmins, Duration cacheTTL, Clock clock,
Set<SystemStreamPartition> sspsToPrefetch) {
this.systemAdmins = systemAdmins;
this.cacheTTL = cacheTTL;
this.clock = clock;
this.sspsToPrefetch = sspsToPrefetch;
this.metadataRefreshLock = new Object();
this.cache = new ConcurrentHashMap<>();
}
/**
* Gets the metadata for an SSP. This will return a cached value if it is fresh enough. Otherwise, it will fetch the
* metadata from a source-of-truth.
* If the metadata for the SSP needs to be fetched, then this will also prefetch and cache the metadata for any stale
* {@link #sspsToPrefetch} that can be included in the same fetch call (e.g. same system).
*
* @param ssp SSP for which to get metadata
* @return metadata for the SSP; null if the source-of-truth returned no metadata
* @throws RuntimeException if there was an error in fetching metadata
*/
public SystemStreamMetadata.SystemStreamPartitionMetadata getMetadata(SystemStreamPartition ssp) {
maybeRefreshMetadata(ssp);
CacheEntry cacheEntry = cache.get(ssp);
/*
* cacheEntry itself should not be null once the refresh is done, but check anyways to be safe.
* The metadata inside a non-null cacheEntry might still be null, so this will return null in that case.
*/
return cacheEntry == null ? null : cacheEntry.getMetadata();
}
private void maybeRefreshMetadata(SystemStreamPartition requestedSSP) {
synchronized (this.metadataRefreshLock) {
Instant refreshRequestedAt = Instant.ofEpochMilli(this.clock.currentTimeMillis());
if (shouldRefresh(requestedSSP, refreshRequestedAt)) {
String system = requestedSSP.getSystem();
Set<SystemStreamPartition> sspsToFetchFor = new HashSet<>();
sspsToFetchFor.add(requestedSSP);
for (SystemStreamPartition sspToPrefetch : this.sspsToPrefetch) {
if (system.equals(sspToPrefetch.getSystem()) && shouldRefresh(sspToPrefetch, refreshRequestedAt)) {
sspsToFetchFor.add(sspToPrefetch);
}
}
SystemAdmin systemAdmin = this.systemAdmins.getSystemAdmin(system);
Map<SystemStreamPartition, SystemStreamMetadata.SystemStreamPartitionMetadata> fetchedMetadata =
systemAdmin.getSSPMetadata(sspsToFetchFor);
Instant updatedAt = Instant.ofEpochMilli(this.clock.currentTimeMillis());
// we want to add an entry even if there was no metadata, so iterate over sspsToFetchFor
sspsToFetchFor.forEach(ssp -> this.cache.put(ssp, new CacheEntry(fetchedMetadata.get(ssp), updatedAt)));
}
}
}
private boolean shouldRefresh(SystemStreamPartition ssp, Instant now) {
CacheEntry cacheEntry = cache.get(ssp);
if (cacheEntry == null) {
return true;
} else {
Instant isFreshUntil = cacheEntry.getLastUpdatedAt().plus(cacheTTL);
return now.isAfter(isFreshUntil);
}
}
private static class CacheEntry {
/**
* Nullable so that we can cache that there was no metadata for the last fetch.
*/
private final SystemStreamMetadata.SystemStreamPartitionMetadata metadata;
private final Instant lastUpdatedAt;
private CacheEntry(SystemStreamMetadata.SystemStreamPartitionMetadata metadata, Instant lastUpdatedAt) {
this.metadata = metadata;
this.lastUpdatedAt = lastUpdatedAt;
}
private SystemStreamMetadata.SystemStreamPartitionMetadata getMetadata() {
return metadata;
}
private Instant getLastUpdatedAt() {
return lastUpdatedAt;
}
}
}