blob: a39b51c8839088239c442d972c68886ea406d0f5 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.giraph.io.hbase;
import org.apache.giraph.BspCase;
import org.apache.giraph.conf.GiraphConfiguration;
import org.apache.giraph.graph.BasicComputation;
import org.apache.giraph.graph.Vertex;
import org.apache.giraph.io.hbase.edgemarker.TableEdgeInputFormat;
import org.apache.giraph.io.hbase.edgemarker.TableEdgeOutputFormat;
import org.apache.giraph.job.GiraphJob;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.MiniHBaseCluster;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.mapreduce.ImportTsv;
import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.log4j.Logger;
import org.junit.Test;
import java.io.File;
import java.io.IOException;
import java.util.UUID;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
/**
* Test case for HBase reading/writing vertices from an HBase instance.
*/
public class TestHBaseRootMarkerVertextFormat extends BspCase {
private final Logger log = Logger.getLogger(TestHBaseRootMarkerVertextFormat.class);
private final String TABLE_NAME = "simple_graph";
private final String FAMILY = "cf";
private final String QUALIFER = "children";
private final String OUTPUT_FIELD = "parent";
private final HBaseTestingUtility testUtil = new HBaseTestingUtility();
public TestHBaseRootMarkerVertextFormat() {
super(TestHBaseRootMarkerVertextFormat.class.getName());
}
@Test
public void testHBaseInputOutput() throws Exception {
if (System.getProperty("prop.mapred.job.tracker") != null) {
if(log.isInfoEnabled())
log.info("testHBaseInputOutput: Ignore this test if not local mode.");
return;
}
File jarTest = new File(System.getProperty("prop.jarLocation"));
if(!jarTest.exists()) {
fail("Could not find Giraph jar at " +
"location specified by 'prop.jarLocation'. " +
"Make sure you built the main Giraph artifact?.");
}
FileSystem fs = null;
Path hbaseRootdir = null;
try {
MiniHBaseCluster cluster = testUtil.startMiniCluster(1);
cluster.waitForActiveAndReadyMaster();
testUtil.startMiniMapReduceCluster();
// Let's set up the hbase root directory.
Configuration conf = testUtil.getConfiguration();
try {
fs = testUtil.getTestFileSystem();
String randomStr = UUID.randomUUID().toString();
String tmpdir = System.getProperty("java.io.tmpdir") + "/" +
randomStr + "/";
hbaseRootdir = fs.makeQualified(new Path(tmpdir));
conf.set(HConstants.HBASE_DIR, hbaseRootdir.toString());
fs.mkdirs(hbaseRootdir);
} catch(IOException ioe) {
fail("Could not create hbase root directory.");
}
//First let's load some data using ImportTsv into our mock table.
String INPUT_FILE = hbaseRootdir.toString() + "/graph.csv";
String[] args = new String[] {
"-Dimporttsv.columns=HBASE_ROW_KEY,cf:"+QUALIFER,
"-Dimporttsv.separator=" + "\u002c",
TABLE_NAME,
INPUT_FILE
};
GenericOptionsParser opts =
new GenericOptionsParser(testUtil.getConfiguration(), args);
args = opts.getRemainingArgs();
fs = FileSystem.get(conf);
fs.setConf(conf);
Path inputPath = fs.makeQualified(new Path(hbaseRootdir, "graph.csv"));
FSDataOutputStream op = fs.create(inputPath, true);
String line1 = "0001,0002\n";
String line2 = "0002,0004\n";
String line3 = "0003,0005\n";
String line4 = "0004,-1\n";
String line5 = "0005,-1\n";
op.write(line1.getBytes());
op.write(line2.getBytes());
op.write(line3.getBytes());
op.write(line4.getBytes());
op.write(line5.getBytes());
op.close();
final byte[] FAM = Bytes.toBytes(FAMILY);
final byte[] TAB = Bytes.toBytes(TABLE_NAME);
HTableDescriptor desc = new HTableDescriptor(TAB);
desc.addFamily(new HColumnDescriptor(FAM));
HBaseAdmin hbaseAdmin=new HBaseAdmin(conf);
if (hbaseAdmin.isTableAvailable(TABLE_NAME)) {
hbaseAdmin.disableTable(TABLE_NAME);
hbaseAdmin.deleteTable(TABLE_NAME);
}
hbaseAdmin.createTable(desc);
// Do the import
Job job = ImportTsv.createSubmittableJob(conf, args);
job.waitForCompletion(false);
assertTrue(job.isSuccessful());
if(log.isInfoEnabled())
log.info("ImportTsv successful. Running HBase Giraph job.");
// Now operate over HBase using Vertex I/O formats
conf.set(TableInputFormat.INPUT_TABLE, TABLE_NAME);
conf.set(TableOutputFormat.OUTPUT_TABLE, TABLE_NAME);
GiraphJob giraphJob = new GiraphJob(conf, BspCase.getCallingMethodName());
GiraphConfiguration giraphConf = giraphJob.getConfiguration();
setupConfiguration(giraphJob);
giraphConf.setComputationClass(EdgeNotification.class);
giraphConf.setVertexInputFormatClass(TableEdgeInputFormat.class);
giraphConf.setVertexOutputFormatClass(TableEdgeOutputFormat.class);
assertTrue(giraphJob.run(true));
if(log.isInfoEnabled())
log.info("Giraph job successful. Checking output qualifier.");
// Do a get on row 0002, it should have a parent of 0001
// if the outputFormat worked.
HTable table = new HTable(conf, TABLE_NAME);
Result result = table.get(new Get("0002".getBytes()));
byte[] parentBytes = result.getValue(FAMILY.getBytes(),
OUTPUT_FIELD.getBytes());
assertNotNull(parentBytes);
assertTrue(parentBytes.length > 0);
assertEquals("0001", Bytes.toString(parentBytes));
} finally {
testUtil.shutdownMiniMapReduceCluster();
testUtil.shutdownMiniCluster();
}
}
/**
* Test compute method that sends each edge a notification of its parents.
* The test set only has a 1-1 parent-to-child ratio for this unit test.
*/
public static class EdgeNotification
extends BasicComputation<Text, Text, Text, Text> {
@Override
public void compute(Vertex<Text, Text, Text> vertex,
Iterable<Text> messages) throws IOException {
for (Text message : messages) {
vertex.getValue().set(message);
}
if(getSuperstep() == 0) {
sendMessageToAllEdges(vertex, vertex.getId());
}
vertex.voteToHalt();
}
}
}