blob: a689a52513680e447bf605e2a15f0ba3cd9f0623 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.wayang.core.optimizer.cardinality;
import org.apache.wayang.core.api.Configuration;
import org.apache.wayang.core.api.Job;
import org.apache.wayang.core.api.configuration.FunctionalKeyValueProvider;
import org.apache.wayang.core.api.configuration.KeyValueProvider;
import org.apache.wayang.core.optimizer.DefaultOptimizationContext;
import org.apache.wayang.core.optimizer.OptimizationContext;
import org.apache.wayang.core.plan.wayangplan.ElementaryOperator;
import org.apache.wayang.core.plan.wayangplan.OutputSlot;
import org.apache.wayang.core.plan.wayangplan.Subplan;
import org.apache.wayang.core.plan.wayangplan.test.TestJoin;
import org.apache.wayang.core.plan.wayangplan.test.TestMapOperator;
import org.apache.wayang.core.plan.wayangplan.test.TestSource;
import org.apache.wayang.core.test.MockFactory;
import org.apache.wayang.core.types.DataSetType;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
/**
* Test suite for {@link SubplanCardinalityPusher}.
*/
public class SubplanCardinalityPusherTest {
private Job job;
private Configuration configuration;
@Before
public void setUp() {
this.configuration = new Configuration();
KeyValueProvider<OutputSlot<?>, CardinalityEstimator> estimatorProvider =
new FunctionalKeyValueProvider<>(
(outputSlot, requestee) -> {
assert outputSlot.getOwner().isElementary()
: String.format("Cannot provide estimator for composite %s.", outputSlot.getOwner());
return ((ElementaryOperator) outputSlot.getOwner())
.createCardinalityEstimator(outputSlot.getIndex(), this.configuration)
.orElse(null);
},
this.configuration);
this.configuration.setCardinalityEstimatorProvider(estimatorProvider);
this.job = MockFactory.createJob(this.configuration);
}
@Test
public void testSimpleSubplan() {
TestMapOperator<String, String> op1 = new TestMapOperator<>(
DataSetType.createDefault(String.class),
DataSetType.createDefault(String.class)
);
TestMapOperator<String, String> op2 = new TestMapOperator<>(
DataSetType.createDefault(String.class),
DataSetType.createDefault(String.class)
);
op1.connectTo(0, op2, 0);
Subplan subplan = (Subplan) Subplan.wrap(op1, op2);
OptimizationContext optimizationContext = new DefaultOptimizationContext(this.job, subplan);
final OptimizationContext.OperatorContext subplanCtx = optimizationContext.getOperatorContext(subplan);
final CardinalityEstimate inputCardinality = new CardinalityEstimate(123, 321, 0.123d);
subplanCtx.setInputCardinality(0, inputCardinality);
subplan.propagateInputCardinality(0, subplanCtx);
final CardinalityPusher pusher = SubplanCardinalityPusher.createFor(subplan, this.configuration);
pusher.push(subplanCtx, this.configuration);
Assert.assertEquals(inputCardinality, subplanCtx.getOutputCardinality(0));
}
@Test
public void testSourceSubplan() {
TestSource<String> source = new TestSource<>(DataSetType.createDefault(String.class));
final CardinalityEstimate sourceCardinality = new CardinalityEstimate(123, 321, 0.123d);
source.setCardinalityEstimators((optimizationContext, inputEstimates) -> sourceCardinality);
TestMapOperator<String, String> op = new TestMapOperator<>(
DataSetType.createDefault(String.class),
DataSetType.createDefault(String.class)
);
source.connectTo(0, op, 0);
Subplan subplan = (Subplan) Subplan.wrap(source, op);
OptimizationContext optimizationContext = new DefaultOptimizationContext(this.job, subplan);
final OptimizationContext.OperatorContext subplanCtx = optimizationContext.getOperatorContext(subplan);
final CardinalityPusher pusher = SubplanCardinalityPusher.createFor(subplan, this.configuration);
pusher.push(subplanCtx, this.configuration);
Assert.assertEquals(sourceCardinality, subplanCtx.getOutputCardinality(0));
}
@Test
public void testDAGShapedSubplan() {
// _/-\_
// \ /
final DataSetType<String> stringDataSetType = DataSetType.createDefault(String.class);
TestMapOperator<String, String> map1 = new TestMapOperator<>(stringDataSetType, stringDataSetType);
map1.setName("map1");
TestMapOperator<String, String> map2 = new TestMapOperator<>(stringDataSetType, stringDataSetType);
map2.setName("map2");
TestMapOperator<String, String> map3 = new TestMapOperator<>(stringDataSetType, stringDataSetType);
map3.setName("map3");
TestJoin<String, String, String> join1 = new TestJoin<>(stringDataSetType, stringDataSetType, stringDataSetType);
join1.setName("join1");
TestMapOperator<String, String> map4 = new TestMapOperator<>(stringDataSetType, stringDataSetType);
map4.setName("map4");
map1.connectTo(0, map2, 0);
map1.connectTo(0, map3, 0);
map2.connectTo(0, join1, 0);
map3.connectTo(0, join1, 1);
join1.connectTo(0, map4, 0);
Subplan subplan = (Subplan) Subplan.wrap(map1, map4);
OptimizationContext optimizationContext = new DefaultOptimizationContext(this.job, subplan);
final OptimizationContext.OperatorContext subplanCtx = optimizationContext.getOperatorContext(subplan);
final CardinalityEstimate inputCardinality = new CardinalityEstimate(10, 100, 0.9d);
subplanCtx.setInputCardinality(0, inputCardinality);
subplan.propagateInputCardinality(0, subplanCtx);
final CardinalityPusher pusher = SubplanCardinalityPusher.createFor(subplan, this.configuration);
pusher.push(subplanCtx, this.configuration);
final CardinalityEstimate outputCardinality = subplanCtx.getOutputCardinality(0);
final CardinalityEstimate expectedCardinality = new CardinalityEstimate(10 * 10, 100 * 100, 0.9d * 0.7d);
Assert.assertEquals(expectedCardinality, outputCardinality);
}
}