blob: a9e0bfeeb92ea49f942714be7fb87abd28e65243 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexing.kafka.supervisor;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.indexing.kafka.KafkaIndexTaskTuningConfig;
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorTuningConfig;
import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.incremental.AppendableIndexSpec;
import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory;
import org.joda.time.Duration;
import org.joda.time.Period;
import javax.annotation.Nullable;
import java.io.File;
public class KafkaSupervisorTuningConfig extends KafkaIndexTaskTuningConfig
implements SeekableStreamSupervisorTuningConfig
{
private final Integer workerThreads;
private final Integer chatThreads;
private final Long chatRetries;
private final Duration httpTimeout;
private final Duration shutdownTimeout;
private final Duration offsetFetchPeriod;
public static KafkaSupervisorTuningConfig defaultConfig()
{
return new KafkaSupervisorTuningConfig(
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null
);
}
public KafkaSupervisorTuningConfig(
@JsonProperty("appendableIndexSpec") @Nullable AppendableIndexSpec appendableIndexSpec,
@JsonProperty("maxRowsInMemory") Integer maxRowsInMemory,
@JsonProperty("maxBytesInMemory") Long maxBytesInMemory,
@JsonProperty("maxRowsPerSegment") Integer maxRowsPerSegment,
@JsonProperty("maxTotalRows") Long maxTotalRows,
@JsonProperty("intermediatePersistPeriod") Period intermediatePersistPeriod,
@JsonProperty("basePersistDirectory") File basePersistDirectory,
@JsonProperty("maxPendingPersists") Integer maxPendingPersists,
@JsonProperty("indexSpec") IndexSpec indexSpec,
@JsonProperty("indexSpecForIntermediatePersists") @Nullable IndexSpec indexSpecForIntermediatePersists,
// This parameter is left for compatibility when reading existing configs, to be removed in Druid 0.12.
@JsonProperty("buildV9Directly") Boolean buildV9Directly,
@JsonProperty("reportParseExceptions") Boolean reportParseExceptions,
@JsonProperty("handoffConditionTimeout") Long handoffConditionTimeout,
@JsonProperty("resetOffsetAutomatically") Boolean resetOffsetAutomatically,
@JsonProperty("segmentWriteOutMediumFactory") @Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory,
@JsonProperty("workerThreads") Integer workerThreads,
@JsonProperty("chatThreads") Integer chatThreads,
@JsonProperty("chatRetries") Long chatRetries,
@JsonProperty("httpTimeout") Period httpTimeout,
@JsonProperty("shutdownTimeout") Period shutdownTimeout,
@JsonProperty("offsetFetchPeriod") Period offsetFetchPeriod,
@JsonProperty("intermediateHandoffPeriod") Period intermediateHandoffPeriod,
@JsonProperty("logParseExceptions") @Nullable Boolean logParseExceptions,
@JsonProperty("maxParseExceptions") @Nullable Integer maxParseExceptions,
@JsonProperty("maxSavedParseExceptions") @Nullable Integer maxSavedParseExceptions
)
{
super(
appendableIndexSpec,
maxRowsInMemory,
maxBytesInMemory,
maxRowsPerSegment,
maxTotalRows,
intermediatePersistPeriod,
basePersistDirectory,
maxPendingPersists,
indexSpec,
indexSpecForIntermediatePersists,
true,
reportParseExceptions,
handoffConditionTimeout,
resetOffsetAutomatically,
segmentWriteOutMediumFactory,
intermediateHandoffPeriod,
logParseExceptions,
maxParseExceptions,
maxSavedParseExceptions
);
this.workerThreads = workerThreads;
this.chatThreads = chatThreads;
this.chatRetries = (chatRetries != null ? chatRetries : DEFAULT_CHAT_RETRIES);
this.httpTimeout = SeekableStreamSupervisorTuningConfig.defaultDuration(httpTimeout, DEFAULT_HTTP_TIMEOUT);
this.shutdownTimeout = SeekableStreamSupervisorTuningConfig.defaultDuration(
shutdownTimeout,
DEFAULT_SHUTDOWN_TIMEOUT
);
this.offsetFetchPeriod = SeekableStreamSupervisorTuningConfig.defaultDuration(
offsetFetchPeriod,
DEFAULT_OFFSET_FETCH_PERIOD
);
}
@Override
@JsonProperty
public Integer getWorkerThreads()
{
return workerThreads;
}
@Override
@JsonProperty
public Integer getChatThreads()
{
return chatThreads;
}
@Override
@JsonProperty
public Long getChatRetries()
{
return chatRetries;
}
@Override
@JsonProperty
public Duration getHttpTimeout()
{
return httpTimeout;
}
@Override
@JsonProperty
public Duration getShutdownTimeout()
{
return shutdownTimeout;
}
@Override
public Duration getRepartitionTransitionDuration()
{
// Stopping tasks early for Kafka ingestion on partition set change is not supported yet,
// just return a default for now.
return SeekableStreamSupervisorTuningConfig.defaultDuration(
null,
SeekableStreamSupervisorTuningConfig.DEFAULT_REPARTITION_TRANSITION_DURATION
);
}
@Override
@JsonProperty
public Duration getOffsetFetchPeriod()
{
return offsetFetchPeriod;
}
@Override
public String toString()
{
return "KafkaSupervisorTuningConfig{" +
"maxRowsInMemory=" + getMaxRowsInMemory() +
", maxRowsPerSegment=" + getMaxRowsPerSegment() +
", maxTotalRows=" + getMaxTotalRows() +
", maxBytesInMemory=" + getMaxBytesInMemoryOrDefault() +
", intermediatePersistPeriod=" + getIntermediatePersistPeriod() +
", basePersistDirectory=" + getBasePersistDirectory() +
", maxPendingPersists=" + getMaxPendingPersists() +
", indexSpec=" + getIndexSpec() +
", reportParseExceptions=" + isReportParseExceptions() +
", handoffConditionTimeout=" + getHandoffConditionTimeout() +
", resetOffsetAutomatically=" + isResetOffsetAutomatically() +
", segmentWriteOutMediumFactory=" + getSegmentWriteOutMediumFactory() +
", workerThreads=" + workerThreads +
", chatThreads=" + chatThreads +
", chatRetries=" + chatRetries +
", httpTimeout=" + httpTimeout +
", shutdownTimeout=" + shutdownTimeout +
", offsetFetchPeriod=" + offsetFetchPeriod +
", intermediateHandoffPeriod=" + getIntermediateHandoffPeriod() +
", logParseExceptions=" + isLogParseExceptions() +
", maxParseExceptions=" + getMaxParseExceptions() +
", maxSavedParseExceptions=" + getMaxSavedParseExceptions() +
'}';
}
@Override
public KafkaIndexTaskTuningConfig convertToTaskTuningConfig()
{
return new KafkaIndexTaskTuningConfig(
getAppendableIndexSpec(),
getMaxRowsInMemory(),
getMaxBytesInMemory(),
getMaxRowsPerSegment(),
getMaxTotalRows(),
getIntermediatePersistPeriod(),
getBasePersistDirectory(),
getMaxPendingPersists(),
getIndexSpec(),
getIndexSpecForIntermediatePersists(),
true,
isReportParseExceptions(),
getHandoffConditionTimeout(),
isResetOffsetAutomatically(),
getSegmentWriteOutMediumFactory(),
getIntermediateHandoffPeriod(),
isLogParseExceptions(),
getMaxParseExceptions(),
getMaxSavedParseExceptions()
);
}
}