| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| package org.apache.druid.segment.realtime.appenderator; |
| |
| import com.fasterxml.jackson.databind.ObjectMapper; |
| import com.google.common.annotations.VisibleForTesting; |
| import com.google.common.base.Preconditions; |
| import com.google.common.base.Stopwatch; |
| import com.google.common.base.Supplier; |
| import com.google.common.collect.ImmutableList; |
| import com.google.common.collect.Iterables; |
| import com.google.common.collect.Lists; |
| import com.google.common.primitives.Ints; |
| import com.google.common.util.concurrent.Futures; |
| import com.google.common.util.concurrent.ListenableFuture; |
| import org.apache.druid.data.input.Committer; |
| import org.apache.druid.data.input.InputRow; |
| import org.apache.druid.java.util.common.FileUtils; |
| import org.apache.druid.java.util.common.IAE; |
| import org.apache.druid.java.util.common.ISE; |
| import org.apache.druid.java.util.common.Pair; |
| import org.apache.druid.java.util.common.RetryUtils; |
| import org.apache.druid.java.util.common.StringUtils; |
| import org.apache.druid.java.util.common.io.Closer; |
| import org.apache.druid.java.util.emitter.EmittingLogger; |
| import org.apache.druid.query.Query; |
| import org.apache.druid.query.QueryRunner; |
| import org.apache.druid.query.SegmentDescriptor; |
| import org.apache.druid.segment.IndexIO; |
| import org.apache.druid.segment.IndexMerger; |
| import org.apache.druid.segment.QueryableIndex; |
| import org.apache.druid.segment.QueryableIndexSegment; |
| import org.apache.druid.segment.ReferenceCountingSegment; |
| import org.apache.druid.segment.incremental.IncrementalIndexAddResult; |
| import org.apache.druid.segment.incremental.IndexSizeExceededException; |
| import org.apache.druid.segment.incremental.ParseExceptionHandler; |
| import org.apache.druid.segment.incremental.RowIngestionMeters; |
| import org.apache.druid.segment.indexing.DataSchema; |
| import org.apache.druid.segment.loading.DataSegmentPusher; |
| import org.apache.druid.segment.realtime.FireDepartmentMetrics; |
| import org.apache.druid.segment.realtime.FireHydrant; |
| import org.apache.druid.segment.realtime.plumber.Sink; |
| import org.apache.druid.timeline.DataSegment; |
| import org.joda.time.Interval; |
| |
| import javax.annotation.Nullable; |
| import javax.annotation.concurrent.NotThreadSafe; |
| import java.io.Closeable; |
| import java.io.File; |
| import java.io.IOException; |
| import java.nio.channels.FileChannel; |
| import java.nio.channels.FileLock; |
| import java.nio.file.StandardOpenOption; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Collection; |
| import java.util.HashMap; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| import java.util.stream.Collectors; |
| |
| /** |
| * This is a new class produced when the old {@code AppenderatorImpl} was split. For historical |
| * reasons, the code for creating segments was all handled by the same code path in that class. The code |
| * was correct but inefficient for batch ingestion from a memory perspective. If the input file being processed |
| * by batch ingestion had enough sinks & hydrants produced then it may run out of memory either in the |
| * hydrant creation phase (append) of this class or in the merge hydrants phase. Therefore, a new class, |
| * {@code BatchAppenderator}, this class, was created to specialize in batch ingestion and the old class |
| * for stream ingestion was renamed to {@link StreamAppenderator}. |
| * <p> |
| * This class is not thread safe!. |
| * It is important to realize that this class is completely synchronous despite the {@link Appenderator} |
| * interface suggesting otherwise. The concurrency was not required so it has been completely removed. |
| */ |
| @NotThreadSafe |
| public class BatchAppenderator implements Appenderator |
| { |
| public static final int ROUGH_OVERHEAD_PER_SINK = 5000; |
| // Rough estimate of memory footprint of empty FireHydrant based on actual heap dumps |
| public static final int ROUGH_OVERHEAD_PER_HYDRANT = 1000; |
| |
| private static final EmittingLogger log = new EmittingLogger(BatchAppenderator.class); |
| private static final String IDENTIFIER_FILE_NAME = "identifier.json"; |
| |
| private final String myId; |
| private final DataSchema schema; |
| private final AppenderatorConfig tuningConfig; |
| private final FireDepartmentMetrics metrics; |
| private final DataSegmentPusher dataSegmentPusher; |
| private final ObjectMapper objectMapper; |
| private final IndexIO indexIO; |
| private final IndexMerger indexMerger; |
| private final Map<SegmentIdWithShardSpec, Sink> sinks = new HashMap<>(); |
| private final long maxBytesTuningConfig; |
| private final boolean skipBytesInMemoryOverheadCheck; |
| |
| /** |
| * The following sinks metadata map and associated class are the way to retain metadata now that sinks |
| * are being completely removed from memory after each incremental persist. |
| */ |
| private final Map<SegmentIdWithShardSpec, SinkMetadata> sinksMetadata = new HashMap<>(); |
| |
| // This variable updated in add(), persist(), and drop() |
| private int rowsCurrentlyInMemory = 0; |
| private int totalRows = 0; |
| private long bytesCurrentlyInMemory = 0; |
| private final RowIngestionMeters rowIngestionMeters; |
| private final ParseExceptionHandler parseExceptionHandler; |
| |
| private final AtomicBoolean closed = new AtomicBoolean(false); |
| |
| private volatile FileLock basePersistDirLock = null; |
| private volatile FileChannel basePersistDirLockChannel = null; |
| |
| BatchAppenderator( |
| String id, |
| DataSchema schema, |
| AppenderatorConfig tuningConfig, |
| FireDepartmentMetrics metrics, |
| DataSegmentPusher dataSegmentPusher, |
| ObjectMapper objectMapper, |
| IndexIO indexIO, |
| IndexMerger indexMerger, |
| RowIngestionMeters rowIngestionMeters, |
| ParseExceptionHandler parseExceptionHandler |
| ) |
| { |
| this.myId = id; |
| this.schema = Preconditions.checkNotNull(schema, "schema"); |
| this.tuningConfig = Preconditions.checkNotNull(tuningConfig, "tuningConfig"); |
| this.metrics = Preconditions.checkNotNull(metrics, "metrics"); |
| this.dataSegmentPusher = Preconditions.checkNotNull(dataSegmentPusher, "dataSegmentPusher"); |
| this.objectMapper = Preconditions.checkNotNull(objectMapper, "objectMapper"); |
| this.indexIO = Preconditions.checkNotNull(indexIO, "indexIO"); |
| this.indexMerger = Preconditions.checkNotNull(indexMerger, "indexMerger"); |
| this.rowIngestionMeters = Preconditions.checkNotNull(rowIngestionMeters, "rowIngestionMeters"); |
| this.parseExceptionHandler = Preconditions.checkNotNull(parseExceptionHandler, "parseExceptionHandler"); |
| |
| maxBytesTuningConfig = tuningConfig.getMaxBytesInMemoryOrDefault(); |
| skipBytesInMemoryOverheadCheck = tuningConfig.isSkipBytesInMemoryOverheadCheck(); |
| } |
| |
| @Override |
| public String getId() |
| { |
| return myId; |
| } |
| |
| @Override |
| public String getDataSource() |
| { |
| return schema.getDataSource(); |
| } |
| |
| @Override |
| public Object startJob() |
| { |
| tuningConfig.getBasePersistDirectory().mkdirs(); |
| lockBasePersistDirectory(); |
| return null; |
| } |
| |
| @Override |
| public AppenderatorAddResult add( |
| final SegmentIdWithShardSpec identifier, |
| final InputRow row, |
| @Nullable final Supplier<Committer> committerSupplier, |
| final boolean allowIncrementalPersists |
| ) throws IndexSizeExceededException, SegmentNotWritableException |
| { |
| |
| Preconditions.checkArgument( |
| committerSupplier == null, |
| "Batch appenderator does not need a committer!" |
| ); |
| |
| Preconditions.checkArgument( |
| allowIncrementalPersists, |
| "Batch appenderator should always allow incremental persists!" |
| ); |
| |
| if (!identifier.getDataSource().equals(schema.getDataSource())) { |
| throw new IAE( |
| "Expected dataSource[%s] but was asked to insert row for dataSource[%s]?!", |
| schema.getDataSource(), |
| identifier.getDataSource() |
| ); |
| } |
| |
| final Sink sink = getOrCreateSink(identifier); |
| metrics.reportMessageMaxTimestamp(row.getTimestampFromEpoch()); |
| final int sinkRowsInMemoryBeforeAdd = sink.getNumRowsInMemory(); |
| final int sinkRowsInMemoryAfterAdd; |
| final long bytesInMemoryBeforeAdd = sink.getBytesInMemory(); |
| final long bytesInMemoryAfterAdd; |
| final IncrementalIndexAddResult addResult; |
| |
| try { |
| addResult = sink.add(row, false); // allow incrememtal persis is always true for batch |
| sinkRowsInMemoryAfterAdd = addResult.getRowCount(); |
| bytesInMemoryAfterAdd = addResult.getBytesInMemory(); |
| } |
| catch (IndexSizeExceededException e) { |
| // Uh oh, we can't do anything about this! We can't persist (commit metadata would be out of sync) and we |
| // can't add the row (it just failed). This should never actually happen, though, because we check |
| // sink.canAddRow after returning from add. |
| log.error(e, "Sink for segment[%s] was unexpectedly full!", identifier); |
| throw e; |
| } |
| |
| if (sinkRowsInMemoryAfterAdd < 0) { |
| throw new SegmentNotWritableException("Attempt to add row to swapped-out sink for segment[%s].", identifier); |
| } |
| |
| if (addResult.isRowAdded()) { |
| rowIngestionMeters.incrementProcessed(); |
| } else if (addResult.hasParseException()) { |
| parseExceptionHandler.handle(addResult.getParseException()); |
| } |
| |
| final int numAddedRows = sinkRowsInMemoryAfterAdd - sinkRowsInMemoryBeforeAdd; |
| rowsCurrentlyInMemory += numAddedRows; |
| bytesCurrentlyInMemory += bytesInMemoryAfterAdd - bytesInMemoryBeforeAdd; |
| totalRows += numAddedRows; |
| sinksMetadata.computeIfAbsent(identifier, unused -> new SinkMetadata()).addRows(numAddedRows); |
| |
| boolean persist = false; |
| List<String> persistReasons = new ArrayList<>(); |
| |
| if (!sink.canAppendRow()) { |
| persist = true; |
| persistReasons.add("No more rows can be appended to sink"); |
| } |
| if (rowsCurrentlyInMemory >= tuningConfig.getMaxRowsInMemory()) { |
| persist = true; |
| persistReasons.add(StringUtils.format( |
| "rowsCurrentlyInMemory[%d] is greater than maxRowsInMemory[%d]", |
| rowsCurrentlyInMemory, |
| tuningConfig.getMaxRowsInMemory() |
| )); |
| } |
| if (bytesCurrentlyInMemory >= maxBytesTuningConfig) { |
| persist = true; |
| persistReasons.add(StringUtils.format( |
| "bytesCurrentlyInMemory[%d] is greater than maxBytesInMemory[%d]", |
| bytesCurrentlyInMemory, |
| maxBytesTuningConfig |
| )); |
| } |
| if (persist) { |
| // persistAll clears rowsCurrentlyInMemory, no need to update it. |
| log.info("Incremental persist to disk because %s.", String.join(",", persistReasons)); |
| |
| long bytesToBePersisted = 0L; |
| for (Map.Entry<SegmentIdWithShardSpec, Sink> entry : sinks.entrySet()) { |
| final Sink sinkEntry = entry.getValue(); |
| if (sinkEntry != null) { |
| bytesToBePersisted += sinkEntry.getBytesInMemory(); |
| if (sinkEntry.swappable()) { |
| // Code for batch no longer memory maps hydrants but they still take memory... |
| int memoryStillInUse = calculateMemoryUsedByHydrant(); |
| bytesCurrentlyInMemory += memoryStillInUse; |
| } |
| } |
| } |
| |
| if (!skipBytesInMemoryOverheadCheck |
| && bytesCurrentlyInMemory - bytesToBePersisted > maxBytesTuningConfig) { |
| // We are still over maxBytesTuningConfig even after persisting. |
| // This means that we ran out of all available memory to ingest (due to overheads created as part of ingestion) |
| final String alertMessage = StringUtils.format( |
| "Task has exceeded safe estimated heap usage limits, failing " |
| + "(numSinks: [%d] numHydrantsAcrossAllSinks: [%d] totalRows: [%d])" |
| + "(bytesCurrentlyInMemory: [%d] - bytesToBePersisted: [%d] > maxBytesTuningConfig: [%d])", |
| sinks.size(), |
| sinks.values().stream().mapToInt(Iterables::size).sum(), |
| getTotalRowCount(), |
| bytesCurrentlyInMemory, |
| bytesToBePersisted, |
| maxBytesTuningConfig |
| ); |
| final String errorMessage = StringUtils.format( |
| "%s.\nThis can occur when the overhead from too many intermediary segment persists becomes to " |
| + "great to have enough space to process additional input rows. This check, along with metering the overhead " |
| + "of these objects to factor into the 'maxBytesInMemory' computation, can be disabled by setting " |
| + "'skipBytesInMemoryOverheadCheck' to 'true' (note that doing so might allow the task to naturally encounter " |
| + "a 'java.lang.OutOfMemoryError'). Alternatively, 'maxBytesInMemory' can be increased which will cause an " |
| + "increase in heap footprint, but will allow for more intermediary segment persists to occur before " |
| + "reaching this condition.", |
| alertMessage |
| ); |
| log.makeAlert(alertMessage) |
| .addData("dataSource", schema.getDataSource()) |
| .emit(); |
| throw new RuntimeException(errorMessage); |
| } |
| |
| persistAllAndRemoveSinks(); |
| |
| } |
| return new AppenderatorAddResult(identifier, sinksMetadata.get(identifier).numRowsInSegment, false); |
| } |
| |
| /** |
| * Returns all active segments regardless whether they are in memory or persisted |
| */ |
| @Override |
| public List<SegmentIdWithShardSpec> getSegments() |
| { |
| return ImmutableList.copyOf(sinksMetadata.keySet()); |
| } |
| |
| @VisibleForTesting |
| public List<SegmentIdWithShardSpec> getInMemorySegments() |
| { |
| return ImmutableList.copyOf(sinks.keySet()); |
| } |
| |
| @Override |
| public int getRowCount(final SegmentIdWithShardSpec identifier) |
| { |
| return sinksMetadata.get(identifier).getNumRowsInSegment(); |
| } |
| |
| @Override |
| public int getTotalRowCount() |
| { |
| return totalRows; |
| } |
| |
| @VisibleForTesting |
| public int getRowsInMemory() |
| { |
| return rowsCurrentlyInMemory; |
| } |
| |
| @VisibleForTesting |
| public long getBytesCurrentlyInMemory() |
| { |
| return bytesCurrentlyInMemory; |
| } |
| |
| @VisibleForTesting |
| public long getBytesInMemory(SegmentIdWithShardSpec identifier) |
| { |
| final Sink sink = sinks.get(identifier); |
| |
| if (sink == null) { |
| return 0L; // sinks are removed after a persist |
| } else { |
| return sink.getBytesInMemory(); |
| } |
| } |
| |
| private Sink getOrCreateSink(final SegmentIdWithShardSpec identifier) |
| { |
| Sink retVal = sinks.get(identifier); |
| |
| if (retVal == null) { |
| retVal = new Sink( |
| identifier.getInterval(), |
| schema, |
| identifier.getShardSpec(), |
| identifier.getVersion(), |
| tuningConfig.getAppendableIndexSpec(), |
| tuningConfig.getMaxRowsInMemory(), |
| maxBytesTuningConfig, |
| null |
| ); |
| bytesCurrentlyInMemory += calculateSinkMemoryInUsed(); |
| |
| sinks.put(identifier, retVal); |
| metrics.setSinkCount(sinks.size()); |
| } |
| |
| return retVal; |
| } |
| |
| @Override |
| public <T> QueryRunner<T> getQueryRunnerForIntervals(final Query<T> query, final Iterable<Interval> intervals) |
| { |
| throw new UnsupportedOperationException("No query runner for batch appenderator"); |
| } |
| |
| @Override |
| public <T> QueryRunner<T> getQueryRunnerForSegments(final Query<T> query, final Iterable<SegmentDescriptor> specs) |
| { |
| throw new UnsupportedOperationException("No query runner for batch appenderator"); |
| } |
| |
| @Override |
| public void clear() |
| { |
| clear(true); |
| } |
| |
| private void clear(boolean removeOnDiskData) |
| { |
| // Drop commit metadata, then abandon all segments. |
| log.info("Clearing all[%d] sinks & their hydrants, removing data on disk: [%s]", sinks.size(), removeOnDiskData); |
| // Drop everything. |
| Iterator<Map.Entry<SegmentIdWithShardSpec, Sink>> sinksIterator = sinks.entrySet().iterator(); |
| sinksIterator.forEachRemaining(entry -> { |
| clearSinkMetadata(entry.getKey(), entry.getValue(), removeOnDiskData); |
| sinksIterator.remove(); |
| }); |
| metrics.setSinkCount(sinks.size()); |
| } |
| |
| @Override |
| public ListenableFuture<?> drop(final SegmentIdWithShardSpec identifier) |
| { |
| final Sink sink = sinks.get(identifier); |
| SinkMetadata sm = sinksMetadata.remove(identifier); |
| if (sm != null) { |
| int originalTotalRows = getTotalRowCount(); |
| int rowsToDrop = sm.getNumRowsInSegment(); |
| int totalRowsAfter = originalTotalRows - rowsToDrop; |
| if (totalRowsAfter < 0) { |
| log.warn("Total rows[%d] after dropping segment[%s] rows [%d]", totalRowsAfter, identifier, rowsToDrop); |
| } |
| totalRows = Math.max(totalRowsAfter, 0); |
| } |
| if (sink != null) { |
| clearSinkMetadata(identifier, sink, true); |
| if (sinks.remove(identifier) == null) { |
| log.warn("Sink for identifier[%s] not found, skipping", identifier); |
| } |
| } |
| return Futures.immediateFuture(null); |
| } |
| |
| @Override |
| public ListenableFuture<Object> persistAll(@Nullable final Committer committer) |
| { |
| if (committer != null) { |
| throw new ISE("committer must be null for BatchAppenderator"); |
| } |
| persistAllAndRemoveSinks(); |
| return Futures.immediateFuture(null); |
| } |
| |
| /** |
| * Persist all sinks & their hydrants, keep their metadata, and then remove them completely from |
| * memory (to be resurrected right before merge & push) |
| */ |
| private void persistAllAndRemoveSinks() |
| { |
| |
| final List<Pair<FireHydrant, SegmentIdWithShardSpec>> indexesToPersist = new ArrayList<>(); |
| int numPersistedRows = 0; |
| long bytesPersisted = 0L; |
| int totalHydrantsCount = 0; |
| final long totalSinks = sinks.size(); |
| for (Map.Entry<SegmentIdWithShardSpec, Sink> entry : sinks.entrySet()) { |
| final SegmentIdWithShardSpec identifier = entry.getKey(); |
| final Sink sink = entry.getValue(); |
| if (sink == null) { |
| throw new ISE("No sink for identifier: %s", identifier); |
| } |
| |
| final List<FireHydrant> hydrants = Lists.newArrayList(sink); |
| // Since everytime we persist we also get rid of the in-memory references to sinks & hydrants |
| // the invariant of exactly one, always swappable, sink with exactly one unpersisted hydrant must hold |
| int totalHydrantsForSink = hydrants.size(); |
| if (totalHydrantsForSink != 1) { |
| throw new ISE("There should be only one hydrant for identifier[%s] but there are[%s]", |
| identifier, totalHydrantsForSink |
| ); |
| } |
| totalHydrantsCount += 1; |
| numPersistedRows += sink.getNumRowsInMemory(); |
| bytesPersisted += sink.getBytesInMemory(); |
| |
| if (!sink.swappable()) { |
| throw new ISE("Sink is not swappable![%s]", identifier); |
| } |
| indexesToPersist.add(Pair.of(sink.swap(), identifier)); |
| |
| } |
| |
| if (indexesToPersist.isEmpty()) { |
| log.info("No indexes will be persisted"); |
| } |
| final Stopwatch persistStopwatch = Stopwatch.createStarted(); |
| try { |
| for (Pair<FireHydrant, SegmentIdWithShardSpec> pair : indexesToPersist) { |
| metrics.incrementRowOutputCount(persistHydrant(pair.lhs, pair.rhs)); |
| } |
| |
| log.info( |
| "Persisted in-memory data for segments: %s", |
| indexesToPersist.stream() |
| .filter(itp -> itp.rhs != null) |
| .map(itp -> itp.rhs.asSegmentId().toString()) |
| .distinct() |
| .collect(Collectors.joining(", ")) |
| ); |
| log.info( |
| "Persisted stats: processed rows: [%d], persisted rows[%d], persisted sinks: [%d], persisted fireHydrants (across sinks): [%d]", |
| rowIngestionMeters.getProcessed(), |
| numPersistedRows, |
| totalSinks, |
| totalHydrantsCount |
| ); |
| |
| } |
| catch (Exception e) { |
| metrics.incrementFailedPersists(); |
| throw e; |
| } |
| finally { |
| metrics.incrementNumPersists(); |
| metrics.incrementPersistTimeMillis(persistStopwatch.elapsed(TimeUnit.MILLISECONDS)); |
| persistStopwatch.stop(); |
| } |
| |
| // NB: The rows are still in memory until they're done persisting, but we only count rows in active indexes. |
| rowsCurrentlyInMemory -= numPersistedRows; |
| bytesCurrentlyInMemory -= bytesPersisted; |
| |
| // remove all sinks after persisting: |
| clear(false); |
| |
| log.info("Persisted rows[%,d] and bytes[%,d] and removed all sinks & hydrants from memory", |
| numPersistedRows, bytesPersisted); |
| |
| } |
| |
| @Override |
| public ListenableFuture<SegmentsAndCommitMetadata> push( |
| final Collection<SegmentIdWithShardSpec> identifiers, |
| @Nullable final Committer committer, |
| final boolean useUniquePath |
| ) |
| { |
| |
| if (committer != null) { |
| throw new ISE("There should be no committer for batch ingestion"); |
| } |
| |
| if (useUniquePath) { |
| throw new ISE("Batch ingestion does not require uniquePath"); |
| } |
| |
| // Any sinks not persisted so far need to be persisted before push: |
| persistAllAndRemoveSinks(); |
| |
| log.info("Preparing to push..."); |
| |
| // Traverse identifiers, load their sink, and push it: |
| int totalHydrantsMerged = 0; |
| final List<DataSegment> dataSegments = new ArrayList<>(); |
| for (SegmentIdWithShardSpec identifier : identifiers) { |
| SinkMetadata sm = sinksMetadata.get(identifier); |
| if (sm == null) { |
| throw new ISE("No sink has been processed for identifier[%s]", identifier); |
| } |
| File persistedDir = sm.getPersistedFileDir(); |
| if (persistedDir == null) { |
| throw new ISE("Sink for identifier[%s] not found in local file system", identifier); |
| } |
| totalHydrantsMerged += sm.getNumHydrants(); |
| |
| // retrieve sink from disk: |
| Sink sinkForIdentifier; |
| try { |
| sinkForIdentifier = getSinkForIdentifierPath(identifier, persistedDir); |
| } |
| catch (IOException e) { |
| throw new ISE(e, "Failed to retrieve sinks for identifier[%s] in path[%s]", identifier, persistedDir); |
| } |
| |
| // push sink: |
| final DataSegment dataSegment = mergeAndPush( |
| identifier, |
| sinkForIdentifier |
| ); |
| |
| // record it: |
| if (dataSegment != null) { |
| dataSegments.add(dataSegment); |
| } else { |
| log.warn("mergeAndPush[%s] returned null, skipping.", identifier); |
| } |
| |
| } |
| |
| log.info("Push complete: total sinks merged[%d], total hydrants merged[%d]", |
| identifiers.size(), totalHydrantsMerged); |
| |
| return Futures.immediateFuture(new SegmentsAndCommitMetadata(dataSegments, null)); |
| } |
| |
| /** |
| * Merge segment, push to deep storage. Should only be used on segments that have been fully persisted. |
| * |
| * @param identifier sink identifier |
| * @param sink sink to push |
| * @return segment descriptor, or null if the sink is no longer valid |
| */ |
| @Nullable |
| private DataSegment mergeAndPush( |
| final SegmentIdWithShardSpec identifier, |
| final Sink sink |
| ) |
| { |
| |
| |
| // Use a descriptor file to indicate that pushing has completed. |
| final File persistDir = computePersistDir(identifier); |
| final File mergedTarget = new File(persistDir, "merged"); |
| final File descriptorFile = computeDescriptorFile(identifier); |
| |
| // Sanity checks |
| if (sink.isWritable()) { |
| throw new ISE("Expected sink to be no longer writable before mergeAndPush for segment[%s].", identifier); |
| } |
| |
| int numHydrants = 0; |
| for (FireHydrant hydrant : sink) { |
| if (!hydrant.hasSwapped()) { |
| throw new ISE("Expected sink to be fully persisted before mergeAndPush for segment[%s].", identifier); |
| } |
| numHydrants++; |
| } |
| |
| SinkMetadata sm = sinksMetadata.get(identifier); |
| if (sm == null) { |
| log.warn("Sink metadata not found just before merge for identifier [%s]", identifier); |
| } else if (numHydrants != sm.getNumHydrants()) { |
| throw new ISE("Number of restored hydrants[%d] for identifier[%s] does not match expected value[%d]", |
| numHydrants, identifier, sm.getNumHydrants()); |
| } |
| |
| try { |
| if (descriptorFile.exists()) { |
| // Already pushed. |
| log.info("Segment[%s] already pushed, skipping.", identifier); |
| return objectMapper.readValue(descriptorFile, DataSegment.class); |
| } |
| |
| removeDirectory(mergedTarget); |
| |
| if (mergedTarget.exists()) { |
| throw new ISE("Merged target[%s] exists after removing?!", mergedTarget); |
| } |
| |
| final File mergedFile; |
| final long mergeFinishTime; |
| final long startTime = System.nanoTime(); |
| List<QueryableIndex> indexes = new ArrayList<>(); |
| Closer closer = Closer.create(); |
| try { |
| for (FireHydrant fireHydrant : sink) { |
| Pair<ReferenceCountingSegment, Closeable> segmentAndCloseable = fireHydrant.getAndIncrementSegment(); |
| final QueryableIndex queryableIndex = segmentAndCloseable.lhs.asQueryableIndex(); |
| log.debug("Segment[%s] adding hydrant[%s]", identifier, fireHydrant); |
| indexes.add(queryableIndex); |
| closer.register(segmentAndCloseable.rhs); |
| } |
| |
| mergedFile = indexMerger.mergeQueryableIndex( |
| indexes, |
| schema.getGranularitySpec().isRollup(), |
| schema.getAggregators(), |
| schema.getDimensionsSpec(), |
| mergedTarget, |
| tuningConfig.getIndexSpec(), |
| tuningConfig.getSegmentWriteOutMediumFactory(), |
| tuningConfig.getMaxColumnsToMerge() |
| ); |
| |
| mergeFinishTime = System.nanoTime(); |
| |
| log.debug("Segment[%s] built in %,dms.", identifier, (mergeFinishTime - startTime) / 1000000); |
| } |
| catch (Throwable t) { |
| throw closer.rethrow(t); |
| } |
| finally { |
| closer.close(); |
| } |
| |
| // Retry pushing segments because uploading to deep storage might fail especially for cloud storage types |
| final DataSegment segment = RetryUtils.retry( |
| // This appenderator is used only for the local indexing task so unique paths are not required |
| () -> dataSegmentPusher.push( |
| mergedFile, |
| sink.getSegment() |
| .withDimensions(IndexMerger.getMergedDimensionsFromQueryableIndexes( |
| indexes, |
| schema.getDimensionsSpec() |
| )), |
| false |
| ), |
| exception -> exception instanceof Exception, |
| 5 |
| ); |
| |
| // Drop the queryable indexes behind the hydrants... they are not needed anymore and their |
| // mapped file references |
| // can generate OOMs during merge if enough of them are held back... |
| for (FireHydrant fireHydrant : sink) { |
| fireHydrant.swapSegment(null); |
| } |
| |
| // cleanup, sink no longer needed |
| removeDirectory(computePersistDir(identifier)); |
| |
| final long pushFinishTime = System.nanoTime(); |
| |
| log.info( |
| "Segment[%s] of %,d bytes " |
| + "built from %d incremental persist(s) in %,dms; " |
| + "pushed to deep storage in %,dms. " |
| + "Load spec is: %s", |
| identifier, |
| segment.getSize(), |
| indexes.size(), |
| (mergeFinishTime - startTime) / 1000000, |
| (pushFinishTime - mergeFinishTime) / 1000000, |
| objectMapper.writeValueAsString(segment.getLoadSpec()) |
| ); |
| |
| return segment; |
| } |
| catch (Exception e) { |
| metrics.incrementFailedHandoffs(); |
| log.warn(e, "Failed to push merged index for segment[%s].", identifier); |
| throw new RuntimeException(e); |
| } |
| } |
| |
| @Override |
| public void close() |
| { |
| if (!closed.compareAndSet(false, true)) { |
| log.debug("Appenderator already closed, skipping close() call."); |
| return; |
| } |
| |
| log.debug("Shutting down..."); |
| |
| clear(false); |
| |
| unlockBasePersistDirectory(); |
| |
| // cleanup: |
| List<File> persistedIdentifiers = getPersistedidentifierPaths(); |
| if (persistedIdentifiers != null) { |
| for (File identifier : persistedIdentifiers) { |
| removeDirectory(identifier); |
| } |
| } |
| |
| totalRows = 0; |
| sinksMetadata.clear(); |
| } |
| |
| /** |
| Nothing to do since there are no executors |
| */ |
| @Override |
| public void closeNow() |
| { |
| if (!closed.compareAndSet(false, true)) { |
| log.debug("Appenderator already closed, skipping closeNow() call."); |
| return; |
| } |
| |
| log.debug("Shutting down immediately..."); |
| } |
| |
| private void lockBasePersistDirectory() |
| { |
| if (basePersistDirLock == null) { |
| try { |
| basePersistDirLockChannel = FileChannel.open( |
| computeLockFile().toPath(), |
| StandardOpenOption.CREATE, |
| StandardOpenOption.WRITE |
| ); |
| |
| basePersistDirLock = basePersistDirLockChannel.tryLock(); |
| if (basePersistDirLock == null) { |
| throw new ISE("Cannot acquire lock on basePersistDir: %s", computeLockFile()); |
| } |
| } |
| catch (IOException e) { |
| throw new RuntimeException(e); |
| } |
| } |
| } |
| |
| private void unlockBasePersistDirectory() |
| { |
| try { |
| if (basePersistDirLock != null) { |
| basePersistDirLock.release(); |
| basePersistDirLockChannel.close(); |
| basePersistDirLock = null; |
| } |
| } |
| catch (IOException e) { |
| throw new RuntimeException(e); |
| } |
| } |
| |
| @VisibleForTesting |
| @Nullable |
| public List<File> getPersistedidentifierPaths() |
| { |
| |
| ArrayList<File> retVal = new ArrayList<>(); |
| |
| final File baseDir = tuningConfig.getBasePersistDirectory(); |
| if (!baseDir.exists()) { |
| return null; |
| } |
| |
| final File[] files = baseDir.listFiles(); |
| if (files == null) { |
| return null; |
| } |
| |
| for (File sinkDir : files) { |
| final File identifierFile = new File(sinkDir, IDENTIFIER_FILE_NAME); |
| if (!identifierFile.isFile()) { |
| // No identifier in this sinkDir; it must not actually be a sink directory. Skip it. |
| continue; |
| } |
| retVal.add(sinkDir); |
| } |
| |
| return retVal; |
| } |
| |
| private Sink getSinkForIdentifierPath(SegmentIdWithShardSpec identifier, File identifierPath) |
| throws IOException |
| { |
| // To avoid reading and listing of "merged" dir and other special files |
| final File[] sinkFiles = identifierPath.listFiles( |
| (dir, fileName) -> !(Ints.tryParse(fileName) == null) |
| ); |
| if (sinkFiles == null) { |
| throw new ISE("Problem reading persisted sinks in path[%s]", identifierPath); |
| } |
| |
| Arrays.sort( |
| sinkFiles, |
| (o1, o2) -> Ints.compare(Integer.parseInt(o1.getName()), Integer.parseInt(o2.getName())) |
| ); |
| |
| List<FireHydrant> hydrants = new ArrayList<>(); |
| for (File hydrantDir : sinkFiles) { |
| final int hydrantNumber = Integer.parseInt(hydrantDir.getName()); |
| |
| log.debug("Loading previously persisted partial segment at [%s]", hydrantDir); |
| if (hydrantNumber != hydrants.size()) { |
| throw new ISE("Missing hydrant [%,d] in identifier [%s].", hydrants.size(), identifier); |
| } |
| |
| hydrants.add( |
| new FireHydrant( |
| new QueryableIndexSegment(indexIO.loadIndex(hydrantDir), identifier.asSegmentId()), |
| hydrantNumber |
| ) |
| ); |
| } |
| |
| Sink retVal = new Sink( |
| identifier.getInterval(), |
| schema, |
| identifier.getShardSpec(), |
| identifier.getVersion(), |
| tuningConfig.getAppendableIndexSpec(), |
| tuningConfig.getMaxRowsInMemory(), |
| maxBytesTuningConfig, |
| null, |
| hydrants |
| ); |
| retVal.finishWriting(); // this sink is not writable |
| return retVal; |
| } |
| |
| // This function does not remove the sink from its tracking Map (sinks), the caller is responsible for that |
| // this is because the Map is not synchronized and removing elements from a map while traversing it |
| // throws a concurrent access exception |
| private void clearSinkMetadata( |
| final SegmentIdWithShardSpec identifier, |
| final Sink sink, |
| final boolean removeOnDiskData |
| ) |
| { |
| // Ensure no future writes will be made to this sink. |
| if (sink.finishWriting()) { |
| // Decrement this sink's rows from the counters. we only count active sinks so that we don't double decrement, |
| // i.e. those that haven't been persisted for *InMemory counters, or pushed to deep storage for the total counter. |
| rowsCurrentlyInMemory -= sink.getNumRowsInMemory(); |
| bytesCurrentlyInMemory -= sink.getBytesInMemory(); |
| bytesCurrentlyInMemory -= calculateSinkMemoryInUsed(); |
| for (FireHydrant hydrant : sink) { |
| // Decrement memory used by all Memory Mapped Hydrant |
| if (!hydrant.equals(sink.getCurrHydrant())) { |
| bytesCurrentlyInMemory -= calculateMemoryUsedByHydrant(); |
| } |
| } |
| // totalRows are not decremented when removing the sink from memory, sink was just persisted and it |
| // still "lives" but it is in hibernation. It will be revived later just before push. |
| } |
| |
| if (removeOnDiskData) { |
| removeDirectory(computePersistDir(identifier)); |
| } |
| |
| log.info("Removed sink for segment[%s].", identifier); |
| |
| } |
| |
| private File computeLockFile() |
| { |
| return new File(tuningConfig.getBasePersistDirectory(), ".lock"); |
| } |
| |
| private File computePersistDir(SegmentIdWithShardSpec identifier) |
| { |
| return new File(tuningConfig.getBasePersistDirectory(), identifier.toString()); |
| } |
| |
| private File computeIdentifierFile(SegmentIdWithShardSpec identifier) |
| { |
| return new File(computePersistDir(identifier), IDENTIFIER_FILE_NAME); |
| } |
| |
| private File computeDescriptorFile(SegmentIdWithShardSpec identifier) |
| { |
| return new File(computePersistDir(identifier), "descriptor.json"); |
| } |
| |
| private File createPersistDirIfNeeded(SegmentIdWithShardSpec identifier) throws IOException |
| { |
| final File persistDir = computePersistDir(identifier); |
| org.apache.commons.io.FileUtils.forceMkdir(persistDir); |
| |
| objectMapper.writeValue(computeIdentifierFile(identifier), identifier); |
| |
| return persistDir; |
| } |
| |
| /** |
| * Persists the given hydrant and returns the number of rows persisted. |
| * |
| * @param indexToPersist hydrant to persist |
| * @param identifier the segment this hydrant is going to be part of |
| * @return the number of rows persisted |
| */ |
| private int persistHydrant(FireHydrant indexToPersist, SegmentIdWithShardSpec identifier) |
| { |
| if (indexToPersist.hasSwapped()) { |
| throw new ISE( |
| "Segment[%s] hydrant[%s] already swapped. This cannot happen.", |
| identifier, |
| indexToPersist |
| ); |
| } |
| |
| log.debug("Segment[%s], persisting Hydrant[%s]", identifier, indexToPersist); |
| |
| try { |
| final long startTime = System.nanoTime(); |
| int numRows = indexToPersist.getIndex().size(); |
| |
| // since the sink may have been persisted before it may have lost its |
| // hydrant count, we remember that value in the sinks metadata so we have |
| // to pull it from there.... |
| SinkMetadata sm = sinksMetadata.get(identifier); |
| if (sm == null) { |
| throw new ISE("Sink must not be null for identifier when persisting hydrant[%s]", identifier); |
| } |
| final File persistDir = createPersistDirIfNeeded(identifier); |
| indexMerger.persist( |
| indexToPersist.getIndex(), |
| identifier.getInterval(), |
| new File(persistDir, String.valueOf(sm.getNumHydrants())), |
| tuningConfig.getIndexSpecForIntermediatePersists(), |
| tuningConfig.getSegmentWriteOutMediumFactory() |
| ); |
| sm.setPersistedFileDir(persistDir); |
| |
| log.info( |
| "Persisted in-memory data for segment[%s] spill[%s] to disk in [%,d] ms (%,d rows).", |
| indexToPersist.getSegmentId(), |
| indexToPersist.getCount(), |
| (System.nanoTime() - startTime) / 1000000, |
| numRows |
| ); |
| |
| indexToPersist.swapSegment(null); |
| // remember hydrant count: |
| sm.addHydrants(1); |
| |
| return numRows; |
| } |
| catch (IOException e) { |
| log.makeAlert("Incremental persist failed") |
| .addData("segment", identifier.toString()) |
| .addData("dataSource", schema.getDataSource()) |
| .addData("count", indexToPersist.getCount()) |
| .emit(); |
| |
| throw new RuntimeException(e); |
| } |
| } |
| |
| private void removeDirectory(final File target) |
| { |
| if (target.exists()) { |
| try { |
| FileUtils.deleteDirectory(target); |
| log.info("Removed directory [%s]", target); |
| } |
| catch (Exception e) { |
| log.makeAlert(e, "Failed to remove directory[%s]", schema.getDataSource()) |
| .addData("file", target) |
| .emit(); |
| } |
| } |
| } |
| |
| private int calculateMemoryUsedByHydrant() |
| { |
| if (skipBytesInMemoryOverheadCheck) { |
| return 0; |
| } |
| // These calculations are approximated from actual heap dumps. |
| int total; |
| total = Integer.BYTES + (4 * Short.BYTES) + ROUGH_OVERHEAD_PER_HYDRANT; |
| return total; |
| } |
| |
| private int calculateSinkMemoryInUsed() |
| { |
| if (skipBytesInMemoryOverheadCheck) { |
| return 0; |
| } |
| // Rough estimate of memory footprint of empty Sink based on actual heap dumps |
| return ROUGH_OVERHEAD_PER_SINK; |
| } |
| |
| /** |
| * This class is used for information that needs to be kept related to Sinks as |
| * they are persisted and removed from memory at every incremental persist. |
| * The information is used for sanity checks and as information required |
| * for functionality, depending in the field that is used. More info about the |
| * fields is annotated as comments in the class |
| */ |
| private static class SinkMetadata |
| { |
| /** This is used to maintain the rows in the sink accross persists of the sink |
| * used for functionality (i.e. to detect whether an incremental push |
| * is needed {@link AppenderatorDriverAddResult#isPushRequired(Integer, Long)} |
| **/ |
| private int numRowsInSegment; |
| /** For sanity check as well as functionality: to make sure that all hydrants for a sink are restored from disk at |
| * push time and also to remember the fire hydrant "count" when persisting it. |
| */ |
| private int numHydrants; |
| /* Reference to directory that holds the persisted data */ |
| File persistedFileDir; |
| |
| public SinkMetadata() |
| { |
| this(0, 0); |
| } |
| |
| public SinkMetadata(int numRowsInSegment, int numHydrants) |
| { |
| this.numRowsInSegment = numRowsInSegment; |
| this.numHydrants = numHydrants; |
| } |
| |
| public void addRows(int num) |
| { |
| numRowsInSegment += num; |
| } |
| |
| public void addHydrants(int num) |
| { |
| numHydrants += num; |
| } |
| |
| public int getNumRowsInSegment() |
| { |
| return numRowsInSegment; |
| } |
| |
| public int getNumHydrants() |
| { |
| return numHydrants; |
| } |
| |
| public void setPersistedFileDir(File persistedFileDir) |
| { |
| this.persistedFileDir = persistedFileDir; |
| } |
| |
| public File getPersistedFileDir() |
| { |
| return persistedFileDir; |
| } |
| |
| } |
| |
| } |