blob: 50be4649bccf1ef4b5db5ae46b1a8cad1c416981 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexing.seekablestream.supervisor;
import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import org.apache.druid.guice.annotations.Json;
import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
import org.apache.druid.indexing.overlord.TaskMaster;
import org.apache.druid.indexing.overlord.TaskStorage;
import org.apache.druid.indexing.overlord.supervisor.Supervisor;
import org.apache.druid.indexing.overlord.supervisor.SupervisorSpec;
import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManagerConfig;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskClientFactory;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.segment.incremental.RowIngestionMetersFactory;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.server.metrics.DruidMonitorSchedulerConfig;
import javax.annotation.Nullable;
import java.util.List;
import java.util.Map;
public abstract class SeekableStreamSupervisorSpec implements SupervisorSpec
{
private static SeekableStreamSupervisorIngestionSpec checkIngestionSchema(
SeekableStreamSupervisorIngestionSpec ingestionSchema
)
{
Preconditions.checkNotNull(ingestionSchema, "ingestionSchema");
Preconditions.checkNotNull(ingestionSchema.getDataSchema(), "dataSchema");
Preconditions.checkNotNull(ingestionSchema.getIOConfig(), "ioConfig");
return ingestionSchema;
}
protected final TaskStorage taskStorage;
protected final TaskMaster taskMaster;
protected final IndexerMetadataStorageCoordinator indexerMetadataStorageCoordinator;
protected final SeekableStreamIndexTaskClientFactory indexTaskClientFactory;
protected final ObjectMapper mapper;
protected final RowIngestionMetersFactory rowIngestionMetersFactory;
private final SeekableStreamSupervisorIngestionSpec ingestionSchema;
@Nullable
private final Map<String, Object> context;
protected final ServiceEmitter emitter;
protected final DruidMonitorSchedulerConfig monitorSchedulerConfig;
private final boolean suspended;
protected final SupervisorStateManagerConfig supervisorStateManagerConfig;
@JsonCreator
public SeekableStreamSupervisorSpec(
@JsonProperty("spec") final SeekableStreamSupervisorIngestionSpec ingestionSchema,
@JsonProperty("context") @Nullable Map<String, Object> context,
@JsonProperty("suspended") Boolean suspended,
@JacksonInject TaskStorage taskStorage,
@JacksonInject TaskMaster taskMaster,
@JacksonInject IndexerMetadataStorageCoordinator indexerMetadataStorageCoordinator,
@JacksonInject SeekableStreamIndexTaskClientFactory indexTaskClientFactory,
@JacksonInject @Json ObjectMapper mapper,
@JacksonInject ServiceEmitter emitter,
@JacksonInject DruidMonitorSchedulerConfig monitorSchedulerConfig,
@JacksonInject RowIngestionMetersFactory rowIngestionMetersFactory,
@JacksonInject SupervisorStateManagerConfig supervisorStateManagerConfig
)
{
this.ingestionSchema = checkIngestionSchema(ingestionSchema);
this.context = context;
this.taskStorage = taskStorage;
this.taskMaster = taskMaster;
this.indexerMetadataStorageCoordinator = indexerMetadataStorageCoordinator;
this.indexTaskClientFactory = indexTaskClientFactory;
this.mapper = mapper;
this.emitter = emitter;
this.monitorSchedulerConfig = monitorSchedulerConfig;
this.rowIngestionMetersFactory = rowIngestionMetersFactory;
this.suspended = suspended != null ? suspended : false;
this.supervisorStateManagerConfig = supervisorStateManagerConfig;
}
@JsonProperty
public SeekableStreamSupervisorIngestionSpec getSpec()
{
return ingestionSchema;
}
@Deprecated
@JsonProperty
public DataSchema getDataSchema()
{
return ingestionSchema.getDataSchema();
}
@JsonProperty
public SeekableStreamSupervisorTuningConfig getTuningConfig()
{
return ingestionSchema.getTuningConfig();
}
@JsonProperty
public SeekableStreamSupervisorIOConfig getIoConfig()
{
return ingestionSchema.getIOConfig();
}
@Nullable
@JsonProperty
public Map<String, Object> getContext()
{
return context;
}
public ServiceEmitter getEmitter()
{
return emitter;
}
@Override
public String getId()
{
return ingestionSchema.getDataSchema().getDataSource();
}
public DruidMonitorSchedulerConfig getMonitorSchedulerConfig()
{
return monitorSchedulerConfig;
}
@Override
public abstract Supervisor createSupervisor();
@Override
public List<String> getDataSources()
{
return ImmutableList.of(getDataSchema().getDataSource());
}
@Override
public SeekableStreamSupervisorSpec createSuspendedSpec()
{
return toggleSuspend(true);
}
@Override
public SeekableStreamSupervisorSpec createRunningSpec()
{
return toggleSuspend(false);
}
public SupervisorStateManagerConfig getSupervisorStateManagerConfig()
{
return supervisorStateManagerConfig;
}
@Override
@JsonProperty("suspended")
public boolean isSuspended()
{
return suspended;
}
protected abstract SeekableStreamSupervisorSpec toggleSuspend(boolean suspend);
}