blob: ba0efca52019f470d39e3eb62865a2f0a2d74607 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.pig.test;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.util.ArrayList;
import java.util.List;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PatternNode;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PatternPlan;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFRJoin;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFilter;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.plan.PlanException;
import org.junit.Before;
import org.junit.Test;
public class TestPhyPatternMatch {
int opKeyNum = 0;
@Before
public void setUp() throws Exception {
opKeyNum = 0;
}
@Test
public void testSingleNodePattern() throws PlanException{
//create pattern
PatternPlan ptPlan = new PatternPlan();
PatternNode ptFilNode = new PatternNode(ptPlan);
ptFilNode.setClassName(POFilter.class);
ptPlan.add(ptFilNode);
//create plan with single ForEach node
PhysicalPlan pplan = new PhysicalPlan();
POForEach fe = new POForEach(getNewOpKey());
pplan.add(fe);
// verify that match is false
boolean matched = ptPlan.match(pplan);
assertFalse("plan not matched", matched);
//add a filter to the plan (fe -> fil)
POFilter fil = new POFilter(getNewOpKey());
pplan.add(fil);
pplan.connect(fe, fil);
//verify that pattern matches
matched = ptPlan.match(pplan);
assertTrue("plan matched", matched);
assertEquals(" class matched ", ptFilNode.getMatch(), fil);
//test leaf/source settings in pattern node
ptFilNode.setSourceNode(true);
assertFalse("plan matched", ptPlan.match(pplan));
ptFilNode.setSourceNode(false);
ptFilNode.setLeafNode(true);
assertTrue("plan matched", ptPlan.match(pplan));
}
@Test
public void testTwoNodePattern() throws PlanException{
//create pattern (foreach -> filter)
Class<?>[] nodes = {POForEach.class, POFilter.class};
PatternPlan ptPlan = PatternPlan.create(nodes);
PatternNode ptFilNode = (PatternNode) ptPlan.getSinks().get(0);
PatternNode ptFENode = (PatternNode) ptPlan.getSources().get(0);
//create plan with single Filter node
PhysicalPlan pplan = new PhysicalPlan();
POFilter fil = new POFilter(getNewOpKey());
pplan.add(fil);
// verify that match is false
assertFalse("plan not matched", ptPlan.match(pplan));
//verify that there is no match in the pattern nodes
assertEquals("null match", ptFilNode.getMatch(), null);
assertEquals("null match", ptFENode.getMatch(), null);
//add a foreach to the plan (fe -> fil)
POForEach fe = new POForEach(getNewOpKey());
pplan.add(fe);
pplan.connect(fe, fil);
// verify that match is true
assertTrue("plan matched", ptPlan.match(pplan));
// set leaf and source properties and try again
ptFilNode.setLeafNode(true);
ptFENode.setSourceNode(true);
assertTrue("plan matched", ptPlan.match(pplan));
//add a store to the plan to make it (fe -> fil -> store)
POStore store = new POStore(getNewOpKey());
pplan.add(store);
pplan.connect(fil, store);
// match should fail because filter pt node leaf property is set
assertFalse("plan matched", ptPlan.match(pplan));
//reset patter filter node leaf property
ptFilNode.setLeafNode(false);
// verify that match is true
assertTrue("plan matched", ptPlan.match(pplan));
assertEquals("filter pt node match", ptFilNode.getMatch(), fil);
assertEquals("foreach pt node match", ptFENode.getMatch(), fe);
//add a store to the plan to make it (fe -> fe -> fil -> store)
POForEach fe2 = new POForEach(getNewOpKey());
pplan.add(fe2);
pplan.connect(fe2, fe);
// match fails because fe pattern node is set to be source
assertFalse("plan matched", ptPlan.match(pplan));
ptFENode.setSourceNode(false);
// verify that match is true
assertTrue("plan matched", ptPlan.match(pplan));
assertEquals("filter pt node match", ptFilNode.getMatch(), fil);
assertEquals("foreach pt node match", ptFENode.getMatch(), fe);
//create new plan (fil -> fe)
PhysicalPlan pplan2 = new PhysicalPlan();
POFilter fil2 = new POFilter(getNewOpKey());
pplan.add(fil2);
POForEach fe21 = new POForEach(getNewOpKey());
pplan.add(fe21);
pplan.connect(fil2, fe21);
//verify that plan does not match
assertFalse("plan not matched", ptPlan.match(pplan2));
assertEquals("null match", ptFilNode.getMatch(), null);
assertEquals("null match", ptFENode.getMatch(), null);
}
@Test
public void testThreeNodePatternLinear() throws PlanException{
//create pattern (fil -> FE -> store)
PatternPlan ptPlan = new PatternPlan();
PatternNode ptFilNode = createPtNode(ptPlan, POFilter.class);
PatternNode ptFENode = createPtNode(ptPlan, POForEach.class);
PatternNode ptStNode = createPtNode(ptPlan, POStore.class);
ptPlan.connect(ptFilNode, ptFENode);
ptPlan.connect(ptFENode, ptStNode);
//create plan fil -> fil -> fe -> store
PhysicalPlan pplan = new PhysicalPlan();
POFilter fil = new POFilter(getNewOpKey());
pplan.add(fil);
assertFalse("plan not matched", ptPlan.match(pplan));
assertEquals("null match", ptFilNode.getMatch(), null);
assertEquals("null match", ptFENode.getMatch(), null);
assertEquals("null match", ptStNode.getMatch(), null);
POForEach fe = new POForEach(getNewOpKey());
pplan.add(fe);
pplan.connect(fil, fe);
assertFalse("plan not matched", ptPlan.match(pplan));
POFilter fil2 = new POFilter(getNewOpKey());
pplan.add(fil2);
pplan.connect(fil2, fil);
assertFalse("plan not matched", ptPlan.match(pplan));
POStore store = new POStore(getNewOpKey());
pplan.add(store);
pplan.connect(fe, store);
assertTrue("plan matched", ptPlan.match(pplan));
assertEquals("test match node", ptFilNode.getMatch(), fil);
assertEquals("test match node", ptFENode.getMatch(), fe);
assertEquals("test match node", ptStNode.getMatch(), store);
}
@Test
public void testThreeNodePatternTwoParents() throws PlanException, ExecException{
//create pattern fe fil
// \ /
// frJoin
PatternPlan ptPlan = new PatternPlan();
PatternNode ptFilNode = createPtNode(ptPlan, POFilter.class);
PatternNode ptFENode = createPtNode(ptPlan, POForEach.class);
PatternNode ptJoinNode = createPtNode(ptPlan, POFRJoin.class);
ptPlan.connect(ptFilNode, ptJoinNode);
ptPlan.connect(ptFENode, ptJoinNode);
//create plan
PhysicalPlan pplan = new PhysicalPlan();
POFilter fil = new POFilter(getNewOpKey());
pplan.add(fil);
assertFalse("plan not matched", ptPlan.match(pplan));
POForEach fe = new POForEach(getNewOpKey());
pplan.add(fe);
assertFalse("plan not matched", ptPlan.match(pplan));
POFRJoin join = new POFRJoin(getNewOpKey(), 0, null,
new ArrayList<List<PhysicalPlan>>(), null, null, 0, false,null);
pplan.add(join);
pplan.connect(fil, join);
pplan.connect(fe, join);
assertTrue("plan matched", ptPlan.match(pplan));
assertEquals("test match node", ptFilNode.getMatch(), fil);
assertEquals("test match node", ptFENode.getMatch(), fe);
assertEquals("test match node", ptJoinNode.getMatch(), join);
}
private PatternNode createPtNode(PatternPlan ptPlan, Class<?> ptClass) {
PatternNode ptNode = new PatternNode(ptPlan);
ptNode.setClassName(ptClass);
ptPlan.add(ptNode);
return ptNode;
}
private OperatorKey getNewOpKey() {
return new OperatorKey("", ++opKeyNum);
}
}