blob: 787ea26cbc6302e9f26fc5a58723fb952de73f0f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.segment.realtime.plumber;
import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
import org.apache.druid.client.cache.Cache;
import org.apache.druid.client.cache.CacheConfig;
import org.apache.druid.client.cache.CachePopulatorStats;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.query.QueryProcessingPool;
import org.apache.druid.query.QueryRunnerFactoryConglomerate;
import org.apache.druid.segment.IndexIO;
import org.apache.druid.segment.IndexMergerV9;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.indexing.RealtimeTuningConfig;
import org.apache.druid.segment.join.JoinableFactory;
import org.apache.druid.segment.realtime.FireDepartmentMetrics;
import org.apache.druid.server.coordination.DataSegmentAnnouncer;
import org.joda.time.Duration;
/**
* This plumber just drops segments at the end of a flush duration instead of handing them off. It is only useful if you want to run
* a real time node without the rest of the Druid cluster.
*/
public class FlushingPlumberSchool extends RealtimePlumberSchool
{
private static final Duration DEFAULT_FLUSH_DURATION = new Duration("PT1H");
private final Duration flushDuration;
private final ServiceEmitter emitter;
private final QueryRunnerFactoryConglomerate conglomerate;
private final DataSegmentAnnouncer segmentAnnouncer;
private final QueryProcessingPool queryProcessingPool;
private final JoinableFactory joinableFactory;
private final IndexMergerV9 indexMergerV9;
private final IndexIO indexIO;
private final Cache cache;
private final CacheConfig cacheConfig;
private final CachePopulatorStats cachePopulatorStats;
private final ObjectMapper objectMapper;
@JsonCreator
public FlushingPlumberSchool(
@JsonProperty("flushDuration") Duration flushDuration,
@JacksonInject ServiceEmitter emitter,
@JacksonInject QueryRunnerFactoryConglomerate conglomerate,
@JacksonInject DataSegmentAnnouncer segmentAnnouncer,
@JacksonInject QueryProcessingPool queryProcessingPool,
@JacksonInject JoinableFactory joinableFactory,
@JacksonInject IndexMergerV9 indexMergerV9,
@JacksonInject IndexIO indexIO,
@JacksonInject Cache cache,
@JacksonInject CacheConfig cacheConfig,
@JacksonInject CachePopulatorStats cachePopulatorStats,
@JacksonInject ObjectMapper objectMapper
)
{
super(
emitter,
conglomerate,
null,
segmentAnnouncer,
null,
null,
queryProcessingPool,
joinableFactory,
indexMergerV9,
indexIO,
cache,
cacheConfig,
cachePopulatorStats,
objectMapper
);
this.flushDuration = flushDuration == null ? DEFAULT_FLUSH_DURATION : flushDuration;
this.emitter = emitter;
this.conglomerate = conglomerate;
this.segmentAnnouncer = segmentAnnouncer;
this.queryProcessingPool = queryProcessingPool;
this.joinableFactory = joinableFactory;
this.indexMergerV9 = Preconditions.checkNotNull(indexMergerV9, "Null IndexMergerV9");
this.indexIO = Preconditions.checkNotNull(indexIO, "Null IndexIO");
this.cache = cache;
this.cacheConfig = cacheConfig;
this.cachePopulatorStats = cachePopulatorStats;
this.objectMapper = objectMapper;
}
@Override
public Plumber findPlumber(
final DataSchema schema,
final RealtimeTuningConfig config,
final FireDepartmentMetrics metrics
)
{
verifyState();
return new FlushingPlumber(
flushDuration,
schema,
config,
metrics,
emitter,
conglomerate,
segmentAnnouncer,
queryProcessingPool,
joinableFactory,
indexMergerV9,
indexIO,
cache,
cacheConfig,
cachePopulatorStats,
objectMapper
);
}
private void verifyState()
{
Preconditions.checkNotNull(conglomerate, "must specify a queryRunnerFactoryConglomerate to do this action.");
Preconditions.checkNotNull(segmentAnnouncer, "must specify a segmentAnnouncer to do this action.");
Preconditions.checkNotNull(emitter, "must specify a serviceEmitter to do this action.");
}
}