blob: 6f6ceb5a6caa47bfe9c3d3d4787a44528a4d6482 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ipc;
import java.util.Arrays;
import java.util.UUID;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.ipc.metrics.RetryCacheMetrics;
import org.apache.hadoop.util.LightWeightCache;
import org.apache.hadoop.util.LightWeightGSet;
import org.apache.hadoop.util.LightWeightGSet.LinkedElement;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Maintains a cache of non-idempotent requests that have been successfully
* processed by the RPC server implementation, to handle the retries. A request
* is uniquely identified by the unique client ID + call ID of the RPC request.
* On receiving retried request, an entry will be found in the
* {@link RetryCache} and the previous response is sent back to the request.
* <p>
* To look an implementation using this cache, see HDFS FSNamesystem class.
*/
@InterfaceAudience.Private
public class RetryCache {
public static final Logger LOG = LoggerFactory.getLogger(RetryCache.class);
private final RetryCacheMetrics retryCacheMetrics;
private static final int MAX_CAPACITY = 16;
/**
* CacheEntry is tracked using unique client ID and callId of the RPC request
*/
public static class CacheEntry implements LightWeightCache.Entry {
/**
* Processing state of the requests
*/
private static byte INPROGRESS = 0;
private static byte SUCCESS = 1;
private static byte FAILED = 2;
private byte state = INPROGRESS;
// Store uuid as two long for better memory utilization
private final long clientIdMsb; // Most signficant bytes
private final long clientIdLsb; // Least significant bytes
private final int callId;
private final long expirationTime;
private LightWeightGSet.LinkedElement next;
CacheEntry(byte[] clientId, int callId, long expirationTime) {
// ClientId must be a UUID - that is 16 octets.
Preconditions.checkArgument(clientId.length == ClientId.BYTE_LENGTH,
"Invalid clientId - length is " + clientId.length
+ " expected length " + ClientId.BYTE_LENGTH);
// Convert UUID bytes to two longs
clientIdMsb = ClientId.getMsb(clientId);
clientIdLsb = ClientId.getLsb(clientId);
this.callId = callId;
this.expirationTime = expirationTime;
}
CacheEntry(byte[] clientId, int callId, long expirationTime,
boolean success) {
this(clientId, callId, expirationTime);
this.state = success ? SUCCESS : FAILED;
}
private static int hashCode(long value) {
return (int)(value ^ (value >>> 32));
}
@Override
public int hashCode() {
return (hashCode(clientIdMsb) * 31 + hashCode(clientIdLsb)) * 31 + callId;
}
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (!(obj instanceof CacheEntry)) {
return false;
}
CacheEntry other = (CacheEntry) obj;
return callId == other.callId && clientIdMsb == other.clientIdMsb
&& clientIdLsb == other.clientIdLsb;
}
@Override
public void setNext(LinkedElement next) {
this.next = next;
}
@Override
public LinkedElement getNext() {
return next;
}
synchronized void completed(boolean success) {
state = success ? SUCCESS : FAILED;
this.notifyAll();
}
public synchronized boolean isSuccess() {
return state == SUCCESS;
}
@Override
public void setExpirationTime(long timeNano) {
// expiration time does not change
}
@Override
public long getExpirationTime() {
return expirationTime;
}
@Override
public String toString() {
return (new UUID(this.clientIdMsb, this.clientIdLsb)).toString() + ":"
+ this.callId + ":" + this.state;
}
}
/**
* CacheEntry with payload that tracks the previous response or parts of
* previous response to be used for generating response for retried requests.
*/
public static class CacheEntryWithPayload extends CacheEntry {
private Object payload;
CacheEntryWithPayload(byte[] clientId, int callId, Object payload,
long expirationTime) {
super(clientId, callId, expirationTime);
this.payload = payload;
}
CacheEntryWithPayload(byte[] clientId, int callId, Object payload,
long expirationTime, boolean success) {
super(clientId, callId, expirationTime, success);
this.payload = payload;
}
/** Override equals to avoid findbugs warnings */
@Override
public boolean equals(Object obj) {
return super.equals(obj);
}
/** Override hashcode to avoid findbugs warnings */
@Override
public int hashCode() {
return super.hashCode();
}
public Object getPayload() {
return payload;
}
}
private final LightWeightGSet<CacheEntry, CacheEntry> set;
private final long expirationTime;
private String cacheName;
private final ReentrantLock lock = new ReentrantLock();
/**
* Constructor
* @param cacheName name to identify the cache by
* @param percentage percentage of total java heap space used by this cache
* @param expirationTime time for an entry to expire in nanoseconds
*/
public RetryCache(String cacheName, double percentage, long expirationTime) {
int capacity = LightWeightGSet.computeCapacity(percentage, cacheName);
capacity = capacity > MAX_CAPACITY ? capacity : MAX_CAPACITY;
this.set = new LightWeightCache<CacheEntry, CacheEntry>(capacity, capacity,
expirationTime, 0);
this.expirationTime = expirationTime;
this.cacheName = cacheName;
this.retryCacheMetrics = RetryCacheMetrics.create(this);
}
private static boolean skipRetryCache() {
// Do not track non RPC invocation or RPC requests with
// invalid callId or clientId in retry cache
return !Server.isRpcInvocation() || Server.getCallId() < 0
|| Arrays.equals(Server.getClientId(), RpcConstants.DUMMY_CLIENT_ID);
}
public void lock() {
this.lock.lock();
}
public void unlock() {
this.lock.unlock();
}
private void incrCacheClearedCounter() {
retryCacheMetrics.incrCacheCleared();
}
@VisibleForTesting
public LightWeightGSet<CacheEntry, CacheEntry> getCacheSet() {
return set;
}
@VisibleForTesting
public RetryCacheMetrics getMetricsForTests() {
return retryCacheMetrics;
}
/**
* This method returns cache name for metrics.
*/
public String getCacheName() {
return cacheName;
}
/**
* This method handles the following conditions:
* <ul>
* <li>If retry is not to be processed, return null</li>
* <li>If there is no cache entry, add a new entry {@code newEntry} and return
* it.</li>
* <li>If there is an existing entry, wait for its completion. If the
* completion state is {@link CacheEntry#FAILED}, the expectation is that the
* thread that waited for completion, retries the request. the
* {@link CacheEntry} state is set to {@link CacheEntry#INPROGRESS} again.
* <li>If the completion state is {@link CacheEntry#SUCCESS}, the entry is
* returned so that the thread that waits for it can can return previous
* response.</li>
* <ul>
*
* @return {@link CacheEntry}.
*/
private CacheEntry waitForCompletion(CacheEntry newEntry) {
CacheEntry mapEntry = null;
lock.lock();
try {
mapEntry = set.get(newEntry);
// If an entry in the cache does not exist, add a new one
if (mapEntry == null) {
if (LOG.isTraceEnabled()) {
LOG.trace("Adding Rpc request clientId "
+ newEntry.clientIdMsb + newEntry.clientIdLsb + " callId "
+ newEntry.callId + " to retryCache");
}
set.put(newEntry);
retryCacheMetrics.incrCacheUpdated();
return newEntry;
} else {
retryCacheMetrics.incrCacheHit();
}
} finally {
lock.unlock();
}
// Entry already exists in cache. Wait for completion and return its state
Preconditions.checkNotNull(mapEntry,
"Entry from the cache should not be null");
// Wait for in progress request to complete
synchronized (mapEntry) {
while (mapEntry.state == CacheEntry.INPROGRESS) {
try {
mapEntry.wait();
} catch (InterruptedException ie) {
// Restore the interrupted status
Thread.currentThread().interrupt();
}
}
// Previous request has failed, the expectation is is that it will be
// retried again.
if (mapEntry.state != CacheEntry.SUCCESS) {
mapEntry.state = CacheEntry.INPROGRESS;
}
}
return mapEntry;
}
/**
* Add a new cache entry into the retry cache. The cache entry consists of
* clientId and callId extracted from editlog.
*/
public void addCacheEntry(byte[] clientId, int callId) {
CacheEntry newEntry = new CacheEntry(clientId, callId, System.nanoTime()
+ expirationTime, true);
lock.lock();
try {
set.put(newEntry);
} finally {
lock.unlock();
}
retryCacheMetrics.incrCacheUpdated();
}
public void addCacheEntryWithPayload(byte[] clientId, int callId,
Object payload) {
// since the entry is loaded from editlog, we can assume it succeeded.
CacheEntry newEntry = new CacheEntryWithPayload(clientId, callId, payload,
System.nanoTime() + expirationTime, true);
lock.lock();
try {
set.put(newEntry);
} finally {
lock.unlock();
}
retryCacheMetrics.incrCacheUpdated();
}
private static CacheEntry newEntry(long expirationTime) {
return new CacheEntry(Server.getClientId(), Server.getCallId(),
System.nanoTime() + expirationTime);
}
private static CacheEntryWithPayload newEntry(Object payload,
long expirationTime) {
return new CacheEntryWithPayload(Server.getClientId(), Server.getCallId(),
payload, System.nanoTime() + expirationTime);
}
/** Static method that provides null check for retryCache */
public static CacheEntry waitForCompletion(RetryCache cache) {
if (skipRetryCache()) {
return null;
}
return cache != null ? cache
.waitForCompletion(newEntry(cache.expirationTime)) : null;
}
/** Static method that provides null check for retryCache */
public static CacheEntryWithPayload waitForCompletion(RetryCache cache,
Object payload) {
if (skipRetryCache()) {
return null;
}
return (CacheEntryWithPayload) (cache != null ? cache
.waitForCompletion(newEntry(payload, cache.expirationTime)) : null);
}
public static void setState(CacheEntry e, boolean success) {
if (e == null) {
return;
}
e.completed(success);
}
public static void setState(CacheEntryWithPayload e, boolean success,
Object payload) {
if (e == null) {
return;
}
e.payload = payload;
e.completed(success);
}
public static void clear(RetryCache cache) {
if (cache != null) {
cache.set.clear();
cache.incrCacheClearedCounter();
}
}
}