blob: 15cd0d4261555df11daabf1684e7338340a767c4 [file] [log] [blame]
package org.apache.rya.joinselect.mr;
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import static org.apache.rya.joinselect.mr.utils.JoinSelectConstants.INPUTPATH;
import static org.apache.rya.joinselect.mr.utils.JoinSelectConstants.INSTANCE;
import static org.apache.rya.joinselect.mr.utils.JoinSelectConstants.OUTPUTPATH;
import static org.apache.rya.joinselect.mr.utils.JoinSelectConstants.PASSWORD;
import static org.apache.rya.joinselect.mr.utils.JoinSelectConstants.PROSPECTS_OUTPUTPATH;
import static org.apache.rya.joinselect.mr.utils.JoinSelectConstants.PROSPECTS_TABLE;
import static org.apache.rya.joinselect.mr.utils.JoinSelectConstants.SELECTIVITY_TABLE;
import static org.apache.rya.joinselect.mr.utils.JoinSelectConstants.SPO_OUTPUTPATH;
import static org.apache.rya.joinselect.mr.utils.JoinSelectConstants.SPO_TABLE;
import static org.apache.rya.joinselect.mr.utils.JoinSelectConstants.USERNAME;
import java.io.File;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import org.apache.rya.accumulo.AccumuloRdfConfiguration;
import org.apache.rya.api.RdfCloudTripleStoreConstants.TABLE_LAYOUT;
import org.apache.rya.api.domain.RyaStatement;
import org.apache.rya.api.domain.RyaType;
import org.apache.rya.api.domain.RyaIRI;
import org.apache.rya.api.resolver.RyaTripleContext;
import org.apache.rya.api.resolver.triple.TripleRow;
import org.apache.rya.joinselect.mr.JoinSelectAggregate.JoinReducer;
import org.apache.rya.joinselect.mr.JoinSelectAggregate.JoinSelectAggregateMapper;
import org.apache.rya.joinselect.mr.JoinSelectAggregate.JoinSelectGroupComparator;
import org.apache.rya.joinselect.mr.JoinSelectAggregate.JoinSelectPartitioner;
import org.apache.rya.joinselect.mr.JoinSelectAggregate.JoinSelectSortComparator;
import org.apache.rya.joinselect.mr.JoinSelectProspectOutput.CardinalityMapper;
import org.apache.rya.joinselect.mr.JoinSelectSpoTableOutput.JoinSelectMapper;
import org.apache.rya.joinselect.mr.JoinSelectStatisticsSum.CardinalityIdentityCombiner;
import org.apache.rya.joinselect.mr.JoinSelectStatisticsSum.CardinalityIdentityMapper;
import org.apache.rya.joinselect.mr.JoinSelectStatisticsSum.CardinalityIdentityReducer;
import org.apache.rya.joinselect.mr.utils.CardList;
import org.apache.rya.joinselect.mr.utils.CompositeType;
import org.apache.rya.joinselect.mr.utils.JoinSelectStatsUtil;
import org.apache.rya.joinselect.mr.utils.TripleCard;
import org.apache.rya.joinselect.mr.utils.TripleEntry;
import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.BatchWriter;
import org.apache.accumulo.core.client.BatchWriterConfig;
import org.apache.accumulo.core.client.Connector;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.client.TableExistsException;
import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat;
import org.apache.accumulo.core.client.mapreduce.AccumuloOutputFormat;
import org.apache.accumulo.core.client.mock.MockInstance;
import org.apache.accumulo.core.client.security.tokens.PasswordToken;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
public class JoinSelectStatisticsTest {
private static final String PREFIX = JoinSelectStatisticsTest.class.getSimpleName();
private static final String DELIM = "\u0000";
private static final String iri = "uri:";
private List<String> cardList = Arrays.asList("subject", "predicate", "object");
private List<String> aggCardList = Arrays.asList("subjectobject", "subjectpredicate", "subjectsubject", "predicateobject", "predicatepredicate", "predicatesubject");
private static File SPOOUT;
private static File PROSPECTSOUT;
private static File tempDir;
private Connector c;
private RyaTripleContext ryaContext;
private static final String INSTANCE_NAME = "mapreduce_instance";
private static class JoinSelectTester1 extends Configured implements Tool {
@Override
public int run(String[] args) throws Exception {
Configuration conf = getConf();
String inTable = conf.get(SPO_TABLE);
String outPath = conf.get(SPO_OUTPUTPATH);
assert inTable != null && outPath != null;
Job job = new Job(conf, this.getClass().getSimpleName() + "_" + System.currentTimeMillis());
job.setJarByClass(this.getClass());
conf.setBoolean(MRJobConfig.MAPREDUCE_JOB_USER_CLASSPATH_FIRST, true);
initTabToSeqFileJob(job, inTable, outPath);
job.setMapperClass(JoinSelectMapper.class);
job.setNumReduceTasks(0);
job.waitForCompletion(true);
return job.isSuccessful() ? 0 : 1;
}
}
private static class JoinSelectTester2 extends Configured implements Tool {
@Override
public int run(String[] args) throws Exception {
Configuration conf = getConf();
String inTable = conf.get(PROSPECTS_TABLE);
System.out.println("Table is " + inTable);
String outPath = conf.get(PROSPECTS_OUTPUTPATH);
assert inTable != null && outPath != null;
Job job = new Job(conf, this.getClass().getSimpleName() + "_" + System.currentTimeMillis());
job.setJarByClass(this.getClass());
conf.setBoolean(MRJobConfig.MAPREDUCE_JOB_USER_CLASSPATH_FIRST, true);
initTabToSeqFileJob(job, inTable, outPath);
job.setMapperClass(CardinalityMapper.class);
job.setNumReduceTasks(0);
job.waitForCompletion(true);
return job.isSuccessful() ? 0 : 1;
}
}
private static class JoinSelectTester4 extends Configured implements Tool {
@Override
public int run(String[] args) throws Exception {
Configuration conf = getConf();
String outpath = conf.get(OUTPUTPATH);
Job job = new Job(conf, this.getClass().getSimpleName() + "_" + System.currentTimeMillis());
job.setJarByClass(this.getClass());
conf.setBoolean(MRJobConfig.MAPREDUCE_JOB_USER_CLASSPATH_FIRST, true);
MultipleInputs.addInputPath(job, new Path(PROSPECTSOUT.getAbsolutePath()),
SequenceFileInputFormat.class, JoinSelectAggregateMapper.class);
MultipleInputs.addInputPath(job,new Path(SPOOUT.getAbsolutePath()) ,
SequenceFileInputFormat.class, JoinSelectAggregateMapper.class);
job.setMapOutputKeyClass(CompositeType.class);
job.setMapOutputValueClass(TripleCard.class);
tempDir = new File(File.createTempFile(outpath, "txt").getParentFile(), System.currentTimeMillis() + "");
SequenceFileOutputFormat.setOutputPath(job, new Path(tempDir.getAbsolutePath()));
job.setOutputFormatClass(SequenceFileOutputFormat.class);
job.setOutputKeyClass(TripleEntry.class);
job.setOutputValueClass(CardList.class);
job.setSortComparatorClass(JoinSelectSortComparator.class);
job.setGroupingComparatorClass(JoinSelectGroupComparator.class);
job.setPartitionerClass(JoinSelectPartitioner.class);
job.setReducerClass(JoinReducer.class);
job.setNumReduceTasks(32);
job.waitForCompletion(true);
return job.isSuccessful() ? 0 : 1;
}
}
private static class JoinSelectTester3 extends Configured implements Tool {
@Override
public int run(String[] args) throws Exception {
Configuration conf = getConfig();
String outTable = conf.get(SELECTIVITY_TABLE);
String inPath = conf.get(INPUTPATH);
assert outTable != null && inPath != null;
Job job = new Job(getConf(), this.getClass().getSimpleName() + "_" + System.currentTimeMillis());
job.setJarByClass(this.getClass());
initSumMRJob(job, inPath, outTable);
job.setMapperClass(CardinalityIdentityMapper.class);
job.setCombinerClass(CardinalityIdentityCombiner.class);
job.setReducerClass(CardinalityIdentityReducer.class);
job.setNumReduceTasks(32);
job.waitForCompletion(true);
return job.isSuccessful() ? 0 : 1;
}
}
public class JoinSelectTestDriver extends Configured implements Tool {
Configuration conf = getConfig();
@Override
public int run(String[] args) throws Exception {
int res0 = ToolRunner.run(conf, new JoinSelectTester1(), args);
int res1 = 1;
int res2 = 1;
int res3 = 1;
if(res0 == 0) {
res1 = ToolRunner.run(conf, new JoinSelectTester2(), args);
}
if(res1 == 0) {
res2 = ToolRunner.run(conf, new JoinSelectTester4(), args);
}
if(res2 == 0) {
res3 = ToolRunner.run(conf, new JoinSelectTester3(), args);
}
return res3;
}
}
private static Configuration getConfig() {
Configuration conf = new Configuration();
conf.set("fs.default.name", "file:///");
conf.set("mapreduce.framework.name", "local");
conf.set("spo.table", "rya_spo");
conf.set("prospects.table", "rya_prospects");
conf.set("selectivity.table", "rya_selectivity");
conf.set("auths", "");
conf.set("instance",INSTANCE_NAME);
conf.set("username","root");
conf.set("password", "");
conf.set("inputpath","temp");
conf.set("outputpath","temp");
conf.set("prospects.outputpath","prospects");
conf.set("spo.outputpath", "spo");
return conf;
}
public static void initTabToSeqFileJob(Job job, String intable, String outpath) throws AccumuloSecurityException, IOException {
Configuration conf = job.getConfiguration();
String username = conf.get(USERNAME);
System.out.println("Username is " + username);
String password = conf.get(PASSWORD);
String instance = conf.get(INSTANCE);
System.out.println("Instance is " + instance);
AccumuloInputFormat.setMockInstance(job, instance);
AccumuloInputFormat.setConnectorInfo(job, username, new PasswordToken(password));
AccumuloInputFormat.setInputTableName(job, intable);
job.setInputFormatClass(AccumuloInputFormat.class);
job.setMapOutputKeyClass(CompositeType.class);
job.setMapOutputValueClass(TripleCard.class);
System.out.println("Outpath is " + outpath);
// OUTPUT
if(outpath.equals("spo")) {
SPOOUT = new File(File.createTempFile(outpath, "txt").getParentFile(), System.currentTimeMillis() + "spo");
SequenceFileOutputFormat.setOutputPath(job, new Path(SPOOUT.getAbsolutePath()));
} else {
PROSPECTSOUT = new File(File.createTempFile(outpath, "txt").getParentFile(), System.currentTimeMillis() + "prospects");
SequenceFileOutputFormat.setOutputPath(job, new Path(PROSPECTSOUT.getAbsolutePath()));
}
job.setOutputFormatClass(SequenceFileOutputFormat.class);
job.setOutputKeyClass(CompositeType.class);
job.setOutputValueClass(TripleCard.class);
}
public static void initSumMRJob(Job job, String inputPath, String outtable) throws AccumuloSecurityException, IOException {
Configuration conf = job.getConfiguration();
String username = conf.get(USERNAME);
String password = conf.get(PASSWORD);
String instance = conf.get(INSTANCE);
AccumuloOutputFormat.setConnectorInfo(job, username, new PasswordToken(password));
AccumuloOutputFormat.setMockInstance(job, instance);
AccumuloOutputFormat.setDefaultTableName(job, outtable);
SequenceFileInputFormat.addInputPath(job, new Path(tempDir.getAbsolutePath()));
job.setInputFormatClass(SequenceFileInputFormat.class);
job.setMapOutputKeyClass(TripleEntry.class);
job.setMapOutputValueClass(CardList.class);
job.setOutputFormatClass(AccumuloOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Mutation.class);
}
@Before
public void init() throws AccumuloException, AccumuloSecurityException, TableExistsException, TableNotFoundException {
MockInstance mockInstance = new MockInstance(INSTANCE_NAME);
c = mockInstance.getConnector("root", new PasswordToken(""));
if (c.tableOperations().exists("rya_prospects")) {
c.tableOperations().delete("rya_prospects");
}
if (c.tableOperations().exists("rya_selectivity")) {
c.tableOperations().delete("rya_selectivity");
}
if (c.tableOperations().exists("rya_spo")) {
c.tableOperations().delete("rya_spo");
}
c.tableOperations().create("rya_spo");
c.tableOperations().create("rya_prospects");
c.tableOperations().create("rya_selectivity");
ryaContext = RyaTripleContext.getInstance(new AccumuloRdfConfiguration(getConfig()));
}
@Test
public void testMap1() throws Exception {
init();
System.out.println("*****************************Test1**************************** ");
BatchWriter bw_table1 = c.createBatchWriter("rya_spo", new BatchWriterConfig());
for (int i = 1; i < 3; i++) {
RyaStatement rs = new RyaStatement(new RyaIRI(iri + i), new RyaIRI(iri + 5), new RyaType(iri + (i + 2)));
Map<TABLE_LAYOUT, TripleRow> tripleRowMap = ryaContext.serializeTriple(rs);
TripleRow tripleRow = tripleRowMap.get(TABLE_LAYOUT.SPO);
Mutation m = JoinSelectStatsUtil.createMutation(tripleRow);
bw_table1.addMutation(m);
}
bw_table1.close();
BatchWriter bw_table2 = c.createBatchWriter("rya_prospects", new BatchWriterConfig());
for (int i = 1; i < 6; i++) {
int j = 1;
for (String s : cardList) {
Mutation m = new Mutation(new Text(s + DELIM + iri + i + DELIM + i));
m.put(new Text(), new Text(), new Value(new IntWritable(i + j).toString().getBytes()));
bw_table2.addMutation(m);
j++;
}
}
bw_table2.close();
Assert.assertEquals(0, ToolRunner.run(new JoinSelectTestDriver(), new String[]{""}));
Scanner scan = c.createScanner("rya_selectivity", new Authorizations());
scan.setRange(new Range());
for (Map.Entry<Key, Value> entry : scan) {
System.out.println("Key row string is " + entry.getKey().getRow().toString());
System.out.println("Join type is " + entry.getKey().getColumnFamily().toString());
System.out.println("Value is " + entry.getKey().getColumnQualifier().toString());
}
Scanner scan1 = c.createScanner("rya_selectivity" , new Authorizations());
scan1.setRange(Range.prefix("predicate" +DELIM + iri + 5));
int i = 5;
for (Map.Entry<Key, Value> entry : scan1) {
int val1 = 5 + 2*i;
int val2 = 5 + 2*(i-1);
int val = Integer.parseInt(entry.getKey().getColumnQualifier().toString());
if(i < 3) {
Assert.assertTrue( val == val1);
}
if(i >= 3 && i < 6) {
Assert.assertTrue(val == val2);
}
i--;
}
Assert.assertTrue(i == -1);
Scanner scan2 = c.createScanner("rya_selectivity" , new Authorizations());
scan2.setRange(Range.prefix("object" +DELIM + iri + 3));
int j = 5;
for (Map.Entry<Key, Value> entry : scan2) {
int val1 = 5 + (j-2);
int val2 = 2+j;
int val = Integer.parseInt(entry.getKey().getColumnQualifier().toString());
if(j < 3) {
Assert.assertTrue( val == val2);
}
if(j >= 3 && j < 6) {
Assert.assertTrue(val == val1);
}
j--;
}
Assert.assertTrue(j == -1);
Scanner scan3 = c.createScanner("rya_selectivity", new Authorizations());
scan3.setRange(Range.prefix("objectsubject" + DELIM + iri + 3 +DELIM +iri +1 ));
int k = 8;
for (Map.Entry<Key, Value> entry : scan3) {
int val = Integer.parseInt(entry.getKey().getColumnQualifier().toString());
Assert.assertTrue(val == k);
k--;
}
Assert.assertTrue(k == 5);
}
@Test
public void testMap2() throws Exception {
System.out.println("*********************Test2******************* ");
init();
BatchWriter bw_table1 = c.createBatchWriter("rya_spo", new BatchWriterConfig());
for (int i = 1; i < 4; i++) {
RyaStatement rs = new RyaStatement(new RyaIRI(iri + 1), new RyaIRI(iri + 2), new RyaType(iri + i));
Map<TABLE_LAYOUT, TripleRow> tripleRowMap = ryaContext.serializeTriple(rs);
TripleRow tripleRow = tripleRowMap.get(TABLE_LAYOUT.SPO);
Mutation m = JoinSelectStatsUtil.createMutation(tripleRow);
bw_table1.addMutation(m);
}
bw_table1.close();
BatchWriter bw_table2 = c.createBatchWriter("rya_prospects", new BatchWriterConfig());
for (int i = 1; i < 4; i++) {
for (String s : cardList) {
Mutation m = new Mutation(new Text(s + DELIM + iri + i + DELIM + i));
m.put(new Text(), new Text(), new Value(new IntWritable(i + 2).toString().getBytes()));
bw_table2.addMutation(m);
}
}
bw_table2.close();
Assert.assertEquals(0, ToolRunner.run(new JoinSelectTestDriver(), new String[]{""}));
Scanner scan1 = c.createScanner("rya_selectivity" , new Authorizations());
scan1.setRange(Range.prefix("subject" +DELIM + iri + 1));
int i = 0;
for (Map.Entry<Key, Value> entry : scan1) {
Assert.assertTrue(entry.getKey().getColumnQualifier().toString().equals("12"));
i++;
}
Assert.assertTrue(i == 6);
Scanner scan2 = c.createScanner("rya_selectivity" , new Authorizations());
scan2.setRange(Range.prefix("predicate" +DELIM + iri + 2));
int j = 0;
for (Map.Entry<Key, Value> entry : scan2) {
if(j < 3) {
Assert.assertTrue(entry.getKey().getColumnQualifier().toString().equals("12"));
}
if(j > 3 && j < 6) {
Assert.assertTrue(entry.getKey().getColumnQualifier().toString().equals("9"));
}
j++;
}
Assert.assertTrue(j == 6);
Scanner scan3 = c.createScanner("rya_selectivity" , new Authorizations());
scan3.setRange(Range.prefix("predicateobject" +DELIM + iri + 2 +DELIM + iri + 2));
int k = 0;
for (Map.Entry<Key, Value> entry : scan3) {
Assert.assertTrue(entry.getKey().getColumnQualifier().toString().equals("3"));
k++;
}
Assert.assertTrue(k == 3);
}
@Test
public void testMap3() throws Exception {
init();
System.out.println("*************************Test3**************************** ");
BatchWriter bw_table1 = c.createBatchWriter("rya_spo", new BatchWriterConfig());
for (int i = 1; i < 3; i++) {
for (int j = 1; j < 3; j++) {
for (int k = 1; k < 3; k++) {
RyaStatement rs = new RyaStatement(new RyaIRI(iri + i), new RyaIRI(iri + (j)), new RyaType(iri + k));
Map<TABLE_LAYOUT, TripleRow> tripleRowMap = ryaContext.serializeTriple(rs);
TripleRow tripleRow = tripleRowMap.get(TABLE_LAYOUT.SPO);
Mutation m = JoinSelectStatsUtil.createMutation(tripleRow);
bw_table1.addMutation(m);
}
}
}
bw_table1.close();
BatchWriter bw_table2 = c.createBatchWriter("rya_prospects", new BatchWriterConfig());
for (int i = 1; i < 3; i++) {
int k = 1;
for (String s : cardList) {
Mutation m = new Mutation(new Text(s + DELIM + iri + i + DELIM + i));
m.put(new Text(), new Text(), new Value(new IntWritable(i + k).toString().getBytes()));
bw_table2.addMutation(m);
k++;
}
for (int j = 1; j < 3; j++) {
k = 1;
for (String s : aggCardList) {
Mutation m = new Mutation(new Text(s + DELIM + iri + i + DELIM + iri + j + DELIM + i));
m.put(new Text(), new Text(), new Value(new IntWritable(i + k +j).toString().getBytes()));
bw_table2.addMutation(m);
k++;
}
}
}
bw_table2.close();
Assert.assertEquals(0, ToolRunner.run(new JoinSelectTestDriver(), new String[]{""}));
Scanner scan = c.createScanner("rya_selectivity", new Authorizations());
scan.setRange(new Range());
for (Map.Entry<Key, Value> entry : scan) {
System.out.println("Key row string is " + entry.getKey().getRow().toString());
System.out.println("Join type is " + entry.getKey().getColumnFamily().toString());
System.out.println("Value is " + entry.getKey().getColumnQualifier().toString());
}
Scanner scan1 = c.createScanner("rya_selectivity" , new Authorizations());
scan1.setRange(Range.prefix("subject" +DELIM + iri + 1));
int i = 0;
for (Map.Entry<Key, Value> entry : scan1) {
Key key = entry.getKey();
String s = key.getColumnFamily().toString();
int val = Integer.parseInt(key.getColumnQualifier().toString());
if(s.equals("predicatepredicate")) {
Assert.assertTrue(val == 14);
}
if(s.equals("objectobject")) {
Assert.assertTrue(val == 18);
}
if(s.equals("predicateobjectpredicateobject")) {
Assert.assertTrue(val == 28);
}
if(s.equals("predicateobjectsubjectpredicate")) {
Assert.assertTrue(val == 20);
}
if(s.equals("predicateobjectobjectsubject")) {
Assert.assertTrue(val == 16);
}
i++;
}
Assert.assertTrue(i == 12);
}
@Test
public void testMap4() throws Exception {
init();
System.out.println("*************************Test4**************************** ");
System.out.println("*************************Test4**************************** ");
BatchWriter bw_table1 = c.createBatchWriter("rya_spo", new BatchWriterConfig());
for (int i = 1; i < 3; i++) {
for (int j = 1; j < 3; j++) {
for (int k = 1; k < 3; k++) {
if(j == 1 && k ==2) {
break;
}
RyaStatement rs = new RyaStatement(new RyaIRI(iri + i), new RyaIRI(iri + (j)), new RyaType(iri + k));
Map<TABLE_LAYOUT, TripleRow> tripleRowMap = ryaContext.serializeTriple(rs);
TripleRow tripleRow = tripleRowMap.get(TABLE_LAYOUT.SPO);
Mutation m = JoinSelectStatsUtil.createMutation(tripleRow);
bw_table1.addMutation(m);
}
}
}
bw_table1.close();
BatchWriter bw_table2 = c.createBatchWriter("rya_prospects", new BatchWriterConfig());
for (int i = 1; i < 3; i++) {
int k = 1;
for (String s : cardList) {
Mutation m = new Mutation(new Text(s + DELIM + iri + i + DELIM + i));
m.put(new Text(), new Text(), new Value(new IntWritable(i + k).toString().getBytes()));
bw_table2.addMutation(m);
k++;
}
for (int j = 1; j < 3; j++) {
k = 1;
for (String s : aggCardList) {
Mutation m = new Mutation(new Text(s + DELIM + iri + i + DELIM + iri + j + DELIM + i));
m.put(new Text(), new Text(), new Value(new IntWritable(i + k + 2*j).toString().getBytes()));
bw_table2.addMutation(m);
k++;
}
}
}
bw_table2.close();
Assert.assertEquals(0, ToolRunner.run(new JoinSelectTestDriver(), new String[]{""}));
Scanner scan = c.createScanner("rya_selectivity", new Authorizations());
scan.setRange(new Range());
for (Map.Entry<Key, Value> entry : scan) {
System.out.println("Key row string is " + entry.getKey().getRow().toString());
System.out.println("Join type is " + entry.getKey().getColumnFamily().toString());
System.out.println("Value is " + entry.getKey().getColumnQualifier().toString());
}
Scanner scan1 = c.createScanner("rya_selectivity" , new Authorizations());
scan1.setRange(Range.prefix("subject" +DELIM + iri + 1));
int i = 0;
for (Map.Entry<Key, Value> entry : scan1) {
Key key = entry.getKey();
String s = key.getColumnFamily().toString();
int val = Integer.parseInt(key.getColumnQualifier().toString());
if(s.equals("predicatepredicate")) {
Assert.assertTrue(val == 11);
}
if(s.equals("objectobject")) {
Assert.assertTrue(val == 13);
}
if(s.equals("predicateobjectobjectpredicate")) {
Assert.assertTrue(val == 26);
}
if(s.equals("predicateobjectpredicateobject")) {
Assert.assertTrue(val == 25);
}
if(s.equals("predicateobjectsubjectpredicate")) {
Assert.assertTrue(val == 19);
}
if(s.equals("predicateobjectpredicatesubject")) {
Assert.assertTrue(val == 20);
}
i++;
}
Assert.assertTrue(i == 12);
Scanner scan2 = c.createScanner("rya_selectivity" , new Authorizations());
scan2.setRange(Range.prefix("predicate" +DELIM + iri + 1));
int j = 0;
for (Map.Entry<Key, Value> entry : scan2) {
Key key = entry.getKey();
String s = key.getColumnFamily().toString();
int val = Integer.parseInt(key.getColumnQualifier().toString());
if(s.equals("subjectsubject")) {
Assert.assertTrue(val == 5);
}
if(s.equals("objectobject")) {
Assert.assertTrue(val == 8);
}
if(s.equals("objectsubjectsubjectpredicate")) {
Assert.assertTrue(val == 11);
}
if(s.equals("objectsubjectpredicateobject")) {
Assert.assertTrue(val == 15);
}
if(s.equals("objectsubjectobjectsubject")) {
Assert.assertTrue(val == 9);
}
if(s.equals("objectsubjectsubjectobject")) {
Assert.assertTrue(val == 10);
}
j++;
}
Assert.assertTrue(j == 12);
}
}