blob: f210ca69760718c189458d0a96769f00b24d3059 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexing.kinesis.supervisor;
import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.inject.name.Named;
import org.apache.druid.common.aws.AWSCredentialsConfig;
import org.apache.druid.guice.annotations.Json;
import org.apache.druid.indexing.kinesis.KinesisIndexTaskClientFactory;
import org.apache.druid.indexing.kinesis.KinesisIndexingServiceModule;
import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
import org.apache.druid.indexing.overlord.TaskMaster;
import org.apache.druid.indexing.overlord.TaskStorage;
import org.apache.druid.indexing.overlord.supervisor.Supervisor;
import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManagerConfig;
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorSpec;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.segment.incremental.RowIngestionMetersFactory;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.server.metrics.DruidMonitorSchedulerConfig;
import javax.annotation.Nullable;
import java.util.Map;
public class KinesisSupervisorSpec extends SeekableStreamSupervisorSpec
{
private static final String SUPERVISOR_TYPE = "kinesis";
private final AWSCredentialsConfig awsCredentialsConfig;
@JsonCreator
public KinesisSupervisorSpec(
@JsonProperty("spec") @Nullable KinesisSupervisorIngestionSpec ingestionSchema,
@JsonProperty("dataSchema") @Nullable DataSchema dataSchema,
@JsonProperty("tuningConfig") @Nullable KinesisSupervisorTuningConfig tuningConfig,
@JsonProperty("ioConfig") @Nullable KinesisSupervisorIOConfig ioConfig,
@JsonProperty("context") Map<String, Object> context,
@JsonProperty("suspended") Boolean suspended,
@JacksonInject TaskStorage taskStorage,
@JacksonInject TaskMaster taskMaster,
@JacksonInject IndexerMetadataStorageCoordinator indexerMetadataStorageCoordinator,
@JacksonInject KinesisIndexTaskClientFactory kinesisIndexTaskClientFactory,
@JacksonInject @Json ObjectMapper mapper,
@JacksonInject ServiceEmitter emitter,
@JacksonInject DruidMonitorSchedulerConfig monitorSchedulerConfig,
@JacksonInject RowIngestionMetersFactory rowIngestionMetersFactory,
@JacksonInject @Named(KinesisIndexingServiceModule.AWS_SCOPE) AWSCredentialsConfig awsCredentialsConfig,
@JacksonInject SupervisorStateManagerConfig supervisorStateManagerConfig
)
{
super(
ingestionSchema != null
? ingestionSchema
: new KinesisSupervisorIngestionSpec(
dataSchema,
ioConfig,
tuningConfig != null
? tuningConfig
: KinesisSupervisorTuningConfig.defaultConfig()
),
context,
suspended,
taskStorage,
taskMaster,
indexerMetadataStorageCoordinator,
kinesisIndexTaskClientFactory,
mapper,
emitter,
monitorSchedulerConfig,
rowIngestionMetersFactory,
supervisorStateManagerConfig
);
this.awsCredentialsConfig = awsCredentialsConfig;
}
@Override
public Supervisor createSupervisor()
{
return new KinesisSupervisor(
taskStorage,
taskMaster,
indexerMetadataStorageCoordinator,
(KinesisIndexTaskClientFactory) indexTaskClientFactory,
mapper,
this,
rowIngestionMetersFactory,
awsCredentialsConfig
);
}
@Override
public String getType()
{
return SUPERVISOR_TYPE;
}
@Override
public String getSource()
{
return getIoConfig() != null ? getIoConfig().getStream() : null;
}
@Override
public String toString()
{
return "KinesisSupervisorSpec{" +
"dataSchema=" + getDataSchema() +
", tuningConfig=" + getTuningConfig() +
", ioConfig=" + getIoConfig() +
", suspended=" + isSuspended() +
", context=" + getContext() +
'}';
}
@Override
@Deprecated
@JsonProperty
public KinesisSupervisorTuningConfig getTuningConfig()
{
return (KinesisSupervisorTuningConfig) super.getTuningConfig();
}
@Override
@Deprecated
@JsonProperty
public KinesisSupervisorIOConfig getIoConfig()
{
return (KinesisSupervisorIOConfig) super.getIoConfig();
}
@Override
@JsonProperty
public KinesisSupervisorIngestionSpec getSpec()
{
return (KinesisSupervisorIngestionSpec) super.getSpec();
}
@Override
protected KinesisSupervisorSpec toggleSuspend(boolean suspend)
{
return new KinesisSupervisorSpec(
getSpec(),
getDataSchema(),
getTuningConfig(),
getIoConfig(),
getContext(),
suspend,
taskStorage,
taskMaster,
indexerMetadataStorageCoordinator,
(KinesisIndexTaskClientFactory) indexTaskClientFactory,
mapper,
emitter,
monitorSchedulerConfig,
rowIngestionMetersFactory,
awsCredentialsConfig,
supervisorStateManagerConfig
);
}
}