blob: 659b21e48fa00ce1cc06cbd9745cb32e5e67edf1 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.jackrabbit.oak.segment.remote.persistentcache;
import com.google.common.base.Stopwatch;
import org.apache.jackrabbit.oak.commons.Buffer;
import org.apache.jackrabbit.oak.segment.spi.monitor.IOMonitor;
import org.apache.jackrabbit.oak.segment.spi.persistence.persistentcache.AbstractPersistentCache;
import org.apache.jackrabbit.oak.segment.spi.persistence.persistentcache.SegmentCacheStats;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;
import redis.clients.jedis.exceptions.JedisException;
import redis.clients.jedis.params.SetParams;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.StringReader;
import java.nio.channels.Channels;
import java.nio.channels.WritableByteChannel;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import static org.apache.jackrabbit.oak.segment.remote.RemoteUtilities.OFF_HEAP;
public class PersistentRedisCache extends AbstractPersistentCache {
private static final Logger logger = LoggerFactory.getLogger(PersistentRedisCache.class);
public static final int DEFAULT_REDIS_CACHE_EXPIRE_SECONDS = 3600 * 24 * 2;
public static final String NAME = "Segment Redis Cache";
private static final String REDIS_PREFIX = "SEGMENT";
private final IOMonitor redisCacheIOMonitor;
private JedisPool redisPool;
private SetParams setParamsWithExpire;
public PersistentRedisCache(String redisHost, int redisPort, int redisExpireSeconds, int redisSocketTimeout,
int redisConnectionTimeout, int redisMinConnections, int redisMaxConnections, int redisMaxTotalConnections,
int redisDBIndex, IOMonitor redisCacheIOMonitor) {
this.redisCacheIOMonitor = redisCacheIOMonitor;
int redisExpireSeconds1 = redisExpireSeconds < 0 ? DEFAULT_REDIS_CACHE_EXPIRE_SECONDS : redisExpireSeconds;
setParamsWithExpire = SetParams.setParams().ex(redisExpireSeconds1);
if (redisPort == 0) {
redisPort = 6379;
}
JedisPoolConfig jedisPoolConfig = new JedisPoolConfig();
jedisPoolConfig.setTestOnBorrow(true);
jedisPoolConfig.setMaxWaitMillis(redisSocketTimeout);
jedisPoolConfig.setMinIdle(redisMinConnections);
jedisPoolConfig.setMaxIdle(redisMaxConnections);
jedisPoolConfig.setMaxTotal(redisMaxTotalConnections);
this.redisPool = new JedisPool(jedisPoolConfig, redisHost, redisPort, redisConnectionTimeout,
redisSocketTimeout, null, redisDBIndex, null);
this.segmentCacheStats = new SegmentCacheStats(NAME, this::getRedisMaxMemory, this::getCacheElementCount,
this::getCurrentWeight, this::getNumberOfEvictedKeys);
}
private long getCacheElementCount() {
try(Jedis redis = redisPool.getResource()) {
return redis.dbSize();
} catch (JedisException e) {
logger.error("Error getting number of elements in redis", e);
}
return -1;
}
private long getRedisMaxMemory() {
try{
return Long.parseLong(getRedisProperty("memory", "maxmemory"));
} catch (JedisException | IOException e) {
logger.error("Error getting redis configuration value for 'maxmemory'", e);
}
return -1;
}
private long getCurrentWeight() {
try{
return Long.parseLong(getRedisProperty("memory", "used_memory"));
} catch (JedisException | IOException e) {
logger.error("Error getting number of elements in redis", e);
}
return -1;
}
private long getNumberOfEvictedKeys() {
try{
return Long.parseLong(getRedisProperty("stats", "evicted_keys"));
} catch (JedisException | IOException e) {
logger.error("Error getting number of evicted elements in redis", e);
}
return -1;
}
private String getRedisProperty(String section, String propertyName) throws IOException {
try(Jedis redis = redisPool.getResource()) {
String redisInfoString = redis.info(section);
Properties props = new Properties();
props.load(new StringReader(redisInfoString));
return (String) props.get(propertyName);
}
}
@Override
protected Buffer readSegmentInternal(long msb, long lsb) {
String segmentId = new UUID(msb, lsb).toString();
Stopwatch stopwatch = Stopwatch.createStarted();
try(Jedis redis = redisPool.getResource()) {
redisCacheIOMonitor.beforeSegmentRead(null, msb, lsb, 0);
final byte[] bytes = redis.get((REDIS_PREFIX + ":" + segmentId).getBytes());
if (bytes != null) {
long elapsed = stopwatch.elapsed(TimeUnit.NANOSECONDS);
redisCacheIOMonitor.afterSegmentRead(null, msb, lsb, bytes.length, elapsed);
Buffer buffer;
if (OFF_HEAP) {
buffer = Buffer.allocateDirect(bytes.length);
} else {
buffer = Buffer.allocate(bytes.length);
}
buffer.put(bytes);
buffer.flip();
return buffer;
}
} catch (Exception e) {
logger.error("Error loading segment {} from cache", segmentId, e);
}
return null;
}
@Override
public boolean containsSegment(long msb, long lsb) {
String segmentId = new UUID(msb, lsb).toString();
try (Jedis redis = redisPool.getResource()) {
return redis.exists((REDIS_PREFIX + ":" + segmentId).getBytes());
} catch (JedisException e) {
logger.error("Error checking segment existence {} in cache: {}", segmentId, e);
}
return false;
}
@Override
public void writeSegment(long msb, long lsb, Buffer buffer) {
String segmentId = new UUID(msb, lsb).toString();
Buffer bufferCopy = buffer.duplicate();
Runnable task = () -> {
if (writesPending.add(segmentId)) {
final ByteArrayOutputStream bos = new ByteArrayOutputStream();
try (WritableByteChannel channel = Channels.newChannel(bos); Jedis redis = redisPool.getResource()) {
while (bufferCopy.hasRemaining()) {
bufferCopy.write(channel);
}
final byte[] key = (REDIS_PREFIX + ":" + segmentId).getBytes();
redis.set(key, bos.toByteArray(), setParamsWithExpire);
cacheSize.addAndGet(bos.size());
} catch (Throwable t) {
logger.debug("Unable to write segment {} to cache: {}", segmentId, t.getMessage());
} finally {
writesPending.remove(segmentId);
}
}
};
executor.execute(task);
}
@Override
public void cleanUp() {
// do nothing
}
}