| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.cassandra.cache; |
| |
| import java.io.DataInput; |
| import java.io.DataOutput; |
| import java.io.IOException; |
| import java.nio.ByteBuffer; |
| import java.nio.channels.WritableByteChannel; |
| import java.util.Iterator; |
| |
| import com.google.common.base.Function; |
| |
| import org.apache.cassandra.config.DatabaseDescriptor; |
| import org.apache.cassandra.db.ColumnFamily; |
| import org.apache.cassandra.db.TypeSizes; |
| import org.apache.cassandra.io.util.DataOutputPlus; |
| import org.apache.cassandra.io.util.Memory; |
| import org.apache.cassandra.net.MessagingService; |
| import org.apache.cassandra.utils.Pair; |
| import org.caffinitas.ohc.OHCache; |
| import org.caffinitas.ohc.OHCacheBuilder; |
| |
| public class OHCProvider implements CacheProvider<RowCacheKey, IRowCacheEntry> |
| { |
| public ICache<RowCacheKey, IRowCacheEntry> create() |
| { |
| OHCacheBuilder<RowCacheKey, IRowCacheEntry> builder = OHCacheBuilder.newBuilder(); |
| builder.capacity(DatabaseDescriptor.getRowCacheSizeInMB() * 1024 * 1024) |
| .keySerializer(new KeySerializer()) |
| .valueSerializer(new ValueSerializer()) |
| .throwOOME(true); |
| |
| return new OHCacheAdapter(builder.build()); |
| } |
| |
| private static class OHCacheAdapter implements ICache<RowCacheKey, IRowCacheEntry> |
| { |
| private final OHCache<RowCacheKey, IRowCacheEntry> ohCache; |
| |
| public OHCacheAdapter(OHCache<RowCacheKey, IRowCacheEntry> ohCache) |
| { |
| this.ohCache = ohCache; |
| } |
| |
| public long capacity() |
| { |
| return ohCache.capacity(); |
| } |
| |
| public void setCapacity(long capacity) |
| { |
| ohCache.setCapacity(capacity); |
| } |
| |
| public void put(RowCacheKey key, IRowCacheEntry value) |
| { |
| ohCache.put(key, value); |
| } |
| |
| public boolean putIfAbsent(RowCacheKey key, IRowCacheEntry value) |
| { |
| return ohCache.putIfAbsent(key, value); |
| } |
| |
| public boolean replace(RowCacheKey key, IRowCacheEntry old, IRowCacheEntry value) |
| { |
| return ohCache.addOrReplace(key, old, value); |
| } |
| |
| public IRowCacheEntry get(RowCacheKey key) |
| { |
| return ohCache.get(key); |
| } |
| |
| public void remove(RowCacheKey key) |
| { |
| ohCache.remove(key); |
| } |
| |
| public int size() |
| { |
| return (int) ohCache.size(); |
| } |
| |
| public long weightedSize() |
| { |
| return ohCache.size(); |
| } |
| |
| public void clear() |
| { |
| ohCache.clear(); |
| } |
| |
| public Iterator<RowCacheKey> hotKeyIterator(int n) |
| { |
| return ohCache.hotKeyIterator(n); |
| } |
| |
| public Iterator<RowCacheKey> keyIterator() |
| { |
| return ohCache.keyIterator(); |
| } |
| |
| public boolean containsKey(RowCacheKey key) |
| { |
| return ohCache.containsKey(key); |
| } |
| } |
| |
| private static class KeySerializer implements org.caffinitas.ohc.CacheSerializer<RowCacheKey> |
| { |
| public void serialize(RowCacheKey rowCacheKey, DataOutput dataOutput) throws IOException |
| { |
| dataOutput.writeUTF(rowCacheKey.ksAndCFName.left); |
| dataOutput.writeUTF(rowCacheKey.ksAndCFName.right); |
| dataOutput.writeInt(rowCacheKey.key.length); |
| dataOutput.write(rowCacheKey.key); |
| } |
| |
| public RowCacheKey deserialize(DataInput dataInput) throws IOException |
| { |
| String ksName = dataInput.readUTF(); |
| String cfName = dataInput.readUTF(); |
| byte[] key = new byte[dataInput.readInt()]; |
| dataInput.readFully(key); |
| return new RowCacheKey(Pair.create(ksName, cfName), key); |
| } |
| |
| public int serializedSize(RowCacheKey rowCacheKey) |
| { |
| return TypeSizes.NATIVE.sizeof(rowCacheKey.ksAndCFName.left) |
| + TypeSizes.NATIVE.sizeof(rowCacheKey.ksAndCFName.right) |
| + 4 |
| + rowCacheKey.key.length; |
| } |
| } |
| |
| private static class ValueSerializer implements org.caffinitas.ohc.CacheSerializer<IRowCacheEntry> |
| { |
| public void serialize(IRowCacheEntry entry, DataOutput out) throws IOException |
| { |
| assert entry != null; // unlike CFS we don't support nulls, since there is no need for that in the cache |
| boolean isSentinel = entry instanceof RowCacheSentinel; |
| out.writeBoolean(isSentinel); |
| if (isSentinel) |
| out.writeLong(((RowCacheSentinel) entry).sentinelId); |
| else |
| ColumnFamily.serializer.serialize((ColumnFamily) entry, new DataOutputPlusAdapter(out), MessagingService.current_version); |
| } |
| |
| public IRowCacheEntry deserialize(DataInput in) throws IOException |
| { |
| boolean isSentinel = in.readBoolean(); |
| if (isSentinel) |
| return new RowCacheSentinel(in.readLong()); |
| return ColumnFamily.serializer.deserialize(in, MessagingService.current_version); |
| } |
| |
| public int serializedSize(IRowCacheEntry entry) |
| { |
| TypeSizes typeSizes = TypeSizes.NATIVE; |
| int size = typeSizes.sizeof(true); |
| if (entry instanceof RowCacheSentinel) |
| size += typeSizes.sizeof(((RowCacheSentinel) entry).sentinelId); |
| else |
| size += ColumnFamily.serializer.serializedSize((ColumnFamily) entry, typeSizes, MessagingService.current_version); |
| return size; |
| } |
| } |
| |
| static class DataOutputPlusAdapter implements DataOutputPlus |
| { |
| private final DataOutput out; |
| |
| public void write(byte[] b) throws IOException |
| { |
| out.write(b); |
| } |
| |
| public void write(byte[] b, int off, int len) throws IOException |
| { |
| out.write(b, off, len); |
| } |
| |
| public void write(int b) throws IOException |
| { |
| out.write(b); |
| } |
| |
| public void writeBoolean(boolean v) throws IOException |
| { |
| out.writeBoolean(v); |
| } |
| |
| public void writeByte(int v) throws IOException |
| { |
| out.writeByte(v); |
| } |
| |
| public void writeBytes(String s) throws IOException |
| { |
| out.writeBytes(s); |
| } |
| |
| public void writeChar(int v) throws IOException |
| { |
| out.writeChar(v); |
| } |
| |
| public void writeChars(String s) throws IOException |
| { |
| out.writeChars(s); |
| } |
| |
| public void writeDouble(double v) throws IOException |
| { |
| out.writeDouble(v); |
| } |
| |
| public void writeFloat(float v) throws IOException |
| { |
| out.writeFloat(v); |
| } |
| |
| public void writeInt(int v) throws IOException |
| { |
| out.writeInt(v); |
| } |
| |
| public void writeLong(long v) throws IOException |
| { |
| out.writeLong(v); |
| } |
| |
| public void writeShort(int v) throws IOException |
| { |
| out.writeShort(v); |
| } |
| |
| public void writeUTF(String s) throws IOException |
| { |
| out.writeUTF(s); |
| } |
| |
| public DataOutputPlusAdapter(DataOutput out) |
| { |
| this.out = out; |
| } |
| |
| public void write(ByteBuffer buffer) throws IOException |
| { |
| if (buffer.hasArray()) |
| out.write(buffer.array(), buffer.arrayOffset() + buffer.position(), buffer.remaining()); |
| else |
| throw new UnsupportedOperationException("IMPLEMENT ME"); |
| } |
| |
| public void write(Memory memory, long offset, long length) throws IOException |
| { |
| throw new UnsupportedOperationException("IMPLEMENT ME"); |
| } |
| |
| public <R> R applyToChannel(Function<WritableByteChannel, R> c) throws IOException |
| { |
| throw new UnsupportedOperationException("IMPLEMENT ME"); |
| } |
| } |
| } |