blob: 6901fafd79e7b76afdfcf8d685f7709afa10e312 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.segment.indexing;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import org.apache.druid.indexer.partitions.PartitionsSpec;
import org.apache.druid.java.util.common.FileUtils;
import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.incremental.AppendableIndexSpec;
import org.apache.druid.segment.realtime.appenderator.AppenderatorConfig;
import org.apache.druid.segment.realtime.plumber.IntervalStartVersioningPolicy;
import org.apache.druid.segment.realtime.plumber.RejectionPolicyFactory;
import org.apache.druid.segment.realtime.plumber.ServerTimeRejectionPolicyFactory;
import org.apache.druid.segment.realtime.plumber.VersioningPolicy;
import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory;
import org.apache.druid.timeline.partition.NumberedShardSpec;
import org.apache.druid.timeline.partition.ShardSpec;
import org.joda.time.Period;
import javax.annotation.Nullable;
import java.io.File;
/**
*
*/
public class RealtimeTuningConfig implements AppenderatorConfig
{
private static final Period DEFAULT_INTERMEDIATE_PERSIST_PERIOD = new Period("PT10M");
private static final Period DEFAULT_WINDOW_PERIOD = new Period("PT10M");
private static final VersioningPolicy DEFAULT_VERSIONING_POLICY = new IntervalStartVersioningPolicy();
private static final RejectionPolicyFactory DEFAULT_REJECTION_POLICY_FACTORY = new ServerTimeRejectionPolicyFactory();
private static final int DEFAULT_MAX_PENDING_PERSISTS = 0;
private static final ShardSpec DEFAULT_SHARD_SPEC = new NumberedShardSpec(0, 1);
private static final IndexSpec DEFAULT_INDEX_SPEC = new IndexSpec();
private static final Boolean DEFAULT_REPORT_PARSE_EXCEPTIONS = Boolean.FALSE;
private static final long DEFAULT_HANDOFF_CONDITION_TIMEOUT = 0;
private static final long DEFAULT_ALERT_TIMEOUT = 0;
private static final String DEFAULT_DEDUP_COLUMN = null;
private static File createNewBasePersistDirectory()
{
return FileUtils.createTempDir("druid-realtime-persist");
}
// Might make sense for this to be a builder
public static RealtimeTuningConfig makeDefaultTuningConfig(final @Nullable File basePersistDirectory)
{
return new RealtimeTuningConfig(
DEFAULT_APPENDABLE_INDEX,
DEFAULT_MAX_ROWS_IN_MEMORY,
0L,
DEFAULT_SKIP_BYTES_IN_MEMORY_OVERHEAD_CHECK,
DEFAULT_INTERMEDIATE_PERSIST_PERIOD,
DEFAULT_WINDOW_PERIOD,
basePersistDirectory == null ? createNewBasePersistDirectory() : basePersistDirectory,
DEFAULT_VERSIONING_POLICY,
DEFAULT_REJECTION_POLICY_FACTORY,
DEFAULT_MAX_PENDING_PERSISTS,
DEFAULT_SHARD_SPEC,
DEFAULT_INDEX_SPEC,
DEFAULT_INDEX_SPEC,
0,
0,
DEFAULT_REPORT_PARSE_EXCEPTIONS,
DEFAULT_HANDOFF_CONDITION_TIMEOUT,
DEFAULT_ALERT_TIMEOUT,
null,
DEFAULT_DEDUP_COLUMN
);
}
private final AppendableIndexSpec appendableIndexSpec;
private final int maxRowsInMemory;
private final long maxBytesInMemory;
private final boolean skipBytesInMemoryOverheadCheck;
private final Period intermediatePersistPeriod;
private final Period windowPeriod;
private final File basePersistDirectory;
private final VersioningPolicy versioningPolicy;
private final RejectionPolicyFactory rejectionPolicyFactory;
private final int maxPendingPersists;
private final ShardSpec shardSpec;
private final IndexSpec indexSpec;
private final IndexSpec indexSpecForIntermediatePersists;
private final int persistThreadPriority;
private final int mergeThreadPriority;
private final boolean reportParseExceptions;
private final long handoffConditionTimeout;
private final long alertTimeout;
@Nullable
private final SegmentWriteOutMediumFactory segmentWriteOutMediumFactory;
@Nullable
private final String dedupColumn;
@JsonCreator
public RealtimeTuningConfig(
@JsonProperty("appendableIndexSpec") @Nullable AppendableIndexSpec appendableIndexSpec,
@JsonProperty("maxRowsInMemory") Integer maxRowsInMemory,
@JsonProperty("maxBytesInMemory") Long maxBytesInMemory,
@JsonProperty("skipBytesInMemoryOverheadCheck") @Nullable Boolean skipBytesInMemoryOverheadCheck,
@JsonProperty("intermediatePersistPeriod") Period intermediatePersistPeriod,
@JsonProperty("windowPeriod") Period windowPeriod,
@JsonProperty("basePersistDirectory") File basePersistDirectory,
@JsonProperty("versioningPolicy") VersioningPolicy versioningPolicy,
@JsonProperty("rejectionPolicy") RejectionPolicyFactory rejectionPolicyFactory,
@JsonProperty("maxPendingPersists") Integer maxPendingPersists,
@JsonProperty("shardSpec") ShardSpec shardSpec,
@JsonProperty("indexSpec") IndexSpec indexSpec,
@JsonProperty("indexSpecForIntermediatePersists") @Nullable IndexSpec indexSpecForIntermediatePersists,
@JsonProperty("persistThreadPriority") int persistThreadPriority,
@JsonProperty("mergeThreadPriority") int mergeThreadPriority,
@JsonProperty("reportParseExceptions") Boolean reportParseExceptions,
@JsonProperty("handoffConditionTimeout") Long handoffConditionTimeout,
@JsonProperty("alertTimeout") Long alertTimeout,
@JsonProperty("segmentWriteOutMediumFactory") @Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory,
@JsonProperty("dedupColumn") @Nullable String dedupColumn
)
{
this.appendableIndexSpec = appendableIndexSpec == null ? DEFAULT_APPENDABLE_INDEX : appendableIndexSpec;
this.maxRowsInMemory = maxRowsInMemory == null ? DEFAULT_MAX_ROWS_IN_MEMORY : maxRowsInMemory;
// initializing this to 0, it will be lazily initialized to a value
// @see #getMaxBytesInMemoryOrDefault()
this.maxBytesInMemory = maxBytesInMemory == null ? 0 : maxBytesInMemory;
this.skipBytesInMemoryOverheadCheck = skipBytesInMemoryOverheadCheck == null ?
DEFAULT_SKIP_BYTES_IN_MEMORY_OVERHEAD_CHECK : skipBytesInMemoryOverheadCheck;
this.intermediatePersistPeriod = intermediatePersistPeriod == null
? DEFAULT_INTERMEDIATE_PERSIST_PERIOD
: intermediatePersistPeriod;
this.windowPeriod = windowPeriod == null ? DEFAULT_WINDOW_PERIOD : windowPeriod;
this.basePersistDirectory = basePersistDirectory == null ? createNewBasePersistDirectory() : basePersistDirectory;
this.versioningPolicy = versioningPolicy == null ? DEFAULT_VERSIONING_POLICY : versioningPolicy;
this.rejectionPolicyFactory = rejectionPolicyFactory == null
? DEFAULT_REJECTION_POLICY_FACTORY
: rejectionPolicyFactory;
this.maxPendingPersists = maxPendingPersists == null ? DEFAULT_MAX_PENDING_PERSISTS : maxPendingPersists;
this.shardSpec = shardSpec == null ? DEFAULT_SHARD_SPEC : shardSpec;
this.indexSpec = indexSpec == null ? DEFAULT_INDEX_SPEC : indexSpec;
this.indexSpecForIntermediatePersists = indexSpecForIntermediatePersists == null ?
this.indexSpec : indexSpecForIntermediatePersists;
this.mergeThreadPriority = mergeThreadPriority;
this.persistThreadPriority = persistThreadPriority;
this.reportParseExceptions = reportParseExceptions == null
? DEFAULT_REPORT_PARSE_EXCEPTIONS
: reportParseExceptions;
this.handoffConditionTimeout = handoffConditionTimeout == null
? DEFAULT_HANDOFF_CONDITION_TIMEOUT
: handoffConditionTimeout;
Preconditions.checkArgument(this.handoffConditionTimeout >= 0, "handoffConditionTimeout must be >= 0");
this.alertTimeout = alertTimeout == null ? DEFAULT_ALERT_TIMEOUT : alertTimeout;
Preconditions.checkArgument(this.alertTimeout >= 0, "alertTimeout must be >= 0");
this.segmentWriteOutMediumFactory = segmentWriteOutMediumFactory;
this.dedupColumn = dedupColumn == null ? DEFAULT_DEDUP_COLUMN : dedupColumn;
}
@Override
@JsonProperty
public AppendableIndexSpec getAppendableIndexSpec()
{
return appendableIndexSpec;
}
@Override
@JsonProperty
public int getMaxRowsInMemory()
{
return maxRowsInMemory;
}
@Override
@JsonProperty
public long getMaxBytesInMemory()
{
return maxBytesInMemory;
}
@JsonProperty
@Override
public boolean isSkipBytesInMemoryOverheadCheck()
{
return skipBytesInMemoryOverheadCheck;
}
@Override
@JsonProperty
public Period getIntermediatePersistPeriod()
{
return intermediatePersistPeriod;
}
@JsonProperty
public Period getWindowPeriod()
{
return windowPeriod;
}
@Override
@JsonProperty
public File getBasePersistDirectory()
{
return basePersistDirectory;
}
@JsonProperty
public VersioningPolicy getVersioningPolicy()
{
return versioningPolicy;
}
@JsonProperty("rejectionPolicy")
public RejectionPolicyFactory getRejectionPolicyFactory()
{
return rejectionPolicyFactory;
}
@Override
@JsonProperty
public int getMaxPendingPersists()
{
return maxPendingPersists;
}
@Override
public PartitionsSpec getPartitionsSpec()
{
throw new UnsupportedOperationException();
}
@JsonProperty
public ShardSpec getShardSpec()
{
return shardSpec;
}
@Override
@JsonProperty
public IndexSpec getIndexSpec()
{
return indexSpec;
}
@JsonProperty
@Override
public IndexSpec getIndexSpecForIntermediatePersists()
{
return indexSpecForIntermediatePersists;
}
@JsonProperty
public int getPersistThreadPriority()
{
return this.persistThreadPriority;
}
@JsonProperty
public int getMergeThreadPriority()
{
return this.mergeThreadPriority;
}
@Override
@JsonProperty
public boolean isReportParseExceptions()
{
return reportParseExceptions;
}
@JsonProperty
public long getHandoffConditionTimeout()
{
return handoffConditionTimeout;
}
@JsonProperty
public long getAlertTimeout()
{
return alertTimeout;
}
@Override
@JsonProperty
@Nullable
public SegmentWriteOutMediumFactory getSegmentWriteOutMediumFactory()
{
return segmentWriteOutMediumFactory;
}
@JsonProperty
@Nullable
public String getDedupColumn()
{
return dedupColumn;
}
public RealtimeTuningConfig withVersioningPolicy(VersioningPolicy policy)
{
return new RealtimeTuningConfig(
appendableIndexSpec,
maxRowsInMemory,
maxBytesInMemory,
skipBytesInMemoryOverheadCheck,
intermediatePersistPeriod,
windowPeriod,
basePersistDirectory,
policy,
rejectionPolicyFactory,
maxPendingPersists,
shardSpec,
indexSpec,
indexSpecForIntermediatePersists,
persistThreadPriority,
mergeThreadPriority,
reportParseExceptions,
handoffConditionTimeout,
alertTimeout,
segmentWriteOutMediumFactory,
dedupColumn
);
}
@Override
public RealtimeTuningConfig withBasePersistDirectory(File dir)
{
return new RealtimeTuningConfig(
appendableIndexSpec,
maxRowsInMemory,
maxBytesInMemory,
skipBytesInMemoryOverheadCheck,
intermediatePersistPeriod,
windowPeriod,
dir,
versioningPolicy,
rejectionPolicyFactory,
maxPendingPersists,
shardSpec,
indexSpec,
indexSpecForIntermediatePersists,
persistThreadPriority,
mergeThreadPriority,
reportParseExceptions,
handoffConditionTimeout,
alertTimeout,
segmentWriteOutMediumFactory,
dedupColumn
);
}
}