blob: d2f2e3c7950327b9408ca7c6aa1ba8ee61687c2e [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.dataflow.worker;
import java.io.Closeable;
import java.io.IOException;
import java.util.Arrays;
import org.apache.beam.runners.core.metrics.ExecutionStateTracker;
import org.apache.beam.runners.core.metrics.ExecutionStateTracker.ExecutionState;
import org.apache.beam.runners.dataflow.util.RandomAccessData;
import org.apache.beam.runners.dataflow.worker.counters.Counter;
import org.apache.beam.runners.dataflow.worker.counters.CounterFactory;
import org.apache.beam.runners.dataflow.worker.counters.CounterName;
import org.apache.beam.runners.dataflow.worker.util.common.worker.Sink;
import org.apache.beam.sdk.coders.BigEndianLongCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.Coder.Context;
import org.apache.beam.sdk.coders.InstantCoder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.WindowedValue.WindowedValueCoder;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v20_0.com.google.common.primitives.Ints;
/**
* A sink that writes to a shuffle dataset.
*
* @param <T> the type of the elements written to the sink
*/
public class ShuffleSink<T> extends Sink<WindowedValue<T>> {
enum ShuffleKind {
UNGROUPED,
PARTITION_KEYS,
GROUP_KEYS,
GROUP_KEYS_AND_SORT_VALUES
}
static final long SHUFFLE_WRITER_BUFFER_SIZE = 128 << 20;
private static final byte[] NULL_VALUE_WITH_SIZE_PREFIX = new byte[] {0, 0, 0, 0};
final byte[] shuffleWriterConfig;
final ShuffleKind shuffleKind;
final PipelineOptions options;
private final BatchModeExecutionContext executionContext;
private final DataflowOperationContext operationContext;
private ExecutionStateTracker tracker;
private ExecutionState writeState;
boolean shardByKey;
boolean groupValues;
boolean sortValues;
WindowedValueCoder<T> windowedElemCoder;
WindowedValueCoder windowedValueCoder;
Coder<T> elemCoder;
Coder keyCoder;
Coder valueCoder;
Coder sortKeyCoder;
Coder sortValueCoder;
public static ShuffleKind parseShuffleKind(String shuffleKind) throws Exception {
try {
return Enum.valueOf(ShuffleKind.class, shuffleKind.trim().toUpperCase());
} catch (IllegalArgumentException e) {
throw new Exception("unexpected shuffle_kind", e);
}
}
public ShuffleSink(
PipelineOptions options,
byte[] shuffleWriterConfig,
ShuffleKind shuffleKind,
Coder<WindowedValue<T>> coder,
BatchModeExecutionContext executionContext,
DataflowOperationContext operationContext)
throws Exception {
this.shuffleWriterConfig = shuffleWriterConfig;
this.shuffleKind = shuffleKind;
this.options = options;
this.executionContext = executionContext;
this.operationContext = operationContext;
this.writeState = operationContext.newExecutionState("write-shuffle");
this.tracker = executionContext.getExecutionStateTracker();
initCoder(coder);
}
private void initCoder(Coder<WindowedValue<T>> coder) throws Exception {
switch (shuffleKind) {
case UNGROUPED:
this.shardByKey = false;
this.groupValues = false;
this.sortValues = false;
break;
case PARTITION_KEYS:
this.shardByKey = true;
this.groupValues = false;
this.sortValues = false;
break;
case GROUP_KEYS:
this.shardByKey = true;
this.groupValues = true;
this.sortValues = false;
break;
case GROUP_KEYS_AND_SORT_VALUES:
this.shardByKey = true;
this.groupValues = true;
this.sortValues = true;
break;
default:
throw new AssertionError("unexpected shuffle kind");
}
this.windowedElemCoder = (WindowedValueCoder<T>) coder;
this.elemCoder = windowedElemCoder.getValueCoder();
if (shardByKey) {
if (!(elemCoder instanceof KvCoder)) {
throw new Exception(
String.format(
"Unexpected kind of coder for elements written to a key-grouping shuffle %s.",
elemCoder));
}
KvCoder<?, ?> kvCoder = (KvCoder<?, ?>) elemCoder;
this.keyCoder = kvCoder.getKeyCoder();
this.valueCoder = kvCoder.getValueCoder();
if (sortValues) {
// TODO: Decide the representation of sort-keyed values.
// For now, we'll just use KVs.
if (!(valueCoder instanceof KvCoder)) {
throw new Exception(
String.format(
"Unexpected kind of coder for values written to a value-sorting shuffle %s.",
valueCoder));
}
KvCoder<?, ?> kvValueCoder = (KvCoder<?, ?>) valueCoder;
this.sortKeyCoder = kvValueCoder.getKeyCoder();
this.sortValueCoder = kvValueCoder.getValueCoder();
} else {
this.sortKeyCoder = null;
this.sortValueCoder = null;
}
if (groupValues) {
this.windowedValueCoder = null;
} else {
this.windowedValueCoder = this.windowedElemCoder.withValueCoder(this.valueCoder);
}
} else {
this.keyCoder = null;
this.valueCoder = null;
this.sortKeyCoder = null;
this.sortValueCoder = null;
this.windowedValueCoder = null;
}
}
/**
* Returns a SinkWriter that allows writing to this ShuffleSink, using the given
* ShuffleEntryWriter. The dataset ID is used to construct names of counterFactory that track
* per-worker per-dataset bytes written to shuffle.
*/
public SinkWriter<WindowedValue<T>> writer(ShuffleWriter writer, String datasetId) {
return new ShuffleSinkWriter(writer, options, operationContext.counterFactory(), datasetId);
}
/** The SinkWriter for a ShuffleSink. */
class ShuffleSinkWriter implements SinkWriter<WindowedValue<T>> {
// This is the minimum size before we output a chunk except for
// the final chunk when close() is called.
private static final int MIN_OUTPUT_CHUNK_SIZE = 1 << 20;
private ShuffleWriter writer;
private long seqNum = 0;
// How many bytes were written to a given shuffle session.
private final Counter<Long, ?> perDatasetBytesCounter;
private final RandomAccessData chunk;
private boolean closed = false;
ShuffleSinkWriter(
ShuffleWriter writer,
PipelineOptions options,
CounterFactory counterFactory,
String datasetId) {
this.writer = writer;
this.perDatasetBytesCounter =
counterFactory.longSum(CounterName.named("dax-shuffle-" + datasetId + "-written-bytes"));
// Initialize the chunk with the minimum size so we do not have to
// "grow" the internal byte[] up to the minimum chunk size.
this.chunk = new RandomAccessData(MIN_OUTPUT_CHUNK_SIZE);
}
@Override
public long add(WindowedValue<T> windowedElem) throws IOException {
T elem = windowedElem.getValue();
long bytes = 0;
if (shardByKey) {
if (!(elem instanceof KV)) {
throw new AssertionError(
"expecting the values written to a key-grouping shuffle " + "to be KVs");
}
KV<?, ?> kv = (KV) elem;
Object key = kv.getKey();
Object value = kv.getValue();
bytes += encodeToChunk(keyCoder, key);
if (sortValues) {
if (!(value instanceof KV)) {
throw new AssertionError(
"expecting the value parts of the KVs written to "
+ "a value-sorting shuffle to also be KVs");
}
KV<?, ?> kvValue = (KV) value;
Object sortKey = kvValue.getKey();
Object sortValue = kvValue.getValue();
// Sort values by key and then timestamp so that any GroupAlsoByWindows
// can run more efficiently. We produce an order preserving encoding of this composite
// key by concatenating the nested encoding of sortKey and outer encoding of the
// timestamp. An alternative implementation would be to use OrderedCode but it
// is unnecessary here because the nested encoding of sortKey achieves the same effect,
// due to the nested encoding being a https://en.wikipedia.org/wiki/Prefix_code.
// Move forward enough bytes so we can prefix the size on after performing the write
int initialChunkSize = chunk.size();
chunk.resetTo(initialChunkSize + Ints.BYTES);
sortKeyCoder.encode(sortKey, chunk.asOutputStream());
if (!windowedElem.getTimestamp().equals(BoundedWindow.TIMESTAMP_MIN_VALUE)) {
// Empty timestamp suffixes sort before all other sort value keys with
// the same prefix. So We can omit this suffix for this common value here
// for efficiency and only encode when its not the minimum timestamp.
InstantCoder.of()
.encode(windowedElem.getTimestamp(), chunk.asOutputStream(), Context.OUTER);
}
int elementSize = chunk.size() - initialChunkSize - Ints.BYTES;
byte[] internalBytes = chunk.array();
internalBytes[initialChunkSize] = (byte) ((elementSize >>> 24) & 0xFF);
internalBytes[initialChunkSize + 1] = (byte) ((elementSize >>> 16) & 0xFF);
internalBytes[initialChunkSize + 2] = (byte) ((elementSize >>> 8) & 0xFF);
internalBytes[initialChunkSize + 3] = (byte) ((elementSize >>> 0) & 0xFF);
bytes += elementSize;
bytes += encodeToChunk(sortValueCoder, sortValue);
} else if (groupValues) {
// Sort values by timestamp so that GroupAlsoByWindows can run efficiently.
if (windowedElem.getTimestamp().equals(BoundedWindow.TIMESTAMP_MIN_VALUE)) {
// Empty secondary keys sort before all other secondary keys, so we
// can omit this common value here for efficiency.
chunk.asOutputStream().write(NULL_VALUE_WITH_SIZE_PREFIX);
} else {
bytes += encodeToChunk(InstantCoder.of(), windowedElem.getTimestamp());
}
bytes += encodeToChunk(valueCoder, value);
} else {
chunk.asOutputStream().write(NULL_VALUE_WITH_SIZE_PREFIX);
bytes += encodeToChunk(windowedValueCoder, windowedElem.withValue(value));
}
} else {
// Not partitioning or grouping by key, just resharding values.
// <key> is ignored, except by the shuffle splitter. Use a seq#
// as the key, so we can split records anywhere. This also works
// for writing a single-sharded ordered PCollection through a
// shuffle, since the order of elements in the input will be
// preserved in the output.
bytes += encodeToChunk(BigEndianLongCoder.of(), seqNum++);
chunk.asOutputStream().write(NULL_VALUE_WITH_SIZE_PREFIX);
bytes += encodeToChunk(windowedElemCoder, windowedElem);
}
if (chunk.size() > MIN_OUTPUT_CHUNK_SIZE) {
try (Closeable trackedWriteState = tracker.enterState(writeState)) {
outputChunk();
}
}
perDatasetBytesCounter.addValue(bytes);
return bytes;
}
@Override
public void close() throws IOException {
try (Closeable trackedCloseState = tracker.enterState(writeState)) {
outputChunk();
writer.close();
} finally {
closed = true;
}
}
@Override
public void abort() throws IOException {
if (!closed) {
// ShuffleWriter extends AutoCloseable, so it may not be idempotent. Only close if it has
// not already been closed.
close();
}
}
private void outputChunk() throws IOException {
writer.write(Arrays.copyOf(chunk.array(), chunk.size()));
chunk.resetTo(0);
}
private <EncodeT> int encodeToChunk(Coder<EncodeT> coder, EncodeT value) throws IOException {
// Move forward enough bytes so we can prefix the size on after performing the write
int initialChunkSize = chunk.size();
chunk.resetTo(initialChunkSize + Ints.BYTES);
coder.encode(value, chunk.asOutputStream(), Context.OUTER);
int elementSize = chunk.size() - initialChunkSize - Ints.BYTES;
byte[] internalBytes = chunk.array();
internalBytes[initialChunkSize] = (byte) ((elementSize >>> 24) & 0xFF);
internalBytes[initialChunkSize + 1] = (byte) ((elementSize >>> 16) & 0xFF);
internalBytes[initialChunkSize + 2] = (byte) ((elementSize >>> 8) & 0xFF);
internalBytes[initialChunkSize + 3] = (byte) ((elementSize >>> 0) & 0xFF);
return elementSize;
}
}
@Override
public SinkWriter<WindowedValue<T>> writer() throws IOException {
Preconditions.checkArgument(shuffleWriterConfig != null);
ApplianceShuffleWriter applianceWriter =
new ApplianceShuffleWriter(
shuffleWriterConfig, SHUFFLE_WRITER_BUFFER_SIZE, operationContext);
String datasetId = applianceWriter.getDatasetId();
return writer(applianceWriter, datasetId);
}
}