blob: aaec11115817953f0c58ecdedd31fb9b5a9b7e24 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.client.solrj.io.stream;
import java.io.IOException;
import java.util.List;
import org.apache.solr.client.solrj.io.Tuple;
import org.apache.solr.client.solrj.io.stream.expr.Expressible;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionNamedParameter;
import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
/**
* Takes two streams (fullStream and hashStream) and joins them similar to an LeftOuterJoinStream. The difference
* in a OuterHashJoinStream is that the tuples in the hashStream will all be read and hashed when this stream is
* opened. This provides a few optimizations iff the hashStream has a relatively small number of documents.
* The difference between this and a HashJoinStream is that a tuple in the fullStream will be returned even
* if it doesn't have any matching tuples in the hashStream.
* You are expected to provide a set of fields for which the hash will be calculated from. If a tuple from the
* hashStream does not contain a value (ie, null) for one of the fields the hash is being computed on then that
* tuple will not be considered a match to anything. If a tuple from the fullStream does not contain a value (ie, null)
* for one of the fields the hash is being computed on then that tuple will be returned without any joined tuples
* from the hashStream
* @since 6.0.0
**/
public class OuterHashJoinStream extends HashJoinStream implements Expressible {
private static final long serialVersionUID = 1L;
public OuterHashJoinStream(TupleStream fullStream, TupleStream hashStream, List<String> hashOn) throws IOException {
super(fullStream, hashStream, hashOn);
}
public OuterHashJoinStream(StreamExpression expression,StreamFactory factory) throws IOException {
super(expression, factory);
}
@Override
public StreamExpression toExpression(StreamFactory factory) throws IOException {
// function name
StreamExpression expression = new StreamExpression(factory.getFunctionName(this.getClass()));
// streams
if(hashStream instanceof Expressible && fullStream instanceof Expressible){
expression.addParameter(((Expressible)fullStream).toExpression(factory));
expression.addParameter(new StreamExpressionNamedParameter("hashed", ((Expressible)hashStream).toExpression(factory)));
}
else{
throw new IOException("This OuterHashJoinStream contains a non-expressible TupleStream - it cannot be converted to an expression");
}
// on
StringBuilder sb = new StringBuilder();
for(int idx = 0; idx < leftHashOn.size(); ++idx){
if(sb.length() > 0){ sb.append(","); }
// we know that left and right hashOns are the same size
String left = leftHashOn.get(idx);
String right = rightHashOn.get(idx);
if(left.equals(right)){
sb.append(left);
}
else{
sb.append(left);
sb.append("=");
sb.append(right);
}
}
expression.addParameter(new StreamExpressionNamedParameter("on",sb.toString()));
return expression;
}
public Tuple read() throws IOException {
if(null == workingFullTuple){
Tuple fullTuple = fullStream.read();
// We're at the end of the line
if(fullTuple.EOF){
return fullTuple;
}
// If fullTuple doesn't have a valid hash or the hash cannot be found in the hashedTuples then
// return the tuple from fullStream.
// This is an outer join so there is no requirement there be a matching value in the hashed stream
String fullHash = computeHash(fullTuple, leftHashOn);
if(null == fullHash || !hashedTuples.containsKey(fullHash)){
return fullTuple.clone();
}
workingFullTuple = fullTuple;
workingFullHash = fullHash;
workngHashSetIdx = 0;
}
// At this point we know we have at least one doc to match on
// Due to the check at the end, before returning, we know we have at least one to match with left
List<Tuple> matches = hashedTuples.get(workingFullHash);
Tuple returnTuple = workingFullTuple.clone();
returnTuple.merge(matches.get(workngHashSetIdx));
// Increment this so the next time we hit the next matching tuple
workngHashSetIdx++;
if(workngHashSetIdx >= matches.size()){
// well, now we've reached all the matches, clear it all out
workingFullTuple = null;
workingFullHash = null;
workngHashSetIdx = 0;
}
return returnTuple;
}
}