| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| package org.apache.cassandra.spark.reader; |
| |
| import java.io.IOException; |
| import java.util.Map; |
| import java.util.concurrent.Callable; |
| import java.util.concurrent.ExecutionException; |
| import java.util.concurrent.TimeUnit; |
| |
| import com.google.common.cache.Cache; |
| import com.google.common.cache.CacheBuilder; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import org.apache.cassandra.db.DecoratedKey; |
| import org.apache.cassandra.io.sstable.Descriptor; |
| import org.apache.cassandra.io.sstable.metadata.MetadataComponent; |
| import org.apache.cassandra.io.sstable.metadata.MetadataType; |
| import org.apache.cassandra.schema.TableMetadata; |
| import org.apache.cassandra.spark.data.SSTable; |
| import org.apache.cassandra.spark.utils.ThrowableUtils; |
| import org.apache.cassandra.utils.BloomFilter; |
| import org.apache.cassandra.utils.Pair; |
| import org.jetbrains.annotations.NotNull; |
| |
| /** |
| * Basic cache to reduce wasteful requests on the DataLayer for cacheable SSTable metadata, |
| * useful when running many Spark tasks on the same Spark worker |
| */ |
| @SuppressWarnings("UnstableApiUsage") |
| public class SSTableCache |
| { |
| private static final Logger LOGGER = LoggerFactory.getLogger(SSTableCache.class); |
| |
| public static final SSTableCache INSTANCE = new SSTableCache(); |
| private final Cache<SSTable, SummaryDbUtils.Summary> summary = buildCache(propOrDefault("sbr.cache.summary.maxEntries", 4096), |
| propOrDefault("sbr.cache.summary.expireAfterMins", 15)); |
| private final Cache<SSTable, Pair<DecoratedKey, DecoratedKey>> index = buildCache(propOrDefault("sbr.cache.index.maxEntries", 128), |
| propOrDefault("sbr.cache.index.expireAfterMins", 60)); |
| private final Cache<SSTable, Map<MetadataType, MetadataComponent>> stats = buildCache(propOrDefault("sbr.cache.stats.maxEntries", 16384), |
| propOrDefault("sbr.cache.stats.expireAfterMins", 60)); |
| private final Cache<SSTable, BloomFilter> filter = buildCache(propOrDefault("sbr.cache.filter.maxEntries", 16384), |
| propOrDefault("sbr.cache.filter.expireAfterMins", 60)); |
| |
| private static int propOrDefault(String name, int defaultValue) |
| { |
| String str = System.getProperty(name); |
| if (str != null) |
| { |
| try |
| { |
| return Integer.parseInt(str); |
| } |
| catch (NumberFormatException exception) |
| { |
| LOGGER.error("NumberFormatException for prop {} ", name, exception); |
| } |
| } |
| return defaultValue; |
| } |
| |
| private <T> Cache<SSTable, T> buildCache(int size, int expireAfterMins) |
| { |
| return CacheBuilder.newBuilder() |
| .expireAfterAccess(expireAfterMins, TimeUnit.MINUTES) |
| .maximumSize(size) |
| .build(); |
| } |
| |
| public SummaryDbUtils.Summary keysFromSummary(@NotNull TableMetadata metadata, |
| @NotNull SSTable ssTable) throws IOException |
| { |
| return get(summary, ssTable, () -> SummaryDbUtils.readSummary(metadata, ssTable)); |
| } |
| |
| public Pair<DecoratedKey, DecoratedKey> keysFromIndex(@NotNull TableMetadata metadata, |
| @NotNull SSTable ssTable) throws IOException |
| { |
| return get(index, ssTable, () -> ReaderUtils.keysFromIndex(metadata, ssTable)); |
| } |
| |
| public Map<MetadataType, MetadataComponent> componentMapFromStats(@NotNull SSTable ssTable, |
| Descriptor descriptor) throws IOException |
| { |
| return get(stats, ssTable, () -> ReaderUtils.deserializeStatsMetadata(ssTable, descriptor)); |
| } |
| |
| public BloomFilter bloomFilter(@NotNull SSTable ssTable, Descriptor descriptor) throws IOException |
| { |
| return get(filter, ssTable, () -> ReaderUtils.readFilter(ssTable, descriptor.version.hasOldBfFormat())); |
| } |
| |
| boolean containsSummary(@NotNull SSTable ssTable) |
| { |
| return contains(summary, ssTable); |
| } |
| |
| boolean containsIndex(@NotNull SSTable ssTable) |
| { |
| return contains(index, ssTable); |
| } |
| |
| boolean containsStats(@NotNull SSTable ssTable) |
| { |
| return contains(stats, ssTable); |
| } |
| |
| boolean containsFilter(@NotNull SSTable ssTable) |
| { |
| return contains(filter, ssTable); |
| } |
| |
| private static <T> boolean contains(@NotNull Cache<SSTable, T> cache, @NotNull SSTable ssTable) |
| { |
| return cache.getIfPresent(ssTable) != null; |
| } |
| |
| private static <T> T get(@NotNull Cache<SSTable, T> cache, |
| @NotNull SSTable ssTable, |
| @NotNull Callable<T> callable) throws IOException |
| { |
| try |
| { |
| return cache.get(ssTable, callable); |
| } |
| catch (ExecutionException exception) |
| { |
| throw toIOException(exception); |
| } |
| } |
| |
| private static IOException toIOException(Throwable throwable) |
| { |
| IOException ioException = ThrowableUtils.rootCause(throwable, IOException.class); |
| return ioException != null ? ioException : new IOException(ThrowableUtils.rootCause(throwable)); |
| } |
| } |