| package org.apache.rya.accumulo.mr; |
| |
| import java.io.IOException; |
| import java.util.HashSet; |
| import java.util.Map; |
| import java.util.Set; |
| |
| import org.apache.accumulo.core.client.BatchWriterConfig; |
| import org.apache.accumulo.core.client.Connector; |
| import org.apache.accumulo.core.client.MultiTableBatchWriter; |
| import org.apache.accumulo.core.client.Scanner; |
| import org.apache.accumulo.core.client.mapreduce.AccumuloOutputFormat; |
| import org.apache.accumulo.core.client.mock.MockInstance; |
| import org.apache.accumulo.core.client.security.tokens.PasswordToken; |
| import org.apache.accumulo.core.data.Key; |
| import org.apache.accumulo.core.data.Value; |
| import org.apache.accumulo.core.security.Authorizations; |
| import org.apache.hadoop.io.Text; |
| import org.apache.hadoop.io.Writable; |
| import org.apache.hadoop.mapreduce.Job; |
| import org.apache.hadoop.mapreduce.RecordWriter; |
| import org.apache.rya.accumulo.AccumuloRdfConfiguration; |
| import org.apache.rya.accumulo.AccumuloRyaDAO; |
| import org.apache.rya.api.domain.RyaStatement; |
| import org.apache.rya.api.domain.RyaType; |
| import org.apache.rya.api.domain.RyaIRI; |
| import org.apache.rya.api.resolver.RdfToRyaConversions; |
| import org.apache.rya.api.resolver.RyaToRdfConversions; |
| import org.apache.rya.api.resolver.RyaTripleContext; |
| import org.apache.rya.indexing.StatementConstraints; |
| import org.apache.rya.indexing.TemporalInstant; |
| import org.apache.rya.indexing.TemporalInstantRfc3339; |
| import org.apache.rya.indexing.accumulo.ConfigUtils; |
| import org.apache.rya.indexing.accumulo.entity.EntityCentricIndex; |
| import org.apache.rya.indexing.accumulo.freetext.AccumuloFreeTextIndexer; |
| import org.apache.rya.indexing.accumulo.freetext.SimpleTokenizer; |
| import org.apache.rya.indexing.accumulo.freetext.Tokenizer; |
| import org.apache.rya.indexing.accumulo.temporal.AccumuloTemporalIndexer; |
| import org.eclipse.rdf4j.common.iteration.CloseableIteration; |
| import org.eclipse.rdf4j.model.Statement; |
| import org.eclipse.rdf4j.model.ValueFactory; |
| import org.eclipse.rdf4j.model.impl.SimpleValueFactory; |
| import org.eclipse.rdf4j.model.vocabulary.XMLSchema; |
| import org.junit.Assert; |
| import org.junit.Before; |
| import org.junit.Test; |
| |
| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| public class RyaOutputFormatTest { |
| private static final String CV = "test_auth"; |
| private static final String GRAPH = "http://example.org/graph"; |
| private static final String USERNAME = "root"; |
| private static final String PASSWORD = ""; |
| private static final String INSTANCE_NAME = RyaOutputFormatTest.class.getSimpleName() + ".rya_output"; |
| private static final String PREFIX = "ryaoutputformattest_"; |
| |
| MockInstance instance; |
| Connector connector; |
| AccumuloRyaDAO dao; |
| AccumuloRdfConfiguration conf = new AccumuloRdfConfiguration(); |
| Job job; |
| RyaTripleContext ryaContext; |
| |
| @Before |
| public void init() throws Exception { |
| MRUtils.setACMock(conf, true); |
| MRUtils.setACInstance(conf, INSTANCE_NAME); |
| MRUtils.setACUserName(conf, USERNAME); |
| MRUtils.setACPwd(conf, PASSWORD); |
| MRUtils.setTablePrefix(conf, PREFIX); |
| conf.setTablePrefix(PREFIX); |
| conf.setAuths(CV); |
| conf.set(ConfigUtils.CLOUDBASE_INSTANCE, INSTANCE_NAME); |
| conf.set(ConfigUtils.CLOUDBASE_USER, USERNAME); |
| conf.set(ConfigUtils.CLOUDBASE_PASSWORD, PASSWORD); |
| conf.setBoolean(ConfigUtils.USE_MOCK_INSTANCE, true); |
| conf.setClass(ConfigUtils.TOKENIZER_CLASS, SimpleTokenizer.class, Tokenizer.class); |
| ryaContext = RyaTripleContext.getInstance(conf); |
| instance = new MockInstance(INSTANCE_NAME); |
| connector = instance.getConnector(USERNAME, new PasswordToken(PASSWORD)); |
| job = Job.getInstance(conf); |
| RyaOutputFormat.setMockInstance(job, instance.getInstanceName()); |
| AccumuloOutputFormat.setConnectorInfo(job, USERNAME, new PasswordToken(PASSWORD)); |
| AccumuloOutputFormat.setCreateTables(job, true); |
| AccumuloOutputFormat.setDefaultTableName(job, PREFIX + "default"); |
| RyaOutputFormat.setTablePrefix(job, PREFIX); |
| } |
| |
| private void write(final RyaStatement... input) throws IOException, InterruptedException { |
| final RecordWriter<Writable, RyaStatementWritable> writer = |
| new RyaOutputFormat.RyaRecordWriter(job.getConfiguration()); |
| for (final RyaStatement rstmt : input) { |
| final RyaStatementWritable rsw = new RyaStatementWritable(rstmt, ryaContext); |
| writer.write(new Text("unused"), rsw); |
| } |
| writer.close(null); |
| } |
| |
| @Test |
| public void testOutputFormat() throws Exception { |
| final RyaStatement input = RyaStatement.builder() |
| .setSubject(new RyaIRI("http://www.google.com")) |
| .setPredicate(new RyaIRI("http://some_other_uri")) |
| .setObject(new RyaIRI("http://www.yahoo.com")) |
| .setColumnVisibility(CV.getBytes()) |
| .setValue(new byte[0]) |
| .setContext(new RyaIRI(GRAPH)) |
| .build(); |
| RyaOutputFormat.setCoreTablesEnabled(job, true); |
| RyaOutputFormat.setFreeTextEnabled(job, false); |
| RyaOutputFormat.setTemporalEnabled(job, false); |
| RyaOutputFormat.setEntityEnabled(job, false); |
| write(input); |
| TestUtils.verify(connector, conf, input); |
| } |
| |
| @Test |
| public void testDefaultCV() throws Exception { |
| final RyaStatement input = RyaStatement.builder() |
| .setSubject(new RyaIRI("http://www.google.com")) |
| .setPredicate(new RyaIRI("http://some_other_uri")) |
| .setObject(new RyaIRI("http://www.yahoo.com")) |
| .setValue(new byte[0]) |
| .setContext(new RyaIRI(GRAPH)) |
| .build(); |
| final RyaStatement expected = RyaStatement.builder() |
| .setSubject(new RyaIRI("http://www.google.com")) |
| .setPredicate(new RyaIRI("http://some_other_uri")) |
| .setObject(new RyaIRI("http://www.yahoo.com")) |
| .setValue(new byte[0]) |
| .setContext(new RyaIRI(GRAPH)) |
| .setColumnVisibility(CV.getBytes()) |
| .build(); |
| RyaOutputFormat.setCoreTablesEnabled(job, true); |
| RyaOutputFormat.setFreeTextEnabled(job, false); |
| RyaOutputFormat.setTemporalEnabled(job, false); |
| RyaOutputFormat.setEntityEnabled(job, false); |
| RyaOutputFormat.setDefaultVisibility(job, CV); |
| write(input); |
| TestUtils.verify(connector, conf, expected); |
| } |
| |
| @Test |
| public void testDefaultGraph() throws Exception { |
| final RyaStatement input = RyaStatement.builder() |
| .setSubject(new RyaIRI("http://www.google.com")) |
| .setPredicate(new RyaIRI("http://some_other_uri")) |
| .setObject(new RyaIRI("http://www.yahoo.com")) |
| .setValue(new byte[0]) |
| .setColumnVisibility(CV.getBytes()) |
| .build(); |
| final RyaStatement expected = RyaStatement.builder() |
| .setSubject(new RyaIRI("http://www.google.com")) |
| .setPredicate(new RyaIRI("http://some_other_uri")) |
| .setObject(new RyaIRI("http://www.yahoo.com")) |
| .setValue(new byte[0]) |
| .setColumnVisibility(CV.getBytes()) |
| .setContext(new RyaIRI(GRAPH)) |
| .build(); |
| RyaOutputFormat.setCoreTablesEnabled(job, true); |
| RyaOutputFormat.setFreeTextEnabled(job, false); |
| RyaOutputFormat.setTemporalEnabled(job, false); |
| RyaOutputFormat.setEntityEnabled(job, false); |
| RyaOutputFormat.setDefaultContext(job, GRAPH); |
| write(input); |
| TestUtils.verify(connector, conf, expected); |
| } |
| |
| @Test |
| public void testFreeTextIndexing() throws Exception { |
| final AccumuloFreeTextIndexer ft = new AccumuloFreeTextIndexer(); |
| ft.setConf(conf); |
| final RyaStatement input = RyaStatement.builder() |
| .setSubject(new RyaIRI(GRAPH + ":s")) |
| .setPredicate(new RyaIRI(GRAPH + ":p")) |
| .setObject(new RyaType(XMLSchema.STRING, "one two three four five")) |
| .build(); |
| RyaOutputFormat.setCoreTablesEnabled(job, false); |
| RyaOutputFormat.setFreeTextEnabled(job, true); |
| RyaOutputFormat.setTemporalEnabled(job, false); |
| RyaOutputFormat.setEntityEnabled(job, false); |
| write(input); |
| final Set<Statement> empty = new HashSet<>(); |
| final Set<Statement> expected = new HashSet<>(); |
| expected.add(RyaToRdfConversions.convertStatement(input)); |
| Assert.assertEquals(expected, getSet(ft.queryText("one", new StatementConstraints()))); |
| Assert.assertEquals(empty, getSet(ft.queryText("!two", new StatementConstraints()))); |
| Assert.assertEquals(expected, getSet(ft.queryText("*r", new StatementConstraints()))); |
| Assert.assertEquals(empty, getSet(ft.queryText("r*", new StatementConstraints()))); |
| Assert.assertEquals(expected, getSet(ft.queryText("!r*", new StatementConstraints()))); |
| Assert.assertEquals(expected, getSet(ft.queryText("t* & !s*", new StatementConstraints()))); |
| ft.close(); |
| } |
| |
| @Test |
| public void testTemporalIndexing() throws Exception { |
| final TemporalInstant[] instants = { |
| new TemporalInstantRfc3339(2015, 12, 30, 12, 00, 01), |
| new TemporalInstantRfc3339(2015, 12, 30, 12, 00, 02), |
| new TemporalInstantRfc3339(2015, 12, 30, 12, 00, 03), |
| new TemporalInstantRfc3339(2015, 12, 30, 12, 00, 03) |
| }; |
| final Statement[] statements = new Statement[instants.length]; |
| RyaOutputFormat.setCoreTablesEnabled(job, false); |
| RyaOutputFormat.setFreeTextEnabled(job, false); |
| RyaOutputFormat.setTemporalEnabled(job, true); |
| RyaOutputFormat.setEntityEnabled(job, false); |
| final ValueFactory vf = SimpleValueFactory.getInstance(); |
| for (int i = 0; i < instants.length; i++) { |
| final RyaType time = RdfToRyaConversions.convertLiteral(vf.createLiteral(instants[i].toString())); |
| final RyaStatement input = RyaStatement.builder() |
| .setSubject(new RyaIRI(GRAPH + ":s")) |
| .setPredicate(new RyaIRI(GRAPH + ":p")) |
| .setObject(time) |
| .build(); |
| write(input); |
| statements[i] = RyaToRdfConversions.convertStatement(input); |
| } |
| final AccumuloTemporalIndexer temporal = new AccumuloTemporalIndexer(); |
| temporal.setConf(conf); |
| Connector connector = ConfigUtils.getConnector(conf); |
| MultiTableBatchWriter mtbw = connector.createMultiTableBatchWriter(new BatchWriterConfig()); |
| temporal.setConnector(connector); |
| temporal.setMultiTableBatchWriter(mtbw); |
| temporal.init(); |
| final Set<Statement> empty = new HashSet<>(); |
| final Set<Statement> head = new HashSet<>(); |
| final Set<Statement> tail = new HashSet<>(); |
| head.add(statements[0]); |
| tail.add(statements[2]); |
| tail.add(statements[3]); |
| Assert.assertEquals(empty, getSet(temporal.queryInstantBeforeInstant(instants[0], new StatementConstraints()))); |
| Assert.assertEquals(empty, getSet(temporal.queryInstantAfterInstant(instants[3], new StatementConstraints()))); |
| Assert.assertEquals(head, getSet(temporal.queryInstantBeforeInstant(instants[1], new StatementConstraints()))); |
| Assert.assertEquals(tail, getSet(temporal.queryInstantAfterInstant(instants[1], new StatementConstraints()))); |
| temporal.close(); |
| } |
| |
| |
| @Test |
| public void testEntityIndexing() throws Exception { |
| final EntityCentricIndex entity = new EntityCentricIndex(); |
| entity.setConf(conf); |
| final RyaStatement input = RyaStatement.builder() |
| .setSubject(new RyaIRI(GRAPH + ":s")) |
| .setPredicate(new RyaIRI(GRAPH + ":p")) |
| .setObject(new RyaIRI(GRAPH + ":o")) |
| .build(); |
| RyaOutputFormat.setCoreTablesEnabled(job, false); |
| RyaOutputFormat.setFreeTextEnabled(job, false); |
| RyaOutputFormat.setTemporalEnabled(job, false); |
| RyaOutputFormat.setEntityEnabled(job, true); |
| write(input); |
| entity.close(); |
| final Set<Statement> expected = new HashSet<>(); |
| final Set<Statement> inserted = new HashSet<>(); |
| expected.add(RyaToRdfConversions.convertStatement(input)); |
| |
| final String table = EntityCentricIndex.getTableName(conf); |
| final Scanner scanner = connector.createScanner(table, new Authorizations(CV)); |
| for (final Map.Entry<Key, Value> row : scanner) { |
| System.out.println(row); |
| inserted.add(RyaToRdfConversions.convertStatement( |
| EntityCentricIndex.deserializeStatement(row.getKey(), row.getValue()))); |
| } |
| Assert.assertEquals(expected, inserted); |
| } |
| |
| private static <X> Set<X> getSet(final CloseableIteration<X, ?> iter) throws Exception { |
| final Set<X> set = new HashSet<X>(); |
| while (iter.hasNext()) { |
| set.add(iter.next()); |
| } |
| return set; |
| } |
| } |