blob: 61e46b128a5f04127831f47209b649b7a69e66e9 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.client.impl;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_DOMAIN_SOCKET_DISABLE_INTERVAL_SECOND_KEY;
import static org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ShortCircuitFdResponse.USE_RECEIPT_VERIFICATION;
import java.io.BufferedOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.FileInputStream;
import java.io.IOException;
import java.lang.reflect.Constructor;
import java.net.InetSocketAddress;
import java.util.List;
import com.google.common.io.ByteArrayDataOutput;
import com.google.common.io.ByteStreams;
import org.apache.commons.lang.mutable.MutableBoolean;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.BlockReader;
import org.apache.hadoop.hdfs.ClientContext;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DFSInputStream;
import org.apache.hadoop.hdfs.DFSUtilClient;
import org.apache.hadoop.hdfs.ExtendedBlockId;
import org.apache.hadoop.hdfs.RemotePeerFactory;
import org.apache.hadoop.hdfs.ReplicaAccessor;
import org.apache.hadoop.hdfs.ReplicaAccessorBuilder;
import org.apache.hadoop.hdfs.client.impl.DfsClientConf.ShortCircuitConf;
import org.apache.hadoop.hdfs.net.DomainPeer;
import org.apache.hadoop.hdfs.net.Peer;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException;
import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
import org.apache.hadoop.hdfs.shortcircuit.DomainSocketFactory;
import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache;
import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache.ShortCircuitReplicaCreator;
import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitReplica;
import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitReplicaInfo;
import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.Slot;
import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.SlotId;
import org.apache.hadoop.hdfs.util.IOUtilsClient;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.net.unix.DomainSocket;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.SecretManager.InvalidToken;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.PerformanceAdvisory;
import org.apache.hadoop.util.Time;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import org.apache.htrace.core.Tracer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Utility class to create BlockReader implementations.
*/
@InterfaceAudience.Private
public class BlockReaderFactory implements ShortCircuitReplicaCreator {
static final Logger LOG = LoggerFactory.getLogger(BlockReaderFactory.class);
private static final int SMALL_BUFFER_SIZE = 512;
public static class FailureInjector {
public void injectRequestFileDescriptorsFailure() throws IOException {
// do nothing
}
public boolean getSupportsReceiptVerification() {
return true;
}
}
@VisibleForTesting
static ShortCircuitReplicaCreator
createShortCircuitReplicaInfoCallback = null;
private final DfsClientConf conf;
/**
* Injects failures into specific operations during unit tests.
*/
private static FailureInjector failureInjector = new FailureInjector();
/**
* The file name, for logging and debugging purposes.
*/
private String fileName;
/**
* The block ID and block pool ID to use.
*/
private ExtendedBlock block;
/**
* The block token to use for security purposes.
*/
private Token<BlockTokenIdentifier> token;
/**
* The offset within the block to start reading at.
*/
private long startOffset;
/**
* If false, we won't try to verify the block checksum.
*/
private boolean verifyChecksum;
/**
* The name of this client.
*/
private String clientName;
/**
* The DataNode we're talking to.
*/
private DatanodeInfo datanode;
/**
* StorageType of replica on DataNode.
*/
private StorageType storageType;
/**
* If false, we won't try short-circuit local reads.
*/
private boolean allowShortCircuitLocalReads;
/**
* The ClientContext to use for things like the PeerCache.
*/
private ClientContext clientContext;
/**
* Number of bytes to read. Must be set to a non-negative value.
*/
private long length = -1;
/**
* Caching strategy to use when reading the block.
*/
private CachingStrategy cachingStrategy;
/**
* Socket address to use to connect to peer.
*/
private InetSocketAddress inetSocketAddress;
/**
* Remote peer factory to use to create a peer, if needed.
*/
private RemotePeerFactory remotePeerFactory;
/**
* UserGroupInformation to use for legacy block reader local objects,
* if needed.
*/
private UserGroupInformation userGroupInformation;
/**
* Configuration to use for legacy block reader local objects, if needed.
*/
private Configuration configuration;
/**
* The HTrace tracer to use.
*/
private Tracer tracer;
/**
* Information about the domain socket path we should use to connect to the
* local peer-- or null if we haven't examined the local domain socket.
*/
private DomainSocketFactory.PathInfo pathInfo;
/**
* The remaining number of times that we'll try to pull a socket out of the
* cache.
*/
private int remainingCacheTries;
public BlockReaderFactory(DfsClientConf conf) {
this.conf = conf;
this.remainingCacheTries = conf.getNumCachedConnRetry();
}
public BlockReaderFactory setFileName(String fileName) {
this.fileName = fileName;
return this;
}
public BlockReaderFactory setBlock(ExtendedBlock block) {
this.block = block;
return this;
}
public BlockReaderFactory setBlockToken(Token<BlockTokenIdentifier> token) {
this.token = token;
return this;
}
public BlockReaderFactory setStartOffset(long startOffset) {
this.startOffset = startOffset;
return this;
}
public BlockReaderFactory setVerifyChecksum(boolean verifyChecksum) {
this.verifyChecksum = verifyChecksum;
return this;
}
public BlockReaderFactory setClientName(String clientName) {
this.clientName = clientName;
return this;
}
public BlockReaderFactory setDatanodeInfo(DatanodeInfo datanode) {
this.datanode = datanode;
return this;
}
public BlockReaderFactory setStorageType(StorageType storageType) {
this.storageType = storageType;
return this;
}
public BlockReaderFactory setAllowShortCircuitLocalReads(
boolean allowShortCircuitLocalReads) {
this.allowShortCircuitLocalReads = allowShortCircuitLocalReads;
return this;
}
public BlockReaderFactory setClientCacheContext(
ClientContext clientContext) {
this.clientContext = clientContext;
return this;
}
public BlockReaderFactory setLength(long length) {
this.length = length;
return this;
}
public BlockReaderFactory setCachingStrategy(
CachingStrategy cachingStrategy) {
this.cachingStrategy = cachingStrategy;
return this;
}
public BlockReaderFactory setInetSocketAddress (
InetSocketAddress inetSocketAddress) {
this.inetSocketAddress = inetSocketAddress;
return this;
}
public BlockReaderFactory setUserGroupInformation(
UserGroupInformation userGroupInformation) {
this.userGroupInformation = userGroupInformation;
return this;
}
public BlockReaderFactory setRemotePeerFactory(
RemotePeerFactory remotePeerFactory) {
this.remotePeerFactory = remotePeerFactory;
return this;
}
public BlockReaderFactory setConfiguration(
Configuration configuration) {
this.configuration = configuration;
return this;
}
public BlockReaderFactory setTracer(Tracer tracer) {
this.tracer = tracer;
return this;
}
@VisibleForTesting
public static void setFailureInjectorForTesting(FailureInjector injector) {
failureInjector = injector;
}
/**
* Build a BlockReader with the given options.
*
* This function will do the best it can to create a block reader that meets
* all of our requirements. We prefer short-circuit block readers
* (BlockReaderLocal and BlockReaderLocalLegacy) over remote ones, since the
* former avoid the overhead of socket communication. If short-circuit is
* unavailable, our next fallback is data transfer over UNIX domain sockets,
* if dfs.client.domain.socket.data.traffic has been enabled. If that doesn't
* work, we will try to create a remote block reader that operates over TCP
* sockets.
*
* There are a few caches that are important here.
*
* The ShortCircuitCache stores file descriptor objects which have been passed
* from the DataNode.
*
* The DomainSocketFactory stores information about UNIX domain socket paths
* that we not been able to use in the past, so that we don't waste time
* retrying them over and over. (Like all the caches, it does have a timeout,
* though.)
*
* The PeerCache stores peers that we have used in the past. If we can reuse
* one of these peers, we avoid the overhead of re-opening a socket. However,
* if the socket has been timed out on the remote end, our attempt to reuse
* the socket may end with an IOException. For that reason, we limit our
* attempts at socket reuse to dfs.client.cached.conn.retry times. After
* that, we create new sockets. This avoids the problem where a thread tries
* to talk to a peer that it hasn't talked to in a while, and has to clean out
* every entry in a socket cache full of stale entries.
*
* @return The new BlockReader. We will not return null.
*
* @throws InvalidToken
* If the block token was invalid.
* InvalidEncryptionKeyException
* If the encryption key was invalid.
* Other IOException
* If there was another problem.
*/
public BlockReader build() throws IOException {
Preconditions.checkNotNull(configuration);
Preconditions
.checkState(length >= 0, "Length must be set to a non-negative value");
BlockReader reader = tryToCreateExternalBlockReader();
if (reader != null) {
return reader;
}
final ShortCircuitConf scConf = conf.getShortCircuitConf();
if (scConf.isShortCircuitLocalReads() && allowShortCircuitLocalReads) {
if (clientContext.getUseLegacyBlockReaderLocal()) {
reader = getLegacyBlockReaderLocal();
if (reader != null) {
LOG.trace("{}: returning new legacy block reader local.", this);
return reader;
}
} else {
reader = getBlockReaderLocal();
if (reader != null) {
LOG.trace("{}: returning new block reader local.", this);
return reader;
}
}
}
if (scConf.isDomainSocketDataTraffic()) {
reader = getRemoteBlockReaderFromDomain();
if (reader != null) {
LOG.trace("{}: returning new remote block reader using UNIX domain "
+ "socket on {}", this, pathInfo.getPath());
return reader;
}
}
Preconditions.checkState(!DFSInputStream.tcpReadsDisabledForTesting,
"TCP reads were disabled for testing, but we failed to " +
"do a non-TCP read.");
return getRemoteBlockReaderFromTcp();
}
private BlockReader tryToCreateExternalBlockReader() {
List<Class<? extends ReplicaAccessorBuilder>> clses =
conf.getReplicaAccessorBuilderClasses();
for (Class<? extends ReplicaAccessorBuilder> cls : clses) {
try {
ByteArrayDataOutput bado = ByteStreams.newDataOutput();
token.write(bado);
byte tokenBytes[] = bado.toByteArray();
Constructor<? extends ReplicaAccessorBuilder> ctor =
cls.getConstructor();
ReplicaAccessorBuilder builder = ctor.newInstance();
long visibleLength = startOffset + length;
ReplicaAccessor accessor = builder.
setAllowShortCircuitReads(allowShortCircuitLocalReads).
setBlock(block.getBlockId(), block.getBlockPoolId()).
setGenerationStamp(block.getGenerationStamp()).
setBlockAccessToken(tokenBytes).
setClientName(clientName).
setConfiguration(configuration).
setFileName(fileName).
setVerifyChecksum(verifyChecksum).
setVisibleLength(visibleLength).
build();
if (accessor == null) {
LOG.trace("{}: No ReplicaAccessor created by {}",
this, cls.getName());
} else {
return new ExternalBlockReader(accessor, visibleLength, startOffset);
}
} catch (Throwable t) {
LOG.warn("Failed to construct new object of type " +
cls.getName(), t);
}
}
return null;
}
/**
* Get {@link BlockReaderLocalLegacy} for short circuited local reads.
* This block reader implements the path-based style of local reads
* first introduced in HDFS-2246.
*/
private BlockReader getLegacyBlockReaderLocal() throws IOException {
LOG.trace("{}: trying to construct BlockReaderLocalLegacy", this);
if (!DFSUtilClient.isLocalAddress(inetSocketAddress)) {
LOG.trace("{}: can't construct BlockReaderLocalLegacy because the address"
+ "{} is not local", this, inetSocketAddress);
return null;
}
if (clientContext.getDisableLegacyBlockReaderLocal()) {
PerformanceAdvisory.LOG.debug("{}: can't construct " +
"BlockReaderLocalLegacy because " +
"disableLegacyBlockReaderLocal is set.", this);
return null;
}
IOException ioe;
try {
return BlockReaderLocalLegacy.newBlockReader(conf,
userGroupInformation, configuration, fileName, block, token,
datanode, startOffset, length, storageType, tracer);
} catch (RemoteException remoteException) {
ioe = remoteException.unwrapRemoteException(
InvalidToken.class, AccessControlException.class);
} catch (IOException e) {
ioe = e;
}
if ((!(ioe instanceof AccessControlException)) &&
isSecurityException(ioe)) {
// Handle security exceptions.
// We do not handle AccessControlException here, since
// BlockReaderLocalLegacy#newBlockReader uses that exception to indicate
// that the user is not in dfs.block.local-path-access.user, a condition
// which requires us to disable legacy SCR.
throw ioe;
}
LOG.warn(this + ": error creating legacy BlockReaderLocal. " +
"Disabling legacy local reads.", ioe);
clientContext.setDisableLegacyBlockReaderLocal();
return null;
}
private BlockReader getBlockReaderLocal() throws InvalidToken {
LOG.trace("{}: trying to construct a BlockReaderLocal for short-circuit "
+ " reads.", this);
if (pathInfo == null) {
pathInfo = clientContext.getDomainSocketFactory()
.getPathInfo(inetSocketAddress, conf.getShortCircuitConf());
}
if (!pathInfo.getPathState().getUsableForShortCircuit()) {
PerformanceAdvisory.LOG.debug("{}: {} is not usable for short circuit; " +
"giving up on BlockReaderLocal.", this, pathInfo);
return null;
}
ShortCircuitCache cache = clientContext.getShortCircuitCache();
ExtendedBlockId key = new ExtendedBlockId(block.getBlockId(),
block.getBlockPoolId());
ShortCircuitReplicaInfo info = cache.fetchOrCreate(key, this);
InvalidToken exc = info.getInvalidTokenException();
if (exc != null) {
LOG.trace("{}: got InvalidToken exception while trying to construct "
+ "BlockReaderLocal via {}", this, pathInfo.getPath());
throw exc;
}
if (info.getReplica() == null) {
PerformanceAdvisory.LOG.debug("{}: failed to get " +
"ShortCircuitReplica. Cannot construct " +
"BlockReaderLocal via {}", this, pathInfo.getPath());
return null;
}
return new BlockReaderLocal.Builder(conf.getShortCircuitConf()).
setFilename(fileName).
setBlock(block).
setStartOffset(startOffset).
setShortCircuitReplica(info.getReplica()).
setVerifyChecksum(verifyChecksum).
setCachingStrategy(cachingStrategy).
setStorageType(storageType).
setTracer(tracer).
build();
}
/**
* Fetch a pair of short-circuit block descriptors from a local DataNode.
*
* @return Null if we could not communicate with the datanode,
* a new ShortCircuitReplicaInfo object otherwise.
* ShortCircuitReplicaInfo objects may contain either an
* InvalidToken exception, or a ShortCircuitReplica object ready to
* use.
*/
@Override
public ShortCircuitReplicaInfo createShortCircuitReplicaInfo() {
if (createShortCircuitReplicaInfoCallback != null) {
ShortCircuitReplicaInfo info =
createShortCircuitReplicaInfoCallback.createShortCircuitReplicaInfo();
if (info != null) return info;
}
LOG.trace("{}: trying to create ShortCircuitReplicaInfo.", this);
BlockReaderPeer curPeer;
while (true) {
curPeer = nextDomainPeer();
if (curPeer == null) break;
if (curPeer.fromCache) remainingCacheTries--;
DomainPeer peer = (DomainPeer)curPeer.peer;
Slot slot = null;
ShortCircuitCache cache = clientContext.getShortCircuitCache();
try {
MutableBoolean usedPeer = new MutableBoolean(false);
slot = cache.allocShmSlot(datanode, peer, usedPeer,
new ExtendedBlockId(block.getBlockId(), block.getBlockPoolId()),
clientName);
if (usedPeer.booleanValue()) {
LOG.trace("{}: allocShmSlot used up our previous socket {}. "
+ "Allocating a new one...", this, peer.getDomainSocket());
curPeer = nextDomainPeer();
if (curPeer == null) break;
peer = (DomainPeer)curPeer.peer;
}
ShortCircuitReplicaInfo info = requestFileDescriptors(peer, slot);
clientContext.getPeerCache().put(datanode, peer);
return info;
} catch (IOException e) {
if (slot != null) {
cache.freeSlot(slot);
}
if (curPeer.fromCache) {
// Handle an I/O error we got when using a cached socket.
// These are considered less serious, because the socket may be stale.
LOG.debug("{}: closing stale domain peer {}", this, peer, e);
IOUtilsClient.cleanup(LOG, peer);
} else {
// Handle an I/O error we got when using a newly created socket.
// We temporarily disable the domain socket path for a few minutes in
// this case, to prevent wasting more time on it.
LOG.warn(this + ": I/O error requesting file descriptors. " +
"Disabling domain socket " + peer.getDomainSocket(), e);
IOUtilsClient.cleanup(LOG, peer);
clientContext.getDomainSocketFactory()
.disableDomainSocketPath(pathInfo.getPath());
return null;
}
}
}
return null;
}
/**
* Request file descriptors from a DomainPeer.
*
* @param peer The peer to use for communication.
* @param slot If non-null, the shared memory slot to associate with the
* new ShortCircuitReplica.
*
* @return A ShortCircuitReplica object if we could communicate with the
* datanode; null, otherwise.
* @throws IOException If we encountered an I/O exception while communicating
* with the datanode.
*/
private ShortCircuitReplicaInfo requestFileDescriptors(DomainPeer peer,
Slot slot) throws IOException {
ShortCircuitCache cache = clientContext.getShortCircuitCache();
final DataOutputStream out =
new DataOutputStream(new BufferedOutputStream(peer.getOutputStream(), SMALL_BUFFER_SIZE));
SlotId slotId = slot == null ? null : slot.getSlotId();
new Sender(out).requestShortCircuitFds(block, token, slotId, 1,
failureInjector.getSupportsReceiptVerification());
DataInputStream in = new DataInputStream(peer.getInputStream());
BlockOpResponseProto resp = BlockOpResponseProto.parseFrom(
PBHelperClient.vintPrefixed(in));
DomainSocket sock = peer.getDomainSocket();
failureInjector.injectRequestFileDescriptorsFailure();
switch (resp.getStatus()) {
case SUCCESS:
byte buf[] = new byte[1];
FileInputStream[] fis = new FileInputStream[2];
sock.recvFileInputStreams(fis, buf, 0, buf.length);
ShortCircuitReplica replica = null;
try {
ExtendedBlockId key =
new ExtendedBlockId(block.getBlockId(), block.getBlockPoolId());
if (buf[0] == USE_RECEIPT_VERIFICATION.getNumber()) {
LOG.trace("Sending receipt verification byte for slot {}", slot);
sock.getOutputStream().write(0);
}
replica = new ShortCircuitReplica(key, fis[0], fis[1], cache,
Time.monotonicNow(), slot);
return new ShortCircuitReplicaInfo(replica);
} catch (IOException e) {
// This indicates an error reading from disk, or a format error. Since
// it's not a socket communication problem, we return null rather than
// throwing an exception.
LOG.warn(this + ": error creating ShortCircuitReplica.", e);
return null;
} finally {
if (replica == null) {
IOUtilsClient.cleanup(DFSClient.LOG, fis[0], fis[1]);
}
}
case ERROR_UNSUPPORTED:
if (!resp.hasShortCircuitAccessVersion()) {
LOG.warn("short-circuit read access is disabled for " +
"DataNode " + datanode + ". reason: " + resp.getMessage());
clientContext.getDomainSocketFactory()
.disableShortCircuitForPath(pathInfo.getPath());
} else {
LOG.warn("short-circuit read access for the file " +
fileName + " is disabled for DataNode " + datanode +
". reason: " + resp.getMessage());
}
return null;
case ERROR_ACCESS_TOKEN:
String msg = "access control error while " +
"attempting to set up short-circuit access to " +
fileName + resp.getMessage();
LOG.debug("{}:{}", this, msg);
return new ShortCircuitReplicaInfo(new InvalidToken(msg));
default:
final long expiration =
clientContext.getDomainSocketFactory().getPathExpireSeconds();
String disableMsg = "disabled temporarily for " + expiration + " seconds";
if (expiration == 0) {
disableMsg = "not disabled";
}
LOG.warn("{}: unknown response code {} while attempting to set up "
+ "short-circuit access. {}. Short-circuit read for "
+ "DataNode {} is {} based on {}.",
this, resp.getStatus(), resp.getMessage(), datanode,
disableMsg, DFS_DOMAIN_SOCKET_DISABLE_INTERVAL_SECOND_KEY);
clientContext.getDomainSocketFactory()
.disableShortCircuitForPath(pathInfo.getPath());
return null;
}
}
/**
* Get a BlockReaderRemote that communicates over a UNIX domain socket.
*
* @return The new BlockReader, or null if we failed to create the block
* reader.
*
* @throws InvalidToken If the block token was invalid.
* Potentially other security-related execptions.
*/
private BlockReader getRemoteBlockReaderFromDomain() throws IOException {
if (pathInfo == null) {
pathInfo = clientContext.getDomainSocketFactory()
.getPathInfo(inetSocketAddress, conf.getShortCircuitConf());
}
if (!pathInfo.getPathState().getUsableForDataTransfer()) {
PerformanceAdvisory.LOG.debug("{}: not trying to create a " +
"remote block reader because the UNIX domain socket at {}" +
" is not usable.", this, pathInfo);
return null;
}
LOG.trace("{}: trying to create a remote block reader from the UNIX domain "
+ "socket at {}", this, pathInfo.getPath());
while (true) {
BlockReaderPeer curPeer = nextDomainPeer();
if (curPeer == null) break;
if (curPeer.fromCache) remainingCacheTries--;
DomainPeer peer = (DomainPeer)curPeer.peer;
BlockReader blockReader = null;
try {
blockReader = getRemoteBlockReader(peer);
return blockReader;
} catch (IOException ioe) {
IOUtilsClient.cleanup(LOG, peer);
if (isSecurityException(ioe)) {
LOG.trace("{}: got security exception while constructing a remote "
+ " block reader from the unix domain socket at {}",
this, pathInfo.getPath(), ioe);
throw ioe;
}
if (curPeer.fromCache) {
// Handle an I/O error we got when using a cached peer. These are
// considered less serious because the underlying socket may be stale.
LOG.debug("Closed potentially stale domain peer {}", peer, ioe);
} else {
// Handle an I/O error we got when using a newly created domain peer.
// We temporarily disable the domain socket path for a few minutes in
// this case, to prevent wasting more time on it.
LOG.warn("I/O error constructing remote block reader. Disabling " +
"domain socket " + peer.getDomainSocket(), ioe);
clientContext.getDomainSocketFactory()
.disableDomainSocketPath(pathInfo.getPath());
return null;
}
} finally {
if (blockReader == null) {
IOUtilsClient.cleanup(LOG, peer);
}
}
}
return null;
}
/**
* Get a BlockReaderRemote that communicates over a TCP socket.
*
* @return The new BlockReader. We will not return null, but instead throw
* an exception if this fails.
*
* @throws InvalidToken
* If the block token was invalid.
* InvalidEncryptionKeyException
* If the encryption key was invalid.
* Other IOException
* If there was another problem.
*/
private BlockReader getRemoteBlockReaderFromTcp() throws IOException {
LOG.trace("{}: trying to create a remote block reader from a TCP socket",
this);
BlockReader blockReader = null;
while (true) {
BlockReaderPeer curPeer = null;
Peer peer = null;
try {
curPeer = nextTcpPeer();
if (curPeer.fromCache) remainingCacheTries--;
peer = curPeer.peer;
blockReader = getRemoteBlockReader(peer);
return blockReader;
} catch (IOException ioe) {
if (isSecurityException(ioe)) {
LOG.trace("{}: got security exception while constructing a remote "
+ "block reader from {}", this, peer, ioe);
throw ioe;
}
if ((curPeer != null) && curPeer.fromCache) {
// Handle an I/O error we got when using a cached peer. These are
// considered less serious, because the underlying socket may be
// stale.
LOG.debug("Closed potentially stale remote peer {}", peer, ioe);
} else {
// Handle an I/O error we got when using a newly created peer.
LOG.warn("I/O error constructing remote block reader.", ioe);
throw ioe;
}
} finally {
if (blockReader == null) {
IOUtilsClient.cleanup(LOG, peer);
}
}
}
}
public static class BlockReaderPeer {
final Peer peer;
final boolean fromCache;
BlockReaderPeer(Peer peer, boolean fromCache) {
this.peer = peer;
this.fromCache = fromCache;
}
}
/**
* Get the next DomainPeer-- either from the cache or by creating it.
*
* @return the next DomainPeer, or null if we could not construct one.
*/
private BlockReaderPeer nextDomainPeer() {
if (remainingCacheTries > 0) {
Peer peer = clientContext.getPeerCache().get(datanode, true);
if (peer != null) {
LOG.trace("nextDomainPeer: reusing existing peer {}", peer);
return new BlockReaderPeer(peer, true);
}
}
DomainSocket sock = clientContext.getDomainSocketFactory().
createSocket(pathInfo, conf.getSocketTimeout());
if (sock == null) return null;
return new BlockReaderPeer(new DomainPeer(sock), false);
}
/**
* Get the next TCP-based peer-- either from the cache or by creating it.
*
* @return the next Peer, or null if we could not construct one.
*
* @throws IOException If there was an error while constructing the peer
* (such as an InvalidEncryptionKeyException)
*/
private BlockReaderPeer nextTcpPeer() throws IOException {
if (remainingCacheTries > 0) {
Peer peer = clientContext.getPeerCache().get(datanode, false);
if (peer != null) {
LOG.trace("nextTcpPeer: reusing existing peer {}", peer);
return new BlockReaderPeer(peer, true);
}
}
try {
Peer peer = remotePeerFactory.newConnectedPeer(inetSocketAddress, token,
datanode);
LOG.trace("nextTcpPeer: created newConnectedPeer {}", peer);
return new BlockReaderPeer(peer, false);
} catch (IOException e) {
LOG.trace("nextTcpPeer: failed to create newConnectedPeer connected to"
+ "{}", datanode);
throw e;
}
}
/**
* Determine if an exception is security-related.
*
* We need to handle these exceptions differently than other IOExceptions.
* They don't indicate a communication problem. Instead, they mean that there
* is some action the client needs to take, such as refetching block tokens,
* renewing encryption keys, etc.
*
* @param ioe The exception
* @return True only if the exception is security-related.
*/
private static boolean isSecurityException(IOException ioe) {
return (ioe instanceof InvalidToken) ||
(ioe instanceof InvalidEncryptionKeyException) ||
(ioe instanceof InvalidBlockTokenException) ||
(ioe instanceof AccessControlException);
}
@SuppressWarnings("deprecation")
private BlockReader getRemoteBlockReader(Peer peer) throws IOException {
int networkDistance = clientContext.getNetworkDistance(datanode);
if (conf.getShortCircuitConf().isUseLegacyBlockReader()) {
return BlockReaderRemote.newBlockReader(fileName,
block, token, startOffset, length, conf.getIoBufferSize(),
verifyChecksum, clientName, peer, datanode,
clientContext.getPeerCache(), cachingStrategy, tracer,
networkDistance);
} else {
return BlockReaderRemote2.newBlockReader(
fileName, block, token, startOffset, length,
verifyChecksum, clientName, peer, datanode,
clientContext.getPeerCache(), cachingStrategy, tracer,
networkDistance);
}
}
@Override
public String toString() {
return "BlockReaderFactory(fileName=" + fileName + ", block=" + block + ")";
}
/**
* File name to print when accessing a block directly (from servlets)
* @param s Address of the block location
* @param poolId Block pool ID of the block
* @param blockId Block ID of the block
* @return string that has a file name for debug purposes
*/
public static String getFileName(final InetSocketAddress s,
final String poolId, final long blockId) {
return s.toString() + ":" + poolId + ":" + blockId;
}
}