blob: 9b56cf44a94b2595ae9bb07fe7198670a42f3f00 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tajo.storage.kafka;
import org.apache.hadoop.fs.Path;
import org.apache.kafka.common.PartitionInfo;
import org.apache.tajo.ExecutionBlockId;
import org.apache.tajo.OverridableConf;
import org.apache.tajo.TajoConstants;
import org.apache.tajo.catalog.Schema;
import org.apache.tajo.catalog.SortSpec;
import org.apache.tajo.catalog.TableDesc;
import org.apache.tajo.catalog.TableMeta;
import org.apache.tajo.catalog.statistics.TableStats;
import org.apache.tajo.exception.NotImplementedException;
import org.apache.tajo.exception.TajoException;
import org.apache.tajo.exception.TajoInternalError;
import org.apache.tajo.exception.TajoRuntimeException;
import org.apache.tajo.exception.UnsupportedException;
import org.apache.tajo.plan.LogicalPlan;
import org.apache.tajo.plan.expr.EvalNode;
import org.apache.tajo.plan.logical.LogicalNode;
import org.apache.tajo.storage.FormatProperty;
import org.apache.tajo.storage.StorageProperty;
import org.apache.tajo.storage.Tablespace;
import org.apache.tajo.storage.TupleRange;
import org.apache.tajo.storage.fragment.Fragment;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import net.minidev.json.JSONObject;
/**
* Tablespace for Kafka table.
*/
public class KafkaTablespace extends Tablespace {
public static final StorageProperty KAFKA_STORAGE_PROPERTIES = new StorageProperty("kafka", false, false, false,
false);
public static final FormatProperty KAFKA_FORMAT_PROPERTIES = new FormatProperty(false, false, false);
public KafkaTablespace(String name, URI uri, JSONObject config) {
super(name, uri, config);
}
@Override
protected void storageInit() throws IOException {
}
@Override
public long getTableVolume(TableDesc table, Optional<EvalNode> filter) {
long totalVolume;
try {
totalVolume = getSplits("", table, false, filter.orElse(null)).stream()
.map(f -> f.getLength())
.filter(size -> size > 0) // eliminate unknown sizes (-1)
.reduce(0L, Long::sum);
} catch (TajoException e) {
throw new TajoRuntimeException(e);
} catch (Throwable ioe) {
throw new TajoInternalError(ioe);
}
return totalVolume;
}
@Override
public void close() {
}
@Override
public void createTable(TableDesc tableDesc, boolean ifNotExists) throws TajoException, IOException {
TableStats stats = new TableStats();
stats.setNumRows(TajoConstants.UNKNOWN_ROW_NUMBER);
tableDesc.setStats(stats);
}
@Override
public void purgeTable(TableDesc tableDesc) throws IOException, TajoException {
}
@Override
public URI getTableUri(TableMeta meta, String databaseName, String tableName) {
return URI.create(uri.toString() + "/" + tableName);
}
@Override
public List<Fragment> getSplits(String inputSourceId,
TableDesc table,
boolean requireSorted,
EvalNode filterCondition)
throws IOException, TajoException {
String topic = table.getMeta().getProperty(KafkaStorageConstants.KAFKA_TOPIC);
int fragmentSize = Integer.parseInt(table.getMeta().getProperty(KafkaStorageConstants.KAFKA_FRAGMENT_SIZE,
KafkaStorageConstants.DEFAULT_FRAGMENT_SIZE));
// If isn't specific partitions, scan all partition of topic.
String partitions = table.getMeta().getProperty(KafkaStorageConstants.KAFKA_TOPIC_PARTITION,
KafkaStorageConstants.DEFAULT_PARTITION);
List<PartitionInfo> partitionList;
if (partitions.equals(KafkaStorageConstants.DEFAULT_PARTITION)) {
partitionList = SimpleConsumerManager.getPartitions(uri, topic);
} else {
partitionList = new LinkedList<>();
// filter partitions.
List<PartitionInfo> topicPartitions = SimpleConsumerManager.getPartitions(uri, topic);
Map<String, PartitionInfo> topicPartitionsMap = new HashMap<>();
for (PartitionInfo partitionInfo : topicPartitions) {
topicPartitionsMap.put(Integer.toString(partitionInfo.partition()), partitionInfo);
}
for (String partitionId : partitions.split(",")) {
partitionList.add(topicPartitionsMap.get(partitionId));
}
}
List<Fragment> fragments = new ArrayList<Fragment>();
for (PartitionInfo partitionInfo : partitionList) {
int partitionId = partitionInfo.partition();
String leaderHost = partitionInfo.leader().host();
long lastOffset;
long startOffset;
try (SimpleConsumerManager simpleConsumerManager = new SimpleConsumerManager(uri, topic, partitionId)) {
lastOffset = simpleConsumerManager.getLatestOffset();
startOffset = simpleConsumerManager.getEarliestOffset();
}
long messageSize = lastOffset - startOffset;
if (0 == lastOffset || 0 == messageSize)
continue;
// If message count of partition is less than fragmentSize(message count of one fragment),
if (messageSize <= fragmentSize) {
fragments.add(new KafkaFragment(table.getUri(), inputSourceId, topic, startOffset,
lastOffset, partitionId, leaderHost));
} else { // If message count of partition is greater than fragmentSize,
long nextFragmentStartOffset = startOffset;
while (nextFragmentStartOffset < lastOffset) {
// partition data: 0 1 2 3 4 5 6 7 8 9 10
// start offset: 0
// last offset: 11
// fragment size: 3
// result: (0, 1, 2), (3, 4, 5), (6, 7, 8), (9, 10)
// 1st nextFragmentStartOffset=0, nextFragmentLastOffset=3
// 2st nextFragmentStartOffset=3, nextFragmentLastOffset=6
// 3st nextFragmentStartOffset=6, nextFragmentLastOffset=9
// 4st nextFragmentStartOffset=9, nextFragmentLastOffset=12
long nextFragmentLastOffset = nextFragmentStartOffset + fragmentSize;
// the offset of last part is small than fragmentSize so that Tajo gets the minimum value.
long fragmentLstOffset = Math.min(nextFragmentLastOffset, lastOffset);
fragments.add(new KafkaFragment(table.getUri(), inputSourceId, topic,
nextFragmentStartOffset, fragmentLstOffset, partitionId, leaderHost));
nextFragmentStartOffset = nextFragmentLastOffset;
}
}
}
return fragments;
}
@Override
public StorageProperty getProperty() {
return KAFKA_STORAGE_PROPERTIES;
}
@Override
public FormatProperty getFormatProperty(TableMeta meta) {
return KAFKA_FORMAT_PROPERTIES;
}
@Override
public TupleRange[] getInsertSortRanges(OverridableConf queryContext, TableDesc tableDesc, Schema inputSchema,
SortSpec[] sortSpecs, TupleRange dataRange) throws IOException {
throw new TajoRuntimeException(new NotImplementedException());
}
@Override
public void verifySchemaToWrite(TableDesc tableDesc, Schema outSchema) throws TajoException {
throw new TajoRuntimeException(new NotImplementedException());
}
@Override
public void prepareTable(LogicalNode node) throws IOException, TajoException {
}
@Override
public Path commitTable(OverridableConf queryContext, ExecutionBlockId finalEbId, LogicalPlan plan, Schema schema,
TableDesc tableDesc) throws IOException {
throw new TajoRuntimeException(new NotImplementedException());
}
@Override
public void rollbackTable(LogicalNode node) throws IOException, TajoException {
throw new TajoRuntimeException(new NotImplementedException());
}
@Override
public URI getStagingUri(OverridableConf context, String queryId, TableMeta meta) throws IOException {
throw new TajoRuntimeException(new UnsupportedException());
}
}