blob: 2d546deec9c5998f2967373d576c5d45b65e94e2 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.wayang.jdbc.operators;
import org.apache.wayang.basic.operators.TableSource;
import org.apache.wayang.commons.util.profiledb.model.measurement.TimeMeasurement;
import org.apache.wayang.core.optimizer.OptimizationContext;
import org.apache.wayang.core.optimizer.cardinality.CardinalityEstimate;
import org.apache.wayang.core.optimizer.cardinality.CardinalityEstimator;
import org.apache.wayang.jdbc.compiler.FunctionCompiler;
import org.apache.logging.log4j.LogManager;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
/**
* PostgreSQL implementation for the {@link TableSource}.
*/
public abstract class JdbcTableSource extends TableSource implements JdbcExecutionOperator {
/**
* Creates a new instance.
*
* @see TableSource#TableSource(String, String...)
*/
public JdbcTableSource(String tableName, String... columnNames) {
super(tableName, columnNames);
}
/**
* Copies an instance (exclusive of broadcasts).
*
* @param that that should be copied
*/
public JdbcTableSource(JdbcTableSource that) {
super(that);
}
@Override
public String createSqlClause(Connection connection, FunctionCompiler compiler) {
return this.getTableName();
}
@Override
public String getLoadProfileEstimatorConfigurationKey() {
return String.format("wayang.%s.tablesource.load", this.getPlatform().getPlatformId());
}
@Override
public CardinalityEstimator getCardinalityEstimator(int outputIndex) {
assert outputIndex == 0;
return new CardinalityEstimator() {
@Override
public CardinalityEstimate estimate(OptimizationContext optimizationContext, CardinalityEstimate... inputEstimates) {
// see Job for StopWatch measurements
final TimeMeasurement timeMeasurement = optimizationContext.getJob().getStopWatch().start(
"Optimization", "Cardinality&Load Estimation", "Push Estimation", "Estimate source cardinalities"
);
// Establish a DB connection.
try (Connection connection = JdbcTableSource.this.getPlatform()
.createDatabaseDescriptor(optimizationContext.getConfiguration())
.createJdbcConnection()) {
// Query the table cardinality.
final String sql = String.format("SELECT count(*) FROM %s;", JdbcTableSource.this.getTableName());
final ResultSet resultSet = connection.createStatement().executeQuery(sql);
if (!resultSet.next()) {
throw new SQLException("No query result for \"" + sql + "\".");
}
long cardinality = resultSet.getLong(1);
return new CardinalityEstimate(cardinality, cardinality, 1d);
} catch (Exception e) {
LogManager.getLogger(this.getClass()).error(
"Could not estimate cardinality for {}.", JdbcTableSource.this, e
);
// If we could not load the cardinality, let's use a very conservative estimate.
return new CardinalityEstimate(10, 10000000, 0.9);
} finally {
timeMeasurement.stop();
}
}
};
}
}