| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| //package org.apache.rya.accumulo; |
| |
| // |
| //import com.google.common.collect.Iterators; |
| //import com.google.common.io.ByteArrayDataInput; |
| //import com.google.common.io.ByteStreams; |
| //import org.eclipse.rdf4j.common.iteration.CloseableIteration; |
| //import org.apache.rya.api.RdfCloudTripleStoreConstants; |
| //import org.apache.rya.api.RdfCloudTripleStoreUtils; |
| //import org.apache.rya.api.persist.RdfDAOException; |
| //import org.apache.rya.api.utils.NullableStatementImpl; |
| //import org.apache.accumulo.core.client.*; |
| //import org.apache.accumulo.core.data.Key; |
| //import org.apache.accumulo.core.data.Range; |
| //import org.apache.accumulo.core.iterators.user.AgeOffFilter; |
| //import org.apache.accumulo.core.iterators.user.TimestampFilter; |
| //import org.apache.accumulo.core.security.Authorizations; |
| //import org.apache.hadoop.io.Text; |
| //import org.eclipse.rdf4j.model.Resource; |
| //import org.eclipse.rdf4j.model.Statement; |
| //import org.eclipse.rdf4j.model.IRI; |
| //import org.eclipse.rdf4j.model.Value; |
| //import org.eclipse.rdf4j.query.BindingSet; |
| //import org.slf4j.Logger; |
| //import org.slf4j.LoggerFactory; |
| // |
| //import java.io.IOException; |
| //import java.util.Collection; |
| //import java.util.Collections; |
| //import java.util.HashSet; |
| //import java.util.Iterator; |
| //import java.util.Map.Entry; |
| // |
| //import static org.apache.rya.accumulo.AccumuloRdfConstants.ALL_AUTHORIZATIONS; |
| //import static org.apache.rya.api.RdfCloudTripleStoreConstants.TABLE_LAYOUT; |
| //import static org.apache.rya.api.RdfCloudTripleStoreUtils.writeValue; |
| // |
| //public class AccumuloRdfQueryIterator implements |
| // CloseableIteration<Entry<Statement, BindingSet>, RdfDAOException> { |
| // |
| // protected final Logger logger = LoggerFactory.getLogger(getClass()); |
| // |
| // private boolean open = false; |
| // private Iterator result; |
| // private Resource[] contexts; |
| // private Collection<Entry<Statement, BindingSet>> statements; |
| // private int numOfThreads = 20; |
| // |
| // private RangeBindingSetEntries rangeMap = new RangeBindingSetEntries(); |
| // private ScannerBase scanner; |
| // private boolean isBatchScanner = true; |
| // private Statement statement; |
| // Iterator<BindingSet> iter_bss = null; |
| // |
| // private boolean hasNext = true; |
| // private AccumuloRdfConfiguration conf; |
| // private TABLE_LAYOUT tableLayout; |
| // private Text context_txt; |
| // |
| // private DefineTripleQueryRangeFactory queryRangeFactory = new DefineTripleQueryRangeFactory(); |
| // |
| // public AccumuloRdfQueryIterator(Collection<Entry<Statement, BindingSet>> statements, Connector connector, Resource... contexts) |
| // throws RdfDAOException { |
| // this(statements, connector, null, contexts); |
| // } |
| // |
| // public AccumuloRdfQueryIterator(Collection<Entry<Statement, BindingSet>> statements, Connector connector, |
| // AccumuloRdfConfiguration conf, Resource... contexts) |
| // throws RdfDAOException { |
| // this.statements = statements; |
| // this.contexts = contexts; |
| // this.conf = conf; |
| // initialize(connector); |
| // open = true; |
| // } |
| // |
| // public AccumuloRdfQueryIterator(Resource subject, IRI predicate, Value object, Connector connector, |
| // AccumuloRdfConfiguration conf, Resource[] contexts) throws RdfDAOException { |
| // this(Collections.<Entry<Statement, BindingSet>>singleton(new RdfCloudTripleStoreUtils.CustomEntry<Statement, BindingSet>( |
| // new NullableStatementImpl(subject, predicate, object, contexts), |
| // null)), connector, conf, contexts); |
| // } |
| // |
| // protected void initialize(Connector connector) |
| // throws RdfDAOException { |
| // try { |
| // //TODO: We cannot span multiple tables here |
| // Collection<Range> ranges = new HashSet<Range>(); |
| // |
| // result = Iterators.emptyIterator(); |
| // Long startTime = conf.getStartTime(); |
| // Long ttl = conf.getTtl(); |
| // |
| // Resource context = null; |
| // for (Entry<Statement, BindingSet> stmtbs : statements) { |
| // Statement stmt = stmtbs.getKey(); |
| // Resource subject = stmt.getSubject(); |
| // IRI predicate = stmt.getPredicate(); |
| // Value object = stmt.getObject(); |
| // context = stmt.getContext(); //TODO: assumes the same context for all statements |
| // logger.debug("Batch Scan, lookup subject[" + subject + "] predicate[" + predicate + "] object[" + object + "] combination"); |
| // |
| // Entry<TABLE_LAYOUT, Range> entry = queryRangeFactory.defineRange(subject, predicate, object, conf); |
| // tableLayout = entry.getKey(); |
| //// isTimeRange = isTimeRange || queryRangeFactory.isTimeRange(); |
| // Range range = entry.getValue(); |
| // ranges.add(range); |
| // rangeMap.ranges.add(new RdfCloudTripleStoreUtils.CustomEntry<Range, BindingSet>(range, stmtbs.getValue())); |
| // } |
| // |
| // Authorizations authorizations = AccumuloRdfConstants.ALL_AUTHORIZATIONS; |
| // String auth = conf.getAuth(); |
| // if (auth != null) { |
| // authorizations = new Authorizations(auth.split(",")); |
| // } |
| // String table = RdfCloudTripleStoreUtils.layoutToTable(tableLayout, conf); |
| // result = createScanner(connector, authorizations, table, context, startTime, ttl, ranges); |
| //// if (isBatchScanner) { |
| //// ((BatchScanner) scanner).setRanges(ranges); |
| //// } else { |
| //// for (Range range : ranges) { |
| //// ((Scanner) scanner).setRange(range); //TODO: Not good way of doing this |
| //// } |
| //// } |
| //// |
| //// if (isBatchScanner) { |
| //// result = ((BatchScanner) scanner).iterator(); |
| //// } else { |
| //// result = ((Scanner) scanner).iterator(); |
| //// } |
| // } catch (Exception e) { |
| // throw new RdfDAOException(e); |
| // } |
| // } |
| // |
| // protected Iterator<Entry<Key, org.apache.accumulo.core.data.Value>> createScanner(Connector connector, Authorizations authorizations, String table, Resource context, Long startTime, Long ttl, Collection<Range> ranges) throws TableNotFoundException, IOException { |
| //// ShardedConnector shardedConnector = new ShardedConnector(connector, 4, ta) |
| // if (rangeMap.ranges.size() > (numOfThreads / 2)) { //TODO: Arbitrary number, make configurable |
| // BatchScanner scannerBase = connector.createBatchScanner(table, authorizations, numOfThreads); |
| // scannerBase.setRanges(ranges); |
| // populateScanner(context, startTime, ttl, scannerBase); |
| // return scannerBase.iterator(); |
| // } else { |
| // isBatchScanner = false; |
| // Iterator<Entry<Key, org.apache.accumulo.core.data.Value>>[] iters = new Iterator[ranges.size()]; |
| // int i = 0; |
| // for (Range range : ranges) { |
| // Scanner scannerBase = connector.createScanner(table, authorizations); |
| // populateScanner(context, startTime, ttl, scannerBase); |
| // scannerBase.setRange(range); |
| // iters[i] = scannerBase.iterator(); |
| // i++; |
| // scanner = scannerBase; //TODO: Always overridden, but doesn't matter since Scanner doesn't need to be closed |
| // } |
| // return Iterators.concat(iters); |
| // } |
| // |
| // } |
| // |
| // protected void populateScanner(Resource context, Long startTime, Long ttl, ScannerBase scannerBase) throws IOException { |
| // if (context != null) { //default graph |
| // context_txt = new Text(writeValue(context)); |
| // scannerBase.fetchColumnFamily(context_txt); |
| // } |
| // |
| //// if (!isQueryTimeBased(conf)) { |
| // if (startTime != null && ttl != null) { |
| //// scannerBase.setScanIterators(1, FilteringIterator.class.getName(), "filteringIterator"); |
| //// scannerBase.setScanIteratorOption("filteringIterator", "0", TimeRangeFilter.class.getName()); |
| //// scannerBase.setScanIteratorOption("filteringIterator", "0." + TimeRangeFilter.TIME_RANGE_PROP, ttl); |
| //// scannerBase.setScanIteratorOption("filteringIterator", "0." + TimeRangeFilter.START_TIME_PROP, startTime); |
| // IteratorSetting setting = new IteratorSetting(1, "fi", TimestampFilter.class.getName()); |
| // TimestampFilter.setStart(setting, startTime, true); |
| // TimestampFilter.setEnd(setting, startTime + ttl, true); |
| // scannerBase.addScanIterator(setting); |
| // } else if (ttl != null) { |
| //// scannerBase.setScanIterators(1, FilteringIterator.class.getName(), "filteringIterator"); |
| //// scannerBase.setScanIteratorOption("filteringIterator", "0", AgeOffFilter.class.getName()); |
| //// scannerBase.setScanIteratorOption("filteringIterator", "0.ttl", ttl); |
| // IteratorSetting setting = new IteratorSetting(1, "fi", AgeOffFilter.class.getName()); |
| // AgeOffFilter.setTTL(setting, ttl); |
| // scannerBase.addScanIterator(setting); |
| // } |
| //// } |
| // } |
| // |
| // @Override |
| // public void close() throws RdfDAOException { |
| // if (!open) |
| // return; |
| // verifyIsOpen(); |
| // open = false; |
| // if (scanner != null && isBatchScanner) { |
| // ((BatchScanner) scanner).close(); |
| // } |
| // } |
| // |
| // public void verifyIsOpen() throws RdfDAOException { |
| // if (!open) { |
| // throw new RdfDAOException("Iterator not open"); |
| // } |
| // } |
| // |
| // @Override |
| // public boolean hasNext() throws RdfDAOException { |
| // try { |
| // if (!open) |
| // return false; |
| // verifyIsOpen(); |
| // /** |
| // * For some reason, the result.hasNext returns false |
| // * once at the end of the iterator, and then true |
| // * for every subsequent call. |
| // */ |
| // hasNext = (hasNext && result.hasNext()); |
| // return hasNext || ((iter_bss != null) && iter_bss.hasNext()); |
| // } catch (Exception e) { |
| // throw new RdfDAOException(e); |
| // } |
| // } |
| // |
| // @Override |
| // public Entry<Statement, BindingSet> next() throws RdfDAOException { |
| // try { |
| // if (!this.hasNext()) |
| // return null; |
| // |
| // return getStatement(result, contexts); |
| // } catch (Exception e) { |
| // throw new RdfDAOException(e); |
| // } |
| // } |
| // |
| // public Entry<Statement, BindingSet> getStatement( |
| // Iterator<Entry<Key, org.apache.accumulo.core.data.Value>> rowResults, |
| // Resource... filterContexts) throws IOException { |
| // try { |
| // while (true) { |
| // if (iter_bss != null && iter_bss.hasNext()) { |
| // return new RdfCloudTripleStoreUtils.CustomEntry<Statement, BindingSet>(statement, iter_bss.next()); |
| // } |
| // |
| // if (rowResults.hasNext()) { |
| // Entry<Key, org.apache.accumulo.core.data.Value> entry = rowResults.next(); |
| // Key key = entry.getKey(); |
| // ByteArrayDataInput input = ByteStreams.newDataInput(key.getRow().getBytes()); |
| // statement = RdfCloudTripleStoreUtils.translateStatementFromRow(input, key.getColumnFamily(), tableLayout, RdfCloudTripleStoreConstants.VALUE_FACTORY); |
| // iter_bss = rangeMap.containsKey(key).iterator(); |
| // } else |
| // break; |
| // } |
| // } catch (Exception e) { |
| // throw new IOException(e); |
| // } |
| // return null; |
| // } |
| // |
| // @Override |
| // public void remove() throws RdfDAOException { |
| // next(); |
| // } |
| // |
| // public int getNumOfThreads() { |
| // return numOfThreads; |
| // } |
| // |
| // public void setNumOfThreads(int numOfThreads) { |
| // this.numOfThreads = numOfThreads; |
| // } |
| // |
| // public DefineTripleQueryRangeFactory getQueryRangeFactory() { |
| // return queryRangeFactory; |
| // } |
| // |
| // public void setQueryRangeFactory(DefineTripleQueryRangeFactory queryRangeFactory) { |
| // this.queryRangeFactory = queryRangeFactory; |
| // } |
| //} |