| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| package org.apache.druid.indexing.seekablestream; |
| |
| import com.fasterxml.jackson.annotation.JsonProperty; |
| import org.apache.druid.indexer.partitions.DynamicPartitionsSpec; |
| import org.apache.druid.segment.IndexSpec; |
| import org.apache.druid.segment.incremental.AppendableIndexSpec; |
| import org.apache.druid.segment.indexing.RealtimeTuningConfig; |
| import org.apache.druid.segment.indexing.TuningConfig; |
| import org.apache.druid.segment.realtime.appenderator.AppenderatorConfig; |
| import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory; |
| import org.joda.time.Period; |
| |
| import javax.annotation.Nullable; |
| import java.io.File; |
| import java.util.Objects; |
| |
| public abstract class SeekableStreamIndexTaskTuningConfig implements AppenderatorConfig |
| { |
| private static final boolean DEFAULT_RESET_OFFSET_AUTOMATICALLY = false; |
| private static final boolean DEFAULT_SKIP_SEQUENCE_NUMBER_AVAILABILITY_CHECK = false; |
| |
| private final AppendableIndexSpec appendableIndexSpec; |
| private final int maxRowsInMemory; |
| private final long maxBytesInMemory; |
| private final DynamicPartitionsSpec partitionsSpec; |
| private final Period intermediatePersistPeriod; |
| private final File basePersistDirectory; |
| @Deprecated |
| private final int maxPendingPersists; |
| private final IndexSpec indexSpec; |
| private final IndexSpec indexSpecForIntermediatePersists; |
| private final boolean reportParseExceptions; |
| private final long handoffConditionTimeout; |
| private final boolean resetOffsetAutomatically; |
| @Nullable |
| private final SegmentWriteOutMediumFactory segmentWriteOutMediumFactory; |
| private final Period intermediateHandoffPeriod; |
| private final boolean skipSequenceNumberAvailabilityCheck; |
| |
| private final boolean logParseExceptions; |
| private final int maxParseExceptions; |
| private final int maxSavedParseExceptions; |
| |
| public SeekableStreamIndexTaskTuningConfig( |
| @Nullable AppendableIndexSpec appendableIndexSpec, |
| @Nullable Integer maxRowsInMemory, |
| @Nullable Long maxBytesInMemory, |
| @Nullable Integer maxRowsPerSegment, |
| @Nullable Long maxTotalRows, |
| @Nullable Period intermediatePersistPeriod, |
| @Nullable File basePersistDirectory, |
| @Nullable Integer maxPendingPersists, |
| @Nullable IndexSpec indexSpec, |
| @Nullable IndexSpec indexSpecForIntermediatePersists, |
| // This parameter is left for compatibility when reading existing configs, to be removed in Druid 0.12. |
| @Deprecated @JsonProperty("buildV9Directly") @Nullable Boolean buildV9Directly, |
| @Deprecated @Nullable Boolean reportParseExceptions, |
| @Nullable Long handoffConditionTimeout, |
| @Nullable Boolean resetOffsetAutomatically, |
| Boolean skipSequenceNumberAvailabilityCheck, |
| @Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory, |
| @Nullable Period intermediateHandoffPeriod, |
| @Nullable Boolean logParseExceptions, |
| @Nullable Integer maxParseExceptions, |
| @Nullable Integer maxSavedParseExceptions |
| ) |
| { |
| // Cannot be a static because default basePersistDirectory is unique per-instance |
| final RealtimeTuningConfig defaults = RealtimeTuningConfig.makeDefaultTuningConfig(basePersistDirectory); |
| |
| this.appendableIndexSpec = appendableIndexSpec == null ? DEFAULT_APPENDABLE_INDEX : appendableIndexSpec; |
| this.maxRowsInMemory = maxRowsInMemory == null ? defaults.getMaxRowsInMemory() : maxRowsInMemory; |
| this.partitionsSpec = new DynamicPartitionsSpec(maxRowsPerSegment, maxTotalRows); |
| // initializing this to 0, it will be lazily initialized to a value |
| // @see #getMaxBytesInMemoryOrDefault() |
| this.maxBytesInMemory = maxBytesInMemory == null ? 0 : maxBytesInMemory; |
| this.intermediatePersistPeriod = intermediatePersistPeriod == null |
| ? defaults.getIntermediatePersistPeriod() |
| : intermediatePersistPeriod; |
| this.basePersistDirectory = defaults.getBasePersistDirectory(); |
| this.maxPendingPersists = maxPendingPersists == null ? 0 : maxPendingPersists; |
| this.indexSpec = indexSpec == null ? defaults.getIndexSpec() : indexSpec; |
| this.indexSpecForIntermediatePersists = indexSpecForIntermediatePersists == null ? |
| this.indexSpec : indexSpecForIntermediatePersists; |
| this.reportParseExceptions = reportParseExceptions == null |
| ? defaults.isReportParseExceptions() |
| : reportParseExceptions; |
| this.handoffConditionTimeout = handoffConditionTimeout == null |
| ? defaults.getHandoffConditionTimeout() |
| : handoffConditionTimeout; |
| this.resetOffsetAutomatically = resetOffsetAutomatically == null |
| ? DEFAULT_RESET_OFFSET_AUTOMATICALLY |
| : resetOffsetAutomatically; |
| this.segmentWriteOutMediumFactory = segmentWriteOutMediumFactory; |
| this.intermediateHandoffPeriod = intermediateHandoffPeriod == null |
| ? new Period().withDays(Integer.MAX_VALUE) |
| : intermediateHandoffPeriod; |
| this.skipSequenceNumberAvailabilityCheck = skipSequenceNumberAvailabilityCheck == null |
| ? DEFAULT_SKIP_SEQUENCE_NUMBER_AVAILABILITY_CHECK |
| : skipSequenceNumberAvailabilityCheck; |
| |
| if (this.reportParseExceptions) { |
| this.maxParseExceptions = 0; |
| this.maxSavedParseExceptions = maxSavedParseExceptions == null ? 0 : Math.min(1, maxSavedParseExceptions); |
| } else { |
| this.maxParseExceptions = maxParseExceptions == null |
| ? TuningConfig.DEFAULT_MAX_PARSE_EXCEPTIONS |
| : maxParseExceptions; |
| this.maxSavedParseExceptions = maxSavedParseExceptions == null |
| ? TuningConfig.DEFAULT_MAX_SAVED_PARSE_EXCEPTIONS |
| : maxSavedParseExceptions; |
| } |
| this.logParseExceptions = logParseExceptions == null |
| ? TuningConfig.DEFAULT_LOG_PARSE_EXCEPTIONS |
| : logParseExceptions; |
| } |
| |
| @Override |
| @JsonProperty |
| public AppendableIndexSpec getAppendableIndexSpec() |
| { |
| return appendableIndexSpec; |
| } |
| |
| @Override |
| @JsonProperty |
| public int getMaxRowsInMemory() |
| { |
| return maxRowsInMemory; |
| } |
| |
| @Override |
| @JsonProperty |
| public long getMaxBytesInMemory() |
| { |
| return maxBytesInMemory; |
| } |
| |
| @Override |
| @JsonProperty |
| public Integer getMaxRowsPerSegment() |
| { |
| return partitionsSpec.getMaxRowsPerSegment(); |
| } |
| |
| @JsonProperty |
| @Override |
| @Nullable |
| public Long getMaxTotalRows() |
| { |
| return partitionsSpec.getMaxTotalRows(); |
| } |
| |
| @Override |
| public DynamicPartitionsSpec getPartitionsSpec() |
| { |
| return partitionsSpec; |
| } |
| |
| @Override |
| @JsonProperty |
| public Period getIntermediatePersistPeriod() |
| { |
| return intermediatePersistPeriod; |
| } |
| |
| @Override |
| @JsonProperty |
| public File getBasePersistDirectory() |
| { |
| return basePersistDirectory; |
| } |
| |
| @Override |
| @JsonProperty |
| @Deprecated |
| public int getMaxPendingPersists() |
| { |
| return maxPendingPersists; |
| } |
| |
| @Override |
| @JsonProperty |
| public IndexSpec getIndexSpec() |
| { |
| return indexSpec; |
| } |
| |
| @JsonProperty |
| @Override |
| public IndexSpec getIndexSpecForIntermediatePersists() |
| { |
| return indexSpecForIntermediatePersists; |
| } |
| |
| /** |
| * Always returns true, doesn't affect the version being built. |
| */ |
| @Deprecated |
| @JsonProperty |
| public boolean getBuildV9Directly() |
| { |
| return true; |
| } |
| |
| @Override |
| @JsonProperty |
| public boolean isReportParseExceptions() |
| { |
| return reportParseExceptions; |
| } |
| |
| @JsonProperty |
| public long getHandoffConditionTimeout() |
| { |
| return handoffConditionTimeout; |
| } |
| |
| @JsonProperty |
| public boolean isResetOffsetAutomatically() |
| { |
| return resetOffsetAutomatically; |
| } |
| |
| @Override |
| @JsonProperty |
| @Nullable |
| public SegmentWriteOutMediumFactory getSegmentWriteOutMediumFactory() |
| { |
| return segmentWriteOutMediumFactory; |
| } |
| |
| @JsonProperty |
| public Period getIntermediateHandoffPeriod() |
| { |
| return intermediateHandoffPeriod; |
| } |
| |
| @JsonProperty |
| public boolean isLogParseExceptions() |
| { |
| return logParseExceptions; |
| } |
| |
| @JsonProperty |
| public int getMaxParseExceptions() |
| { |
| return maxParseExceptions; |
| } |
| |
| @JsonProperty |
| public int getMaxSavedParseExceptions() |
| { |
| return maxSavedParseExceptions; |
| } |
| |
| @JsonProperty |
| public boolean isSkipSequenceNumberAvailabilityCheck() |
| { |
| return skipSequenceNumberAvailabilityCheck; |
| } |
| |
| @Override |
| public abstract SeekableStreamIndexTaskTuningConfig withBasePersistDirectory(File dir); |
| |
| @Override |
| public boolean equals(Object o) |
| { |
| if (this == o) { |
| return true; |
| } |
| if (o == null || getClass() != o.getClass()) { |
| return false; |
| } |
| SeekableStreamIndexTaskTuningConfig that = (SeekableStreamIndexTaskTuningConfig) o; |
| return Objects.equals(appendableIndexSpec, that.appendableIndexSpec) && |
| maxRowsInMemory == that.maxRowsInMemory && |
| maxBytesInMemory == that.maxBytesInMemory && |
| maxPendingPersists == that.maxPendingPersists && |
| reportParseExceptions == that.reportParseExceptions && |
| handoffConditionTimeout == that.handoffConditionTimeout && |
| resetOffsetAutomatically == that.resetOffsetAutomatically && |
| skipSequenceNumberAvailabilityCheck == that.skipSequenceNumberAvailabilityCheck && |
| logParseExceptions == that.logParseExceptions && |
| maxParseExceptions == that.maxParseExceptions && |
| maxSavedParseExceptions == that.maxSavedParseExceptions && |
| Objects.equals(partitionsSpec, that.partitionsSpec) && |
| Objects.equals(intermediatePersistPeriod, that.intermediatePersistPeriod) && |
| Objects.equals(basePersistDirectory, that.basePersistDirectory) && |
| Objects.equals(indexSpec, that.indexSpec) && |
| Objects.equals(indexSpecForIntermediatePersists, that.indexSpecForIntermediatePersists) && |
| Objects.equals(segmentWriteOutMediumFactory, that.segmentWriteOutMediumFactory) && |
| Objects.equals(intermediateHandoffPeriod, that.intermediateHandoffPeriod); |
| } |
| |
| @Override |
| public int hashCode() |
| { |
| return Objects.hash( |
| appendableIndexSpec, |
| maxRowsInMemory, |
| maxBytesInMemory, |
| partitionsSpec, |
| intermediatePersistPeriod, |
| basePersistDirectory, |
| maxPendingPersists, |
| indexSpec, |
| indexSpecForIntermediatePersists, |
| reportParseExceptions, |
| handoffConditionTimeout, |
| resetOffsetAutomatically, |
| segmentWriteOutMediumFactory, |
| intermediateHandoffPeriod, |
| skipSequenceNumberAvailabilityCheck, |
| logParseExceptions, |
| maxParseExceptions, |
| maxSavedParseExceptions |
| ); |
| } |
| |
| @Override |
| public abstract String toString(); |
| } |