blob: 7de95688484b51e0c7f07884c8e4b134552346fd [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.internal.memcached.commands;
import java.nio.BufferUnderflowException;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.util.Arrays;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.geode.annotations.internal.MakeNotStatic;
import org.apache.geode.cache.Cache;
import org.apache.geode.internal.memcached.KeyWrapper;
import org.apache.geode.internal.memcached.RequestReader;
import org.apache.geode.logging.internal.executors.LoggingThreadFactory;
import org.apache.geode.memcached.GemFireMemcachedServer.Protocol;
/**
* general format of the command is:<br/>
* <code>
* &lt;command name&gt; &lt;key&gt; &lt;flags&gt; &lt;exptime&gt; &lt;bytes&gt; [noreply]\r\n
* </code><br/>
* After this line, the client sends the data block:<br/>
* <code>
* &lt;data block&gt;\r\n
* </code>
*
*/
public abstract class StorageCommand extends AbstractCommand {
/**
* thread pool for scheduling expiration tasks
*/
@MakeNotStatic
private static final ScheduledExecutorService expiryExecutor =
new ScheduledThreadPoolExecutor(1, new LoggingThreadFactory("memcached-expiryExecutor"));
@MakeNotStatic
private static final ConcurrentMap<Object, ScheduledFuture> expiryFutures =
new ConcurrentHashMap<Object, ScheduledFuture>();
/**
* number of seconds in 30 days
*/
private static final long secsIn30Days = 60 * 60 * 24 * 30;
@Override
public ByteBuffer processCommand(RequestReader reader, Protocol protocol, Cache cache) {
ByteBuffer buffer = reader.getRequest();
if (protocol == Protocol.ASCII) {
return processAsciiCommand(buffer, cache);
}
return processBinaryComand(reader, cache);
}
private ByteBuffer processAsciiCommand(ByteBuffer buffer, Cache cache) {
CharBuffer flb = getFirstLineBuffer();
getAsciiDecoder().decode(buffer, flb, false);
flb.flip();
String firstLine = getFirstLine();
String[] firstLineElements = firstLine.split(" ");
String key = firstLineElements[1];
int flags = Integer.parseInt(firstLineElements[2]);
long expTime = Long.parseLong(firstLineElements[3]);
int numBytes = Integer.parseInt(stripNewline(firstLineElements[4]));
boolean noReply = false;
if (firstLineElements.length > 5) {
noReply = true;
}
byte[] value = new byte[numBytes];
buffer.position(firstLine.length());
try {
for (int i = 0; i < numBytes; i++) {
value[i] = buffer.get();
}
} catch (BufferUnderflowException e) {
throw new ClientError("error reading value");
}
if (getLogger().fineEnabled()) {
getLogger().fine("key:" + key);
getLogger().fine("value:" + Arrays.toString(value));
}
ByteBuffer retVal = processStorageCommand(key, value, flags, cache);
if (expTime > 0) {
scheduleExpiration(key, expTime, cache);
}
return noReply ? null : retVal;
}
private ByteBuffer processBinaryComand(RequestReader request, Cache cache) {
ByteBuffer buffer = request.getRequest();
int extrasLength = buffer.get(EXTRAS_LENGTH_INDEX);
int flags = 0, expTime = 0;
KeyWrapper key = getKey(buffer, HEADER_LENGTH + extrasLength);
if (extrasLength > 0) {
assert extrasLength == 8;
buffer.position(HEADER_LENGTH);
flags = buffer.getInt();
expTime = buffer.getInt();
}
byte[] value = getValue(buffer);
long cas = buffer.getLong(POSITION_CAS);
ByteBuffer retVal = processBinaryStorageCommand(key, value, cas, flags, cache, request);
if (expTime > 0) {
scheduleExpiration(key, expTime, cache);
}
if (getLogger().fineEnabled()) {
getLogger().fine("key:" + key);
getLogger().fine("value:" + Arrays.toString(value));
}
return retVal;
}
/**
* Schedules the entry to expire based on the following: the expiration time sent may either be
* Unix time (number of seconds since January 1, 1970, as a 32-bit value), or a number of seconds
* starting from current time. In the latter case, this number of seconds may not exceed
* 60*60*24*30 (number of seconds in 30 days); if the number sent by a client is larger than that,
* the server will consider it to be real Unix time value rather than an offset from current time.
*
*/
private void scheduleExpiration(final Object key, long p_expTime, final Cache cache) {
long expTime = p_expTime;
assert expTime > 0;
if (p_expTime > secsIn30Days) {
expTime = p_expTime - System.currentTimeMillis();
if (expTime < 0) {
getLogger().info("Invalid expiration time passed, key:" + key + " will not expire");
return;
}
}
ScheduledFuture f =
expiryExecutor.schedule(new ExpiryTask(cache, key), expTime, TimeUnit.SECONDS);
expiryFutures.put(key, f);
}
public abstract ByteBuffer processStorageCommand(String key, byte[] value, int flags,
Cache cache);
public abstract ByteBuffer processBinaryStorageCommand(Object key, byte[] value, long cas,
int flags, Cache cache, RequestReader request);
protected static ScheduledExecutorService getExpiryExecutor() {
return expiryExecutor;
}
/**
* reschedules expiration for a key only if one was previously scheduled
*
* @return true if successfully rescheduled, false otherwise
*/
public static boolean rescheduleExpiration(Cache cache, Object key, int newExpTime) {
ScheduledFuture f = expiryFutures.get(key);
if (f != null) {
if (f.cancel(false)) {
ScheduledFuture f2 =
expiryExecutor.schedule(new ExpiryTask(cache, key), newExpTime, TimeUnit.SECONDS);
expiryFutures.put(key, f2);
return true;
}
}
return false;
}
/**
* Removes key from the cache and expiryFuture
*/
public static class ExpiryTask implements Runnable {
private final Cache cache;
private final Object key;
public ExpiryTask(Cache cache, Object key) {
this.cache = cache;
this.key = key;
}
@Override
public void run() {
getMemcachedRegion(cache).remove(key);
expiryFutures.remove(key);
if (cache.getLogger().fineEnabled()) {
cache.getLogger().fine("expiration removed key:" + key);
}
}
}
}