| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.tez.runtime.library.output; |
| |
| import java.util.Collections; |
| import java.util.HashSet; |
| import java.util.LinkedList; |
| import java.util.List; |
| import java.util.Set; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| |
| import org.apache.tez.common.Preconditions; |
| import com.google.common.annotations.VisibleForTesting; |
| |
| import org.apache.tez.common.TezUtils; |
| import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| import org.apache.hadoop.classification.InterfaceAudience; |
| import org.apache.hadoop.classification.InterfaceAudience.Public; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.tez.common.TezCommonUtils; |
| import org.apache.tez.common.TezRuntimeFrameworkConfigs; |
| import org.apache.tez.common.counters.TaskCounter; |
| import org.apache.tez.dag.api.TezConfiguration; |
| import org.apache.tez.runtime.api.AbstractLogicalOutput; |
| import org.apache.tez.runtime.api.Event; |
| import org.apache.tez.runtime.api.LogicalOutput; |
| import org.apache.tez.runtime.api.OutputContext; |
| import org.apache.tez.runtime.api.Writer; |
| import org.apache.tez.runtime.library.api.TezRuntimeConfiguration; |
| import org.apache.tez.runtime.library.common.MemoryUpdateCallbackHandler; |
| import org.apache.tez.runtime.library.common.writers.UnorderedPartitionedKVWriter; |
| |
| /** |
| * {@link UnorderedPartitionedKVOutput} is a {@link LogicalOutput} which can be used to |
| * write Key-Value pairs. The key-value pairs are written to the correct partition based on the |
| * configured Partitioner. |
| */ |
| @Public |
| public class UnorderedPartitionedKVOutput extends AbstractLogicalOutput { |
| |
| private static final Logger LOG = LoggerFactory.getLogger(UnorderedPartitionedKVOutput.class); |
| |
| @VisibleForTesting |
| Configuration conf; |
| private MemoryUpdateCallbackHandler memoryUpdateCallbackHandler; |
| private UnorderedPartitionedKVWriter kvWriter; |
| private final AtomicBoolean isStarted = new AtomicBoolean(false); |
| |
| public UnorderedPartitionedKVOutput(OutputContext outputContext, int numPhysicalOutputs) { |
| super(outputContext, numPhysicalOutputs); |
| } |
| |
| @Override |
| public synchronized List<Event> initialize() throws Exception { |
| this.conf = TezUtils.createConfFromBaseConfAndPayload(getContext()); |
| this.conf.setStrings(TezRuntimeFrameworkConfigs.LOCAL_DIRS, getContext().getWorkDirs()); |
| this.conf.setInt(TezRuntimeFrameworkConfigs.TEZ_RUNTIME_NUM_EXPECTED_PARTITIONS, |
| getNumPhysicalOutputs()); |
| this.memoryUpdateCallbackHandler = new MemoryUpdateCallbackHandler(); |
| getContext().requestInitialMemory( |
| UnorderedPartitionedKVWriter.getInitialMemoryRequirement(conf, |
| getContext().getTotalMemoryAvailableToTask()), memoryUpdateCallbackHandler); |
| return Collections.emptyList(); |
| } |
| |
| @Override |
| public synchronized void start() throws Exception { |
| if (!isStarted.get()) { |
| memoryUpdateCallbackHandler.validateUpdateReceived(); |
| this.kvWriter = new UnorderedPartitionedKVWriter(getContext(), conf, getNumPhysicalOutputs(), |
| memoryUpdateCallbackHandler.getMemoryAssigned()); |
| isStarted.set(true); |
| } |
| } |
| |
| @Override |
| public synchronized Writer getWriter() throws Exception { |
| Preconditions.checkState(isStarted.get(), "Cannot get writer before starting the Output"); |
| return kvWriter; |
| } |
| |
| @Override |
| public void handleEvents(List<Event> outputEvents) { |
| } |
| |
| @Override |
| public synchronized List<Event> close() throws Exception { |
| List<Event> returnEvents = null; |
| if (isStarted.get()) { |
| returnEvents = kvWriter.close(); |
| kvWriter = null; |
| } else { |
| LOG.warn(getContext().getInputOutputVertexNames() + |
| ": Attempting to close output {} of type {} before it was started. Generating empty events", |
| getContext().getDestinationVertexName(), this.getClass().getSimpleName()); |
| returnEvents = new LinkedList<Event>(); |
| ShuffleUtils |
| .generateEventsForNonStartedOutput(returnEvents, getNumPhysicalOutputs(), getContext(), |
| false, true, TezCommonUtils.newBestCompressionDeflater()); |
| } |
| |
| // This works for non-started outputs since new counters will be created with an initial value of 0 |
| long outputSize = getContext().getCounters().findCounter(TaskCounter.OUTPUT_BYTES).getValue(); |
| getContext().getStatisticsReporter().reportDataSize(outputSize); |
| long outputRecords = getContext().getCounters() |
| .findCounter(TaskCounter.OUTPUT_RECORDS).getValue(); |
| getContext().getStatisticsReporter().reportItemsProcessed(outputRecords); |
| |
| return returnEvents; |
| } |
| |
| private static final Set<String> confKeys = new HashSet<String>(); |
| |
| static { |
| confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_IFILE_READAHEAD); |
| confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_IFILE_READAHEAD_BYTES); |
| confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_IO_FILE_BUFFER_SIZE); |
| confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_INDEX_CACHE_MEMORY_LIMIT_BYTES); |
| confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_UNORDERED_OUTPUT_BUFFER_SIZE_MB); |
| confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_UNORDERED_OUTPUT_MAX_PER_BUFFER_SIZE_BYTES); |
| confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_PARTITIONER_CLASS); |
| confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS); |
| confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS); |
| confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_COMPRESS); |
| confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_COMPRESS_CODEC); |
| confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_EMPTY_PARTITION_INFO_VIA_EVENTS_ENABLED); |
| confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_TRANSFER_DATA_VIA_EVENTS_ENABLED); |
| confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_TRANSFER_DATA_VIA_EVENTS_MAX_SIZE); |
| confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_TRANSFER_DATA_VIA_EVENTS_SUPPORT_IN_MEM_FILE); |
| confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_CONVERT_USER_PAYLOAD_TO_HISTORY_TEXT); |
| confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_PIPELINED_SHUFFLE_ENABLED); |
| confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT); |
| confKeys.add(TezConfiguration.TEZ_COUNTERS_MAX); |
| confKeys.add(TezConfiguration.TEZ_COUNTERS_GROUP_NAME_MAX_LENGTH); |
| confKeys.add(TezConfiguration.TEZ_COUNTERS_COUNTER_NAME_MAX_LENGTH); |
| confKeys.add(TezConfiguration.TEZ_COUNTERS_MAX_GROUPS); |
| confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_CLEANUP_FILES_ON_INTERRUPT); |
| confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_REPORT_PARTITION_STATS); |
| confKeys.add(TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID); |
| confKeys.add( |
| TezRuntimeConfiguration.TEZ_RUNTIME_UNORDERED_PARTITIONED_KVWRITER_BUFFER_MERGE_PERCENT); |
| } |
| |
| // TODO Maybe add helper methods to extract keys |
| // TODO Maybe add constants or an Enum to access the keys |
| |
| @InterfaceAudience.Private |
| public static Set<String> getConfigurationKeySet() { |
| return Collections.unmodifiableSet(confKeys); |
| } |
| } |