| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more contributor license |
| * agreements. See the NOTICE file distributed with this work for additional information regarding |
| * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance with the License. You may obtain a |
| * copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software distributed under the License |
| * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
| * or implied. See the License for the specific language governing permissions and limitations under |
| * the License. |
| */ |
| package org.apache.geode.internal.memcached.commands; |
| |
| import java.nio.BufferUnderflowException; |
| import java.nio.ByteBuffer; |
| import java.nio.CharBuffer; |
| import java.util.Arrays; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.concurrent.ConcurrentMap; |
| import java.util.concurrent.ScheduledExecutorService; |
| import java.util.concurrent.ScheduledFuture; |
| import java.util.concurrent.ScheduledThreadPoolExecutor; |
| import java.util.concurrent.TimeUnit; |
| |
| import org.apache.geode.annotations.internal.MakeNotStatic; |
| import org.apache.geode.cache.Cache; |
| import org.apache.geode.internal.memcached.KeyWrapper; |
| import org.apache.geode.internal.memcached.RequestReader; |
| import org.apache.geode.logging.internal.executors.LoggingThreadFactory; |
| import org.apache.geode.memcached.GemFireMemcachedServer.Protocol; |
| |
| /** |
| * general format of the command is:<br/> |
| * <code> |
| * <command name> <key> <flags> <exptime> <bytes> [noreply]\r\n |
| * </code><br/> |
| * After this line, the client sends the data block:<br/> |
| * <code> |
| * <data block>\r\n |
| * </code> |
| * |
| */ |
| |
| public abstract class StorageCommand extends AbstractCommand { |
| |
| /** |
| * thread pool for scheduling expiration tasks |
| */ |
| @MakeNotStatic |
| private static final ScheduledExecutorService expiryExecutor = |
| new ScheduledThreadPoolExecutor(1, new LoggingThreadFactory("memcached-expiryExecutor")); |
| |
| @MakeNotStatic |
| private static final ConcurrentMap<Object, ScheduledFuture> expiryFutures = |
| new ConcurrentHashMap<Object, ScheduledFuture>(); |
| |
| /** |
| * number of seconds in 30 days |
| */ |
| private static final long secsIn30Days = 60 * 60 * 24 * 30; |
| |
| @Override |
| public ByteBuffer processCommand(RequestReader reader, Protocol protocol, Cache cache) { |
| ByteBuffer buffer = reader.getRequest(); |
| if (protocol == Protocol.ASCII) { |
| return processAsciiCommand(buffer, cache); |
| } |
| return processBinaryComand(reader, cache); |
| } |
| |
| private ByteBuffer processAsciiCommand(ByteBuffer buffer, Cache cache) { |
| CharBuffer flb = getFirstLineBuffer(); |
| getAsciiDecoder().decode(buffer, flb, false); |
| flb.flip(); |
| String firstLine = getFirstLine(); |
| String[] firstLineElements = firstLine.split(" "); |
| String key = firstLineElements[1]; |
| int flags = Integer.parseInt(firstLineElements[2]); |
| long expTime = Long.parseLong(firstLineElements[3]); |
| int numBytes = Integer.parseInt(stripNewline(firstLineElements[4])); |
| boolean noReply = false; |
| if (firstLineElements.length > 5) { |
| noReply = true; |
| } |
| byte[] value = new byte[numBytes]; |
| buffer.position(firstLine.length()); |
| try { |
| for (int i = 0; i < numBytes; i++) { |
| value[i] = buffer.get(); |
| } |
| } catch (BufferUnderflowException e) { |
| throw new ClientError("error reading value"); |
| } |
| if (getLogger().fineEnabled()) { |
| getLogger().fine("key:" + key); |
| getLogger().fine("value:" + Arrays.toString(value)); |
| } |
| ByteBuffer retVal = processStorageCommand(key, value, flags, cache); |
| if (expTime > 0) { |
| scheduleExpiration(key, expTime, cache); |
| } |
| return noReply ? null : retVal; |
| } |
| |
| private ByteBuffer processBinaryComand(RequestReader request, Cache cache) { |
| ByteBuffer buffer = request.getRequest(); |
| int extrasLength = buffer.get(EXTRAS_LENGTH_INDEX); |
| int flags = 0, expTime = 0; |
| |
| KeyWrapper key = getKey(buffer, HEADER_LENGTH + extrasLength); |
| |
| if (extrasLength > 0) { |
| assert extrasLength == 8; |
| buffer.position(HEADER_LENGTH); |
| flags = buffer.getInt(); |
| expTime = buffer.getInt(); |
| } |
| |
| byte[] value = getValue(buffer); |
| |
| long cas = buffer.getLong(POSITION_CAS); |
| |
| ByteBuffer retVal = processBinaryStorageCommand(key, value, cas, flags, cache, request); |
| if (expTime > 0) { |
| scheduleExpiration(key, expTime, cache); |
| } |
| if (getLogger().fineEnabled()) { |
| getLogger().fine("key:" + key); |
| getLogger().fine("value:" + Arrays.toString(value)); |
| } |
| return retVal; |
| } |
| |
| /** |
| * Schedules the entry to expire based on the following: the expiration time sent may either be |
| * Unix time (number of seconds since January 1, 1970, as a 32-bit value), or a number of seconds |
| * starting from current time. In the latter case, this number of seconds may not exceed |
| * 60*60*24*30 (number of seconds in 30 days); if the number sent by a client is larger than that, |
| * the server will consider it to be real Unix time value rather than an offset from current time. |
| * |
| */ |
| private void scheduleExpiration(final Object key, long p_expTime, final Cache cache) { |
| long expTime = p_expTime; |
| assert expTime > 0; |
| if (p_expTime > secsIn30Days) { |
| expTime = p_expTime - System.currentTimeMillis(); |
| if (expTime < 0) { |
| getLogger().info("Invalid expiration time passed, key:" + key + " will not expire"); |
| return; |
| } |
| } |
| ScheduledFuture f = |
| expiryExecutor.schedule(new ExpiryTask(cache, key), expTime, TimeUnit.SECONDS); |
| expiryFutures.put(key, f); |
| } |
| |
| public abstract ByteBuffer processStorageCommand(String key, byte[] value, int flags, |
| Cache cache); |
| |
| public abstract ByteBuffer processBinaryStorageCommand(Object key, byte[] value, long cas, |
| int flags, Cache cache, RequestReader request); |
| |
| protected static ScheduledExecutorService getExpiryExecutor() { |
| return expiryExecutor; |
| } |
| |
| /** |
| * reschedules expiration for a key only if one was previously scheduled |
| * |
| * @return true if successfully rescheduled, false otherwise |
| */ |
| public static boolean rescheduleExpiration(Cache cache, Object key, int newExpTime) { |
| ScheduledFuture f = expiryFutures.get(key); |
| if (f != null) { |
| if (f.cancel(false)) { |
| ScheduledFuture f2 = |
| expiryExecutor.schedule(new ExpiryTask(cache, key), newExpTime, TimeUnit.SECONDS); |
| expiryFutures.put(key, f2); |
| return true; |
| } |
| } |
| return false; |
| } |
| |
| /** |
| * Removes key from the cache and expiryFuture |
| */ |
| public static class ExpiryTask implements Runnable { |
| private final Cache cache; |
| private final Object key; |
| |
| public ExpiryTask(Cache cache, Object key) { |
| this.cache = cache; |
| this.key = key; |
| } |
| |
| @Override |
| public void run() { |
| getMemcachedRegion(cache).remove(key); |
| expiryFutures.remove(key); |
| if (cache.getLogger().fineEnabled()) { |
| cache.getLogger().fine("expiration removed key:" + key); |
| } |
| } |
| } |
| } |