blob: 54736a0d9d483b159f0940e76f9e84c62620d4ae [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nutch.segment;
import java.text.DecimalFormat;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.MapFile;
import org.apache.hadoop.io.MapFile.Writer.Option;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.MapFileOutputFormat;
import org.apache.nutch.parse.ParseText;
import org.apache.nutch.util.NutchConfiguration;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
public class TestSegmentMerger {
Configuration conf;
FileSystem fs;
Path testDir;
Path seg1;
Path seg2;
Path out;
int countSeg1, countSeg2;
@Before
public void setUp() throws Exception {
conf = NutchConfiguration.create();
fs = FileSystem.get(conf);
testDir = new Path(conf.get("hadoop.tmp.dir"), "merge-"
+ System.currentTimeMillis());
seg1 = new Path(testDir, "seg1");
seg2 = new Path(testDir, "seg2");
out = new Path(testDir, "out");
// create large parse-text segments
System.err.println("Creating large segment 1...");
DecimalFormat df = new DecimalFormat("0000000");
Text k = new Text();
Path ptPath = new Path(new Path(seg1, ParseText.DIR_NAME), "part-00000");
Option kOpt = MapFile.Writer.keyClass(Text.class);
org.apache.hadoop.io.SequenceFile.Writer.Option vOpt = SequenceFile.Writer.valueClass(ParseText.class);
MapFile.Writer w = new MapFile.Writer(conf, ptPath, kOpt, vOpt);
long curSize = 0;
countSeg1 = 0;
FileStatus fileStatus = fs.getFileStatus(ptPath);
long blkSize = fileStatus.getBlockSize();
while (curSize < blkSize * 2) {
k.set("seg1-" + df.format(countSeg1));
w.append(k, new ParseText("seg1 text " + countSeg1));
countSeg1++;
curSize += 40; // roughly ...
}
w.close();
System.err.println(" - done: " + countSeg1 + " records.");
System.err.println("Creating large segment 2...");
ptPath = new Path(new Path(seg2, ParseText.DIR_NAME), "part-00000");
Option wKeyOpt = MapFile.Writer.keyClass(Text.class);
org.apache.hadoop.io.SequenceFile.Writer.Option wValueOpt = SequenceFile.Writer.valueClass(ParseText.class);
w = new MapFile.Writer(conf, ptPath, wKeyOpt, wValueOpt);
curSize = 0;
countSeg2 = 0;
while (curSize < blkSize * 2) {
k.set("seg2-" + df.format(countSeg2));
w.append(k, new ParseText("seg2 text " + countSeg2));
countSeg2++;
curSize += 40; // roughly ...
}
w.close();
System.err.println(" - done: " + countSeg2 + " records.");
}
@After
public void tearDown() throws Exception {
fs.delete(testDir, true);
}
@Test
public void testLargeMerge() throws Exception {
@SuppressWarnings("resource")
SegmentMerger merger = new SegmentMerger(conf);
merger.merge(out, new Path[] { seg1, seg2 }, false, false, -1);
// verify output
FileStatus[] stats = fs.listStatus(out);
// there should be two items contained within
// stats.length, one merged directory and one _SUCCESS file.
Assert.assertEquals(1, stats.length);
//Assert.assertTrue("Only one merged directory should exist.", stats[0].isDirectory());
//Assert.assertTrue("One _SUCCESS file should exist.", !stats[1].isDirectory());
Path outSeg = stats[0].getPath();
Text k = new Text();
ParseText v = new ParseText();
MapFile.Reader[] readers = MapFileOutputFormat.getReaders(fs, new Path(
outSeg, ParseText.DIR_NAME), conf);
int cnt1 = 0, cnt2 = 0;
for (MapFile.Reader r : readers) {
while (r.next(k, v)) {
String ks = k.toString();
String vs = v.getText();
if (ks.startsWith("seg1-")) {
cnt1++;
Assert.assertTrue(vs.startsWith("seg1 "));
} else if (ks.startsWith("seg2-")) {
cnt2++;
Assert.assertTrue(vs.startsWith("seg2 "));
}
}
r.close();
}
Assert.assertEquals(countSeg1, cnt1);
Assert.assertEquals(countSeg2, cnt2);
}
}