blob: e98a4a46f22b9188942f40b665910e4c8d21fdc5 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.wayang.jdbc.operators;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Spliterator;
import java.util.Spliterators;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import org.apache.logging.log4j.LogManager;
import org.apache.wayang.basic.data.Record;
import org.apache.wayang.basic.types.RecordType;
import org.apache.wayang.core.api.exception.WayangException;
import org.apache.wayang.core.optimizer.OptimizationContext;
import org.apache.wayang.core.optimizer.costs.LoadProfileEstimators;
import org.apache.wayang.core.plan.wayangplan.Operator;
import org.apache.wayang.core.plan.wayangplan.UnaryToUnaryOperator;
import org.apache.wayang.core.platform.ChannelDescriptor;
import org.apache.wayang.core.platform.ChannelInstance;
import org.apache.wayang.core.platform.lineage.ExecutionLineageNode;
import org.apache.wayang.core.types.DataSetType;
import org.apache.wayang.core.util.JsonSerializable;
import org.apache.wayang.core.util.ReflectionUtils;
import org.apache.wayang.core.util.Tuple;
import org.apache.wayang.core.util.json.JSONObject;
import org.apache.wayang.java.channels.StreamChannel;
import org.apache.wayang.java.execution.JavaExecutor;
import org.apache.wayang.java.operators.JavaExecutionOperator;
import org.apache.wayang.jdbc.channels.SqlQueryChannel;
import org.apache.wayang.jdbc.platform.JdbcPlatformTemplate;
/**
* This {@link Operator} converts {@link SqlQueryChannel}s to {@link StreamChannel}s.
*/
public class SqlToStreamOperator extends UnaryToUnaryOperator<Record, Record> implements JavaExecutionOperator, JsonSerializable {
private final JdbcPlatformTemplate jdbcPlatform;
/**
* Creates a new instance.
*
* @param jdbcPlatform from which the SQL data comes
*/
public SqlToStreamOperator(JdbcPlatformTemplate jdbcPlatform) {
this(jdbcPlatform, DataSetType.createDefault(Record.class));
}
/**
* Creates a new instance.
*
* @param jdbcPlatform from which the SQL data comes
* @param dataSetType type of the {@link Record}s being transformed; see {@link RecordType}
*/
public SqlToStreamOperator(JdbcPlatformTemplate jdbcPlatform, DataSetType<Record> dataSetType) {
super(dataSetType, dataSetType, false);
this.jdbcPlatform = jdbcPlatform;
}
protected SqlToStreamOperator(SqlToStreamOperator that) {
super(that);
this.jdbcPlatform = that.jdbcPlatform;
}
@Override
public Tuple<Collection<ExecutionLineageNode>, Collection<ChannelInstance>> evaluate(
ChannelInstance[] inputs,
ChannelInstance[] outputs,
JavaExecutor executor,
OptimizationContext.OperatorContext operatorContext) {
// Cast the inputs and outputs.
final SqlQueryChannel.Instance input = (SqlQueryChannel.Instance) inputs[0];
final StreamChannel.Instance output = (StreamChannel.Instance) outputs[0];
JdbcPlatformTemplate producerPlatform = (JdbcPlatformTemplate) input.getChannel().getProducer().getPlatform();
final Connection connection = producerPlatform
.createDatabaseDescriptor(executor.getConfiguration())
.createJdbcConnection();
Iterator<Record> resultSetIterator = new ResultSetIterator(connection, input.getSqlQuery());
Spliterator<Record> resultSetSpliterator = Spliterators.spliteratorUnknownSize(resultSetIterator, 0);
Stream<Record> resultSetStream = StreamSupport.stream(resultSetSpliterator, false);
output.accept(resultSetStream);
ExecutionLineageNode queryLineageNode = new ExecutionLineageNode(operatorContext);
queryLineageNode.add(LoadProfileEstimators.createFromSpecification(
String.format("wayang.%s.sqltostream.load.query", this.jdbcPlatform.getPlatformId()),
executor.getConfiguration()
));
queryLineageNode.addPredecessor(input.getLineage());
ExecutionLineageNode outputLineageNode = new ExecutionLineageNode(operatorContext);
outputLineageNode.add(LoadProfileEstimators.createFromSpecification(
String.format("wayang.%s.sqltostream.load.output", this.jdbcPlatform.getPlatformId()),
executor.getConfiguration()
));
output.getLineage().addPredecessor(outputLineageNode);
return queryLineageNode.collectAndMark();
}
@Override
public List<ChannelDescriptor> getSupportedInputChannels(int index) {
return Collections.singletonList(this.jdbcPlatform.getSqlQueryChannelDescriptor());
}
@Override
public List<ChannelDescriptor> getSupportedOutputChannels(int index) {
return Collections.singletonList(StreamChannel.DESCRIPTOR);
}
@Override
public Collection<String> getLoadProfileEstimatorConfigurationKeys() {
return Arrays.asList(
String.format("wayang.%s.sqltostream.load.query", this.jdbcPlatform.getPlatformId()),
String.format("wayang.%s.sqltostream.load.output", this.jdbcPlatform.getPlatformId())
);
}
/**
* Exposes a {@link ResultSet} as an {@link Iterator}.
*/
private static class ResultSetIterator implements Iterator<Record>, AutoCloseable {
/**
* Keeps around the {@link ResultSet} of the SQL query.
*/
private ResultSet resultSet;
/**
* The next {@link Record} to be delivered via {@link #next()}.
*/
private Record next;
/**
* Creates a new instance.
*
* @param connection the JDBC connection on which to execute a SQL query
* @param sqlQuery the SQL query
*/
ResultSetIterator(Connection connection, String sqlQuery) {
try {
//connection.setAutoCommit(false);
Statement st = connection.createStatement();
//st.setFetchSize(100000000);
this.resultSet = st.executeQuery(sqlQuery);
} catch (SQLException e) {
this.close();
throw new WayangException("Could not execute SQL.", e);
}
this.moveToNext();
}
/**
* Moves this instance to the next {@link Record}.
*/
private void moveToNext() {
try {
if (this.resultSet == null || !this.resultSet.next()) {
this.next = null;
this.close();
} else {
final int recordWidth = this.resultSet.getMetaData().getColumnCount();
Object[] values = new Object[recordWidth];
for (int i = 0; i < recordWidth; i++) {
values[i] = this.resultSet.getObject(i + 1);
}
this.next = new Record(values);
}
} catch (SQLException e) {
this.next = null;
this.close();
throw new WayangException("Exception while iterating the result set.", e);
}
}
@Override
public boolean hasNext() {
return this.next != null;
}
@Override
public Record next() {
Record curNext = this.next;
this.moveToNext();
return curNext;
}
@Override
public void close() {
if (this.resultSet != null) {
try {
this.resultSet.close();
} catch (Throwable t) {
LogManager.getLogger(this.getClass()).error("Could not close result set.", t);
} finally {
this.resultSet = null;
}
}
}
}
@Override
public JSONObject toJson() {
return new JSONObject().put("platform", this.jdbcPlatform.getClass().getCanonicalName());
}
@SuppressWarnings("unused")
public static SqlToStreamOperator fromJson(JSONObject jsonObject) {
final String platformClassName = jsonObject.getString("platform");
JdbcPlatformTemplate jdbcPlatform = ReflectionUtils.evaluate(platformClassName + ".getInstance()");
return new SqlToStreamOperator(jdbcPlatform);
}
}