blob: 20a41c190a348e64547aa4d9d9be479de7552b23 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import org.apache.commons.logging.impl.Log4JLogger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.LeaseManager;
import org.apache.hadoop.security.UnixUserGroupInformation;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.log4j.Level;
public class TestLeaseRecovery2 extends junit.framework.TestCase {
{
((Log4JLogger)DataNode.LOG).getLogger().setLevel(Level.ALL);
((Log4JLogger)LeaseManager.LOG).getLogger().setLevel(Level.ALL);
((Log4JLogger)FSNamesystem.LOG).getLogger().setLevel(Level.ALL);
}
static final long BLOCK_SIZE = 1024;
static final int FILE_SIZE = 1024*16;
static final short REPLICATION_NUM = (short)3;
static byte[] buffer = new byte[FILE_SIZE];
static private String fakeUsername = "fakeUser1";
static private String fakeGroup = "supergroup";
public void testBlockSynchronization() throws Exception {
final long softLease = 1000;
final long hardLease = 60 * 60 *1000;
final short repl = 3;
final Configuration conf = new HdfsConfiguration();
final int bufferSize = conf.getInt("io.file.buffer.size", 4096);
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
conf.setInt("dfs.heartbeat.interval", 1);
// conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, 16);
// create fake mapping user to group and set it to the conf
// NOTE. this must be done at the beginning, before first call to mapping
// functions
Map<String, String []> u2g_map = new HashMap<String, String []>(1);
u2g_map.put(fakeUsername, new String[] {fakeGroup});
DFSTestUtil.updateConfWithFakeGroupMapping(conf, u2g_map);
MiniDFSCluster cluster = null;
DistributedFileSystem dfs = null;
byte[] actual = new byte[FILE_SIZE];
try {
cluster = new MiniDFSCluster(conf, 5, true, null);
cluster.waitActive();
//create a file
dfs = (DistributedFileSystem)cluster.getFileSystem();
// create a random file name
String filestr = "/foo" + AppendTestUtil.nextInt();
System.out.println("filestr=" + filestr);
Path filepath = new Path(filestr);
FSDataOutputStream stm = dfs.create(filepath, true,
bufferSize, repl, BLOCK_SIZE);
assertTrue(dfs.dfs.exists(filestr));
// write random number of bytes into it.
int size = AppendTestUtil.nextInt(FILE_SIZE);
System.out.println("size=" + size);
stm.write(buffer, 0, size);
// hflush file
AppendTestUtil.LOG.info("hflush");
stm.hflush();
AppendTestUtil.LOG.info("leasechecker.interruptAndJoin()");
dfs.dfs.leasechecker.interruptAndJoin();
// set the soft limit to be 1 second so that the
// namenode triggers lease recovery on next attempt to write-for-open.
cluster.setLeasePeriod(softLease, hardLease);
// try to re-open the file before closing the previous handle. This
// should fail but will trigger lease recovery.
{
Configuration conf2 = new HdfsConfiguration(conf);
UnixUserGroupInformation.saveToConf(conf2,
UnixUserGroupInformation.UGI_PROPERTY_NAME,
new UnixUserGroupInformation(fakeUsername, new String[]{fakeGroup}));
FileSystem dfs2 = FileSystem.get(conf2);
boolean done = false;
for(int i = 0; i < 10 && !done; i++) {
AppendTestUtil.LOG.info("i=" + i);
try {
dfs2.create(filepath, false, bufferSize, repl, BLOCK_SIZE);
fail("Creation of an existing file should never succeed.");
} catch (IOException ioe) {
final String message = ioe.getMessage();
if (message.contains("file exists")) {
AppendTestUtil.LOG.info("done", ioe);
done = true;
}
else if (message.contains(AlreadyBeingCreatedException.class.getSimpleName())) {
AppendTestUtil.LOG.info("GOOD! got " + message);
}
else {
AppendTestUtil.LOG.warn("UNEXPECTED IOException", ioe);
}
}
if (!done) {
AppendTestUtil.LOG.info("sleep " + 5000 + "ms");
try {Thread.sleep(5000);} catch (InterruptedException e) {}
}
}
assertTrue(done);
}
AppendTestUtil.LOG.info("Lease for file " + filepath + " is recovered. "
+ "Validating its contents now...");
// verify that file-size matches
long fileSize = dfs.getFileStatus(filepath).getLen();
assertTrue("File should be " + size + " bytes, but is actually " +
" found to be " + fileSize + " bytes", fileSize == size);
// verify that there is enough data to read.
System.out.println("File size is good. Now validating sizes from datanodes...");
FSDataInputStream stmin = dfs.open(filepath);
stmin.readFully(0, actual, 0, size);
stmin.close();
}
finally {
try {
if(dfs != null) dfs.close();
if (cluster != null) {cluster.shutdown();}
} catch (Exception e) {
// ignore
}
}
}
}