| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| package org.apache.druid.indexing.kinesis.supervisor; |
| |
| import com.fasterxml.jackson.annotation.JacksonInject; |
| import com.fasterxml.jackson.annotation.JsonCreator; |
| import com.fasterxml.jackson.annotation.JsonProperty; |
| import com.fasterxml.jackson.databind.ObjectMapper; |
| import com.google.inject.name.Named; |
| import org.apache.druid.common.aws.AWSCredentialsConfig; |
| import org.apache.druid.guice.annotations.Json; |
| import org.apache.druid.indexing.kinesis.KinesisIndexTaskClientFactory; |
| import org.apache.druid.indexing.kinesis.KinesisIndexingServiceModule; |
| import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator; |
| import org.apache.druid.indexing.overlord.TaskMaster; |
| import org.apache.druid.indexing.overlord.TaskStorage; |
| import org.apache.druid.indexing.overlord.supervisor.Supervisor; |
| import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManagerConfig; |
| import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorSpec; |
| import org.apache.druid.java.util.emitter.service.ServiceEmitter; |
| import org.apache.druid.segment.incremental.RowIngestionMetersFactory; |
| import org.apache.druid.segment.indexing.DataSchema; |
| import org.apache.druid.server.metrics.DruidMonitorSchedulerConfig; |
| |
| import javax.annotation.Nullable; |
| import java.util.Map; |
| |
| public class KinesisSupervisorSpec extends SeekableStreamSupervisorSpec |
| { |
| private static final String SUPERVISOR_TYPE = "kinesis"; |
| private final AWSCredentialsConfig awsCredentialsConfig; |
| |
| @JsonCreator |
| public KinesisSupervisorSpec( |
| @JsonProperty("spec") @Nullable KinesisSupervisorIngestionSpec ingestionSchema, |
| @JsonProperty("dataSchema") @Nullable DataSchema dataSchema, |
| @JsonProperty("tuningConfig") @Nullable KinesisSupervisorTuningConfig tuningConfig, |
| @JsonProperty("ioConfig") @Nullable KinesisSupervisorIOConfig ioConfig, |
| @JsonProperty("context") Map<String, Object> context, |
| @JsonProperty("suspended") Boolean suspended, |
| @JacksonInject TaskStorage taskStorage, |
| @JacksonInject TaskMaster taskMaster, |
| @JacksonInject IndexerMetadataStorageCoordinator indexerMetadataStorageCoordinator, |
| @JacksonInject KinesisIndexTaskClientFactory kinesisIndexTaskClientFactory, |
| @JacksonInject @Json ObjectMapper mapper, |
| @JacksonInject ServiceEmitter emitter, |
| @JacksonInject DruidMonitorSchedulerConfig monitorSchedulerConfig, |
| @JacksonInject RowIngestionMetersFactory rowIngestionMetersFactory, |
| @JacksonInject @Named(KinesisIndexingServiceModule.AWS_SCOPE) AWSCredentialsConfig awsCredentialsConfig, |
| @JacksonInject SupervisorStateManagerConfig supervisorStateManagerConfig |
| ) |
| { |
| super( |
| ingestionSchema != null |
| ? ingestionSchema |
| : new KinesisSupervisorIngestionSpec( |
| dataSchema, |
| ioConfig, |
| tuningConfig != null |
| ? tuningConfig |
| : KinesisSupervisorTuningConfig.defaultConfig() |
| ), |
| context, |
| suspended, |
| taskStorage, |
| taskMaster, |
| indexerMetadataStorageCoordinator, |
| kinesisIndexTaskClientFactory, |
| mapper, |
| emitter, |
| monitorSchedulerConfig, |
| rowIngestionMetersFactory, |
| supervisorStateManagerConfig |
| ); |
| this.awsCredentialsConfig = awsCredentialsConfig; |
| } |
| |
| |
| @Override |
| public Supervisor createSupervisor() |
| { |
| return new KinesisSupervisor( |
| taskStorage, |
| taskMaster, |
| indexerMetadataStorageCoordinator, |
| (KinesisIndexTaskClientFactory) indexTaskClientFactory, |
| mapper, |
| this, |
| rowIngestionMetersFactory, |
| awsCredentialsConfig |
| ); |
| } |
| |
| @Override |
| public String getType() |
| { |
| return SUPERVISOR_TYPE; |
| } |
| |
| @Override |
| public String getSource() |
| { |
| return getIoConfig() != null ? getIoConfig().getStream() : null; |
| } |
| |
| @Override |
| public String toString() |
| { |
| return "KinesisSupervisorSpec{" + |
| "dataSchema=" + getDataSchema() + |
| ", tuningConfig=" + getTuningConfig() + |
| ", ioConfig=" + getIoConfig() + |
| ", suspended=" + isSuspended() + |
| ", context=" + getContext() + |
| '}'; |
| } |
| |
| @Override |
| @Deprecated |
| @JsonProperty |
| public KinesisSupervisorTuningConfig getTuningConfig() |
| { |
| return (KinesisSupervisorTuningConfig) super.getTuningConfig(); |
| } |
| |
| @Override |
| @Deprecated |
| @JsonProperty |
| public KinesisSupervisorIOConfig getIoConfig() |
| { |
| return (KinesisSupervisorIOConfig) super.getIoConfig(); |
| } |
| |
| @Override |
| @JsonProperty |
| public KinesisSupervisorIngestionSpec getSpec() |
| { |
| return (KinesisSupervisorIngestionSpec) super.getSpec(); |
| } |
| |
| @Override |
| protected KinesisSupervisorSpec toggleSuspend(boolean suspend) |
| { |
| return new KinesisSupervisorSpec( |
| getSpec(), |
| getDataSchema(), |
| getTuningConfig(), |
| getIoConfig(), |
| getContext(), |
| suspend, |
| taskStorage, |
| taskMaster, |
| indexerMetadataStorageCoordinator, |
| (KinesisIndexTaskClientFactory) indexTaskClientFactory, |
| mapper, |
| emitter, |
| monitorSchedulerConfig, |
| rowIngestionMetersFactory, |
| awsCredentialsConfig, |
| supervisorStateManagerConfig |
| ); |
| } |
| } |