blob: 26870b5014d7af152bb59968cd3b5cbdb97bc63f [file] [log] [blame]
package org.apache.drill.exec.store.phoenix;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.apache.drill.common.types.TypeProtos.MinorType;
import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
import org.apache.drill.exec.physical.impl.scan.framework.SchemaNegotiator;
import org.apache.drill.exec.physical.resultSet.RowSetLoader;
import org.apache.drill.exec.record.metadata.SchemaBuilder;
import org.apache.drill.exec.record.metadata.TupleMetadata;
import org.apache.drill.exec.vector.accessor.ScalarWriter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class PhoenixBatchReader implements ManagedReader<SchemaNegotiator> {
private static final Logger logger = LoggerFactory.getLogger(PhoenixBatchReader.class);
private final PhoenixSubScan subScan;
private PhoenixResultSet resultSet;
private ColumnDefn[] columns;
private int count = 1;
public PhoenixBatchReader(PhoenixSubScan subScan) {
this.subScan = subScan;
}
@Override
public boolean open(SchemaNegotiator negotiator) {
negotiator.tableSchema(defineMetadata(), true);
// negotiator.batchSize(3);
resultSet = new PhoenixResultSet(negotiator.build());
bindColumns(resultSet.getWriter());
return true;
}
String abc = "Currently, the Apache Drill build process is known to work on Linux, Windows and OSX.";
@Override
public boolean next() {
if (count > 3) {
return false;
}
// byte[] value = new byte[512];
// Arrays.fill(value, (byte) String.valueOf(count).charAt(0));
while(!resultSet.getWriter().isFull()) {
resultSet.getWriter().start();
for (int i = 0; i < columns.length; i++) {
columns[i].load(count + "\t" + abc);
}
resultSet.getWriter().save();
}
count++;
return true;
}
@Override
public void close() {
int count = resultSet.getWriter().loader().batchCount();
logger.info("phoenix fetch batch size: {}", count);
}
private TupleMetadata defineMetadata() {
List<String> cols = new ArrayList<String>(Arrays.asList("a", "b", "c"));
columns = new ColumnDefn[cols.size()];
SchemaBuilder builder = new SchemaBuilder();
for (int i = 0; i < cols.size(); i++) {
columns[i] = makeColumn(cols.get(i), i);
columns[i].define(builder);
}
return builder.buildSchema();
}
private ColumnDefn makeColumn(String name, int index) {
return new VarCharDefn(name, index);
}
private void bindColumns(RowSetLoader loader) {
for (int i = 0; i < columns.length; i++) {
columns[i].bind(loader);
}
}
public abstract static class ColumnDefn {
final String name;
int index;
ScalarWriter writer;
public String getName() {
return name;
}
public int getIndex() {
return index;
}
public ColumnDefn(String name, int index) {
this.name = name;
this.index = index;
}
public void bind(RowSetLoader loader) {
writer = loader.scalar(getName());
}
public abstract void define(SchemaBuilder builder);
public abstract void load(String value);
public abstract void load(byte[] value);
public abstract void load(int index, String value);
}
public static class VarCharDefn extends ColumnDefn {
public VarCharDefn(String name, int index) {
super(name, index);
}
@Override
public void define(SchemaBuilder builder) {
builder.addNullable(getName(), MinorType.VARCHAR);
}
@Override
public void load(String value) {
writer.setString(value);
}
@Override
public void load(byte[] value) {
writer.setBytes(value, value.length);
}
@Deprecated
@Override
public void load(int index, String value) { }
}
}