| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.hadoop.mapred.nativetask.kvtest; |
| |
| import java.io.IOException; |
| import java.util.HashMap; |
| import java.util.Random; |
| |
| import org.apache.commons.logging.Log; |
| import org.apache.commons.logging.LogFactory; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.fs.FileSystem; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.io.BooleanWritable; |
| import org.apache.hadoop.io.ByteWritable; |
| import org.apache.hadoop.io.DoubleWritable; |
| import org.apache.hadoop.io.FloatWritable; |
| import org.apache.hadoop.io.IOUtils; |
| import org.apache.hadoop.io.IntWritable; |
| import org.apache.hadoop.io.LongWritable; |
| import org.apache.hadoop.io.SequenceFile; |
| import org.apache.hadoop.io.VIntWritable; |
| import org.apache.hadoop.io.VLongWritable; |
| import org.apache.hadoop.io.Writable; |
| import org.apache.hadoop.mapred.nativetask.testutil.BytesFactory; |
| import org.apache.hadoop.mapred.nativetask.testutil.ScenarioConfiguration; |
| import org.apache.hadoop.mapred.nativetask.testutil.TestConstants; |
| |
| |
| public class TestInputFile { |
| private static Log LOG = LogFactory.getLog(TestInputFile.class); |
| |
| public static class KVSizeScope { |
| private static final int DefaultMinNum = 1; |
| private static final int DefaultMaxNum = 64; |
| |
| public int minBytesNum; |
| public int maxBytesNum; |
| |
| public KVSizeScope() { |
| this.minBytesNum = DefaultMinNum; |
| this.maxBytesNum = DefaultMaxNum; |
| } |
| |
| public KVSizeScope(int min, int max) { |
| this.minBytesNum = min; |
| this.maxBytesNum = max; |
| } |
| } |
| |
| private static HashMap<String, KVSizeScope> map = new HashMap<String, KVSizeScope>(); |
| |
| private byte[] databuf = null; |
| private final String keyClsName, valueClsName; |
| private int filesize = 0; |
| private int keyMaxBytesNum, keyMinBytesNum; |
| private int valueMaxBytesNum, valueMinBytesNum; |
| private SequenceFile.Writer writer = null; |
| Random r = new Random(); |
| public static final int DATABUFSIZE = 1 << 22; // 4M |
| |
| private enum State { |
| KEY, VALUE |
| }; |
| |
| static { |
| map.put(BooleanWritable.class.getName(), new KVSizeScope(1, 1)); |
| map.put(DoubleWritable.class.getName(), new KVSizeScope(8, 8)); |
| map.put(FloatWritable.class.getName(), new KVSizeScope(4, 4)); |
| map.put(VLongWritable.class.getName(), new KVSizeScope(8, 8)); |
| map.put(ByteWritable.class.getName(), new KVSizeScope(1, 1)); |
| map.put(LongWritable.class.getName(), new KVSizeScope(8, 8)); |
| map.put(VIntWritable.class.getName(), new KVSizeScope(4, 4)); |
| map.put(IntWritable.class.getName(), new KVSizeScope(4, 4)); |
| } |
| |
| public TestInputFile(int filesize, String keytype, String valuetype, |
| Configuration conf) throws Exception { |
| this.filesize = filesize; |
| this.databuf = new byte[DATABUFSIZE]; |
| this.keyClsName = keytype; |
| this.valueClsName = valuetype; |
| final int defaultMinBytes = conf.getInt(TestConstants.NATIVETASK_KVSIZE_MIN, 1); |
| final int defaultMaxBytes = conf.getInt(TestConstants.NATIVETASK_KVSIZE_MAX, 64); |
| |
| if (map.get(keytype) != null) { |
| keyMinBytesNum = map.get(keytype).minBytesNum; |
| keyMaxBytesNum = map.get(keytype).maxBytesNum; |
| } else { |
| keyMinBytesNum = defaultMinBytes; |
| keyMaxBytesNum = defaultMaxBytes; |
| } |
| |
| if (map.get(valuetype) != null) { |
| valueMinBytesNum = map.get(valuetype).minBytesNum; |
| valueMaxBytesNum = map.get(valuetype).maxBytesNum; |
| } else { |
| valueMinBytesNum = defaultMinBytes; |
| valueMaxBytesNum = defaultMaxBytes; |
| } |
| } |
| |
| public void createSequenceTestFile(String filepath) throws Exception { |
| int FULL_BYTE_SPACE = 256; |
| createSequenceTestFile(filepath, FULL_BYTE_SPACE); |
| } |
| |
| public void createSequenceTestFile(String filepath, int base) throws Exception { |
| createSequenceTestFile(filepath, base, (byte)0); |
| } |
| |
| public void createSequenceTestFile(String filepath, int base, byte start) throws Exception { |
| LOG.info("creating file " + filepath + "(" + filesize + " bytes)"); |
| LOG.info(keyClsName + " " + valueClsName); |
| Class<?> tmpkeycls, tmpvaluecls; |
| try { |
| tmpkeycls = Class.forName(keyClsName); |
| } catch (final ClassNotFoundException e) { |
| throw new Exception("key class not found: ", e); |
| } |
| try { |
| tmpvaluecls = Class.forName(valueClsName); |
| } catch (final ClassNotFoundException e) { |
| throw new Exception("key class not found: ", e); |
| } |
| try { |
| final Path outputfilepath = new Path(filepath); |
| final ScenarioConfiguration conf= new ScenarioConfiguration(); |
| writer = SequenceFile.createWriter( |
| conf, |
| SequenceFile.Writer.file(outputfilepath), |
| SequenceFile.Writer.keyClass(tmpkeycls), |
| SequenceFile.Writer.valueClass(tmpvaluecls)); |
| } catch (final Exception e) { |
| e.printStackTrace(); |
| } |
| |
| int tmpfilesize = this.filesize; |
| while (tmpfilesize > DATABUFSIZE) { |
| nextRandomBytes(databuf, base, start); |
| final int size = flushBuf(DATABUFSIZE); |
| tmpfilesize -= size; |
| } |
| nextRandomBytes(databuf, base, start); |
| flushBuf(tmpfilesize); |
| |
| if (writer != null) { |
| IOUtils.closeStream(writer); |
| } else { |
| throw new Exception("no writer to create sequenceTestFile!"); |
| } |
| } |
| |
| private void nextRandomBytes(byte[] buf, int base) { |
| nextRandomBytes(buf, base, (byte)0); |
| } |
| |
| private void nextRandomBytes(byte[] buf, int base, byte start) { |
| r.nextBytes(buf); |
| for (int i = 0; i < buf.length; i++) { |
| buf[i] = (byte) ((buf[i] & 0xFF) % base + start); |
| } |
| } |
| |
| private int flushBuf(int buflen) throws Exception { |
| final Random r = new Random(); |
| int keybytesnum = 0; |
| int valuebytesnum = 0; |
| int offset = 0; |
| |
| Writable keyWritable = BytesFactory.newObject(null, keyClsName); |
| Writable valWritable = BytesFactory.newObject(null, valueClsName); |
| |
| while (offset < buflen) { |
| final int remains = buflen - offset; |
| keybytesnum = keyMaxBytesNum; |
| if (keyMaxBytesNum != keyMinBytesNum) { |
| keybytesnum = keyMinBytesNum + r.nextInt(keyMaxBytesNum - keyMinBytesNum); |
| } |
| |
| valuebytesnum = valueMaxBytesNum; |
| if (valueMaxBytesNum != valueMinBytesNum) { |
| valuebytesnum = valueMinBytesNum + r.nextInt(valueMaxBytesNum - valueMinBytesNum); |
| } |
| |
| if (keybytesnum + valuebytesnum > remains) { |
| break; |
| } |
| |
| final byte[] key = new byte[keybytesnum]; |
| final byte[] value = new byte[valuebytesnum]; |
| |
| System.arraycopy(databuf, offset, key, 0, keybytesnum); |
| offset += keybytesnum; |
| |
| System.arraycopy(databuf, offset, value, 0, valuebytesnum); |
| offset += valuebytesnum; |
| |
| BytesFactory.updateObject(keyWritable, key); |
| BytesFactory.updateObject(valWritable, value); |
| |
| try { |
| writer.append(keyWritable, valWritable); |
| } catch (final IOException e) { |
| e.printStackTrace(); |
| throw new Exception("sequence file create failed", e); |
| } |
| } |
| return offset; |
| } |
| |
| } |