blob: cd35ebd81f7106dc22126d0362da550125bf5131 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hive.druid;
import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.base.Supplier;
import com.google.common.base.Suppliers;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.apache.druid.data.input.impl.DimensionSchema;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.InputRowParser;
import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.RetryUtils;
import org.apache.druid.java.util.common.lifecycle.Lifecycle;
import org.apache.druid.java.util.http.client.HttpClient;
import org.apache.druid.java.util.http.client.HttpClientConfig;
import org.apache.druid.java.util.http.client.HttpClientInit;
import org.apache.druid.java.util.http.client.Request;
import org.apache.druid.java.util.http.client.response.StringFullResponseHandler;
import org.apache.druid.java.util.http.client.response.StringFullResponseHolder;
import org.apache.druid.metadata.MetadataStorageConnectorConfig;
import org.apache.druid.metadata.MetadataStorageTablesConfig;
import org.apache.druid.metadata.SQLMetadataConnector;
import org.apache.druid.metadata.storage.derby.DerbyConnector;
import org.apache.druid.metadata.storage.derby.DerbyMetadataStorage;
import org.apache.druid.metadata.storage.mysql.MySQLConnector;
import org.apache.druid.metadata.storage.mysql.MySQLConnectorConfig;
import org.apache.druid.metadata.storage.postgresql.PostgreSQLConnector;
import org.apache.druid.metadata.storage.postgresql.PostgreSQLConnectorConfig;
import org.apache.druid.metadata.storage.postgresql.PostgreSQLTablesConfig;
import org.apache.druid.query.BaseQuery;
import org.apache.druid.query.Query;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.indexing.granularity.GranularitySpec;
import org.apache.druid.segment.loading.DataSegmentPusher;
import org.apache.druid.segment.loading.SegmentLoadingException;
import org.apache.druid.storage.hdfs.HdfsDataSegmentPusher;
import org.apache.druid.storage.hdfs.HdfsDataSegmentPusherConfig;
import org.apache.druid.timeline.DataSegment;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.TableName;
import org.apache.hadoop.hive.conf.Constants;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.druid.conf.DruidConstants;
import org.apache.hadoop.hive.druid.io.DruidOutputFormat;
import org.apache.hadoop.hive.druid.io.DruidQueryBasedInputFormat;
import org.apache.hadoop.hive.druid.io.DruidRecordWriter;
import org.apache.hadoop.hive.druid.json.KafkaSupervisorReport;
import org.apache.hadoop.hive.druid.json.KafkaSupervisorSpec;
import org.apache.hadoop.hive.druid.security.KerberosHttpClient;
import org.apache.hadoop.hive.druid.serde.DruidSerDe;
import org.apache.hadoop.hive.metastore.DefaultHiveMetaHook;
import org.apache.hadoop.hive.metastore.HiveMetaHook;
import org.apache.hadoop.hive.metastore.api.EnvironmentContext;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.LockType;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.hooks.WriteEntity;
import org.apache.hadoop.hive.ql.metadata.HiveStorageHandler;
import org.apache.hadoop.hive.ql.metadata.StorageHandlerInfo;
import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
import org.apache.hadoop.hive.ql.plan.TableDesc;
import org.apache.hadoop.hive.ql.plan.TableScanDesc;
import org.apache.hadoop.hive.ql.security.authorization.DefaultHiveAuthorizationProvider;
import org.apache.hadoop.hive.ql.security.authorization.HiveAuthorizationProvider;
import org.apache.hadoop.hive.ql.security.authorization.HiveCustomStorageHandlerUtils;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.hive.serde2.AbstractSerDe;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.OutputFormat;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hive.common.util.ShutdownHookManager;
import org.jboss.netty.handler.codec.http.HttpMethod;
import org.jboss.netty.handler.codec.http.HttpResponseStatus;
import org.joda.time.DateTime;
import org.joda.time.Period;
import org.skife.jdbi.v2.exceptions.CallbackFailedException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URL;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import static org.apache.hadoop.hive.druid.DruidStorageHandlerUtils.JSON_MAPPER;
/**
* DruidStorageHandler provides a HiveStorageHandler implementation for Druid.
*/
@SuppressWarnings({ "rawtypes" }) public class DruidStorageHandler extends DefaultHiveMetaHook
implements HiveStorageHandler {
private static final Logger LOG = LoggerFactory.getLogger(DruidStorageHandler.class);
private static final SessionState.LogHelper CONSOLE = new SessionState.LogHelper(LOG);
public static final String SEGMENTS_DESCRIPTOR_DIR_NAME = "segmentsDescriptorDir";
private static final String INTERMEDIATE_SEGMENT_DIR_NAME = "intermediateSegmentDir";
private static final HttpClient HTTP_CLIENT;
private static final List<String> ALLOWED_ALTER_TYPES =
ImmutableList.of("ADDPROPS", "DROPPROPS", "ADDCOLS");
/** Druid prefix to form the URI for authentication */
private static final String DRUID_PREFIX = "druid:";
/** Druid config for determining the host name */
private static final String DRUID_HOST_NAME = "druid.zk.service.host";
static {
final Lifecycle lifecycle = new Lifecycle();
try {
lifecycle.start();
} catch (Exception e) {
LOG.error("Issues with lifecycle start", e);
}
HTTP_CLIENT = makeHttpClient(lifecycle);
ShutdownHookManager.addShutdownHook(lifecycle::stop);
}
private SQLMetadataConnector connector;
private MetadataStorageTablesConfig druidMetadataStorageTablesConfig = null;
private String uniqueId = null;
private String rootWorkingDir = null;
private Configuration conf;
public DruidStorageHandler() {
}
@VisibleForTesting public DruidStorageHandler(SQLMetadataConnector connector,
MetadataStorageTablesConfig druidMetadataStorageTablesConfig) {
this.connector = connector;
this.druidMetadataStorageTablesConfig = druidMetadataStorageTablesConfig;
}
@Override public Class<? extends InputFormat> getInputFormatClass() {
return DruidQueryBasedInputFormat.class;
}
@Override public Class<? extends OutputFormat> getOutputFormatClass() {
return DruidOutputFormat.class;
}
@Override public Class<? extends AbstractSerDe> getSerDeClass() {
return DruidSerDe.class;
}
@Override public HiveMetaHook getMetaHook() {
return this;
}
@Override public HiveAuthorizationProvider getAuthorizationProvider() {
return new DefaultHiveAuthorizationProvider();
}
@Override public void configureInputJobProperties(TableDesc tableDesc, Map<String, String> jobProperties) {
}
@Override public void configureInputJobCredentials(TableDesc tableDesc, Map<String, String> jobSecrets) {
}
@Override public void preCreateTable(Table table) throws MetaException {
if (!StringUtils.isEmpty(table.getSd().getLocation())) {
throw new MetaException("LOCATION may not be specified for Druid");
}
if (table.getPartitionKeysSize() != 0) {
throw new MetaException("PARTITIONED BY may not be specified for Druid");
}
if (table.getSd().getBucketColsSize() != 0) {
throw new MetaException("CLUSTERED BY may not be specified for Druid");
}
String dataSourceName = table.getParameters().get(Constants.DRUID_DATA_SOURCE);
if (dataSourceName != null) {
// Already Existing datasource in Druid.
return;
}
// create dataSourceName based on Hive Table name
dataSourceName = TableName.getDbTable(table.getDbName(), table.getTableName());
try {
// NOTE: This just created druid_segments table in Druid metastore.
// This is needed for the case when hive is started before any of druid services
// and druid_segments table has not been created yet.
getConnector().createSegmentTable();
} catch (Exception e) {
LOG.error("Exception while trying to create druid segments table", e);
throw new MetaException(e.getMessage());
}
Collection<String>
existingDataSources =
DruidStorageHandlerUtils.getAllDataSourceNames(getConnector(), getDruidMetadataStorageTablesConfig());
LOG.debug("pre-create data source with name {}", dataSourceName);
// Check for existence of for the datasource we are going to create in druid_segments table.
if (existingDataSources.contains(dataSourceName)) {
throw new MetaException(String.format("Data source [%s] already existing", dataSourceName));
}
table.getParameters().put(Constants.DRUID_DATA_SOURCE, dataSourceName);
}
@Override public void rollbackCreateTable(Table table) {
cleanWorkingDir();
}
@Override public void commitCreateTable(Table table) throws MetaException {
if (DruidKafkaUtils.isKafkaStreamingTable(table)) {
updateKafkaIngestion(table);
}
// For CTAS queries when user has explicitly specified the datasource.
// We will append the data to existing druid datasource.
this.commitInsertTable(table, false);
}
@Override
public URI getURIForAuth(Table table) throws URISyntaxException{
Map<String, String> tableProperties = HiveCustomStorageHandlerUtils.getTableProperties(table);
String host_name = conf.get(DRUID_HOST_NAME) != null ? conf.get(DRUID_HOST_NAME) :
HiveConf.getVar(getConf(), HiveConf.ConfVars.HIVE_DRUID_BROKER_DEFAULT_ADDRESS);
String table_name = tableProperties.get(Constants.DRUID_DATA_SOURCE);
String column_names = tableProperties.get(Constants.DRUID_QUERY_FIELD_NAMES);
if (column_names != null)
return new URI(DRUID_PREFIX+"//"+host_name+"/"+table_name+"/"+column_names);
else
return new URI(DRUID_PREFIX+"//"+host_name+"/"+table_name);
}
private void updateKafkaIngestion(Table table) {
final String overlordAddress = HiveConf.getVar(getConf(), HiveConf.ConfVars.HIVE_DRUID_OVERLORD_DEFAULT_ADDRESS);
final String
dataSourceName =
Preconditions.checkNotNull(DruidStorageHandlerUtils.getTableProperty(table, Constants.DRUID_DATA_SOURCE),
"Druid datasource name is null");
final String
kafkaTopic =
Preconditions.checkNotNull(DruidStorageHandlerUtils.getTableProperty(table,
DruidConstants.KAFKA_TOPIC), "kafka topic is null");
final String
kafkaServers =
Preconditions.checkNotNull(DruidStorageHandlerUtils.getTableProperty(table,
DruidConstants.KAFKA_BOOTSTRAP_SERVERS), "kafka connect string is null");
Properties tableProperties = new Properties();
tableProperties.putAll(table.getParameters());
final GranularitySpec granularitySpec = DruidStorageHandlerUtils.getGranularitySpec(getConf(), tableProperties);
List<FieldSchema> columns = table.getSd().getCols();
List<String> columnNames = new ArrayList<>(columns.size());
List<TypeInfo> columnTypes = new ArrayList<>(columns.size());
for (FieldSchema schema : columns) {
columnNames.add(schema.getName());
columnTypes.add(TypeInfoUtils.getTypeInfoFromTypeString(schema.getType()));
}
Pair<List<DimensionSchema>, AggregatorFactory[]>
dimensionsAndAggregates =
DruidStorageHandlerUtils.getDimensionsAndAggregates(columnNames, columnTypes);
if (!columnNames.contains(DruidConstants.DEFAULT_TIMESTAMP_COLUMN)) {
throw new IllegalStateException("Timestamp column (' "
+ DruidConstants.DEFAULT_TIMESTAMP_COLUMN
+ "') not specified in create table; list of columns is : "
+ columnNames);
}
DimensionsSpec dimensionsSpec = new DimensionsSpec(dimensionsAndAggregates.lhs, null, null);
String timestampFormat = DruidStorageHandlerUtils
.getTableProperty(table, DruidConstants.DRUID_TIMESTAMP_FORMAT);
String timestampColumnName = DruidStorageHandlerUtils
.getTableProperty(table, DruidConstants.DRUID_TIMESTAMP_COLUMN);
if(timestampColumnName == null) {
timestampColumnName = DruidConstants.DEFAULT_TIMESTAMP_COLUMN;
}
final TimestampSpec timestampSpec = new TimestampSpec(timestampColumnName, timestampFormat,
null
);
final InputRowParser inputRowParser = DruidKafkaUtils
.getInputRowParser(table, timestampSpec, dimensionsSpec);
final Map<String, Object>
inputParser =
JSON_MAPPER.convertValue(inputRowParser, new TypeReference<Map<String, Object>>() {
});
final DataSchema
dataSchema =
new DataSchema(dataSourceName,
inputParser,
dimensionsAndAggregates.rhs,
granularitySpec,
null,
DruidStorageHandlerUtils.JSON_MAPPER);
IndexSpec indexSpec = DruidStorageHandlerUtils.getIndexSpec(getConf());
KafkaSupervisorSpec
spec =
DruidKafkaUtils.createKafkaSupervisorSpec(table, kafkaTopic, kafkaServers, dataSchema, indexSpec);
// Fetch existing Ingestion Spec from Druid, if any
KafkaSupervisorSpec existingSpec = fetchKafkaIngestionSpec(table);
String
targetState =
DruidStorageHandlerUtils.getTableProperty(table, DruidConstants.DRUID_KAFKA_INGESTION);
if (targetState == null) {
// Case when user has not specified any ingestion state in the current command
// if there is a kafka supervisor running then keep it last known state is START otherwise STOP.
targetState = existingSpec == null ? "STOP" : "START";
}
if ("STOP".equalsIgnoreCase(targetState)) {
if (existingSpec != null) {
stopKafkaIngestion(overlordAddress, dataSourceName);
}
} else if ("START".equalsIgnoreCase(targetState)) {
if (existingSpec == null || !existingSpec.equals(spec)) {
DruidKafkaUtils.updateKafkaIngestionSpec(overlordAddress, spec);
}
} else if ("RESET".equalsIgnoreCase(targetState)) {
// Case when there are changes in multiple table properties.
if (existingSpec != null && !existingSpec.equals(spec)) {
DruidKafkaUtils.updateKafkaIngestionSpec(overlordAddress, spec);
}
resetKafkaIngestion(overlordAddress, dataSourceName);
} else {
throw new IllegalArgumentException(String.format(
"Invalid value for property [%s], Valid values are [START, STOP, RESET]",
DruidConstants.DRUID_KAFKA_INGESTION));
}
// We do not want to keep state in two separate places so remove from hive table properties.
table.getParameters().remove(DruidConstants.DRUID_KAFKA_INGESTION);
}
private void resetKafkaIngestion(String overlordAddress, String dataSourceName) {
try {
StringFullResponseHolder
response =
RetryUtils.retry(() -> DruidStorageHandlerUtils.getResponseFromCurrentLeader(getHttpClient(),
new Request(HttpMethod.POST, new URL(
String.format("http://%s/druid/indexer/v1/supervisor/%s/reset", overlordAddress, dataSourceName))),
new StringFullResponseHandler(Charset.forName("UTF-8"))), input -> input instanceof IOException,
getMaxRetryCount());
if (response.getStatus().equals(HttpResponseStatus.OK)) {
CONSOLE.printInfo("Druid Kafka Ingestion Reset successful.");
} else {
throw new IOException(String.format("Unable to reset Kafka Ingestion Druid status [%d] full response [%s]",
response.getStatus().getCode(),
response.getContent()));
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
private void stopKafkaIngestion(String overlordAddress, String dataSourceName) {
try {
StringFullResponseHolder
response =
RetryUtils.retry(() -> DruidStorageHandlerUtils.getResponseFromCurrentLeader(getHttpClient(),
new Request(HttpMethod.POST, new URL(
String.format("http://%s/druid/indexer/v1/supervisor/%s/shutdown", overlordAddress, dataSourceName))),
new StringFullResponseHandler(Charset.forName("UTF-8"))), input -> input instanceof IOException,
getMaxRetryCount());
if (response.getStatus().equals(HttpResponseStatus.OK)) {
CONSOLE.printInfo("Druid Kafka Ingestion shutdown successful.");
} else {
throw new IOException(String.format("Unable to stop Kafka Ingestion Druid status [%d] full response [%s]",
response.getStatus().getCode(),
response.getContent()));
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
private KafkaSupervisorSpec fetchKafkaIngestionSpec(Table table) {
// Stop Kafka Ingestion first
final String
overlordAddress =
Preconditions.checkNotNull(HiveConf.getVar(getConf(), HiveConf.ConfVars.HIVE_DRUID_OVERLORD_DEFAULT_ADDRESS),
"Druid Overlord Address is null");
String
dataSourceName =
Preconditions.checkNotNull(DruidStorageHandlerUtils.getTableProperty(table, Constants.DRUID_DATA_SOURCE),
"Druid Datasource name is null");
try {
StringFullResponseHolder
response =
RetryUtils.retry(() -> DruidStorageHandlerUtils.getResponseFromCurrentLeader(getHttpClient(),
new Request(HttpMethod.GET,
new URL(String.format("http://%s/druid/indexer/v1/supervisor/%s", overlordAddress, dataSourceName))),
new StringFullResponseHandler(Charset.forName("UTF-8"))), input -> input instanceof IOException,
getMaxRetryCount());
if (response.getStatus().equals(HttpResponseStatus.OK)) {
return JSON_MAPPER.readValue(response.getContent(), KafkaSupervisorSpec.class);
// Druid Returns 400 Bad Request when not found.
} else if (response.getStatus().equals(HttpResponseStatus.NOT_FOUND) || response.getStatus()
.equals(HttpResponseStatus.BAD_REQUEST)) {
LOG.debug("No Kafka Supervisor found for datasource[%s]", dataSourceName);
return null;
} else {
throw new IOException(String.format(
"Unable to fetch Kafka Ingestion Spec from Druid status [%d] full response [%s]",
response.getStatus().getCode(),
response.getContent()));
}
} catch (Exception e) {
throw new RuntimeException("Exception while fetching kafka ingestion spec from druid", e);
}
}
/**
* Fetches kafka supervisor status report from druid overlord. This method will return null if can not fetch report
*
* @param table object.
* @return kafka supervisor report or null when druid overlord is unreachable.
*/
@Nullable private KafkaSupervisorReport fetchKafkaSupervisorReport(Table table) {
final String
overlordAddress =
Preconditions.checkNotNull(HiveConf.getVar(getConf(), HiveConf.ConfVars.HIVE_DRUID_OVERLORD_DEFAULT_ADDRESS),
"Druid Overlord Address is null");
final String
dataSourceName =
Preconditions.checkNotNull(DruidStorageHandlerUtils.getTableProperty(table, Constants.DRUID_DATA_SOURCE),
"Druid Datasource name is null");
try {
StringFullResponseHolder
response =
RetryUtils.retry(() -> DruidStorageHandlerUtils.getResponseFromCurrentLeader(getHttpClient(),
new Request(HttpMethod.GET, new URL(
String.format("http://%s/druid/indexer/v1/supervisor/%s/status", overlordAddress, dataSourceName))),
new StringFullResponseHandler(Charset.forName("UTF-8"))), input -> input instanceof IOException,
getMaxRetryCount());
if (response.getStatus().equals(HttpResponseStatus.OK)) {
return DruidStorageHandlerUtils.JSON_MAPPER.readValue(response.getContent(), KafkaSupervisorReport.class);
// Druid Returns 400 Bad Request when not found.
} else if (response.getStatus().equals(HttpResponseStatus.NOT_FOUND) || response.getStatus()
.equals(HttpResponseStatus.BAD_REQUEST)) {
LOG.info("No Kafka Supervisor found for datasource[%s]", dataSourceName);
return null;
} else {
LOG.error("Unable to fetch Kafka Supervisor status [%d] full response [%s]",
response.getStatus().getCode(),
response.getContent());
return null;
}
} catch (Exception e) {
LOG.error("Exception while fetching kafka ingestion spec from druid", e);
return null;
}
}
/**
* Creates metadata moves then commit the Segment's metadata to Druid metadata store in one TxN.
*
* @param table Hive table
* @param overwrite true if it is an insert overwrite table.
*/
private List<DataSegment> loadAndCommitDruidSegments(Table table, boolean overwrite, List<DataSegment> segmentsToLoad)
throws IOException, CallbackFailedException {
final String dataSourceName = table.getParameters().get(Constants.DRUID_DATA_SOURCE);
final String
segmentDirectory =
table.getParameters().get(DruidConstants.DRUID_SEGMENT_DIRECTORY) != null ?
table.getParameters().get(DruidConstants.DRUID_SEGMENT_DIRECTORY) :
HiveConf.getVar(getConf(), HiveConf.ConfVars.DRUID_SEGMENT_DIRECTORY);
final HdfsDataSegmentPusherConfig hdfsSegmentPusherConfig = new HdfsDataSegmentPusherConfig();
List<DataSegment> publishedDataSegmentList;
LOG.info(String.format("Moving [%s] Druid segments from staging directory [%s] to Deep storage [%s]",
segmentsToLoad.size(),
getStagingWorkingDir().toString(),
segmentDirectory));
hdfsSegmentPusherConfig.setStorageDirectory(segmentDirectory);
DataSegmentPusher dataSegmentPusher = new HdfsDataSegmentPusher(hdfsSegmentPusherConfig, getConf(), JSON_MAPPER);
publishedDataSegmentList =
DruidStorageHandlerUtils.publishSegmentsAndCommit(getConnector(),
getDruidMetadataStorageTablesConfig(),
dataSourceName,
segmentsToLoad,
overwrite,
getConf(),
dataSegmentPusher);
return publishedDataSegmentList;
}
/**
* This function checks the load status of Druid segments by polling druid coordinator.
* @param segments List of druid segments to check for
*/
private void checkLoadStatus(List<DataSegment> segments) {
final String
coordinatorAddress =
HiveConf.getVar(getConf(), HiveConf.ConfVars.HIVE_DRUID_COORDINATOR_DEFAULT_ADDRESS);
int maxTries = getMaxRetryCount();
LOG.debug("checking load status from coordinator {}", coordinatorAddress);
String coordinatorResponse;
try {
coordinatorResponse =
RetryUtils.retry(() -> DruidStorageHandlerUtils.getResponseFromCurrentLeader(getHttpClient(),
new Request(HttpMethod.GET, new URL(String.format("http://%s/status", coordinatorAddress))),
new StringFullResponseHandler(Charset.forName("UTF-8"))).getContent(),
input -> input instanceof IOException, maxTries);
} catch (Exception e) {
CONSOLE.printInfo("Will skip waiting for data loading, coordinator unavailable");
return;
}
if (Strings.isNullOrEmpty(coordinatorResponse)) {
CONSOLE.printInfo("Will skip waiting for data loading empty response from coordinator");
}
CONSOLE.printInfo(String.format("Waiting for the loading of [%s] segments", segments.size()));
long passiveWaitTimeMs = HiveConf.getLongVar(getConf(), HiveConf.ConfVars.HIVE_DRUID_PASSIVE_WAIT_TIME);
Set<URL> urlsOfUnloadedSegments = segments
.stream()
.map(dataSegment -> {
try {
//Need to make sure that we are using segment identifier
return new URL(String.format("http://%s/druid/coordinator/v1/datasources/%s/segments/%s",
coordinatorAddress,
dataSegment.getDataSource(),
dataSegment.getId().toString()));
} catch (MalformedURLException e) {
Throwables.propagate(e);
}
return null;
})
.collect(Collectors.toSet());
int numRetries = 0;
while (numRetries++ < maxTries && !urlsOfUnloadedSegments.isEmpty()) {
urlsOfUnloadedSegments = ImmutableSet.copyOf(Sets.filter(urlsOfUnloadedSegments, input -> {
try {
String result = DruidStorageHandlerUtils
.getResponseFromCurrentLeader(getHttpClient(), new Request(HttpMethod.GET, input),
new StringFullResponseHandler(Charset.forName("UTF-8"))).getContent();
LOG.debug("Checking segment [{}] response is [{}]", input, result);
return Strings.isNullOrEmpty(result);
} catch (InterruptedException | ExecutionException e) {
LOG.error(String.format("Error while checking URL [%s]", input), e);
return true;
}
}));
try {
if (!urlsOfUnloadedSegments.isEmpty()) {
Thread.sleep(passiveWaitTimeMs);
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
if (!urlsOfUnloadedSegments.isEmpty()) {
// We are not Throwing an exception since it might be a transient issue that is blocking loading
CONSOLE.printError(String.format("Wait time exhausted and we have [%s] out of [%s] segments not loaded yet",
urlsOfUnloadedSegments.size(),
segments.size()));
}
}
@VisibleForTesting void deleteSegment(DataSegment segment) throws SegmentLoadingException {
final Path path = DruidStorageHandlerUtils.getPath(segment);
LOG.info("removing segment {}, located at path {}", segment.getId().toString(), path);
try {
if (path.getName().endsWith(".zip")) {
final FileSystem fs = path.getFileSystem(getConf());
if (!fs.exists(path)) {
LOG.warn("Segment Path {} does not exist. It appears to have been deleted already.", path);
return;
}
// path format -- > .../dataSource/interval/version/partitionNum/xxx.zip
Path partitionNumDir = path.getParent();
if (!fs.delete(partitionNumDir, true)) {
throw new SegmentLoadingException("Unable to kill segment, failed to delete dir [%s]",
partitionNumDir.toString());
}
//try to delete other directories if possible
Path versionDir = partitionNumDir.getParent();
if (safeNonRecursiveDelete(fs, versionDir)) {
Path intervalDir = versionDir.getParent();
if (safeNonRecursiveDelete(fs, intervalDir)) {
Path dataSourceDir = intervalDir.getParent();
safeNonRecursiveDelete(fs, dataSourceDir);
}
}
} else {
throw new SegmentLoadingException("Unknown file type[%s]", path);
}
} catch (IOException e) {
throw new SegmentLoadingException(e, "Unable to kill segment");
}
}
private static boolean safeNonRecursiveDelete(FileSystem fs, Path path) {
try {
return fs.delete(path, false);
} catch (Exception ex) {
return false;
}
}
@Override public void preDropTable(Table table) {
// Nothing to do
}
@Override public void rollbackDropTable(Table table) {
// Nothing to do
}
@Override public void commitDropTable(Table table, boolean deleteData) {
if (DruidKafkaUtils.isKafkaStreamingTable(table)) {
// Stop Kafka Ingestion first
final String
overlordAddress =
Preconditions.checkNotNull(HiveConf.getVar(getConf(), HiveConf.ConfVars.HIVE_DRUID_OVERLORD_DEFAULT_ADDRESS),
"Druid Overlord Address is null");
String
dataSourceName =
Preconditions.checkNotNull(DruidStorageHandlerUtils.getTableProperty(table, Constants.DRUID_DATA_SOURCE),
"Druid Datasource name is null");
stopKafkaIngestion(overlordAddress, dataSourceName);
}
String
dataSourceName =
Preconditions.checkNotNull(table.getParameters().get(Constants.DRUID_DATA_SOURCE), "DataSource name is null !");
// Move MetaStoreUtils.isExternalTablePurge(table) calls to a common place for all StorageHandlers
// deleteData flag passed down to StorageHandler should be true only if
// MetaStoreUtils.isExternalTablePurge(table) returns true.
if (deleteData && MetaStoreUtils.isExternalTablePurge(table)) {
LOG.info("Dropping with purge all the data for data source {}", dataSourceName);
List<DataSegment>
dataSegmentList =
DruidStorageHandlerUtils.getDataSegmentList(getConnector(),
getDruidMetadataStorageTablesConfig(),
dataSourceName);
if (dataSegmentList.isEmpty()) {
LOG.info("Nothing to delete for data source {}", dataSourceName);
return;
}
for (DataSegment dataSegment : dataSegmentList) {
try {
deleteSegment(dataSegment);
} catch (SegmentLoadingException e) {
LOG.error(String.format("Error while deleting segment [%s]", dataSegment.getId().toString()), e);
}
}
}
if (DruidStorageHandlerUtils.disableDataSource(getConnector(),
getDruidMetadataStorageTablesConfig(),
dataSourceName)) {
LOG.info("Successfully dropped druid data source {}", dataSourceName);
}
}
@Override public void commitInsertTable(Table table, boolean overwrite) throws MetaException {
LOG.debug("commit insert into table {} overwrite {}", table.getTableName(), overwrite);
try {
// Check if there segments to load
final Path segmentDescriptorDir = getSegmentDescriptorDir();
final List<DataSegment> segmentsToLoad = fetchSegmentsMetadata(segmentDescriptorDir);
final String dataSourceName = table.getParameters().get(Constants.DRUID_DATA_SOURCE);
//No segments to load still need to honer overwrite
if (segmentsToLoad.isEmpty() && overwrite) {
//disable datasource
//Case it is an insert overwrite we have to disable the existing Druid DataSource
DruidStorageHandlerUtils.disableDataSource(getConnector(),
getDruidMetadataStorageTablesConfig(),
dataSourceName);
} else if (!segmentsToLoad.isEmpty()) {
// at this point we have Druid segments from reducers but we need to atomically
// rename and commit to metadata
// Moving Druid segments and committing to druid metadata as one transaction.
checkLoadStatus(loadAndCommitDruidSegments(table, overwrite, segmentsToLoad));
}
} catch (IOException e) {
throw new MetaException(e.getMessage());
} catch (CallbackFailedException c) {
LOG.error("Error while committing transaction to druid metadata storage", c);
throw new MetaException(c.getCause().getMessage());
} finally {
cleanWorkingDir();
}
}
private List<DataSegment> fetchSegmentsMetadata(Path segmentDescriptorDir) throws IOException {
if (!segmentDescriptorDir.getFileSystem(getConf()).exists(segmentDescriptorDir)) {
LOG.info("Directory {} does not exist, ignore this if it is create statement or inserts of 0 rows,"
+ " no Druid segments to move, cleaning working directory {}",
segmentDescriptorDir.toString(),
getStagingWorkingDir().toString());
return Collections.emptyList();
}
return DruidStorageHandlerUtils.getCreatedSegments(segmentDescriptorDir, getConf());
}
@Override public void preInsertTable(Table table, boolean overwrite) {
}
@Override public void rollbackInsertTable(Table table, boolean overwrite) {
// do nothing
}
@Override public void configureOutputJobProperties(TableDesc tableDesc, Map<String, String> jobProperties) {
jobProperties.put(Constants.DRUID_DATA_SOURCE, tableDesc.getTableName());
jobProperties.put(DruidConstants.DRUID_SEGMENT_VERSION, new DateTime().toString());
jobProperties.put(DruidConstants.DRUID_JOB_WORKING_DIRECTORY, getStagingWorkingDir().toString());
// DruidOutputFormat will write segments in an intermediate directory
jobProperties.put(DruidConstants.DRUID_SEGMENT_INTERMEDIATE_DIRECTORY,
getIntermediateSegmentDir().toString());
}
@Override public void configureTableJobProperties(TableDesc tableDesc, Map<String, String> jobProperties) {
}
@Override public void configureJobConf(TableDesc tableDesc, JobConf jobConf) {
final boolean kerberosEnabled = HiveConf.getBoolVar(getConf(), HiveConf.ConfVars.HIVE_DRUID_KERBEROS_ENABLE);
if (kerberosEnabled && UserGroupInformation.isSecurityEnabled()) {
// AM can not do Kerberos Auth so will do the input split generation in the HS2
LOG.debug("Setting {} to {} to enable split generation on HS2",
HiveConf.ConfVars.HIVE_AM_SPLIT_GENERATION.toString(),
Boolean.FALSE.toString());
jobConf.set(HiveConf.ConfVars.HIVE_AM_SPLIT_GENERATION.toString(), Boolean.FALSE.toString());
}
try {
Utilities.addDependencyJars(jobConf, DruidRecordWriter.class);
} catch (IOException e) {
Throwables.propagate(e);
}
}
@Override public void setConf(Configuration conf) {
this.conf = conf;
}
@Override public Configuration getConf() {
return conf;
}
@Override public LockType getLockType(WriteEntity writeEntity) {
if (writeEntity.getWriteType().equals(WriteEntity.WriteType.INSERT)) {
return LockType.SHARED_READ;
}
return LockType.SHARED_WRITE;
}
@Override public String toString() {
return Constants.DRUID_HIVE_STORAGE_HANDLER_ID;
}
private String getUniqueId() {
if (uniqueId == null) {
uniqueId =
Preconditions.checkNotNull(Strings.emptyToNull(HiveConf.getVar(getConf(), HiveConf.ConfVars.HIVEQUERYID)),
"Hive query id is null");
}
return uniqueId;
}
private Path getStagingWorkingDir() {
return new Path(getRootWorkingDir(), makeStagingName());
}
private MetadataStorageTablesConfig getDruidMetadataStorageTablesConfig() {
if (druidMetadataStorageTablesConfig != null) {
return druidMetadataStorageTablesConfig;
}
final String base = HiveConf.getVar(getConf(), HiveConf.ConfVars.DRUID_METADATA_BASE);
druidMetadataStorageTablesConfig = MetadataStorageTablesConfig.fromBase(base);
return druidMetadataStorageTablesConfig;
}
private SQLMetadataConnector getConnector() {
return Suppliers.memoize(this::buildConnector).get();
}
private static String getPassword(Configuration conf, HiveConf.ConfVars var) {
try {
final char[] password = conf.getPassword(var.varname);
return password == null ? null : String.valueOf(password);
} catch (IOException e) {
// Reading password from configuration for backwards compatibility.
LOG.warn("Unable to retrieve password from credential providers. Trying to read it from config..", e);
return conf.get(var.varname);
}
}
private SQLMetadataConnector buildConnector() {
if (connector != null) {
return connector;
}
final String dbType = HiveConf.getVar(getConf(), HiveConf.ConfVars.DRUID_METADATA_DB_TYPE);
final String username = HiveConf.getVar(getConf(), HiveConf.ConfVars.DRUID_METADATA_DB_USERNAME);
final String password = getPassword(getConf(), HiveConf.ConfVars.DRUID_METADATA_DB_PASSWORD);
final String uri = HiveConf.getVar(getConf(), HiveConf.ConfVars.DRUID_METADATA_DB_URI);
LOG.debug("Supplying SQL Connector with DB type {}, URI {}, User {}", dbType, uri, username);
@SuppressWarnings("Guava") final Supplier<MetadataStorageConnectorConfig>
storageConnectorConfigSupplier =
Suppliers.ofInstance(new MetadataStorageConnectorConfig() {
@Override public String getConnectURI() {
return uri;
}
@Override public String getUser() {
return Strings.emptyToNull(username);
}
@Override public String getPassword() {
return Strings.emptyToNull(password);
}
});
switch (dbType) {
case "mysql":
connector =
new MySQLConnector(storageConnectorConfigSupplier,
Suppliers.ofInstance(getDruidMetadataStorageTablesConfig()),
new MySQLConnectorConfig());
break;
case "postgresql":
connector =
new PostgreSQLConnector(storageConnectorConfigSupplier,
Suppliers.ofInstance(getDruidMetadataStorageTablesConfig()), new PostgreSQLConnectorConfig(),
new PostgreSQLTablesConfig());
break;
case "derby":
connector =
new DerbyConnector(new DerbyMetadataStorage(storageConnectorConfigSupplier.get()),
storageConnectorConfigSupplier,
Suppliers.ofInstance(getDruidMetadataStorageTablesConfig()));
break;
default:
throw new IllegalStateException(String.format("Unknown metadata storage type [%s]", dbType));
}
return connector;
}
@VisibleForTesting String makeStagingName() {
return ".staging-".concat(getUniqueId().replace(":", ""));
}
private Path getSegmentDescriptorDir() {
return new Path(getStagingWorkingDir(), SEGMENTS_DESCRIPTOR_DIR_NAME);
}
private Path getIntermediateSegmentDir() {
return new Path(getStagingWorkingDir(), INTERMEDIATE_SEGMENT_DIR_NAME);
}
private void cleanWorkingDir() {
final FileSystem fileSystem;
try {
fileSystem = getStagingWorkingDir().getFileSystem(getConf());
fileSystem.delete(getStagingWorkingDir(), true);
} catch (IOException e) {
LOG.error("Got Exception while cleaning working directory", e);
}
}
private String getRootWorkingDir() {
if (Strings.isNullOrEmpty(rootWorkingDir)) {
rootWorkingDir = HiveConf.getVar(getConf(), HiveConf.ConfVars.DRUID_WORKING_DIR);
}
return rootWorkingDir;
}
private static HttpClient makeHttpClient(Lifecycle lifecycle) {
final int
numConnection =
HiveConf.getIntVar(SessionState.getSessionConf(), HiveConf.ConfVars.HIVE_DRUID_NUM_HTTP_CONNECTION);
final Period
readTimeout =
new Period(HiveConf.getVar(SessionState.getSessionConf(), HiveConf.ConfVars.HIVE_DRUID_HTTP_READ_TIMEOUT));
LOG.info("Creating Druid HTTP client with {} max parallel connections and {}ms read timeout",
numConnection,
readTimeout.toStandardDuration().getMillis());
final HttpClient
httpClient =
HttpClientInit.createClient(HttpClientConfig.builder()
.withNumConnections(numConnection)
.withReadTimeout(new Period(readTimeout).toStandardDuration())
.build(), lifecycle);
final boolean kerberosEnabled =
HiveConf.getBoolVar(SessionState.getSessionConf(), HiveConf.ConfVars.HIVE_DRUID_KERBEROS_ENABLE);
if (kerberosEnabled && UserGroupInformation.isSecurityEnabled()) {
LOG.info("building Kerberos Http Client");
return new KerberosHttpClient(httpClient);
}
return httpClient;
}
public static HttpClient getHttpClient() {
return HTTP_CLIENT;
}
@Override public void preAlterTable(Table table, EnvironmentContext context) throws MetaException {
String alterOpType = context == null ? null : context.getProperties().get(ALTER_TABLE_OPERATION_TYPE);
// alterOpType is null in case of stats update
if (alterOpType != null && !ALLOWED_ALTER_TYPES.contains(alterOpType)) {
throw new MetaException("ALTER TABLE can not be used for " + alterOpType + " to a non-native table ");
}
if (DruidKafkaUtils.isKafkaStreamingTable(table)) {
updateKafkaIngestion(table);
}
}
private int getMaxRetryCount() {
return HiveConf.getIntVar(getConf(), HiveConf.ConfVars.HIVE_DRUID_MAX_TRIES);
}
@Override public StorageHandlerInfo getStorageHandlerInfo(Table table) throws MetaException {
if (DruidKafkaUtils.isKafkaStreamingTable(table)) {
KafkaSupervisorReport kafkaSupervisorReport = fetchKafkaSupervisorReport(table);
if (kafkaSupervisorReport == null) {
return DruidStorageHandlerInfo.UNREACHABLE;
}
return new DruidStorageHandlerInfo(kafkaSupervisorReport);
} else {
// Currently we do not expose any runtime info for non-streaming tables.
// In future extend this add more information regarding table status.
// e.g. Total size of segments in druid, load status of table on historical nodes etc.
return null;
}
}
@Override public Map<String, String> getOperatorDescProperties(OperatorDesc operatorDesc,
Map<String, String> initialProps) {
if (operatorDesc instanceof TableScanDesc) {
TableScanDesc tableScanDesc = (TableScanDesc) operatorDesc;
ExprNodeGenericFuncDesc filterExpr = tableScanDesc.getFilterExpr();
String druidQuery = initialProps.get(Constants.DRUID_QUERY_JSON);
if (filterExpr != null && druidQuery != null) {
try {
Query query = DruidStorageHandlerUtils.JSON_MAPPER.readValue(druidQuery, BaseQuery.class);
Query queryWithDynamicFilters = DruidStorageHandlerUtils.addDynamicFilters(query, filterExpr, conf, false);
Map<String, String> props = Maps.newHashMap(initialProps);
props.put(Constants.DRUID_QUERY_JSON,
DruidStorageHandlerUtils.JSON_MAPPER.writeValueAsString(queryWithDynamicFilters));
return props;
} catch (IOException e) {
LOG.error("Exception while deserializing druid query. Explain plan may not have final druid query", e);
}
}
}
// Case when we do not have any additional info to add.
return initialProps;
}
}