blob: fb6ce308fbc77d3bcb52a8e0c707c0b9fd107689 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hive.druid;
import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.data.input.impl.CSVParseSpec;
import org.apache.druid.data.input.impl.DelimitedParseSpec;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.InputRowParser;
import org.apache.druid.data.input.impl.JSONParseSpec;
import org.apache.druid.data.input.impl.StringInputRowParser;
import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.java.util.http.client.Request;
import org.apache.druid.java.util.http.client.response.StringFullResponseHandler;
import org.apache.druid.java.util.http.client.response.StringFullResponseHolder;
import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.writeout.TmpFileSegmentWriteOutMediumFactory;
import org.apache.hadoop.hive.druid.conf.DruidConstants;
import org.apache.hadoop.hive.druid.json.AvroParseSpec;
import org.apache.hadoop.hive.druid.json.AvroStreamInputRowParser;
import org.apache.hadoop.hive.druid.json.InlineSchemaAvroBytesDecoder;
import org.apache.hadoop.hive.druid.json.KafkaSupervisorIOConfig;
import org.apache.hadoop.hive.druid.json.KafkaSupervisorSpec;
import org.apache.hadoop.hive.druid.json.KafkaSupervisorTuningConfig;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.jboss.netty.handler.codec.http.HttpMethod;
import org.jboss.netty.handler.codec.http.HttpResponseStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.URL;
import java.nio.charset.Charset;
import java.util.HashMap;
import java.util.Map;
import static org.apache.hadoop.hive.druid.DruidStorageHandlerUtils.JSON_MAPPER;
/**
* Class containing some Utility methods for Kafka Ingestion.
*/
final class DruidKafkaUtils {
private static final Logger LOG = LoggerFactory.getLogger(DruidKafkaUtils.class);
private static final SessionState.LogHelper CONSOLE = new SessionState.LogHelper(LOG);
private DruidKafkaUtils() {
}
static KafkaSupervisorSpec createKafkaSupervisorSpec(
Table table,
String kafkaTopic,
String kafkaServers,
DataSchema dataSchema,
IndexSpec indexSpec
) {
return new KafkaSupervisorSpec(dataSchema,
new KafkaSupervisorTuningConfig(DruidStorageHandlerUtils.getIntegerProperty(table,
DruidConstants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "maxRowsInMemory"),
DruidStorageHandlerUtils.getLongProperty(table,
DruidConstants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "maxBytesInMemory"),
DruidStorageHandlerUtils.getIntegerProperty(table,
DruidConstants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "maxRowsPerSegment"), DruidStorageHandlerUtils
.getLongProperty(table, DruidConstants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "maxTotalRows"),
DruidStorageHandlerUtils.getPeriodProperty(table,
DruidConstants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "intermediatePersistPeriod"), null,
// basePersistDirectory - use druid default, no need to be configured by user
DruidStorageHandlerUtils
.getIntegerProperty(table, DruidConstants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "maxPendingPersists"),
indexSpec, null, null,
// buildV9Directly - use druid default, no need to be configured by user
DruidStorageHandlerUtils.getBooleanProperty(table,
DruidConstants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "reportParseExceptions"),
DruidStorageHandlerUtils.getLongProperty(table,
DruidConstants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "handoffConditionTimeout"),
DruidStorageHandlerUtils.getBooleanProperty(table,
DruidConstants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "resetOffsetAutomatically"),
TmpFileSegmentWriteOutMediumFactory.instance(), DruidStorageHandlerUtils
.getIntegerProperty(table, DruidConstants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "workerThreads"),
DruidStorageHandlerUtils.getIntegerProperty(table,
DruidConstants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "chatThreads"),
DruidStorageHandlerUtils.getLongProperty(table,
DruidConstants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "chatRetries"),
DruidStorageHandlerUtils.getPeriodProperty(table,
DruidConstants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "httpTimeout"),
DruidStorageHandlerUtils.getPeriodProperty(table,
DruidConstants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "shutdownTimeout"),
DruidStorageHandlerUtils.getPeriodProperty(table,
DruidConstants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "offsetFetchPeriod"),
DruidStorageHandlerUtils.getPeriodProperty(table,
DruidConstants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "intermediateHandoffPeriod"),
DruidStorageHandlerUtils.getBooleanProperty(table,
DruidConstants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "logParseExceptions"),
DruidStorageHandlerUtils.getIntegerProperty(table,
DruidConstants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "maxParseExceptions"),
DruidStorageHandlerUtils.getIntegerProperty(table,
DruidConstants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "maxSavedParseExceptions")),
new KafkaSupervisorIOConfig(kafkaTopic,
// Mandatory Property
DruidStorageHandlerUtils.getIntegerProperty(table,
DruidConstants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "replicas"),
DruidStorageHandlerUtils.getIntegerProperty(table,
DruidConstants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "taskCount"),
DruidStorageHandlerUtils.getPeriodProperty(table,
DruidConstants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "taskDuration"),
getKafkaConsumerProperties(table, kafkaServers),
// Mandatory Property
DruidStorageHandlerUtils.getPeriodProperty(table,
DruidConstants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "startDelay"),
DruidStorageHandlerUtils.getPeriodProperty(table,
DruidConstants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "period"),
DruidStorageHandlerUtils.getBooleanProperty(table,
DruidConstants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "useEarliestOffset"),
DruidStorageHandlerUtils.getPeriodProperty(table,
DruidConstants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "completionTimeout"),
DruidStorageHandlerUtils.getPeriodProperty(table,
DruidConstants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "lateMessageRejectionPeriod"),
DruidStorageHandlerUtils.getPeriodProperty(table,
DruidConstants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "earlyMessageRejectionPeriod"),
DruidStorageHandlerUtils.getBooleanProperty(table,
DruidConstants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "skipOffsetGaps")),
new HashMap<>());
}
private static Map<String, String> getKafkaConsumerProperties(Table table, String kafkaServers) {
ImmutableMap.Builder<String, String> builder = ImmutableMap.builder();
builder.put(KafkaSupervisorIOConfig.BOOTSTRAP_SERVERS_KEY, kafkaServers);
for (Map.Entry<String, String> entry : table.getParameters().entrySet()) {
if (entry.getKey().startsWith(DruidConstants.DRUID_KAFKA_CONSUMER_PROPERTY_PREFIX)) {
String propertyName = entry.getKey().substring(DruidConstants.DRUID_KAFKA_CONSUMER_PROPERTY_PREFIX.length());
builder.put(propertyName, entry.getValue());
}
}
return builder.build();
}
static void updateKafkaIngestionSpec(String overlordAddress, KafkaSupervisorSpec spec) {
try {
String task = JSON_MAPPER.writeValueAsString(spec);
CONSOLE.printInfo("submitting kafka Spec {}", task);
LOG.info("submitting kafka Supervisor Spec {}", task);
StringFullResponseHolder response = DruidStorageHandlerUtils
.getResponseFromCurrentLeader(DruidStorageHandler.getHttpClient(), new Request(HttpMethod.POST,
new URL(String.format("http://%s/druid/indexer/v1/supervisor", overlordAddress)))
.setContent("application/json", JSON_MAPPER.writeValueAsBytes(spec)),
new StringFullResponseHandler(Charset.forName("UTF-8")));
if (response.getStatus().equals(HttpResponseStatus.OK)) {
String
msg =
String.format("Kafka Supervisor for [%s] Submitted Successfully to druid.",
spec.getDataSchema().getDataSource());
LOG.info(msg);
CONSOLE.printInfo(msg);
} else {
throw new IOException(String.format("Unable to update Kafka Ingestion for Druid status [%d] full response [%s]",
response.getStatus().getCode(),
response.getContent()));
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
static boolean isKafkaStreamingTable(Table table) {
// For kafka Streaming tables it is mandatory to set a kafka topic.
return DruidStorageHandlerUtils.getTableProperty(table, DruidConstants.KAFKA_TOPIC) != null;
}
static InputRowParser getInputRowParser(Table table, TimestampSpec timestampSpec, DimensionsSpec dimensionsSpec) {
String parseSpecFormat = DruidStorageHandlerUtils.getTableProperty(table, DruidConstants.DRUID_PARSE_SPEC_FORMAT);
// Default case JSON
if ((parseSpecFormat == null) || "json".equalsIgnoreCase(parseSpecFormat)) {
return new StringInputRowParser(new JSONParseSpec(timestampSpec, dimensionsSpec, null, null), "UTF-8");
} else if ("csv".equalsIgnoreCase(parseSpecFormat)) {
return new StringInputRowParser(new CSVParseSpec(timestampSpec,
dimensionsSpec,
DruidStorageHandlerUtils.getTableProperty(table, DruidConstants.DRUID_PARSE_SPEC_LIST_DELIMITER),
DruidStorageHandlerUtils.getListProperty(table, DruidConstants.DRUID_PARSE_SPEC_COLUMNS),
DruidStorageHandlerUtils.getBooleanProperty(table, DruidConstants.DRUID_PARSE_SPEC_HAS_HEADER_ROWS, false),
DruidStorageHandlerUtils.getIntegerProperty(table, DruidConstants.DRUID_PARSE_SPEC_SKIP_HEADER_ROWS, 0)),
"UTF-8");
} else if ("delimited".equalsIgnoreCase(parseSpecFormat)) {
return new StringInputRowParser(new DelimitedParseSpec(timestampSpec,
dimensionsSpec,
DruidStorageHandlerUtils.getTableProperty(table, DruidConstants.DRUID_PARSE_SPEC_DELIMITER),
DruidStorageHandlerUtils.getTableProperty(table, DruidConstants.DRUID_PARSE_SPEC_LIST_DELIMITER),
DruidStorageHandlerUtils.getListProperty(table, DruidConstants.DRUID_PARSE_SPEC_COLUMNS),
DruidStorageHandlerUtils.getBooleanProperty(table, DruidConstants.DRUID_PARSE_SPEC_HAS_HEADER_ROWS, false),
DruidStorageHandlerUtils.getIntegerProperty(table, DruidConstants.DRUID_PARSE_SPEC_SKIP_HEADER_ROWS, 0)),
"UTF-8");
} else if ("avro".equalsIgnoreCase(parseSpecFormat)) {
try {
String avroSchemaLiteral = DruidStorageHandlerUtils.getTableProperty(table, DruidConstants.AVRO_SCHEMA_LITERAL);
Preconditions.checkNotNull(avroSchemaLiteral, "Please specify avro schema literal when using avro parser");
Map<String, Object>
avroSchema =
JSON_MAPPER.readValue(avroSchemaLiteral, new TypeReference<Map<String, Object>>() {
});
return new AvroStreamInputRowParser(new AvroParseSpec(timestampSpec, dimensionsSpec, null),
new InlineSchemaAvroBytesDecoder(avroSchema));
} catch (Exception e) {
throw new IllegalStateException("Exception while creating avro schema", e);
}
}
throw new IllegalArgumentException("Invalid parse spec format ["
+ parseSpecFormat
+ "]. "
+ "Supported types are : json, csv, tsv, avro");
}
}