| /* |
| * Licensed to Metamarkets Group Inc. (Metamarkets) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. Metamarkets licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| package io.druid.indexing.kafka; |
| |
| import com.fasterxml.jackson.annotation.JacksonInject; |
| import com.fasterxml.jackson.annotation.JsonCreator; |
| import com.fasterxml.jackson.annotation.JsonProperty; |
| import com.fasterxml.jackson.core.JsonProcessingException; |
| import com.fasterxml.jackson.core.type.TypeReference; |
| import com.google.common.annotations.VisibleForTesting; |
| import com.google.common.base.Function; |
| import com.google.common.base.Joiner; |
| import com.google.common.base.Optional; |
| import com.google.common.base.Preconditions; |
| import com.google.common.base.Supplier; |
| import com.google.common.base.Throwables; |
| import com.google.common.collect.ImmutableList; |
| import com.google.common.collect.ImmutableMap; |
| import com.google.common.collect.Iterables; |
| import com.google.common.collect.Lists; |
| import com.google.common.collect.Maps; |
| import com.google.common.collect.Sets; |
| import com.google.common.primitives.Ints; |
| import com.google.common.primitives.Longs; |
| import com.google.common.util.concurrent.FutureCallback; |
| import com.google.common.util.concurrent.Futures; |
| import com.google.common.util.concurrent.ListenableFuture; |
| import com.google.common.util.concurrent.ListeningExecutorService; |
| import com.google.common.util.concurrent.MoreExecutors; |
| import io.druid.java.util.emitter.EmittingLogger; |
| import io.druid.data.input.Committer; |
| import io.druid.data.input.InputRow; |
| import io.druid.data.input.impl.InputRowParser; |
| import io.druid.discovery.DiscoveryDruidNode; |
| import io.druid.discovery.DruidNodeDiscoveryProvider; |
| import io.druid.discovery.LookupNodeService; |
| import io.druid.indexing.appenderator.ActionBasedSegmentAllocator; |
| import io.druid.indexing.appenderator.ActionBasedUsedSegmentChecker; |
| import io.druid.indexing.common.TaskStatus; |
| import io.druid.indexing.common.TaskToolbox; |
| import io.druid.indexing.common.actions.CheckPointDataSourceMetadataAction; |
| import io.druid.indexing.common.actions.ResetDataSourceMetadataAction; |
| import io.druid.indexing.common.actions.SegmentTransactionalInsertAction; |
| import io.druid.indexing.common.actions.TaskActionClient; |
| import io.druid.indexing.common.task.AbstractTask; |
| import io.druid.indexing.common.task.RealtimeIndexTask; |
| import io.druid.indexing.common.task.TaskResource; |
| import io.druid.indexing.kafka.supervisor.KafkaSupervisor; |
| import io.druid.java.util.common.DateTimes; |
| import io.druid.java.util.common.ISE; |
| import io.druid.java.util.common.Intervals; |
| import io.druid.java.util.common.StringUtils; |
| import io.druid.java.util.common.collect.Utils; |
| import io.druid.java.util.common.concurrent.Execs; |
| import io.druid.java.util.common.guava.Sequence; |
| import io.druid.java.util.common.parsers.ParseException; |
| import io.druid.query.DruidMetrics; |
| import io.druid.query.NoopQueryRunner; |
| import io.druid.query.Query; |
| import io.druid.query.QueryPlus; |
| import io.druid.query.QueryRunner; |
| import io.druid.segment.indexing.DataSchema; |
| import io.druid.segment.indexing.RealtimeIOConfig; |
| import io.druid.segment.realtime.FireDepartment; |
| import io.druid.segment.realtime.FireDepartmentMetrics; |
| import io.druid.segment.realtime.RealtimeMetricsMonitor; |
| import io.druid.segment.realtime.appenderator.Appenderator; |
| import io.druid.segment.realtime.appenderator.AppenderatorDriverAddResult; |
| import io.druid.segment.realtime.appenderator.Appenderators; |
| import io.druid.segment.realtime.appenderator.StreamAppenderatorDriver; |
| import io.druid.segment.realtime.appenderator.SegmentIdentifier; |
| import io.druid.segment.realtime.appenderator.SegmentsAndMetadata; |
| import io.druid.segment.realtime.appenderator.TransactionalSegmentPublisher; |
| import io.druid.segment.realtime.firehose.ChatHandler; |
| import io.druid.segment.realtime.firehose.ChatHandlerProvider; |
| import io.druid.server.security.Access; |
| import io.druid.server.security.Action; |
| import io.druid.server.security.AuthorizationUtils; |
| import io.druid.server.security.AuthorizerMapper; |
| import io.druid.server.security.ForbiddenException; |
| import io.druid.server.security.Resource; |
| import io.druid.server.security.ResourceAction; |
| import io.druid.server.security.ResourceType; |
| import io.druid.timeline.DataSegment; |
| import org.apache.kafka.clients.consumer.ConsumerRecord; |
| import org.apache.kafka.clients.consumer.ConsumerRecords; |
| import org.apache.kafka.clients.consumer.KafkaConsumer; |
| import org.apache.kafka.clients.consumer.OffsetOutOfRangeException; |
| import org.apache.kafka.common.TopicPartition; |
| import org.apache.kafka.common.serialization.ByteArrayDeserializer; |
| import org.joda.time.DateTime; |
| |
| import javax.annotation.Nullable; |
| import javax.servlet.http.HttpServletRequest; |
| import javax.ws.rs.Consumes; |
| import javax.ws.rs.DefaultValue; |
| import javax.ws.rs.GET; |
| import javax.ws.rs.POST; |
| import javax.ws.rs.Path; |
| import javax.ws.rs.Produces; |
| import javax.ws.rs.QueryParam; |
| import javax.ws.rs.core.Context; |
| import javax.ws.rs.core.MediaType; |
| import javax.ws.rs.core.Response; |
| import java.io.File; |
| import java.io.IOException; |
| import java.nio.ByteBuffer; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Properties; |
| import java.util.Random; |
| import java.util.Set; |
| import java.util.TreeMap; |
| import java.util.concurrent.BlockingQueue; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.concurrent.CopyOnWriteArrayList; |
| import java.util.concurrent.CountDownLatch; |
| import java.util.concurrent.Future; |
| import java.util.concurrent.LinkedBlockingQueue; |
| import java.util.concurrent.RejectedExecutionException; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.TimeoutException; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| import java.util.concurrent.atomic.AtomicReference; |
| import java.util.concurrent.locks.Condition; |
| import java.util.concurrent.locks.Lock; |
| import java.util.concurrent.locks.ReentrantLock; |
| import java.util.stream.Collectors; |
| |
| public class KafkaIndexTask extends AbstractTask implements ChatHandler |
| { |
| public static final long PAUSE_FOREVER = -1L; |
| |
| public enum Status |
| { |
| NOT_STARTED, |
| STARTING, |
| READING, |
| PAUSED, |
| PUBLISHING |
| // ideally this should be called FINISHING now as the task does incremental publishes |
| // through out its lifetime |
| } |
| |
| private static final EmittingLogger log = new EmittingLogger(KafkaIndexTask.class); |
| private static final String TYPE = "index_kafka"; |
| private static final Random RANDOM = new Random(); |
| private static final long POLL_TIMEOUT = 100; |
| private static final long LOCK_ACQUIRE_TIMEOUT_SECONDS = 15; |
| private static final String METADATA_NEXT_PARTITIONS = "nextPartitions"; |
| private static final String METADATA_PUBLISH_PARTITIONS = "publishPartitions"; |
| |
| private final DataSchema dataSchema; |
| private final InputRowParser<ByteBuffer> parser; |
| private final KafkaTuningConfig tuningConfig; |
| private final KafkaIOConfig ioConfig; |
| private final AuthorizerMapper authorizerMapper; |
| private final Optional<ChatHandlerProvider> chatHandlerProvider; |
| |
| private final Map<Integer, Long> endOffsets = new ConcurrentHashMap<>(); |
| private final Map<Integer, Long> nextOffsets = new ConcurrentHashMap<>(); |
| private final Map<Integer, Long> maxEndOffsets = new HashMap<>(); |
| private final Map<Integer, Long> lastPersistedOffsets = new ConcurrentHashMap<>(); |
| |
| private TaskToolbox toolbox; |
| |
| private volatile Appenderator appenderator = null; |
| private volatile StreamAppenderatorDriver driver = null; |
| private volatile FireDepartmentMetrics fireDepartmentMetrics = null; |
| private volatile DateTime startTime; |
| private volatile Status status = Status.NOT_STARTED; // this is only ever set by the task runner thread (runThread) |
| private volatile Thread runThread = null; |
| private volatile File sequencesPersistFile = null; |
| private final AtomicBoolean stopRequested = new AtomicBoolean(false); |
| private final AtomicBoolean publishOnStop = new AtomicBoolean(false); |
| |
| // The pause lock and associated conditions are to support coordination between the Jetty threads and the main |
| // ingestion loop. The goal is to provide callers of the API a guarantee that if pause() returns successfully |
| // the ingestion loop has been stopped at the returned offsets and will not ingest any more data until resumed. The |
| // fields are used as follows (every step requires acquiring [pauseLock]): |
| // Pausing: |
| // - In pause(), [pauseRequested] is set to true and then execution waits for [status] to change to PAUSED, with the |
| // condition checked when [hasPaused] is signalled. |
| // - In possiblyPause() called from the main loop, if [pauseRequested] is true, [status] is set to PAUSED, |
| // [hasPaused] is signalled, and execution pauses until [pauseRequested] becomes false, either by being set or by |
| // the [pauseMillis] timeout elapsing. [pauseRequested] is checked when [shouldResume] is signalled. |
| // Resuming: |
| // - In resume(), [pauseRequested] is set to false, [shouldResume] is signalled, and execution waits for [status] to |
| // change to something other than PAUSED, with the condition checked when [shouldResume] is signalled. |
| // - In possiblyPause(), when [shouldResume] is signalled, if [pauseRequested] has become false the pause loop ends, |
| // [status] is changed to STARTING and [shouldResume] is signalled. |
| |
| private final Lock pauseLock = new ReentrantLock(); |
| private final Condition hasPaused = pauseLock.newCondition(); |
| private final Condition shouldResume = pauseLock.newCondition(); |
| |
| // [pollRetryLock] and [isAwaitingRetry] is used when the Kafka consumer returns an OffsetOutOfRangeException and we |
| // pause polling from Kafka for POLL_RETRY_MS before trying again. This allows us to signal the sleeping thread and |
| // resume the main run loop in the case of a pause or stop request from a Jetty thread. |
| private final Lock pollRetryLock = new ReentrantLock(); |
| private final Condition isAwaitingRetry = pollRetryLock.newCondition(); |
| |
| // [statusLock] is used to synchronize the Jetty thread calling stopGracefully() with the main run thread. It prevents |
| // the main run thread from switching into a publishing state while the stopGracefully() thread thinks it's still in |
| // a pre-publishing state. This is important because stopGracefully() will try to use the [stopRequested] flag to stop |
| // the main thread where possible, but this flag is not honored once publishing has begun so in this case we must |
| // interrupt the thread. The lock ensures that if the run thread is about to transition into publishing state, it |
| // blocks until after stopGracefully() has set [stopRequested] and then does a final check on [stopRequested] before |
| // transitioning to publishing state. |
| private final Object statusLock = new Object(); |
| |
| private volatile boolean pauseRequested = false; |
| private volatile long pauseMillis = 0; |
| |
| // This value can be tuned in some tests |
| private long pollRetryMs = 30000; |
| |
| private final Set<String> publishingSequences = Sets.newConcurrentHashSet(); |
| private final BlockingQueue<SequenceMetadata> publishQueue = new LinkedBlockingQueue<>(); |
| private final List<ListenableFuture<SegmentsAndMetadata>> handOffWaitList = new CopyOnWriteArrayList<>(); // to prevent concurrency visibility issue |
| private final CountDownLatch waitForPublishes = new CountDownLatch(1); |
| private final AtomicReference<Throwable> throwableAtomicReference = new AtomicReference<>(); |
| private final String topic; |
| |
| private volatile CopyOnWriteArrayList<SequenceMetadata> sequences; |
| private ListeningExecutorService publishExecService; |
| private final boolean useLegacy; |
| |
| @JsonCreator |
| public KafkaIndexTask( |
| @JsonProperty("id") String id, |
| @JsonProperty("resource") TaskResource taskResource, |
| @JsonProperty("dataSchema") DataSchema dataSchema, |
| @JsonProperty("tuningConfig") KafkaTuningConfig tuningConfig, |
| @JsonProperty("ioConfig") KafkaIOConfig ioConfig, |
| @JsonProperty("context") Map<String, Object> context, |
| @JacksonInject ChatHandlerProvider chatHandlerProvider, |
| @JacksonInject AuthorizerMapper authorizerMapper |
| ) |
| { |
| super( |
| id == null ? makeTaskId(dataSchema.getDataSource(), RANDOM.nextInt()) : id, |
| StringUtils.format("%s_%s", TYPE, dataSchema.getDataSource()), |
| taskResource, |
| dataSchema.getDataSource(), |
| context |
| ); |
| |
| this.dataSchema = Preconditions.checkNotNull(dataSchema, "dataSchema"); |
| this.parser = Preconditions.checkNotNull((InputRowParser<ByteBuffer>) dataSchema.getParser(), "parser"); |
| this.tuningConfig = Preconditions.checkNotNull(tuningConfig, "tuningConfig"); |
| this.ioConfig = Preconditions.checkNotNull(ioConfig, "ioConfig"); |
| this.chatHandlerProvider = Optional.fromNullable(chatHandlerProvider); |
| this.authorizerMapper = authorizerMapper; |
| this.endOffsets.putAll(ioConfig.getEndPartitions().getPartitionOffsetMap()); |
| this.maxEndOffsets.putAll(endOffsets.entrySet() |
| .stream() |
| .collect(Collectors.toMap( |
| Map.Entry::getKey, |
| integerLongEntry -> Long.MAX_VALUE |
| ))); |
| this.topic = ioConfig.getStartPartitions().getTopic(); |
| this.sequences = new CopyOnWriteArrayList<>(); |
| |
| if (context != null && context.get(KafkaSupervisor.IS_INCREMENTAL_HANDOFF_SUPPORTED) != null |
| && ((boolean) context.get(KafkaSupervisor.IS_INCREMENTAL_HANDOFF_SUPPORTED))) { |
| useLegacy = false; |
| } else { |
| useLegacy = true; |
| } |
| } |
| |
| @VisibleForTesting |
| void setPollRetryMs(long retryMs) |
| { |
| this.pollRetryMs = retryMs; |
| } |
| |
| private static String makeTaskId(String dataSource, int randomBits) |
| { |
| final StringBuilder suffix = new StringBuilder(8); |
| for (int i = 0; i < Ints.BYTES * 2; ++i) { |
| suffix.append((char) ('a' + ((randomBits >>> (i * 4)) & 0x0F))); |
| } |
| return Joiner.on("_").join(TYPE, dataSource, suffix); |
| } |
| |
| @Override |
| public String getType() |
| { |
| return TYPE; |
| } |
| |
| @Override |
| public boolean isReady(TaskActionClient taskActionClient) throws Exception |
| { |
| return true; |
| } |
| |
| @JsonProperty |
| public DataSchema getDataSchema() |
| { |
| return dataSchema; |
| } |
| |
| @JsonProperty |
| public KafkaTuningConfig getTuningConfig() |
| { |
| return tuningConfig; |
| } |
| |
| @JsonProperty("ioConfig") |
| public KafkaIOConfig getIOConfig() |
| { |
| return ioConfig; |
| } |
| |
| private void createAndStartPublishExecutor() |
| { |
| publishExecService = MoreExecutors.listeningDecorator(Execs.singleThreaded("publish-driver")); |
| publishExecService.submit( |
| (Runnable) () -> { |
| while (true) { |
| try { |
| final SequenceMetadata sequenceMetadata = publishQueue.take(); |
| |
| Preconditions.checkNotNull(driver); |
| |
| if (sequenceMetadata.isSentinel()) { |
| waitForPublishes.countDown(); |
| break; |
| } |
| |
| log.info("Publishing segments for sequence [%s]", sequenceMetadata); |
| |
| final SegmentsAndMetadata result = driver.publish( |
| sequenceMetadata.getPublisher(toolbox, ioConfig.isUseTransaction()), |
| sequenceMetadata.getCommitterSupplier(topic, lastPersistedOffsets).get(), |
| ImmutableList.of(sequenceMetadata.getSequenceName()) |
| ).get(); |
| |
| if (result == null) { |
| throw new ISE( |
| "Transaction failure publishing segments for sequence [%s]", |
| sequenceMetadata |
| ); |
| } else { |
| log.info( |
| "Published segments[%s] with metadata[%s].", |
| Joiner.on(", ").join( |
| result.getSegments().stream().map(DataSegment::getIdentifier).collect(Collectors.toList()) |
| ), |
| Preconditions.checkNotNull(result.getCommitMetadata(), "commitMetadata") |
| ); |
| } |
| |
| sequences.remove(sequenceMetadata); |
| publishingSequences.remove(sequenceMetadata.getSequenceName()); |
| try { |
| persistSequences(); |
| } |
| catch (IOException e) { |
| log.error(e, "Unable to persist state, dying"); |
| Throwables.propagate(e); |
| } |
| |
| final ListenableFuture<SegmentsAndMetadata> handOffFuture = driver.registerHandoff(result); |
| handOffWaitList.add(handOffFuture); |
| } |
| catch (Throwable t) { |
| if ((t instanceof InterruptedException || (t instanceof RejectedExecutionException |
| && t.getCause() instanceof InterruptedException))) { |
| log.warn("Stopping publish thread as we are interrupted, probably we are shutting down"); |
| } else { |
| log.makeAlert(t, "Error in publish thread, dying").emit(); |
| throwableAtomicReference.set(t); |
| } |
| Futures.allAsList(handOffWaitList).cancel(true); |
| waitForPublishes.countDown(); |
| break; |
| } |
| } |
| } |
| ); |
| } |
| |
| @Override |
| public TaskStatus run(final TaskToolbox toolbox) throws Exception |
| { |
| // for backwards compatibility, should be remove from versions greater than 0.12.x |
| if (useLegacy) { |
| return runLegacy(toolbox); |
| } |
| |
| log.info("Starting up!"); |
| |
| startTime = DateTimes.nowUtc(); |
| status = Status.STARTING; |
| this.toolbox = toolbox; |
| |
| if (getContext() != null && getContext().get("checkpoints") != null) { |
| log.info("Got checkpoints [%s]", (String) getContext().get("checkpoints")); |
| final TreeMap<Integer, Map<Integer, Long>> checkpoints = toolbox.getObjectMapper().readValue( |
| (String) getContext().get("checkpoints"), |
| new TypeReference<TreeMap<Integer, Map<Integer, Long>>>() |
| { |
| } |
| ); |
| |
| Iterator<Map.Entry<Integer, Map<Integer, Long>>> sequenceOffsets = checkpoints.entrySet().iterator(); |
| Map.Entry<Integer, Map<Integer, Long>> previous = sequenceOffsets.next(); |
| while (sequenceOffsets.hasNext()) { |
| Map.Entry<Integer, Map<Integer, Long>> current = sequenceOffsets.next(); |
| sequences.add(new SequenceMetadata( |
| previous.getKey(), |
| StringUtils.format("%s_%s", ioConfig.getBaseSequenceName(), previous.getKey()), |
| previous.getValue(), |
| current.getValue(), |
| true |
| )); |
| previous = current; |
| } |
| sequences.add(new SequenceMetadata( |
| previous.getKey(), |
| StringUtils.format("%s_%s", ioConfig.getBaseSequenceName(), previous.getKey()), |
| previous.getValue(), |
| maxEndOffsets, |
| false |
| )); |
| } else { |
| sequences.add(new SequenceMetadata( |
| 0, |
| StringUtils.format("%s_%s", ioConfig.getBaseSequenceName(), 0), |
| ioConfig.getStartPartitions().getPartitionOffsetMap(), |
| maxEndOffsets, |
| false |
| )); |
| } |
| |
| sequencesPersistFile = new File(toolbox.getPersistDir(), "sequences.json"); |
| restoreSequences(); |
| log.info("Starting with sequences: %s", sequences); |
| |
| if (chatHandlerProvider.isPresent()) { |
| log.info("Found chat handler of class[%s]", chatHandlerProvider.get().getClass().getName()); |
| chatHandlerProvider.get().register(getId(), this, false); |
| } else { |
| log.warn("No chat handler detected"); |
| } |
| |
| runThread = Thread.currentThread(); |
| |
| // Set up FireDepartmentMetrics |
| final FireDepartment fireDepartmentForMetrics = new FireDepartment( |
| dataSchema, |
| new RealtimeIOConfig(null, null, null), |
| null |
| ); |
| fireDepartmentMetrics = fireDepartmentForMetrics.getMetrics(); |
| toolbox.getMonitorScheduler().addMonitor( |
| new RealtimeMetricsMonitor( |
| ImmutableList.of(fireDepartmentForMetrics), |
| ImmutableMap.of(DruidMetrics.TASK_ID, new String[]{getId()}) |
| ) |
| ); |
| |
| LookupNodeService lookupNodeService = getContextValue(RealtimeIndexTask.CTX_KEY_LOOKUP_TIER) == null ? |
| toolbox.getLookupNodeService() : |
| new LookupNodeService((String) getContextValue(RealtimeIndexTask.CTX_KEY_LOOKUP_TIER)); |
| DiscoveryDruidNode discoveryDruidNode = new DiscoveryDruidNode( |
| toolbox.getDruidNode(), |
| DruidNodeDiscoveryProvider.NODE_TYPE_PEON, |
| ImmutableMap.of( |
| toolbox.getDataNodeService().getName(), toolbox.getDataNodeService(), |
| lookupNodeService.getName(), lookupNodeService |
| ) |
| ); |
| |
| try ( |
| final KafkaConsumer<byte[], byte[]> consumer = newConsumer() |
| ) { |
| toolbox.getDataSegmentServerAnnouncer().announce(); |
| toolbox.getDruidNodeAnnouncer().announce(discoveryDruidNode); |
| |
| appenderator = newAppenderator(fireDepartmentMetrics, toolbox); |
| driver = newDriver(appenderator, toolbox, fireDepartmentMetrics); |
| createAndStartPublishExecutor(); |
| |
| final String topic = ioConfig.getStartPartitions().getTopic(); |
| |
| // Start up, set up initial offsets. |
| final Object restoredMetadata = driver.startJob(); |
| if (restoredMetadata == null) { |
| // no persist has happened so far |
| // so either this is a brand new task or replacement of a failed task |
| Preconditions.checkState(sequences.get(0).startOffsets.entrySet().stream().allMatch( |
| partitionOffsetEntry -> Longs.compare( |
| partitionOffsetEntry.getValue(), |
| ioConfig.getStartPartitions() |
| .getPartitionOffsetMap() |
| .get(partitionOffsetEntry.getKey()) |
| ) >= 0 |
| ), "Sequence offsets are not compatible with start offsets of task"); |
| nextOffsets.putAll(sequences.get(0).startOffsets); |
| } else { |
| final Map<String, Object> restoredMetadataMap = (Map) restoredMetadata; |
| final KafkaPartitions restoredNextPartitions = toolbox.getObjectMapper().convertValue( |
| restoredMetadataMap.get(METADATA_NEXT_PARTITIONS), |
| KafkaPartitions.class |
| ); |
| nextOffsets.putAll(restoredNextPartitions.getPartitionOffsetMap()); |
| |
| // Sanity checks. |
| if (!restoredNextPartitions.getTopic().equals(ioConfig.getStartPartitions().getTopic())) { |
| throw new ISE( |
| "WTF?! Restored topic[%s] but expected topic[%s]", |
| restoredNextPartitions.getTopic(), |
| ioConfig.getStartPartitions().getTopic() |
| ); |
| } |
| |
| if (!nextOffsets.keySet().equals(ioConfig.getStartPartitions().getPartitionOffsetMap().keySet())) { |
| throw new ISE( |
| "WTF?! Restored partitions[%s] but expected partitions[%s]", |
| nextOffsets.keySet(), |
| ioConfig.getStartPartitions().getPartitionOffsetMap().keySet() |
| ); |
| } |
| // sequences size can be 0 only when all sequences got published and task stopped before it could finish |
| // which is super rare |
| if (sequences.size() == 0 || sequences.get(sequences.size() - 1).isCheckpointed()) { |
| this.endOffsets.putAll(sequences.size() == 0 |
| ? nextOffsets |
| : sequences.get(sequences.size() - 1).getEndOffsets()); |
| log.info("End offsets changed to [%s]", endOffsets); |
| } |
| } |
| |
| // Set up committer. |
| final Supplier<Committer> committerSupplier = () -> { |
| final Map<Integer, Long> snapshot = ImmutableMap.copyOf(nextOffsets); |
| lastPersistedOffsets.clear(); |
| lastPersistedOffsets.putAll(snapshot); |
| |
| return new Committer() |
| { |
| @Override |
| public Object getMetadata() |
| { |
| return ImmutableMap.of( |
| METADATA_NEXT_PARTITIONS, new KafkaPartitions( |
| ioConfig.getStartPartitions().getTopic(), |
| snapshot |
| ) |
| ); |
| } |
| |
| @Override |
| public void run() |
| { |
| // Do nothing. |
| } |
| }; |
| }; |
| |
| // restart publishing of sequences (if any) |
| maybePersistAndPublishSequences(committerSupplier); |
| |
| Set<Integer> assignment = assignPartitionsAndSeekToNext(consumer, topic); |
| |
| // Main loop. |
| // Could eventually support leader/follower mode (for keeping replicas more in sync) |
| boolean stillReading = !assignment.isEmpty(); |
| status = Status.READING; |
| try { |
| while (stillReading) { |
| if (possiblyPause(assignment)) { |
| // The partition assignments may have changed while paused by a call to setEndOffsets() so reassign |
| // partitions upon resuming. This is safe even if the end offsets have not been modified. |
| assignment = assignPartitionsAndSeekToNext(consumer, topic); |
| |
| if (assignment.isEmpty()) { |
| log.info("All partitions have been fully read"); |
| publishOnStop.set(true); |
| stopRequested.set(true); |
| } |
| } |
| |
| // if stop is requested or task's end offset is set by call to setEndOffsets method with finish set to true |
| if (stopRequested.get() || (sequences.get(sequences.size() - 1).isCheckpointed() |
| && !ioConfig.isPauseAfterRead())) { |
| status = Status.PUBLISHING; |
| } |
| |
| if (stopRequested.get()) { |
| break; |
| } |
| |
| checkAndMaybeThrowException(); |
| |
| if (!ioConfig.isPauseAfterRead()) { |
| maybePersistAndPublishSequences(committerSupplier); |
| } |
| |
| // The retrying business is because the KafkaConsumer throws OffsetOutOfRangeException if the seeked-to |
| // offset is not present in the topic-partition. This can happen if we're asking a task to read from data |
| // that has not been written yet (which is totally legitimate). So let's wait for it to show up. |
| ConsumerRecords<byte[], byte[]> records = ConsumerRecords.empty(); |
| try { |
| records = consumer.poll(POLL_TIMEOUT); |
| } |
| catch (OffsetOutOfRangeException e) { |
| log.warn("OffsetOutOfRangeException with message [%s]", e.getMessage()); |
| possiblyResetOffsetsOrWait(e.offsetOutOfRangePartitions(), consumer, toolbox); |
| stillReading = ioConfig.isPauseAfterRead() || !assignment.isEmpty(); |
| } |
| |
| SequenceMetadata sequenceToCheckpoint = null; |
| for (ConsumerRecord<byte[], byte[]> record : records) { |
| if (log.isTraceEnabled()) { |
| log.trace( |
| "Got topic[%s] partition[%d] offset[%,d].", |
| record.topic(), |
| record.partition(), |
| record.offset() |
| ); |
| } |
| |
| if (record.offset() < endOffsets.get(record.partition())) { |
| if (record.offset() != nextOffsets.get(record.partition())) { |
| if (ioConfig.isSkipOffsetGaps()) { |
| log.warn( |
| "Skipped to offset[%,d] after offset[%,d] in partition[%d].", |
| record.offset(), |
| nextOffsets.get(record.partition()), |
| record.partition() |
| ); |
| } else { |
| throw new ISE( |
| "WTF?! Got offset[%,d] after offset[%,d] in partition[%d].", |
| record.offset(), |
| nextOffsets.get(record.partition()), |
| record.partition() |
| ); |
| } |
| } |
| |
| try { |
| final byte[] valueBytes = record.value(); |
| final List<InputRow> rows = valueBytes == null |
| ? Utils.nullableListOf((InputRow) null) |
| : parser.parseBatch(ByteBuffer.wrap(valueBytes)); |
| boolean isPersistRequired = false; |
| |
| for (InputRow row : rows) { |
| if (row != null && withinMinMaxRecordTime(row)) { |
| SequenceMetadata sequenceToUse = null; |
| for (SequenceMetadata sequence : sequences) { |
| if (sequence.canHandle(record)) { |
| sequenceToUse = sequence; |
| } |
| } |
| |
| if (sequenceToUse == null) { |
| throw new ISE( |
| "WTH?! cannot find any valid sequence for record with partition [%d] and offset [%d]. Current sequences: %s", |
| record.partition(), |
| record.offset(), |
| sequences |
| ); |
| } |
| |
| final AppenderatorDriverAddResult addResult = driver.add( |
| row, |
| sequenceToUse.getSequenceName(), |
| committerSupplier, |
| // skip segment lineage check as there will always be one segment |
| // for combination of sequence and segment granularity. |
| // It is necessary to skip it as the task puts messages polled from all the |
| // assigned Kafka partitions into a single Druid segment, thus ordering of |
| // messages among replica tasks across assigned partitions is not guaranteed |
| // which may cause replica tasks to ask for segments with different interval |
| // in different order which might cause SegmentAllocateAction to fail. |
| true, |
| // do not allow incremental persists to happen until all the rows from this batch |
| // of rows are indexed |
| false |
| ); |
| |
| if (addResult.isOk()) { |
| // If the number of rows in the segment exceeds the threshold after adding a row, |
| // move the segment out from the active segments of BaseAppenderatorDriver to make a new segment. |
| if (addResult.getNumRowsInSegment() > tuningConfig.getMaxRowsPerSegment()) { |
| if (!sequenceToUse.isCheckpointed()) { |
| sequenceToCheckpoint = sequenceToUse; |
| } |
| } |
| isPersistRequired |= addResult.isPersistRequired(); |
| } else { |
| // Failure to allocate segment puts determinism at risk, bail out to be safe. |
| // May want configurable behavior here at some point. |
| // If we allow continuing, then consider blacklisting the interval for a while to avoid constant checks. |
| throw new ISE("Could not allocate segment for row with timestamp[%s]", row.getTimestamp()); |
| } |
| |
| fireDepartmentMetrics.incrementProcessed(); |
| } else { |
| fireDepartmentMetrics.incrementThrownAway(); |
| } |
| } |
| if (isPersistRequired) { |
| Futures.addCallback( |
| driver.persistAsync(committerSupplier.get()), |
| new FutureCallback<Object>() |
| { |
| @Override |
| public void onSuccess(@Nullable Object result) |
| { |
| log.info("Persist completed with metadata [%s]", result); |
| } |
| |
| @Override |
| public void onFailure(Throwable t) |
| { |
| log.error("Persist failed, dying"); |
| throwableAtomicReference.set(t); |
| } |
| } |
| ); |
| } |
| } |
| catch (ParseException e) { |
| if (tuningConfig.isReportParseExceptions()) { |
| throw e; |
| } else { |
| log.debug( |
| e, |
| "Dropping unparseable row from partition[%d] offset[%,d].", |
| record.partition(), |
| record.offset() |
| ); |
| |
| fireDepartmentMetrics.incrementUnparseable(); |
| } |
| } |
| |
| nextOffsets.put(record.partition(), record.offset() + 1); |
| } |
| |
| if (nextOffsets.get(record.partition()).equals(endOffsets.get(record.partition())) |
| && assignment.remove(record.partition())) { |
| log.info("Finished reading topic[%s], partition[%,d].", record.topic(), record.partition()); |
| assignPartitions(consumer, topic, assignment); |
| stillReading = ioConfig.isPauseAfterRead() || !assignment.isEmpty(); |
| } |
| } |
| |
| if (sequenceToCheckpoint != null && !ioConfig.isPauseAfterRead()) { |
| Preconditions.checkArgument( |
| sequences.get(sequences.size() - 1) |
| .getSequenceName() |
| .equals(sequenceToCheckpoint.getSequenceName()), |
| "Cannot checkpoint a sequence [%s] which is not the latest one, sequences %s", |
| sequenceToCheckpoint, |
| sequences |
| ); |
| requestPause(PAUSE_FOREVER); |
| if (!toolbox.getTaskActionClient().submit(new CheckPointDataSourceMetadataAction( |
| getDataSource(), |
| ioConfig.getBaseSequenceName(), |
| new KafkaDataSourceMetadata(new KafkaPartitions(topic, sequenceToCheckpoint.getStartOffsets())), |
| new KafkaDataSourceMetadata(new KafkaPartitions(topic, nextOffsets)) |
| ))) { |
| throw new ISE("Checkpoint request with offsets [%s] failed, dying", nextOffsets); |
| } |
| } |
| } |
| } |
| finally { |
| log.info("Persisting all pending data"); |
| driver.persist(committerSupplier.get()); // persist pending data |
| } |
| |
| synchronized (statusLock) { |
| if (stopRequested.get() && !publishOnStop.get()) { |
| throw new InterruptedException("Stopping without publishing"); |
| } |
| |
| status = Status.PUBLISHING; |
| } |
| |
| for (SequenceMetadata sequenceMetadata : sequences) { |
| if (!publishingSequences.contains(sequenceMetadata.getSequenceName())) { |
| // this is done to prevent checks in sequence specific commit supplier from failing |
| sequenceMetadata.setEndOffsets(nextOffsets); |
| sequenceMetadata.updateAssignments(nextOffsets); |
| publishingSequences.add(sequenceMetadata.getSequenceName()); |
| // persist already done in finally, so directly add to publishQueue |
| publishQueue.add(sequenceMetadata); |
| } |
| } |
| |
| // add Sentinel SequenceMetadata to indicate end of all sequences |
| publishQueue.add(SequenceMetadata.getSentinelSequenceMetadata()); |
| waitForPublishes.await(); |
| checkAndMaybeThrowException(); |
| |
| List<SegmentsAndMetadata> handedOffList = Lists.newArrayList(); |
| if (tuningConfig.getHandoffConditionTimeout() == 0) { |
| handedOffList = Futures.allAsList(handOffWaitList).get(); |
| } else { |
| try { |
| handedOffList = Futures.allAsList(handOffWaitList) |
| .get(tuningConfig.getHandoffConditionTimeout(), TimeUnit.MILLISECONDS); |
| } |
| catch (TimeoutException e) { |
| log.makeAlert("Timed out after [%d] millis waiting for handoffs", tuningConfig.getHandoffConditionTimeout()) |
| .addData("TaskId", this.getId()) |
| .emit(); |
| } |
| } |
| |
| for (SegmentsAndMetadata handedOff : handedOffList) { |
| if (handedOff == null) { |
| log.warn("Handoff failed for segments %s", handedOff.getSegments()); |
| } else { |
| log.info( |
| "Handoff completed for segments[%s] with metadata[%s].", |
| Joiner.on(", ").join( |
| handedOff.getSegments().stream().map(DataSegment::getIdentifier).collect(Collectors.toList()) |
| ), |
| Preconditions.checkNotNull(handedOff.getCommitMetadata(), "commitMetadata") |
| ); |
| } |
| } |
| } |
| catch (InterruptedException | RejectedExecutionException e) { |
| appenderator.closeNow(); |
| // handle the InterruptedException that gets wrapped in a RejectedExecutionException |
| if (e instanceof RejectedExecutionException |
| && (e.getCause() == null || !(e.getCause() instanceof InterruptedException))) { |
| throw e; |
| } |
| |
| // if we were interrupted because we were asked to stop, handle the exception and return success, else rethrow |
| if (!stopRequested.get()) { |
| Thread.currentThread().interrupt(); |
| throw e; |
| } |
| |
| log.info("The task was asked to stop before completing"); |
| } |
| finally { |
| if (appenderator != null) { |
| if (throwableAtomicReference.get() != null) { |
| appenderator.closeNow(); |
| } else { |
| appenderator.close(); |
| } |
| } |
| if (driver != null) { |
| driver.close(); |
| } |
| if (chatHandlerProvider.isPresent()) { |
| chatHandlerProvider.get().unregister(getId()); |
| } |
| |
| if (publishExecService != null) { |
| publishExecService.shutdownNow(); |
| } |
| |
| toolbox.getDruidNodeAnnouncer().unannounce(discoveryDruidNode); |
| toolbox.getDataSegmentServerAnnouncer().unannounce(); |
| } |
| |
| return success(); |
| } |
| |
| private TaskStatus runLegacy(final TaskToolbox toolbox) throws Exception |
| { |
| log.info("Starting up!"); |
| startTime = DateTimes.nowUtc(); |
| status = Status.STARTING; |
| this.toolbox = toolbox; |
| |
| if (chatHandlerProvider.isPresent()) { |
| log.info("Found chat handler of class[%s]", chatHandlerProvider.get().getClass().getName()); |
| chatHandlerProvider.get().register(getId(), this, false); |
| } else { |
| log.warn("No chat handler detected"); |
| } |
| |
| runThread = Thread.currentThread(); |
| |
| // Set up FireDepartmentMetrics |
| final FireDepartment fireDepartmentForMetrics = new FireDepartment( |
| dataSchema, |
| new RealtimeIOConfig(null, null, null), |
| null |
| ); |
| fireDepartmentMetrics = fireDepartmentForMetrics.getMetrics(); |
| toolbox.getMonitorScheduler().addMonitor( |
| new RealtimeMetricsMonitor( |
| ImmutableList.of(fireDepartmentForMetrics), |
| ImmutableMap.of(DruidMetrics.TASK_ID, new String[]{getId()}) |
| ) |
| ); |
| |
| LookupNodeService lookupNodeService = getContextValue(RealtimeIndexTask.CTX_KEY_LOOKUP_TIER) == null ? |
| toolbox.getLookupNodeService() : |
| new LookupNodeService((String) getContextValue(RealtimeIndexTask.CTX_KEY_LOOKUP_TIER)); |
| DiscoveryDruidNode discoveryDruidNode = new DiscoveryDruidNode( |
| toolbox.getDruidNode(), |
| DruidNodeDiscoveryProvider.NODE_TYPE_PEON, |
| ImmutableMap.of( |
| toolbox.getDataNodeService().getName(), toolbox.getDataNodeService(), |
| lookupNodeService.getName(), lookupNodeService |
| ) |
| ); |
| |
| try ( |
| final Appenderator appenderator0 = newAppenderator(fireDepartmentMetrics, toolbox); |
| final StreamAppenderatorDriver driver = newDriver(appenderator0, toolbox, fireDepartmentMetrics); |
| final KafkaConsumer<byte[], byte[]> consumer = newConsumer() |
| ) { |
| toolbox.getDataSegmentServerAnnouncer().announce(); |
| toolbox.getDruidNodeAnnouncer().announce(discoveryDruidNode); |
| |
| appenderator = appenderator0; |
| |
| final String topic = ioConfig.getStartPartitions().getTopic(); |
| |
| // Start up, set up initial offsets. |
| final Object restoredMetadata = driver.startJob(); |
| if (restoredMetadata == null) { |
| nextOffsets.putAll(ioConfig.getStartPartitions().getPartitionOffsetMap()); |
| } else { |
| final Map<String, Object> restoredMetadataMap = (Map) restoredMetadata; |
| final KafkaPartitions restoredNextPartitions = toolbox.getObjectMapper().convertValue( |
| restoredMetadataMap.get(METADATA_NEXT_PARTITIONS), |
| KafkaPartitions.class |
| ); |
| nextOffsets.putAll(restoredNextPartitions.getPartitionOffsetMap()); |
| |
| // Sanity checks. |
| if (!restoredNextPartitions.getTopic().equals(ioConfig.getStartPartitions().getTopic())) { |
| throw new ISE( |
| "WTF?! Restored topic[%s] but expected topic[%s]", |
| restoredNextPartitions.getTopic(), |
| ioConfig.getStartPartitions().getTopic() |
| ); |
| } |
| |
| if (!nextOffsets.keySet().equals(ioConfig.getStartPartitions().getPartitionOffsetMap().keySet())) { |
| throw new ISE( |
| "WTF?! Restored partitions[%s] but expected partitions[%s]", |
| nextOffsets.keySet(), |
| ioConfig.getStartPartitions().getPartitionOffsetMap().keySet() |
| ); |
| } |
| } |
| |
| // Set up sequenceNames. |
| final Map<Integer, String> sequenceNames = Maps.newHashMap(); |
| for (Integer partitionNum : nextOffsets.keySet()) { |
| sequenceNames.put(partitionNum, StringUtils.format("%s_%s", ioConfig.getBaseSequenceName(), partitionNum)); |
| } |
| |
| // Set up committer. |
| final Supplier<Committer> committerSupplier = new Supplier<Committer>() |
| { |
| @Override |
| public Committer get() |
| { |
| final Map<Integer, Long> snapshot = ImmutableMap.copyOf(nextOffsets); |
| |
| return new Committer() |
| { |
| @Override |
| public Object getMetadata() |
| { |
| return ImmutableMap.of( |
| METADATA_NEXT_PARTITIONS, new KafkaPartitions( |
| ioConfig.getStartPartitions().getTopic(), |
| snapshot |
| ) |
| ); |
| } |
| |
| @Override |
| public void run() |
| { |
| // Do nothing. |
| } |
| }; |
| } |
| }; |
| |
| Set<Integer> assignment = assignPartitionsAndSeekToNext(consumer, topic); |
| |
| // Main loop. |
| // Could eventually support leader/follower mode (for keeping replicas more in sync) |
| boolean stillReading = !assignment.isEmpty(); |
| status = Status.READING; |
| try { |
| while (stillReading) { |
| if (possiblyPause(assignment)) { |
| // The partition assignments may have changed while paused by a call to setEndOffsets() so reassign |
| // partitions upon resuming. This is safe even if the end offsets have not been modified. |
| assignment = assignPartitionsAndSeekToNext(consumer, topic); |
| |
| if (assignment.isEmpty()) { |
| log.info("All partitions have been fully read"); |
| publishOnStop.set(true); |
| stopRequested.set(true); |
| } |
| } |
| |
| if (stopRequested.get()) { |
| break; |
| } |
| |
| // The retrying business is because the KafkaConsumer throws OffsetOutOfRangeException if the seeked-to |
| // offset is not present in the topic-partition. This can happen if we're asking a task to read from data |
| // that has not been written yet (which is totally legitimate). So let's wait for it to show up. |
| ConsumerRecords<byte[], byte[]> records = ConsumerRecords.empty(); |
| try { |
| records = consumer.poll(POLL_TIMEOUT); |
| } |
| catch (OffsetOutOfRangeException e) { |
| log.warn("OffsetOutOfRangeException with message [%s]", e.getMessage()); |
| possiblyResetOffsetsOrWait(e.offsetOutOfRangePartitions(), consumer, toolbox); |
| stillReading = ioConfig.isPauseAfterRead() || !assignment.isEmpty(); |
| } |
| |
| for (ConsumerRecord<byte[], byte[]> record : records) { |
| if (log.isTraceEnabled()) { |
| log.trace( |
| "Got topic[%s] partition[%d] offset[%,d].", |
| record.topic(), |
| record.partition(), |
| record.offset() |
| ); |
| } |
| |
| if (record.offset() < endOffsets.get(record.partition())) { |
| if (record.offset() != nextOffsets.get(record.partition())) { |
| if (ioConfig.isSkipOffsetGaps()) { |
| log.warn( |
| "Skipped to offset[%,d] after offset[%,d] in partition[%d].", |
| record.offset(), |
| nextOffsets.get(record.partition()), |
| record.partition() |
| ); |
| } else { |
| throw new ISE( |
| "WTF?! Got offset[%,d] after offset[%,d] in partition[%d].", |
| record.offset(), |
| nextOffsets.get(record.partition()), |
| record.partition() |
| ); |
| } |
| } |
| |
| try { |
| final byte[] valueBytes = record.value(); |
| final List<InputRow> rows = valueBytes == null |
| ? Utils.nullableListOf((InputRow) null) |
| : parser.parseBatch(ByteBuffer.wrap(valueBytes)); |
| boolean isPersistRequired = false; |
| final Map<String, Set<SegmentIdentifier>> segmentsToMoveOut = new HashMap<>(); |
| |
| for (InputRow row : rows) { |
| if (row != null && withinMinMaxRecordTime(row)) { |
| final String sequenceName = sequenceNames.get(record.partition()); |
| final AppenderatorDriverAddResult addResult = driver.add( |
| row, |
| sequenceName, |
| committerSupplier, |
| false, |
| false |
| ); |
| |
| if (addResult.isOk()) { |
| // If the number of rows in the segment exceeds the threshold after adding a row, |
| // move the segment out from the active segments of BaseAppenderatorDriver to make a new segment. |
| if (addResult.getNumRowsInSegment() > tuningConfig.getMaxRowsPerSegment()) { |
| segmentsToMoveOut.computeIfAbsent(sequenceName, k -> new HashSet<>()) |
| .add(addResult.getSegmentIdentifier()); |
| } |
| isPersistRequired |= addResult.isPersistRequired(); |
| } else { |
| // Failure to allocate segment puts determinism at risk, bail out to be safe. |
| // May want configurable behavior here at some point. |
| // If we allow continuing, then consider blacklisting the interval for a while to avoid constant checks. |
| throw new ISE("Could not allocate segment for row with timestamp[%s]", row.getTimestamp()); |
| } |
| fireDepartmentMetrics.incrementProcessed(); |
| } else { |
| fireDepartmentMetrics.incrementThrownAway(); |
| } |
| } |
| if (isPersistRequired) { |
| driver.persist(committerSupplier.get()); |
| } |
| segmentsToMoveOut.entrySet().forEach(sequenceSegments -> driver.moveSegmentOut( |
| sequenceSegments.getKey(), |
| sequenceSegments.getValue().stream().collect(Collectors.toList()) |
| )); |
| } |
| catch (ParseException e) { |
| if (tuningConfig.isReportParseExceptions()) { |
| throw e; |
| } else { |
| log.debug( |
| e, |
| "Dropping unparseable row from partition[%d] offset[%,d].", |
| record.partition(), |
| record.offset() |
| ); |
| |
| fireDepartmentMetrics.incrementUnparseable(); |
| } |
| } |
| |
| nextOffsets.put(record.partition(), record.offset() + 1); |
| } |
| |
| if (nextOffsets.get(record.partition()).equals(endOffsets.get(record.partition())) |
| && assignment.remove(record.partition())) { |
| log.info("Finished reading topic[%s], partition[%,d].", record.topic(), record.partition()); |
| assignPartitions(consumer, topic, assignment); |
| stillReading = ioConfig.isPauseAfterRead() || !assignment.isEmpty(); |
| } |
| } |
| } |
| } |
| finally { |
| driver.persist(committerSupplier.get()); // persist pending data |
| } |
| |
| synchronized (statusLock) { |
| if (stopRequested.get() && !publishOnStop.get()) { |
| throw new InterruptedException("Stopping without publishing"); |
| } |
| |
| status = Status.PUBLISHING; |
| } |
| |
| final TransactionalSegmentPublisher publisher = (segments, commitMetadata) -> { |
| final KafkaPartitions finalPartitions = toolbox.getObjectMapper().convertValue( |
| ((Map) Preconditions.checkNotNull(commitMetadata, "commitMetadata")).get(METADATA_NEXT_PARTITIONS), |
| KafkaPartitions.class |
| ); |
| |
| // Sanity check, we should only be publishing things that match our desired end state. |
| if (!endOffsets.equals(finalPartitions.getPartitionOffsetMap())) { |
| throw new ISE("WTF?! Driver attempted to publish invalid metadata[%s].", commitMetadata); |
| } |
| |
| final SegmentTransactionalInsertAction action; |
| |
| if (ioConfig.isUseTransaction()) { |
| action = new SegmentTransactionalInsertAction( |
| segments, |
| new KafkaDataSourceMetadata(ioConfig.getStartPartitions()), |
| new KafkaDataSourceMetadata(finalPartitions) |
| ); |
| } else { |
| action = new SegmentTransactionalInsertAction(segments, null, null); |
| } |
| |
| log.info("Publishing with isTransaction[%s].", ioConfig.isUseTransaction()); |
| |
| return toolbox.getTaskActionClient().submit(action).isSuccess(); |
| }; |
| |
| // Supervised kafka tasks are killed by KafkaSupervisor if they are stuck during publishing segments or waiting |
| // for hand off. See KafkaSupervisorIOConfig.completionTimeout. |
| final SegmentsAndMetadata published = driver.publish( |
| publisher, |
| committerSupplier.get(), |
| sequenceNames.values() |
| ).get(); |
| |
| final Future<SegmentsAndMetadata> handoffFuture = driver.registerHandoff(published); |
| final SegmentsAndMetadata handedOff; |
| if (tuningConfig.getHandoffConditionTimeout() == 0) { |
| handedOff = handoffFuture.get(); |
| } else { |
| handedOff = handoffFuture.get(tuningConfig.getHandoffConditionTimeout(), TimeUnit.MILLISECONDS); |
| } |
| |
| if (handedOff == null) { |
| throw new ISE("Transaction failure publishing segments, aborting"); |
| } else { |
| log.info( |
| "Published segments[%s] with metadata[%s].", |
| Joiner.on(", ").join( |
| Iterables.transform( |
| handedOff.getSegments(), |
| new Function<DataSegment, String>() |
| { |
| @Override |
| public String apply(DataSegment input) |
| { |
| return input.getIdentifier(); |
| } |
| } |
| ) |
| ), |
| Preconditions.checkNotNull(handedOff.getCommitMetadata(), "commitMetadata") |
| ); |
| } |
| } |
| catch (InterruptedException | RejectedExecutionException e) { |
| // handle the InterruptedException that gets wrapped in a RejectedExecutionException |
| if (e instanceof RejectedExecutionException |
| && (e.getCause() == null || !(e.getCause() instanceof InterruptedException))) { |
| throw e; |
| } |
| |
| // if we were interrupted because we were asked to stop, handle the exception and return success, else rethrow |
| if (!stopRequested.get()) { |
| Thread.currentThread().interrupt(); |
| throw e; |
| } |
| |
| log.info("The task was asked to stop before completing"); |
| } |
| finally { |
| if (chatHandlerProvider.isPresent()) { |
| chatHandlerProvider.get().unregister(getId()); |
| } |
| |
| toolbox.getDruidNodeAnnouncer().unannounce(discoveryDruidNode); |
| toolbox.getDataSegmentServerAnnouncer().unannounce(); |
| } |
| |
| return success(); |
| } |
| |
| private void checkAndMaybeThrowException() |
| { |
| if (throwableAtomicReference.get() != null) { |
| Throwables.propagate(throwableAtomicReference.get()); |
| } |
| } |
| |
| private void maybePersistAndPublishSequences(Supplier<Committer> committerSupplier) |
| throws InterruptedException |
| { |
| for (SequenceMetadata sequenceMetadata : sequences) { |
| sequenceMetadata.updateAssignments(nextOffsets); |
| if (!sequenceMetadata.isOpen() && !publishingSequences.contains(sequenceMetadata.getSequenceName())) { |
| publishingSequences.add(sequenceMetadata.getSequenceName()); |
| try { |
| Object result = driver.persist(committerSupplier.get()); |
| log.info( |
| "Persist completed with results: [%s], adding sequence [%s] to publish queue", |
| result, |
| sequenceMetadata |
| ); |
| publishQueue.add(sequenceMetadata); |
| } |
| catch (InterruptedException e) { |
| log.warn("Interrupted while persisting sequence [%s]", sequenceMetadata); |
| throw e; |
| } |
| } |
| } |
| } |
| |
| private void restoreSequences() throws IOException |
| { |
| Preconditions.checkNotNull(sequencesPersistFile); |
| if (sequencesPersistFile.exists()) { |
| sequences = new CopyOnWriteArrayList<>(toolbox.getObjectMapper().<List<SequenceMetadata>>readValue( |
| sequencesPersistFile, new TypeReference<List<SequenceMetadata>>() |
| { |
| })); |
| } |
| } |
| |
| private synchronized void persistSequences() throws IOException |
| { |
| log.info("Persisting Sequences Metadata [%s]", sequences); |
| toolbox.getObjectMapper().writerWithType( |
| new TypeReference<List<SequenceMetadata>>() |
| { |
| } |
| ).writeValue(sequencesPersistFile, sequences); |
| } |
| |
| @Override |
| public boolean canRestore() |
| { |
| return true; |
| } |
| |
| /** |
| * Authorizes action to be performed on this task's datasource |
| * |
| * @return authorization result |
| */ |
| private Access authorizationCheck(final HttpServletRequest req, Action action) |
| { |
| ResourceAction resourceAction = new ResourceAction( |
| new Resource(dataSchema.getDataSource(), ResourceType.DATASOURCE), |
| action |
| ); |
| |
| Access access = AuthorizationUtils.authorizeResourceAction(req, resourceAction, authorizerMapper); |
| if (!access.isAllowed()) { |
| throw new ForbiddenException(access.toString()); |
| } |
| |
| return access; |
| } |
| |
| @VisibleForTesting |
| Appenderator getAppenderator() |
| { |
| return appenderator; |
| } |
| |
| @Override |
| public void stopGracefully() |
| { |
| log.info("Stopping gracefully (status: [%s])", status); |
| stopRequested.set(true); |
| |
| synchronized (statusLock) { |
| if (status == Status.PUBLISHING) { |
| runThread.interrupt(); |
| return; |
| } |
| } |
| |
| try { |
| if (pauseLock.tryLock(LOCK_ACQUIRE_TIMEOUT_SECONDS, TimeUnit.SECONDS)) { |
| try { |
| if (pauseRequested) { |
| pauseRequested = false; |
| shouldResume.signalAll(); |
| } |
| } |
| finally { |
| pauseLock.unlock(); |
| } |
| } else { |
| log.warn("While stopping: failed to acquire pauseLock before timeout, interrupting run thread"); |
| runThread.interrupt(); |
| return; |
| } |
| |
| if (pollRetryLock.tryLock(LOCK_ACQUIRE_TIMEOUT_SECONDS, TimeUnit.SECONDS)) { |
| try { |
| isAwaitingRetry.signalAll(); |
| } |
| finally { |
| pollRetryLock.unlock(); |
| } |
| } else { |
| log.warn("While stopping: failed to acquire pollRetryLock before timeout, interrupting run thread"); |
| runThread.interrupt(); |
| } |
| } |
| catch (Exception e) { |
| Throwables.propagate(e); |
| } |
| } |
| |
| @Override |
| public <T> QueryRunner<T> getQueryRunner(Query<T> query) |
| { |
| if (appenderator == null) { |
| // Not yet initialized, no data yet, just return a noop runner. |
| return new NoopQueryRunner<>(); |
| } |
| |
| return new QueryRunner<T>() |
| { |
| @Override |
| public Sequence<T> run(final QueryPlus<T> queryPlus, final Map<String, Object> responseContext) |
| { |
| return queryPlus.run(appenderator, responseContext); |
| } |
| }; |
| } |
| |
| @POST |
| @Path("/stop") |
| public Response stop(@Context final HttpServletRequest req) |
| { |
| authorizationCheck(req, Action.WRITE); |
| stopGracefully(); |
| return Response.status(Response.Status.OK).build(); |
| } |
| |
| @GET |
| @Path("/status") |
| @Produces(MediaType.APPLICATION_JSON) |
| public Status getStatusHTTP(@Context final HttpServletRequest req) |
| { |
| authorizationCheck(req, Action.READ); |
| return status; |
| } |
| |
| public Status getStatus() |
| { |
| return status; |
| } |
| |
| @GET |
| @Path("/offsets/current") |
| @Produces(MediaType.APPLICATION_JSON) |
| public Map<Integer, Long> getCurrentOffsets(@Context final HttpServletRequest req) |
| { |
| authorizationCheck(req, Action.READ); |
| return getCurrentOffsets(); |
| } |
| |
| public Map<Integer, Long> getCurrentOffsets() |
| { |
| return nextOffsets; |
| } |
| |
| @GET |
| @Path("/offsets/end") |
| @Produces(MediaType.APPLICATION_JSON) |
| public Map<Integer, Long> getEndOffsetsHTTP(@Context final HttpServletRequest req) |
| { |
| authorizationCheck(req, Action.READ); |
| return getEndOffsets(); |
| } |
| |
| public Map<Integer, Long> getEndOffsets() |
| { |
| return endOffsets; |
| } |
| |
| @POST |
| @Path("/offsets/end") |
| @Consumes(MediaType.APPLICATION_JSON) |
| @Produces(MediaType.APPLICATION_JSON) |
| public Response setEndOffsetsHTTP( |
| Map<Integer, Long> offsets, |
| @QueryParam("resume") @DefaultValue("false") final boolean resume, |
| @QueryParam("finish") @DefaultValue("true") final boolean finish, |
| // this field is only for internal purposes, shouldn't be usually set by users |
| @Context final HttpServletRequest req |
| ) throws InterruptedException |
| { |
| authorizationCheck(req, Action.WRITE); |
| return setEndOffsets(offsets, resume, finish); |
| } |
| |
| public Response setEndOffsets( |
| Map<Integer, Long> offsets, |
| final boolean resume, |
| final boolean finish // this field is only for internal purposes, shouldn't be usually set by users |
| ) throws InterruptedException |
| { |
| // for backwards compatibility, should be removed from versions greater than 0.12.x |
| if (useLegacy) { |
| return setEndOffsetsLegacy(offsets, resume); |
| } |
| |
| if (offsets == null) { |
| return Response.status(Response.Status.BAD_REQUEST) |
| .entity("Request body must contain a map of { partition:endOffset }") |
| .build(); |
| } else if (!endOffsets.keySet().containsAll(offsets.keySet())) { |
| return Response.status(Response.Status.BAD_REQUEST) |
| .entity( |
| StringUtils.format( |
| "Request contains partitions not being handled by this task, my partitions: %s", |
| endOffsets.keySet() |
| ) |
| ) |
| .build(); |
| } else { |
| try { |
| pauseLock.lockInterruptibly(); |
| // Perform all sequence related checks before checking for isPaused() |
| // and after acquiring pauseLock to correctly guard against duplicate requests |
| Preconditions.checkState(sequences.size() > 0, "WTH?! No Sequences found to set end offsets"); |
| |
| final SequenceMetadata latestSequence = sequences.get(sequences.size() - 1); |
| if ((latestSequence.getStartOffsets().equals(offsets) && !finish) || |
| (latestSequence.getEndOffsets().equals(offsets) && finish)) { |
| log.warn("Ignoring duplicate request, end offsets already set for sequences [%s]", sequences); |
| return Response.ok(offsets).build(); |
| } else if (latestSequence.isCheckpointed() && !ioConfig.isPauseAfterRead()) { |
| return Response.status(Response.Status.BAD_REQUEST) |
| .entity(StringUtils.format( |
| "WTH?! Sequence [%s] has already endOffsets set, cannot set to [%s]", |
| latestSequence, |
| offsets |
| )).build(); |
| } else if (!isPaused()) { |
| return Response.status(Response.Status.BAD_REQUEST) |
| .entity("Task must be paused before changing the end offsets") |
| .build(); |
| } |
| |
| for (Map.Entry<Integer, Long> entry : offsets.entrySet()) { |
| if (entry.getValue().compareTo(nextOffsets.get(entry.getKey())) < 0) { |
| return Response.status(Response.Status.BAD_REQUEST) |
| .entity( |
| StringUtils.format( |
| "End offset must be >= current offset for partition [%s] (current: %s)", |
| entry.getKey(), |
| nextOffsets.get(entry.getKey()) |
| ) |
| ) |
| .build(); |
| } |
| } |
| |
| latestSequence.setEndOffsets(offsets); |
| |
| if (finish) { |
| log.info("Updating endOffsets from [%s] to [%s]", endOffsets, offsets); |
| endOffsets.putAll(offsets); |
| } else { |
| Preconditions.checkState(!ioConfig.isPauseAfterRead()); |
| // create new sequence |
| final SequenceMetadata newSequence = new SequenceMetadata( |
| latestSequence.getSequenceId() + 1, |
| StringUtils.format("%s_%d", ioConfig.getBaseSequenceName(), latestSequence.getSequenceId() + 1), |
| offsets, |
| maxEndOffsets, |
| false |
| ); |
| sequences.add(newSequence); |
| } |
| |
| persistSequences(); |
| } |
| catch (Exception e) { |
| log.error(e, "Unable to set end offsets, dying"); |
| throwableAtomicReference.set(e); |
| Throwables.propagate(e); |
| } |
| finally { |
| pauseLock.unlock(); |
| } |
| } |
| |
| if (resume) { |
| resume(); |
| } |
| |
| return Response.ok(offsets).build(); |
| } |
| |
| private Response setEndOffsetsLegacy( |
| Map<Integer, Long> offsets, |
| final boolean resume |
| ) throws InterruptedException |
| { |
| if (offsets == null) { |
| return Response.status(Response.Status.BAD_REQUEST) |
| .entity("Request body must contain a map of { partition:endOffset }") |
| .build(); |
| } else if (!endOffsets.keySet().containsAll(offsets.keySet())) { |
| return Response.status(Response.Status.BAD_REQUEST) |
| .entity( |
| StringUtils.format( |
| "Request contains partitions not being handled by this task, my partitions: %s", |
| endOffsets.keySet() |
| ) |
| ) |
| .build(); |
| } |
| |
| pauseLock.lockInterruptibly(); |
| try { |
| if (!isPaused()) { |
| return Response.status(Response.Status.BAD_REQUEST) |
| .entity("Task must be paused before changing the end offsets") |
| .build(); |
| } |
| |
| for (Map.Entry<Integer, Long> entry : offsets.entrySet()) { |
| if (entry.getValue().compareTo(nextOffsets.get(entry.getKey())) < 0) { |
| return Response.status(Response.Status.BAD_REQUEST) |
| .entity( |
| StringUtils.format( |
| "End offset must be >= current offset for partition [%s] (current: %s)", |
| entry.getKey(), |
| nextOffsets.get(entry.getKey()) |
| ) |
| ) |
| .build(); |
| } |
| } |
| |
| endOffsets.putAll(offsets); |
| log.info("endOffsets changed to %s", endOffsets); |
| } |
| finally { |
| pauseLock.unlock(); |
| } |
| |
| if (resume) { |
| resume(); |
| } |
| |
| return Response.ok(endOffsets).build(); |
| } |
| |
| @GET |
| @Path("/checkpoints") |
| @Produces(MediaType.APPLICATION_JSON) |
| public Map<Integer, Map<Integer, Long>> getCheckpointsHTTP(@Context final HttpServletRequest req) |
| { |
| authorizationCheck(req, Action.READ); |
| return getCheckpoints(); |
| } |
| |
| public Map<Integer, Map<Integer, Long>> getCheckpoints() |
| { |
| TreeMap<Integer, Map<Integer, Long>> result = new TreeMap<>(); |
| result.putAll( |
| sequences.stream().collect(Collectors.toMap(SequenceMetadata::getSequenceId, SequenceMetadata::getStartOffsets)) |
| ); |
| return result; |
| } |
| |
| /** |
| * Signals the ingestion loop to pause. |
| * |
| * @param timeout how long to pause for before resuming in milliseconds, <= 0 means indefinitely |
| * |
| * @return one of the following Responses: 400 Bad Request if the task has started publishing; 202 Accepted if the |
| * method has timed out and returned before the task has paused; 200 OK with a map of the current partition offsets |
| * in the response body if the task successfully paused |
| */ |
| @POST |
| @Path("/pause") |
| @Produces(MediaType.APPLICATION_JSON) |
| public Response pauseHTTP( |
| @QueryParam("timeout") @DefaultValue("0") final long timeout, |
| @Context final HttpServletRequest req |
| ) throws InterruptedException |
| { |
| authorizationCheck(req, Action.WRITE); |
| return pause(timeout); |
| } |
| |
| public Response pause(final long timeout) throws InterruptedException |
| { |
| if (!(status == Status.PAUSED || status == Status.READING)) { |
| return Response.status(Response.Status.BAD_REQUEST) |
| .entity(StringUtils.format("Can't pause, task is not in a pausable state (state: [%s])", status)) |
| .build(); |
| } |
| |
| pauseLock.lockInterruptibly(); |
| try { |
| pauseMillis = timeout <= 0 ? PAUSE_FOREVER : timeout; |
| pauseRequested = true; |
| |
| pollRetryLock.lockInterruptibly(); |
| try { |
| isAwaitingRetry.signalAll(); |
| } |
| finally { |
| pollRetryLock.unlock(); |
| } |
| |
| if (isPaused()) { |
| shouldResume.signalAll(); // kick the monitor so it re-awaits with the new pauseMillis |
| } |
| |
| long nanos = TimeUnit.SECONDS.toNanos(2); |
| while (!isPaused()) { |
| if (nanos <= 0L) { |
| return Response.status(Response.Status.ACCEPTED) |
| .entity("Request accepted but task has not yet paused") |
| .build(); |
| } |
| nanos = hasPaused.awaitNanos(nanos); |
| } |
| } |
| finally { |
| pauseLock.unlock(); |
| } |
| |
| try { |
| return Response.ok().entity(toolbox.getObjectMapper().writeValueAsString(getCurrentOffsets())).build(); |
| } |
| catch (JsonProcessingException e) { |
| throw Throwables.propagate(e); |
| } |
| } |
| |
| @POST |
| @Path("/resume") |
| public Response resumeHTTP(@Context final HttpServletRequest req) throws InterruptedException |
| { |
| authorizationCheck(req, Action.WRITE); |
| resume(); |
| return Response.status(Response.Status.OK).build(); |
| } |
| |
| public void resume() throws InterruptedException |
| { |
| pauseLock.lockInterruptibly(); |
| try { |
| pauseRequested = false; |
| shouldResume.signalAll(); |
| |
| long nanos = TimeUnit.SECONDS.toNanos(5); |
| while (isPaused()) { |
| if (nanos <= 0L) { |
| throw new RuntimeException("Resume command was not accepted within 5 seconds"); |
| } |
| nanos = shouldResume.awaitNanos(nanos); |
| } |
| } |
| finally { |
| pauseLock.unlock(); |
| } |
| } |
| |
| @GET |
| @Path("/time/start") |
| @Produces(MediaType.APPLICATION_JSON) |
| public DateTime getStartTime(@Context final HttpServletRequest req) |
| { |
| authorizationCheck(req, Action.WRITE); |
| return startTime; |
| } |
| |
| @VisibleForTesting |
| FireDepartmentMetrics getFireDepartmentMetrics() |
| { |
| return fireDepartmentMetrics; |
| } |
| |
| private boolean isPaused() |
| { |
| return status == Status.PAUSED; |
| } |
| |
| private void requestPause(long pauseMillis) |
| { |
| this.pauseMillis = pauseMillis; |
| pauseRequested = true; |
| } |
| |
| private Appenderator newAppenderator(FireDepartmentMetrics metrics, TaskToolbox toolbox) |
| { |
| return Appenderators.createRealtime( |
| dataSchema, |
| tuningConfig.withBasePersistDirectory(toolbox.getPersistDir()), |
| metrics, |
| toolbox.getSegmentPusher(), |
| toolbox.getObjectMapper(), |
| toolbox.getIndexIO(), |
| toolbox.getIndexMergerV9(), |
| toolbox.getQueryRunnerFactoryConglomerate(), |
| toolbox.getSegmentAnnouncer(), |
| toolbox.getEmitter(), |
| toolbox.getQueryExecutorService(), |
| toolbox.getCache(), |
| toolbox.getCacheConfig() |
| ); |
| } |
| |
| private StreamAppenderatorDriver newDriver( |
| final Appenderator appenderator, |
| final TaskToolbox toolbox, |
| final FireDepartmentMetrics metrics |
| ) |
| { |
| return new StreamAppenderatorDriver( |
| appenderator, |
| new ActionBasedSegmentAllocator(toolbox.getTaskActionClient(), dataSchema), |
| toolbox.getSegmentHandoffNotifierFactory(), |
| new ActionBasedUsedSegmentChecker(toolbox.getTaskActionClient()), |
| toolbox.getObjectMapper(), |
| metrics |
| ); |
| } |
| |
| private KafkaConsumer<byte[], byte[]> newConsumer() |
| { |
| ClassLoader currCtxCl = Thread.currentThread().getContextClassLoader(); |
| try { |
| Thread.currentThread().setContextClassLoader(getClass().getClassLoader()); |
| |
| final Properties props = new Properties(); |
| |
| for (Map.Entry<String, String> entry : ioConfig.getConsumerProperties().entrySet()) { |
| props.setProperty(entry.getKey(), entry.getValue()); |
| } |
| |
| props.setProperty("enable.auto.commit", "false"); |
| props.setProperty("auto.offset.reset", "none"); |
| props.setProperty("key.deserializer", ByteArrayDeserializer.class.getName()); |
| props.setProperty("value.deserializer", ByteArrayDeserializer.class.getName()); |
| |
| return new KafkaConsumer<>(props); |
| } |
| finally { |
| Thread.currentThread().setContextClassLoader(currCtxCl); |
| } |
| } |
| |
| private static void assignPartitions( |
| final KafkaConsumer consumer, |
| final String topic, |
| final Set<Integer> partitions |
| ) |
| { |
| consumer.assign( |
| Lists.newArrayList( |
| partitions.stream().map(n -> new TopicPartition(topic, n)).collect(Collectors.toList()) |
| ) |
| ); |
| } |
| |
| private Set<Integer> assignPartitionsAndSeekToNext(KafkaConsumer consumer, String topic) |
| { |
| // Initialize consumer assignment. |
| final Set<Integer> assignment = Sets.newHashSet(); |
| for (Map.Entry<Integer, Long> entry : nextOffsets.entrySet()) { |
| final long endOffset = endOffsets.get(entry.getKey()); |
| if (entry.getValue() < endOffset) { |
| assignment.add(entry.getKey()); |
| } else if (entry.getValue() == endOffset) { |
| log.info("Finished reading partition[%d].", entry.getKey()); |
| } else { |
| throw new ISE( |
| "WTF?! Cannot start from offset[%,d] > endOffset[%,d]", |
| entry.getValue(), |
| endOffset |
| ); |
| } |
| } |
| |
| assignPartitions(consumer, topic, assignment); |
| |
| // Seek to starting offsets. |
| for (final int partition : assignment) { |
| final long offset = nextOffsets.get(partition); |
| log.info("Seeking partition[%d] to offset[%,d].", partition, offset); |
| consumer.seek(new TopicPartition(topic, partition), offset); |
| } |
| |
| return assignment; |
| } |
| |
| /** |
| * Checks if the pauseRequested flag was set and if so blocks: |
| * a) if pauseMillis == PAUSE_FOREVER, until pauseRequested is cleared |
| * b) if pauseMillis != PAUSE_FOREVER, until pauseMillis elapses -or- pauseRequested is cleared |
| * <p/> |
| * If pauseMillis is changed while paused, the new pause timeout will be applied. This allows adjustment of the |
| * pause timeout (making a timed pause into an indefinite pause and vice versa is valid) without having to resume |
| * and ensures that the loop continues to stay paused without ingesting any new events. You will need to signal |
| * shouldResume after adjusting pauseMillis for the new value to take effect. |
| * <p/> |
| * Sets paused = true and signals paused so callers can be notified when the pause command has been accepted. |
| * <p/> |
| * Additionally, pauses if all partitions assignments have been read and pauseAfterRead flag is set. |
| * |
| * @return true if a pause request was handled, false otherwise |
| */ |
| private boolean possiblyPause(Set<Integer> assignment) throws InterruptedException |
| { |
| pauseLock.lockInterruptibly(); |
| try { |
| if (ioConfig.isPauseAfterRead() && assignment.isEmpty()) { |
| pauseMillis = PAUSE_FOREVER; |
| pauseRequested = true; |
| } |
| |
| if (pauseRequested) { |
| status = Status.PAUSED; |
| long nanos = 0; |
| hasPaused.signalAll(); |
| |
| while (pauseRequested) { |
| if (pauseMillis == PAUSE_FOREVER) { |
| log.info("Pausing ingestion until resumed"); |
| shouldResume.await(); |
| } else { |
| if (pauseMillis > 0) { |
| log.info("Pausing ingestion for [%,d] ms", pauseMillis); |
| nanos = TimeUnit.MILLISECONDS.toNanos(pauseMillis); |
| pauseMillis = 0; |
| } |
| if (nanos <= 0L) { |
| pauseRequested = false; // timeout elapsed |
| } |
| nanos = shouldResume.awaitNanos(nanos); |
| } |
| } |
| |
| status = Status.READING; |
| shouldResume.signalAll(); |
| log.info("Ingestion loop resumed"); |
| return true; |
| } |
| } |
| finally { |
| pauseLock.unlock(); |
| } |
| |
| return false; |
| } |
| |
| private void possiblyResetOffsetsOrWait( |
| Map<TopicPartition, Long> outOfRangePartitions, |
| KafkaConsumer<byte[], byte[]> consumer, |
| TaskToolbox taskToolbox |
| ) throws InterruptedException, IOException |
| { |
| final Map<TopicPartition, Long> resetPartitions = Maps.newHashMap(); |
| boolean doReset = false; |
| if (tuningConfig.isResetOffsetAutomatically()) { |
| for (Map.Entry<TopicPartition, Long> outOfRangePartition : outOfRangePartitions.entrySet()) { |
| final TopicPartition topicPartition = outOfRangePartition.getKey(); |
| final long nextOffset = outOfRangePartition.getValue(); |
| // seek to the beginning to get the least available offset |
| consumer.seekToBeginning(Collections.singletonList(topicPartition)); |
| final long leastAvailableOffset = consumer.position(topicPartition); |
| // reset the seek |
| consumer.seek(topicPartition, nextOffset); |
| // Reset consumer offset if resetOffsetAutomatically is set to true |
| // and the current message offset in the kafka partition is more than the |
| // next message offset that we are trying to fetch |
| if (leastAvailableOffset > nextOffset) { |
| doReset = true; |
| resetPartitions.put(topicPartition, nextOffset); |
| } |
| } |
| } |
| |
| if (doReset) { |
| sendResetRequestAndWait(resetPartitions, taskToolbox); |
| } else { |
| log.warn("Retrying in %dms", pollRetryMs); |
| pollRetryLock.lockInterruptibly(); |
| try { |
| long nanos = TimeUnit.MILLISECONDS.toNanos(pollRetryMs); |
| while (nanos > 0L && !pauseRequested && !stopRequested.get()) { |
| nanos = isAwaitingRetry.awaitNanos(nanos); |
| } |
| } |
| finally { |
| pollRetryLock.unlock(); |
| } |
| } |
| } |
| |
| private void sendResetRequestAndWait(Map<TopicPartition, Long> outOfRangePartitions, TaskToolbox taskToolbox) |
| throws IOException |
| { |
| Map<Integer, Long> partitionOffsetMap = Maps.newHashMap(); |
| for (Map.Entry<TopicPartition, Long> outOfRangePartition : outOfRangePartitions.entrySet()) { |
| partitionOffsetMap.put(outOfRangePartition.getKey().partition(), outOfRangePartition.getValue()); |
| } |
| boolean result = taskToolbox.getTaskActionClient() |
| .submit(new ResetDataSourceMetadataAction( |
| getDataSource(), |
| new KafkaDataSourceMetadata(new KafkaPartitions( |
| ioConfig.getStartPartitions() |
| .getTopic(), |
| partitionOffsetMap |
| )) |
| )); |
| |
| if (result) { |
| log.makeAlert("Resetting Kafka offsets for datasource [%s]", getDataSource()) |
| .addData("partitions", partitionOffsetMap.keySet()) |
| .emit(); |
| // wait for being killed by supervisor |
| requestPause(PAUSE_FOREVER); |
| } else { |
| log.makeAlert("Failed to send reset request for partitions [%s]", partitionOffsetMap.keySet()).emit(); |
| } |
| } |
| |
| private boolean withinMinMaxRecordTime(final InputRow row) |
| { |
| final boolean beforeMinimumMessageTime = ioConfig.getMinimumMessageTime().isPresent() |
| && ioConfig.getMinimumMessageTime().get().isAfter(row.getTimestamp()); |
| |
| final boolean afterMaximumMessageTime = ioConfig.getMaximumMessageTime().isPresent() |
| && ioConfig.getMaximumMessageTime().get().isBefore(row.getTimestamp()); |
| |
| if (!Intervals.ETERNITY.contains(row.getTimestamp())) { |
| final String errorMsg = StringUtils.format( |
| "Encountered row with timestamp that cannot be represented as a long: [%s]", |
| row |
| ); |
| log.debug(errorMsg); |
| if (tuningConfig.isReportParseExceptions()) { |
| throw new ParseException(errorMsg); |
| } else { |
| return false; |
| } |
| } |
| |
| if (log.isDebugEnabled()) { |
| if (beforeMinimumMessageTime) { |
| log.debug( |
| "CurrentTimeStamp[%s] is before MinimumMessageTime[%s]", |
| row.getTimestamp(), |
| ioConfig.getMinimumMessageTime().get() |
| ); |
| } else if (afterMaximumMessageTime) { |
| log.debug( |
| "CurrentTimeStamp[%s] is after MaximumMessageTime[%s]", |
| row.getTimestamp(), |
| ioConfig.getMaximumMessageTime().get() |
| ); |
| } |
| } |
| |
| return !beforeMinimumMessageTime && !afterMaximumMessageTime; |
| } |
| |
| private static class SequenceMetadata |
| { |
| private final int sequenceId; |
| private final String sequenceName; |
| private final Map<Integer, Long> startOffsets; |
| private final Map<Integer, Long> endOffsets; |
| private final Set<Integer> assignments; |
| private final boolean sentinel; |
| private volatile boolean checkpointed; |
| |
| @JsonCreator |
| public SequenceMetadata( |
| @JsonProperty("sequenceId") int sequenceId, |
| @JsonProperty("sequenceName") String sequenceName, |
| @JsonProperty("startOffsets") Map<Integer, Long> startOffsets, |
| @JsonProperty("endOffsets") Map<Integer, Long> endOffsets, |
| @JsonProperty("checkpointed") boolean checkpointed |
| ) |
| { |
| Preconditions.checkNotNull(sequenceName); |
| Preconditions.checkNotNull(startOffsets); |
| Preconditions.checkNotNull(endOffsets); |
| this.sequenceId = sequenceId; |
| this.sequenceName = sequenceName; |
| this.startOffsets = ImmutableMap.copyOf(startOffsets); |
| this.endOffsets = Maps.newHashMap(endOffsets); |
| this.assignments = Sets.newHashSet(startOffsets.keySet()); |
| this.checkpointed = checkpointed; |
| this.sentinel = false; |
| } |
| |
| @JsonProperty |
| public int getSequenceId() |
| { |
| return sequenceId; |
| } |
| |
| @JsonProperty |
| public boolean isCheckpointed() |
| { |
| return checkpointed; |
| } |
| |
| @JsonProperty |
| public String getSequenceName() |
| { |
| return sequenceName; |
| } |
| |
| @JsonProperty |
| public Map<Integer, Long> getStartOffsets() |
| { |
| return startOffsets; |
| } |
| |
| @JsonProperty |
| public Map<Integer, Long> getEndOffsets() |
| { |
| return endOffsets; |
| } |
| |
| @JsonProperty |
| public boolean isSentinel() |
| { |
| return sentinel; |
| } |
| |
| public void setEndOffsets(Map<Integer, Long> newEndOffsets) |
| { |
| endOffsets.putAll(newEndOffsets); |
| checkpointed = true; |
| } |
| |
| public void updateAssignments(Map<Integer, Long> nextPartitionOffset) |
| { |
| assignments.clear(); |
| nextPartitionOffset.entrySet().forEach(partitionOffset -> { |
| if (Longs.compare(endOffsets.get(partitionOffset.getKey()), nextPartitionOffset.get(partitionOffset.getKey())) |
| > 0) { |
| assignments.add(partitionOffset.getKey()); |
| } |
| }); |
| } |
| |
| public boolean isOpen() |
| { |
| return !assignments.isEmpty(); |
| } |
| |
| boolean canHandle(ConsumerRecord<byte[], byte[]> record) |
| { |
| return isOpen() |
| && endOffsets.get(record.partition()) != null |
| && record.offset() >= startOffsets.get(record.partition()) |
| && record.offset() < endOffsets.get(record.partition()); |
| } |
| |
| private SequenceMetadata() |
| { |
| this.sequenceId = -1; |
| this.sequenceName = null; |
| this.startOffsets = null; |
| this.endOffsets = null; |
| this.assignments = null; |
| this.checkpointed = true; |
| this.sentinel = true; |
| } |
| |
| public static SequenceMetadata getSentinelSequenceMetadata() |
| { |
| return new SequenceMetadata(); |
| } |
| |
| @Override |
| public String toString() |
| { |
| return "SequenceMetadata{" + |
| "sequenceName='" + sequenceName + '\'' + |
| ", sequenceId=" + sequenceId + |
| ", startOffsets=" + startOffsets + |
| ", endOffsets=" + endOffsets + |
| ", assignments=" + assignments + |
| ", sentinel=" + sentinel + |
| ", checkpointed=" + checkpointed + |
| '}'; |
| } |
| |
| |
| public Supplier<Committer> getCommitterSupplier(String topic, Map<Integer, Long> lastPersistedOffsets) |
| { |
| // Set up committer. |
| return () -> |
| new Committer() |
| { |
| @Override |
| public Object getMetadata() |
| { |
| Preconditions.checkState( |
| assignments.isEmpty(), |
| "This committer can be used only once all the records till offsets [%s] have been consumed, also make sure to call updateAssignments before using this committer", |
| endOffsets |
| ); |
| |
| // merge endOffsets for this sequence with globally lastPersistedOffsets |
| // This is done because this committer would be persisting only sub set of segments |
| // corresponding to the current sequence. Generally, lastPersistedOffsets should already |
| // cover endOffsets but just to be sure take max of offsets and persist that |
| for (Map.Entry<Integer, Long> partitionOffset : endOffsets.entrySet()) { |
| lastPersistedOffsets.put(partitionOffset.getKey(), Math.max( |
| partitionOffset.getValue(), |
| lastPersistedOffsets.getOrDefault(partitionOffset.getKey(), 0L) |
| )); |
| } |
| |
| // Publish metadata can be different from persist metadata as we are going to publish only |
| // subset of segments |
| return ImmutableMap.of(METADATA_NEXT_PARTITIONS, new KafkaPartitions(topic, lastPersistedOffsets), |
| METADATA_PUBLISH_PARTITIONS, new KafkaPartitions(topic, endOffsets) |
| ); |
| } |
| |
| @Override |
| public void run() |
| { |
| // Do nothing. |
| } |
| }; |
| } |
| |
| public TransactionalSegmentPublisher getPublisher(TaskToolbox toolbox, boolean useTransaction) |
| { |
| return (segments, commitMetadata) -> { |
| final KafkaPartitions finalPartitions = toolbox.getObjectMapper().convertValue( |
| ((Map) Preconditions.checkNotNull(commitMetadata, "commitMetadata")).get(METADATA_PUBLISH_PARTITIONS), |
| KafkaPartitions.class |
| ); |
| |
| // Sanity check, we should only be publishing things that match our desired end state. |
| if (!getEndOffsets().equals(finalPartitions.getPartitionOffsetMap())) { |
| throw new ISE( |
| "WTF?! Driver for sequence [%s], attempted to publish invalid metadata[%s].", |
| toString(), |
| commitMetadata |
| ); |
| } |
| |
| final SegmentTransactionalInsertAction action; |
| |
| if (useTransaction) { |
| action = new SegmentTransactionalInsertAction( |
| segments, |
| new KafkaDataSourceMetadata(new KafkaPartitions(finalPartitions.getTopic(), getStartOffsets())), |
| new KafkaDataSourceMetadata(finalPartitions) |
| ); |
| } else { |
| action = new SegmentTransactionalInsertAction(segments, null, null); |
| } |
| |
| log.info("Publishing with isTransaction[%s].", useTransaction); |
| |
| return toolbox.getTaskActionClient().submit(action).isSuccess(); |
| }; |
| } |
| } |
| } |