| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| package org.apache.druid.segment.realtime.plumber; |
| |
| import com.fasterxml.jackson.databind.ObjectMapper; |
| import org.apache.druid.client.cache.Cache; |
| import org.apache.druid.client.cache.CacheConfig; |
| import org.apache.druid.client.cache.CachePopulatorStats; |
| import org.apache.druid.common.guava.ThreadRenamingCallable; |
| import org.apache.druid.java.util.common.DateTimes; |
| import org.apache.druid.java.util.common.StringUtils; |
| import org.apache.druid.java.util.common.concurrent.Execs; |
| import org.apache.druid.java.util.common.concurrent.ScheduledExecutors; |
| import org.apache.druid.java.util.common.granularity.Granularity; |
| import org.apache.druid.java.util.emitter.EmittingLogger; |
| import org.apache.druid.java.util.emitter.service.ServiceEmitter; |
| import org.apache.druid.query.QueryProcessingPool; |
| import org.apache.druid.query.QueryRunnerFactoryConglomerate; |
| import org.apache.druid.segment.IndexIO; |
| import org.apache.druid.segment.IndexMerger; |
| import org.apache.druid.segment.indexing.DataSchema; |
| import org.apache.druid.segment.indexing.RealtimeTuningConfig; |
| import org.apache.druid.segment.join.JoinableFactory; |
| import org.apache.druid.segment.realtime.FireDepartmentMetrics; |
| import org.apache.druid.server.coordination.DataSegmentAnnouncer; |
| import org.joda.time.DateTime; |
| import org.joda.time.Duration; |
| |
| import java.util.ArrayList; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.concurrent.Callable; |
| import java.util.concurrent.ScheduledExecutorService; |
| |
| /** |
| */ |
| public class FlushingPlumber extends RealtimePlumber |
| { |
| private static final EmittingLogger log = new EmittingLogger(FlushingPlumber.class); |
| |
| private final DataSchema schema; |
| private final RealtimeTuningConfig config; |
| private final Duration flushDuration; |
| |
| private volatile ScheduledExecutorService flushScheduledExec = null; |
| private volatile boolean stopped = false; |
| |
| public FlushingPlumber( |
| Duration flushDuration, |
| DataSchema schema, |
| RealtimeTuningConfig config, |
| FireDepartmentMetrics metrics, |
| ServiceEmitter emitter, |
| QueryRunnerFactoryConglomerate conglomerate, |
| DataSegmentAnnouncer segmentAnnouncer, |
| QueryProcessingPool queryProcessingPool, |
| JoinableFactory joinableFactory, |
| IndexMerger indexMerger, |
| IndexIO indexIO, |
| Cache cache, |
| CacheConfig cacheConfig, |
| CachePopulatorStats cachePopulatorStats, |
| ObjectMapper objectMapper |
| |
| ) |
| { |
| super( |
| schema, |
| config, |
| metrics, |
| emitter, |
| conglomerate, |
| segmentAnnouncer, |
| queryProcessingPool, |
| joinableFactory, |
| null, |
| null, |
| null, |
| indexMerger, |
| indexIO, |
| cache, |
| cacheConfig, |
| cachePopulatorStats, |
| objectMapper |
| ); |
| |
| this.flushDuration = flushDuration; |
| this.schema = schema; |
| this.config = config; |
| } |
| |
| @Override |
| public Object startJob() |
| { |
| log.info("Starting job for %s", getSchema().getDataSource()); |
| |
| computeBaseDir(getSchema()).mkdirs(); |
| initializeExecutors(); |
| |
| if (flushScheduledExec == null) { |
| flushScheduledExec = Execs.scheduledSingleThreaded("flushing_scheduled_%d"); |
| } |
| |
| Object retVal = bootstrapSinksFromDisk(); |
| startFlushThread(); |
| return retVal; |
| } |
| |
| protected void flushAfterDuration(final long truncatedTime, final Sink sink) |
| { |
| log.info( |
| "Abandoning segment %s at %s", |
| sink.getSegment().getId(), |
| DateTimes.nowUtc().plusMillis((int) flushDuration.getMillis()) |
| ); |
| |
| ScheduledExecutors.scheduleWithFixedDelay( |
| flushScheduledExec, |
| flushDuration, |
| new Callable<ScheduledExecutors.Signal>() |
| { |
| @Override |
| public ScheduledExecutors.Signal call() |
| { |
| log.info("Abandoning segment %s", sink.getSegment().getId()); |
| abandonSegment(truncatedTime, sink); |
| return ScheduledExecutors.Signal.STOP; |
| } |
| } |
| ); |
| } |
| |
| private void startFlushThread() |
| { |
| final Granularity segmentGranularity = schema.getGranularitySpec().getSegmentGranularity(); |
| final DateTime truncatedNow = segmentGranularity.bucketStart(DateTimes.nowUtc()); |
| final long windowMillis = config.getWindowPeriod().toStandardDuration().getMillis(); |
| |
| log.info( |
| "Expect to run at [%s]", |
| DateTimes.nowUtc().plus( |
| new Duration( |
| System.currentTimeMillis(), |
| schema.getGranularitySpec().getSegmentGranularity().increment(truncatedNow).getMillis() + windowMillis |
| ) |
| ) |
| ); |
| |
| String threadName = StringUtils.format( |
| "%s-flusher-%d", |
| getSchema().getDataSource(), |
| getConfig().getShardSpec().getPartitionNum() |
| ); |
| ThreadRenamingCallable<ScheduledExecutors.Signal> threadRenamingCallable = |
| new ThreadRenamingCallable<ScheduledExecutors.Signal>(threadName) |
| { |
| @Override |
| public ScheduledExecutors.Signal doCall() |
| { |
| if (stopped) { |
| log.info("Stopping flusher thread"); |
| return ScheduledExecutors.Signal.STOP; |
| } |
| |
| long minTimestamp = segmentGranularity.bucketStart( |
| getRejectionPolicy().getCurrMaxTime().minus(windowMillis) |
| ).getMillis(); |
| |
| List<Map.Entry<Long, Sink>> sinksToPush = new ArrayList<>(); |
| for (Map.Entry<Long, Sink> entry : getSinks().entrySet()) { |
| final Long intervalStart = entry.getKey(); |
| if (intervalStart < minTimestamp) { |
| log.info("Adding entry[%s] to flush.", entry); |
| sinksToPush.add(entry); |
| } |
| } |
| |
| for (final Map.Entry<Long, Sink> entry : sinksToPush) { |
| flushAfterDuration(entry.getKey(), entry.getValue()); |
| } |
| |
| if (stopped) { |
| log.info("Stopping flusher thread"); |
| return ScheduledExecutors.Signal.STOP; |
| } else { |
| return ScheduledExecutors.Signal.REPEAT; |
| } |
| } |
| }; |
| Duration initialDelay = new Duration( |
| System.currentTimeMillis(), |
| schema.getGranularitySpec().getSegmentGranularity().increment(truncatedNow).getMillis() + windowMillis |
| ); |
| Duration rate = new Duration(truncatedNow, segmentGranularity.increment(truncatedNow)); |
| ScheduledExecutors.scheduleAtFixedRate(flushScheduledExec, initialDelay, rate, threadRenamingCallable); |
| } |
| |
| @Override |
| public void finishJob() |
| { |
| log.info("Stopping job"); |
| |
| for (final Map.Entry<Long, Sink> entry : getSinks().entrySet()) { |
| abandonSegment(entry.getKey(), entry.getValue()); |
| } |
| shutdownExecutors(); |
| |
| if (flushScheduledExec != null) { |
| flushScheduledExec.shutdown(); |
| } |
| |
| stopped = true; |
| } |
| } |