| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.hadoop.hdfs.server.datanode; |
| |
| import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_RPC_SOCKET_FACTORY_CLASS_DEFAULT_KEY; |
| import static org.junit.Assert.assertFalse; |
| import static org.junit.Assert.assertTrue; |
| |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.fs.CommonConfigurationKeysPublic; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.hdfs.DFSConfigKeys; |
| import org.apache.hadoop.hdfs.DFSTestUtil; |
| import org.apache.hadoop.hdfs.DistributedFileSystem; |
| import org.apache.hadoop.hdfs.HdfsConfiguration; |
| import org.apache.hadoop.hdfs.MiniDFSCluster; |
| import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; |
| import org.apache.hadoop.net.NetUtils; |
| import org.apache.hadoop.net.StandardSocketFactory; |
| import org.junit.AfterClass; |
| import org.junit.BeforeClass; |
| import org.junit.Test; |
| |
| import javax.net.SocketFactory; |
| import java.io.IOException; |
| import java.io.InputStream; |
| import java.io.OutputStream; |
| import java.net.InetAddress; |
| import java.net.Socket; |
| import java.net.SocketAddress; |
| import java.net.SocketException; |
| import java.net.UnknownHostException; |
| import java.nio.channels.SocketChannel; |
| import java.util.ArrayList; |
| import java.util.List; |
| |
| /** |
| * Checks that used sockets have TCP_NODELAY set when configured. |
| */ |
| public class TestDataNodeTcpNoDelay { |
| private static final Logger LOG = |
| LoggerFactory.getLogger(TestDataNodeTcpNoDelay.class); |
| private static Configuration baseConf; |
| |
| @BeforeClass |
| public static void setUpBeforeClass() throws Exception { |
| baseConf = new HdfsConfiguration(); |
| } |
| |
| @AfterClass |
| public static void tearDownAfterClass() throws Exception { |
| |
| } |
| |
| @Test |
| public void testTcpNoDelayEnabled() throws Exception { |
| Configuration testConf = new Configuration(baseConf); |
| // here we do not have to config TCP_NDELAY settings, since they should be |
| // active by default |
| testConf.set(HADOOP_RPC_SOCKET_FACTORY_CLASS_DEFAULT_KEY, |
| SocketFactoryWrapper.class.getName()); |
| |
| SocketFactory defaultFactory = NetUtils.getDefaultSocketFactory(testConf); |
| LOG.info("Socket factory is " + defaultFactory.getClass().getName()); |
| MiniDFSCluster dfsCluster = |
| new MiniDFSCluster.Builder(testConf).numDataNodes(3).build(); |
| dfsCluster.waitActive(); |
| |
| DistributedFileSystem dfs = dfsCluster.getFileSystem(); |
| |
| try { |
| createData(dfs); |
| transferBlock(dfs); |
| |
| // check that TCP_NODELAY has been set on all sockets |
| assertTrue(SocketFactoryWrapper.wasTcpNoDelayActive()); |
| } finally { |
| SocketFactoryWrapper.reset(); |
| dfsCluster.shutdown(); |
| } |
| } |
| |
| @Test |
| public void testTcpNoDelayDisabled() throws Exception { |
| Configuration testConf = new Configuration(baseConf); |
| // disable TCP_NODELAY in settings |
| setTcpNoDelay(testConf, false); |
| testConf.set(HADOOP_RPC_SOCKET_FACTORY_CLASS_DEFAULT_KEY, |
| SocketFactoryWrapper.class.getName()); |
| |
| SocketFactory defaultFactory = NetUtils.getDefaultSocketFactory(testConf); |
| LOG.info("Socket factory is " + defaultFactory.getClass().getName()); |
| MiniDFSCluster dfsCluster = |
| new MiniDFSCluster.Builder(testConf).numDataNodes(3).build(); |
| dfsCluster.waitActive(); |
| |
| DistributedFileSystem dfs = dfsCluster.getFileSystem(); |
| |
| try { |
| createData(dfs); |
| transferBlock(dfs); |
| |
| // we can only check that TCP_NODELAY was disabled on some sockets, |
| // since part of the client write path always enables TCP_NODELAY |
| // by necessity |
| assertFalse(SocketFactoryWrapper.wasTcpNoDelayActive()); |
| } finally { |
| SocketFactoryWrapper.reset(); |
| dfsCluster.shutdown(); |
| } |
| } |
| |
| |
| private void createData(DistributedFileSystem dfs) throws Exception { |
| Path dir = new Path("test-dir"); |
| for (int i = 0; i < 3; i++) { |
| Path f = new Path(dir, "file" + i); |
| DFSTestUtil.createFile(dfs, f, 10240, (short) 3, 0); |
| } |
| } |
| |
| /** |
| * Tests the {@code DataNode#transferBlocks()} path by re-replicating an |
| * existing block. |
| */ |
| private void transferBlock(DistributedFileSystem dfs) throws Exception { |
| Path dir = new Path("test-block-transfer"); |
| Path f = new Path(dir, "testfile"); |
| DFSTestUtil.createFile(dfs, f, 10240, (short) 1, 0); |
| |
| // force a block transfer to another DN |
| dfs.setReplication(f, (short) 2); |
| DFSTestUtil.waitForReplication(dfs, f, (short) 2, 20000); |
| } |
| |
| /** |
| * Sets known TCP_NODELAY configs to the given value. |
| */ |
| private void setTcpNoDelay(Configuration conf, boolean value) { |
| conf.setBoolean( |
| HdfsClientConfigKeys.DFS_DATA_TRANSFER_CLIENT_TCPNODELAY_KEY, value); |
| conf.setBoolean( |
| DFSConfigKeys.DFS_DATA_TRANSFER_SERVER_TCPNODELAY, value); |
| conf.setBoolean( |
| CommonConfigurationKeysPublic.IPC_CLIENT_TCPNODELAY_KEY, value); |
| conf.setBoolean( |
| CommonConfigurationKeysPublic.IPC_SERVER_TCPNODELAY_KEY, value); |
| } |
| |
| public static class SocketFactoryWrapper extends StandardSocketFactory { |
| private static List<SocketWrapper> sockets = new ArrayList<SocketWrapper>(); |
| |
| public static boolean wasTcpNoDelayActive() { |
| LOG.info("Checking " + sockets.size() + " sockets for TCP_NODELAY"); |
| for (SocketWrapper sw : sockets) { |
| if (!sw.getLastTcpNoDelay()) { |
| return false; |
| } |
| } |
| return true; |
| } |
| |
| public static void reset() { |
| sockets = new ArrayList<>(); |
| } |
| |
| @Override |
| public Socket createSocket() throws IOException { |
| LOG.info("Creating new socket"); |
| SocketWrapper wrapper = new SocketWrapper(super.createSocket()); |
| sockets.add(wrapper); |
| return wrapper; |
| } |
| |
| @Override |
| public Socket createSocket(String host, int port) |
| throws IOException, UnknownHostException { |
| LOG.info("Creating socket for " + host); |
| SocketWrapper wrapper = |
| new SocketWrapper(super.createSocket(host, port)); |
| sockets.add(wrapper); |
| return wrapper; |
| } |
| |
| @Override |
| public Socket createSocket(String host, int port, |
| InetAddress localHostAddr, int localPort) |
| throws IOException, UnknownHostException { |
| LOG.info("Creating socket for " + host); |
| SocketWrapper wrapper = new SocketWrapper( |
| super.createSocket(host, port, localHostAddr, localPort)); |
| sockets.add(wrapper); |
| return wrapper; |
| } |
| |
| @Override |
| public Socket createSocket(InetAddress addr, int port) throws IOException { |
| LOG.info("Creating socket for " + addr); |
| SocketWrapper wrapper = |
| new SocketWrapper(super.createSocket(addr, port)); |
| sockets.add(wrapper); |
| return wrapper; |
| } |
| |
| @Override |
| public Socket createSocket(InetAddress addr, int port, |
| InetAddress localHostAddr, int localPort) |
| throws IOException { |
| LOG.info("Creating socket for " + addr); |
| SocketWrapper wrapper = new SocketWrapper( |
| super.createSocket(addr, port, localHostAddr, localPort)); |
| sockets.add(wrapper); |
| return wrapper; |
| } |
| } |
| |
| public static class SocketWrapper extends Socket { |
| private final Socket wrapped; |
| private boolean tcpNoDelay; |
| |
| public SocketWrapper(Socket socket) { |
| this.wrapped = socket; |
| } |
| |
| // Override methods, check whether tcpnodelay has been set for each socket |
| // created. This isn't perfect, as we could still send before tcpnodelay |
| // is set, but should at least trigger when tcpnodelay is never set at all. |
| |
| @Override |
| public void connect(SocketAddress endpoint) throws IOException { |
| wrapped.connect(endpoint); |
| } |
| |
| @Override |
| public void connect(SocketAddress endpoint, int timeout) |
| throws IOException { |
| wrapped.connect(endpoint, timeout); |
| } |
| |
| @Override |
| public void bind(SocketAddress bindpoint) throws IOException { |
| wrapped.bind(bindpoint); |
| } |
| |
| @Override |
| public InetAddress getInetAddress() { |
| return wrapped.getInetAddress(); |
| } |
| |
| @Override |
| public InetAddress getLocalAddress() { |
| return wrapped.getLocalAddress(); |
| } |
| |
| @Override |
| public int getPort() { |
| return wrapped.getPort(); |
| } |
| |
| @Override |
| public int getLocalPort() { |
| return wrapped.getLocalPort(); |
| } |
| |
| @Override |
| public SocketAddress getRemoteSocketAddress() { |
| return wrapped.getRemoteSocketAddress(); |
| } |
| |
| @Override |
| public SocketAddress getLocalSocketAddress() { |
| return wrapped.getLocalSocketAddress(); |
| } |
| |
| @Override |
| public SocketChannel getChannel() { |
| return wrapped.getChannel(); |
| } |
| |
| @Override |
| public InputStream getInputStream() throws IOException { |
| return wrapped.getInputStream(); |
| } |
| |
| @Override |
| public OutputStream getOutputStream() throws IOException { |
| return wrapped.getOutputStream(); |
| } |
| |
| @Override |
| public void setTcpNoDelay(boolean on) throws SocketException { |
| wrapped.setTcpNoDelay(on); |
| this.tcpNoDelay = on; |
| } |
| |
| @Override |
| public boolean getTcpNoDelay() throws SocketException { |
| return wrapped.getTcpNoDelay(); |
| } |
| |
| @Override |
| public void setSoLinger(boolean on, int linger) throws SocketException { |
| wrapped.setSoLinger(on, linger); |
| } |
| |
| @Override |
| public int getSoLinger() throws SocketException { |
| return wrapped.getSoLinger(); |
| } |
| |
| @Override |
| public void sendUrgentData(int data) throws IOException { |
| wrapped.sendUrgentData(data); |
| } |
| |
| @Override |
| public void setOOBInline(boolean on) throws SocketException { |
| wrapped.setOOBInline(on); |
| } |
| |
| @Override |
| public boolean getOOBInline() throws SocketException { |
| return wrapped.getOOBInline(); |
| } |
| |
| @Override |
| public synchronized void setSoTimeout(int timeout) throws SocketException { |
| wrapped.setSoTimeout(timeout); |
| } |
| |
| @Override |
| public synchronized int getSoTimeout() throws SocketException { |
| return wrapped.getSoTimeout(); |
| } |
| |
| @Override |
| public synchronized void setSendBufferSize(int size) |
| throws SocketException { |
| wrapped.setSendBufferSize(size); |
| } |
| |
| @Override |
| public synchronized int getSendBufferSize() throws SocketException { |
| return wrapped.getSendBufferSize(); |
| } |
| |
| @Override |
| public synchronized void setReceiveBufferSize(int size) |
| throws SocketException { |
| wrapped.setReceiveBufferSize(size); |
| } |
| |
| @Override |
| public synchronized int getReceiveBufferSize() throws SocketException { |
| return wrapped.getReceiveBufferSize(); |
| } |
| |
| @Override |
| public void setKeepAlive(boolean on) throws SocketException { |
| wrapped.setKeepAlive(on); |
| } |
| |
| @Override |
| public boolean getKeepAlive() throws SocketException { |
| return wrapped.getKeepAlive(); |
| } |
| |
| @Override |
| public void setTrafficClass(int tc) throws SocketException { |
| wrapped.setTrafficClass(tc); |
| } |
| |
| @Override |
| public int getTrafficClass() throws SocketException { |
| return wrapped.getTrafficClass(); |
| } |
| |
| @Override |
| public void setReuseAddress(boolean on) throws SocketException { |
| wrapped.setReuseAddress(on); |
| } |
| |
| @Override |
| public boolean getReuseAddress() throws SocketException { |
| return wrapped.getReuseAddress(); |
| } |
| |
| @Override |
| public synchronized void close() throws IOException { |
| wrapped.close(); |
| } |
| |
| @Override |
| public void shutdownInput() throws IOException { |
| wrapped.shutdownInput(); |
| } |
| |
| @Override |
| public void shutdownOutput() throws IOException { |
| wrapped.shutdownOutput(); |
| } |
| |
| @Override |
| public String toString() { |
| return wrapped.toString(); |
| } |
| |
| @Override |
| public boolean isConnected() { |
| return wrapped.isConnected(); |
| } |
| |
| @Override |
| public boolean isBound() { |
| return wrapped.isBound(); |
| } |
| |
| @Override |
| public boolean isClosed() { |
| return wrapped.isClosed(); |
| } |
| |
| @Override |
| public boolean isInputShutdown() { |
| return wrapped.isInputShutdown(); |
| } |
| |
| @Override |
| public boolean isOutputShutdown() { |
| return wrapped.isOutputShutdown(); |
| } |
| |
| @Override |
| public void setPerformancePreferences(int connectionTime, int latency, |
| int bandwidth) { |
| wrapped.setPerformancePreferences(connectionTime, latency, bandwidth); |
| } |
| |
| public boolean getLastTcpNoDelay() { |
| return tcpNoDelay; |
| } |
| } |
| } |