blob: 095fa17d03a89505d537308ee208700f0bc65d8d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import org.apache.druid.collections.bitmap.BitmapFactory;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.query.filter.Filter;
import org.joda.time.Interval;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
/**
* DefaultQueryMetrics is unsafe for use from multiple threads. It fails with RuntimeException on access not from the
* thread where it was constructed. To "transfer" DefaultQueryMetrics from one thread to another {@link #ownerThread}
* field should be updated.
*/
public class DefaultQueryMetrics<QueryType extends Query<?>> implements QueryMetrics<QueryType>
{
protected final ServiceMetricEvent.Builder builder = new ServiceMetricEvent.Builder();
protected final Map<String, Number> metrics = new HashMap<>();
/**
* Non final to give subclasses ability to reassign it.
*/
protected Thread ownerThread = Thread.currentThread();
private static String getTableNamesAsString(DataSource dataSource)
{
final Set<String> names = dataSource.getTableNames();
if (names.size() == 1) {
return Iterables.getOnlyElement(names);
} else {
return names.stream().sorted().collect(Collectors.toList()).toString();
}
}
protected void checkModifiedFromOwnerThread()
{
if (Thread.currentThread() != ownerThread) {
throw new IllegalStateException(
"DefaultQueryMetrics must not be modified from multiple threads. If it is needed to gather dimension or "
+ "metric information from multiple threads or from an async thread, this information should explicitly be "
+ "passed between threads (e. g. using Futures), or this DefaultQueryMetrics's ownerThread should be "
+ "reassigned explicitly");
}
}
protected void setDimension(String dimension, Object value)
{
checkModifiedFromOwnerThread();
builder.setDimension(dimension, value);
}
@Override
public void query(QueryType query)
{
dataSource(query);
queryType(query);
interval(query);
hasFilters(query);
duration(query);
queryId(query);
subQueryId(query);
sqlQueryId(query);
context(query);
}
@Override
public void dataSource(QueryType query)
{
setDimension(DruidMetrics.DATASOURCE, getTableNamesAsString(query.getDataSource()));
}
@Override
public void queryType(QueryType query)
{
setDimension(DruidMetrics.TYPE, query.getType());
}
@Override
public void interval(QueryType query)
{
checkModifiedFromOwnerThread();
builder.setDimension(
DruidMetrics.INTERVAL,
query.getIntervals().stream().map(Interval::toString).toArray(String[]::new)
);
}
@Override
public void hasFilters(QueryType query)
{
setDimension("hasFilters", String.valueOf(query.hasFilters()));
}
@Override
public void duration(QueryType query)
{
setDimension("duration", query.getDuration().toString());
}
@Override
public void queryId(QueryType query)
{
setDimension(DruidMetrics.ID, StringUtils.nullToEmptyNonDruidDataString(query.getId()));
}
@Override
public void subQueryId(QueryType query)
{
// Emit nothing by default.
}
@Override
public void sqlQueryId(QueryType query)
{
// Emit nothing by default.
}
@Override
public void context(QueryType query)
{
setDimension("context", query.getContext() == null ? ImmutableMap.of() : query.getContext());
}
@Override
public void server(String host)
{
setDimension("server", host);
}
@Override
public void remoteAddress(String remoteAddress)
{
setDimension("remoteAddress", remoteAddress);
}
@Override
public void status(String status)
{
setDimension(DruidMetrics.STATUS, status);
}
@Override
public void success(boolean success)
{
setDimension("success", String.valueOf(success));
}
@Override
public void segment(String segmentIdentifier)
{
setDimension("segment", segmentIdentifier);
}
@Override
public void preFilters(List<Filter> preFilters)
{
// Emit nothing by default.
}
@Override
public void postFilters(List<Filter> postFilters)
{
// Emit nothing by default.
}
@Override
public void identity(String identity)
{
// Emit nothing by default.
}
@Override
public void vectorized(final boolean vectorized)
{
// Emit nothing by default.
}
@Override
public void parallelMergeParallelism(final int parallelism)
{
// Emit nothing by default.
}
@Override
public BitmapResultFactory<?> makeBitmapResultFactory(BitmapFactory factory)
{
return new DefaultBitmapResultFactory(factory);
}
@Override
public QueryMetrics<QueryType> reportQueryTime(long timeNs)
{
return reportMillisTimeMetric("query/time", timeNs);
}
@Override
public QueryMetrics<QueryType> reportQueryBytes(long byteCount)
{
return reportMetric("query/bytes", byteCount);
}
@Override
public QueryMetrics<QueryType> reportWaitTime(long timeNs)
{
return reportMillisTimeMetric("query/wait/time", timeNs);
}
@Override
public QueryMetrics<QueryType> reportSegmentTime(long timeNs)
{
return reportMillisTimeMetric("query/segment/time", timeNs);
}
@Override
public QueryMetrics<QueryType> reportSegmentAndCacheTime(long timeNs)
{
return reportMillisTimeMetric("query/segmentAndCache/time", timeNs);
}
@Override
public QueryMetrics<QueryType> reportCpuTime(long timeNs)
{
return reportMetric("query/cpu/time", TimeUnit.NANOSECONDS.toMicros(timeNs));
}
@Override
public QueryMetrics<QueryType> reportNodeTimeToFirstByte(long timeNs)
{
return reportMillisTimeMetric("query/node/ttfb", timeNs);
}
@Override
public QueryMetrics<QueryType> reportBackPressureTime(long timeNs)
{
// Don't emit by default.
return this;
}
@Override
public QueryMetrics<QueryType> reportNodeTime(long timeNs)
{
return reportMillisTimeMetric("query/node/time", timeNs);
}
@Override
public QueryMetrics<QueryType> reportNodeBytes(long byteCount)
{
return reportMetric("query/node/bytes", byteCount);
}
@Override
public QueryMetrics<QueryType> reportBitmapConstructionTime(long timeNs)
{
// Don't emit by default.
return this;
}
@Override
public QueryMetrics<QueryType> reportSegmentRows(long numRows)
{
// Don't emit by default.
return this;
}
@Override
public QueryMetrics<QueryType> reportPreFilteredRows(long numRows)
{
// Don't emit by default.
return this;
}
@Override
public QueryMetrics<QueryType> reportParallelMergeParallelism(int parallelism)
{
// Don't emit by default.
return this;
}
@Override
public QueryMetrics<QueryType> reportParallelMergeInputSequences(long numSequences)
{
// Don't emit by default.
return this;
}
@Override
public QueryMetrics<QueryType> reportParallelMergeInputRows(long numRows)
{
// Don't emit by default.
return this;
}
@Override
public QueryMetrics<QueryType> reportParallelMergeOutputRows(long numRows)
{
// Don't emit by default.
return this;
}
@Override
public QueryMetrics<QueryType> reportParallelMergeTaskCount(long numTasks)
{
// Don't emit by default.
return this;
}
@Override
public QueryMetrics<QueryType> reportParallelMergeTotalCpuTime(long timeNs)
{
// Don't emit by default.
return this;
}
@Override
public QueryMetrics<QueryType> reportQueriedSegmentCount(long segmentCount)
{
// Don't emit by default.
return this;
}
@Override
public void emit(ServiceEmitter emitter)
{
checkModifiedFromOwnerThread();
for (Map.Entry<String, Number> metric : metrics.entrySet()) {
emitter.emit(builder.build(metric.getKey(), metric.getValue()));
}
metrics.clear();
}
protected QueryMetrics<QueryType> reportMetric(String metricName, Number value)
{
checkModifiedFromOwnerThread();
metrics.put(metricName, value);
return this;
}
private QueryMetrics<QueryType> reportMillisTimeMetric(String metricName, long timeNs)
{
return reportMetric(metricName, TimeUnit.NANOSECONDS.toMillis(timeNs));
}
}