blob: cf6a7f4b03a658a198420bcf6db46bf2bbd71b62 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexing.seekablestream;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import org.apache.druid.data.input.InputFormat;
import org.apache.druid.segment.indexing.IOConfig;
import org.joda.time.DateTime;
import javax.annotation.Nullable;
public abstract class SeekableStreamIndexTaskIOConfig<PartitionIdType, SequenceOffsetType> implements IOConfig
{
private static final boolean DEFAULT_USE_TRANSACTION = true;
@Nullable
private final Integer taskGroupId;
private final String baseSequenceName;
private final SeekableStreamStartSequenceNumbers<PartitionIdType, SequenceOffsetType> startSequenceNumbers;
private final SeekableStreamEndSequenceNumbers<PartitionIdType, SequenceOffsetType> endSequenceNumbers;
private final boolean useTransaction;
private final Optional<DateTime> minimumMessageTime;
private final Optional<DateTime> maximumMessageTime;
private final InputFormat inputFormat;
public SeekableStreamIndexTaskIOConfig(
@Nullable final Integer taskGroupId, // can be null for backward compabitility
final String baseSequenceName,
final SeekableStreamStartSequenceNumbers<PartitionIdType, SequenceOffsetType> startSequenceNumbers,
final SeekableStreamEndSequenceNumbers<PartitionIdType, SequenceOffsetType> endSequenceNumbers,
final Boolean useTransaction,
final DateTime minimumMessageTime,
final DateTime maximumMessageTime,
@Nullable final InputFormat inputFormat
)
{
this.taskGroupId = taskGroupId;
this.baseSequenceName = Preconditions.checkNotNull(baseSequenceName, "baseSequenceName");
this.startSequenceNumbers = Preconditions.checkNotNull(startSequenceNumbers, "startSequenceNumbers");
this.endSequenceNumbers = Preconditions.checkNotNull(endSequenceNumbers, "endSequenceNumbers");
this.useTransaction = useTransaction != null ? useTransaction : DEFAULT_USE_TRANSACTION;
this.minimumMessageTime = Optional.fromNullable(minimumMessageTime);
this.maximumMessageTime = Optional.fromNullable(maximumMessageTime);
this.inputFormat = inputFormat;
Preconditions.checkArgument(
startSequenceNumbers.getStream().equals(endSequenceNumbers.getStream()),
"start topic/stream and end topic/stream must match"
);
Preconditions.checkArgument(
startSequenceNumbers.getPartitionSequenceNumberMap()
.keySet()
.equals(endSequenceNumbers.getPartitionSequenceNumberMap().keySet()),
"start partition set and end partition set must match"
);
}
@Nullable
@JsonProperty
public Integer getTaskGroupId()
{
return taskGroupId;
}
@JsonProperty
public String getBaseSequenceName()
{
return baseSequenceName;
}
@JsonProperty
public SeekableStreamStartSequenceNumbers<PartitionIdType, SequenceOffsetType> getStartSequenceNumbers()
{
return startSequenceNumbers;
}
@JsonProperty
public SeekableStreamEndSequenceNumbers<PartitionIdType, SequenceOffsetType> getEndSequenceNumbers()
{
return endSequenceNumbers;
}
@JsonProperty
public boolean isUseTransaction()
{
return useTransaction;
}
@JsonProperty
public Optional<DateTime> getMaximumMessageTime()
{
return maximumMessageTime;
}
@JsonProperty
public Optional<DateTime> getMinimumMessageTime()
{
return minimumMessageTime;
}
@Nullable
@JsonProperty("inputFormat")
private InputFormat getGivenInputFormat()
{
return inputFormat;
}
@Nullable
public InputFormat getInputFormat()
{
return inputFormat;
}
}