blob: b6ac1134d99d9a292eb063b87b83f232a5a32a07 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hop.beam.core.transform;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
import org.apache.commons.lang.StringUtils;
import org.apache.hop.beam.core.BeamHop;
import org.apache.hop.beam.core.HopRow;
import org.apache.hop.beam.core.shared.VariableValue;
import org.apache.hop.beam.core.util.HopBeamUtil;
import org.apache.hop.beam.engines.HopPipelineExecutionOptions;
import org.apache.hop.core.exception.HopException;
import org.apache.hop.core.exception.HopTransformException;
import org.apache.hop.core.logging.LogLevel;
import org.apache.hop.core.logging.LoggingObject;
import org.apache.hop.core.metadata.SerializableMetadataProvider;
import org.apache.hop.core.plugins.PluginRegistry;
import org.apache.hop.core.plugins.TransformPluginType;
import org.apache.hop.core.row.IRowMeta;
import org.apache.hop.core.row.IValueMeta;
import org.apache.hop.core.row.JsonRowMeta;
import org.apache.hop.core.variables.IVariables;
import org.apache.hop.core.variables.Variables;
import org.apache.hop.execution.ExecutionDataBuilder;
import org.apache.hop.execution.ExecutionInfoLocation;
import org.apache.hop.execution.ExecutionType;
import org.apache.hop.execution.sampler.IExecutionDataSampler;
import org.apache.hop.execution.sampler.IExecutionDataSamplerStore;
import org.apache.hop.metadata.api.IHopMetadataProvider;
import org.apache.hop.pipeline.Pipeline;
import org.apache.hop.pipeline.PipelineHopMeta;
import org.apache.hop.pipeline.PipelineMeta;
import org.apache.hop.pipeline.RowProducer;
import org.apache.hop.pipeline.SingleThreadedPipelineExecutor;
import org.apache.hop.pipeline.engines.local.LocalPipelineEngine;
import org.apache.hop.pipeline.transform.IRowListener;
import org.apache.hop.pipeline.transform.ITransformMeta;
import org.apache.hop.pipeline.transform.RowAdapter;
import org.apache.hop.pipeline.transform.TransformMeta;
import org.apache.hop.pipeline.transform.TransformMetaDataCombi;
import org.apache.hop.pipeline.transforms.dummy.DummyMeta;
import org.apache.hop.pipeline.transforms.injector.InjectorField;
import org.apache.hop.pipeline.transforms.injector.InjectorMeta;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Queue;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicLong;
public class TransformBatchTransform extends TransformTransform {
public TransformBatchTransform() {
super();
}
public TransformBatchTransform(
List<VariableValue> variableValues,
String metastoreJson,
int batchSize,
int flushIntervalMs,
String transformName,
String transformPluginId,
String transformMetaInterfaceXml,
String inputRowMetaJson,
boolean inputTransform,
List<String> targetTransforms,
List<String> infoTransforms,
List<String> infoRowMetaJsons,
List<PCollectionView<List<HopRow>>> infoCollectionViews,
String runConfigName,
String dataSamplersJson,
String parentLogChannelId) {
super(
variableValues,
metastoreJson,
batchSize,
flushIntervalMs,
transformName,
transformPluginId,
transformMetaInterfaceXml,
inputRowMetaJson,
inputTransform,
targetTransforms,
infoTransforms,
infoRowMetaJsons,
infoCollectionViews,
runConfigName,
dataSamplersJson,
parentLogChannelId);
}
@Override
public PCollectionTuple expand(PCollection<HopRow> input) {
try {
// Only initialize once on this node/vm
//
BeamHop.init();
// Similar for the output : treat a TupleTag list for the target transforms...
//
TupleTag<HopRow> mainOutputTupleTag =
new TupleTag<>(HopBeamUtil.createMainOutputTupleId(transformName)) {};
List<TupleTag<HopRow>> targetTupleTags = new ArrayList<>();
TupleTagList targetTupleTagList = null;
for (String targetTransform : targetTransforms) {
String tupleId = HopBeamUtil.createTargetTupleId(transformName, targetTransform);
TupleTag<HopRow> tupleTag = new TupleTag<HopRow>(tupleId) {};
targetTupleTags.add(tupleTag);
if (targetTupleTagList == null) {
targetTupleTagList = TupleTagList.of(tupleTag);
} else {
targetTupleTagList = targetTupleTagList.and(tupleTag);
}
}
if (targetTupleTagList == null) {
targetTupleTagList = TupleTagList.empty();
}
// Create a new transform function, initializes the transform
//
TransformBatchFn transformBatchFn =
new TransformBatchFn(
variableValues,
metastoreJson,
transformName,
transformPluginId,
transformMetaInterfaceXml,
inputRowMetaJson,
inputTransform,
targetTransforms,
infoTransforms,
infoRowMetaJsons,
parentLogChannelId,
runConfigName,
dataSamplersJson);
// The actual transform functionality
//
ParDo.SingleOutput<HopRow, HopRow> parDoTransformFn = ParDo.of(transformBatchFn);
// Add optional side inputs...
//
if (infoCollectionViews.size() > 0) {
parDoTransformFn = parDoTransformFn.withSideInputs(infoCollectionViews);
}
// Specify the main output and targeted outputs
//
ParDo.MultiOutput<HopRow, HopRow> multiOutput =
parDoTransformFn.withOutputTags(mainOutputTupleTag, targetTupleTagList);
// Apply the multi output parallel do transform function to the main input stream
//
PCollectionTuple collectionTuple = input.apply(multiOutput);
// In the tuple is everything we need to find.
// Just make sure to retrieve the PCollections using the correct Tuple ID
// Use HopBeamUtil.createTargetTupleId()... to make sure
//
return collectionTuple;
} catch (Exception e) {
numErrors.inc();
LOG.error("Error transforming data in transform '" + transformName + "'", e);
throw new RuntimeException("Error transforming data in transform", e);
}
}
private class TransformBatchFn extends TransformBaseFn {
private static final long serialVersionUID = 95700000000000002L;
public static final String INJECTOR_TRANSFORM_NAME = "_INJECTOR_";
protected List<VariableValue> variableValues;
protected String metastoreJson;
protected String transformPluginId;
protected String transformMetaInterfaceXml;
protected String inputRowMetaJson;
protected List<String> targetTransforms;
protected List<String> infoTransforms;
protected List<String> infoRowMetaJsons;
protected boolean inputTransform;
protected boolean initialize;
protected List<PCollection<HopRow>> infoCollections;
// Log and count parse errors.
private final Counter numErrors = Metrics.counter("main", "TransformProcessErrors");
private transient PipelineMeta pipelineMeta;
private transient TransformMeta transformMeta;
private transient IRowMeta inputRowMeta;
private transient IRowMeta outputRowMeta;
private transient List<TransformMetaDataCombi> transformCombis;
private transient LocalPipelineEngine pipeline;
private transient RowProducer rowProducer;
private transient IRowListener rowListener;
private transient List<Object[]> resultRows;
private transient List<List<Object[]>> targetResultRowsList;
private transient List<IRowMeta> targetRowMetas;
private transient List<IRowMeta> infoRowMetas;
private transient List<RowProducer> infoRowProducers;
private transient TupleTag<HopRow> mainTupleTag;
private transient List<TupleTag<HopRow>> tupleTagList;
private transient Counter initCounter;
private transient Counter readCounter;
private transient Counter writtenCounter;
private transient Counter flushBufferCounter;
private transient SingleThreadedPipelineExecutor executor;
private transient Queue<HopRow> rowBuffer;
private transient AtomicLong lastTimerCheck;
private transient Timer timer;
private transient ExecutionInfoLocation executionInfoLocation;
private transient List<IExecutionDataSampler> dataSamplers;
private transient List<IExecutionDataSamplerStore> dataSamplerStores;
private transient Timer executionInfoTimer;
public TransformBatchFn() {
super(null, null, null);
}
// I created a private class because instances of this one need access to infoCollectionViews
//
public TransformBatchFn(
List<VariableValue> variableValues,
String metastoreJson,
String transformName,
String transformPluginId,
String transformMetaInterfaceXml,
String inputRowMetaJson,
boolean inputTransform,
List<String> targetTransforms,
List<String> infoTransforms,
List<String> infoRowMetaJsons,
String parentLogChannelId,
String runConfigName,
String dataSamplersJson) {
super(parentLogChannelId, runConfigName, dataSamplersJson);
this.variableValues = variableValues;
this.metastoreJson = metastoreJson;
this.transformName = transformName;
this.transformPluginId = transformPluginId;
this.transformMetaInterfaceXml = transformMetaInterfaceXml;
this.inputRowMetaJson = inputRowMetaJson;
this.inputTransform = inputTransform;
this.targetTransforms = targetTransforms;
this.infoTransforms = infoTransforms;
this.infoRowMetaJsons = infoRowMetaJsons;
this.initialize = true;
}
/**
* Reset the row buffer every time we start a new bundle to prevent the output of double rows
*
* @param startBundleContext
*/
@StartBundle
public void startBundle(StartBundleContext startBundleContext) {
Metrics.counter("startBundle", transformName).inc();
if ("ScriptValueMod".equals(transformPluginId) && pipeline != null) {
initialize = true;
}
}
@Setup
public void setup() {
try {
rowBuffer = new ConcurrentLinkedQueue();
} catch (Exception e) {
numErrors.inc();
LOG.info("Transform '" + transformName + "' : setup error :" + e.getMessage());
throw new RuntimeException("Unable to set up transform " + transformName, e);
}
}
@Teardown
public void tearDown() {
if (timer != null) {
timer.cancel();
}
try {
if (executor != null) {
executor.dispose();
// Send last data from the data samplers over to the location (if any)
//
if (executionInfoLocation != null) {
if (executionInfoTimer != null) {
executionInfoTimer.cancel();
}
sendSamplesToLocation(true);
// Close the location
//
executionInfoLocation.getExecutionInfoLocation().close();
}
}
} catch (Exception e) {
throw new RuntimeException(
"Error cleaning up single threaded pipeline executor in Beam transform "
+ transformName,
e);
}
}
@Override
protected void sendSamplesToLocation(boolean finished) throws HopException {
ExecutionDataBuilder dataBuilder =
ExecutionDataBuilder.of()
.withOwnerId(executor.getPipeline().getLogChannelId())
.withParentId(parentLogChannelId)
.withCollectionDate(new Date())
.withFinished(finished)
.withExecutionType(ExecutionType.Transform);
for (IExecutionDataSamplerStore store : dataSamplerStores) {
dataBuilder =
dataBuilder.addDataSets(store.getSamples()).addSetMeta(store.getSamplesMetadata());
}
executionInfoLocation.getExecutionInfoLocation().registerData(dataBuilder.build());
}
@ProcessElement
public void processElement(ProcessContext context, BoundedWindow window) {
try {
if (initialize) {
initialize = false;
// Initialize Hop and load extra plugins as well
//
BeamHop.init();
// The content of the metadata is JSON serialized and inflated below.
//
IHopMetadataProvider metadataProvider = new SerializableMetadataProvider(metastoreJson);
IVariables variables = new Variables();
for (VariableValue variableValue : variableValues) {
if (StringUtils.isNotEmpty(variableValue.getVariable())) {
variables.setVariable(variableValue.getVariable(), variableValue.getValue());
}
}
// Create a very simple new transformation to run single threaded...
// Single threaded...
//
pipelineMeta = new PipelineMeta();
pipelineMeta.setName(transformName);
pipelineMeta.setPipelineType(PipelineMeta.PipelineType.SingleThreaded);
pipelineMeta.setMetadataProvider(metadataProvider);
// When the first row ends up in the buffer we start the timer.
// If the rows are flushed out we reset back to -1
//
lastTimerCheck = new AtomicLong(-1L);
// Input row metadata...
//
inputRowMeta = JsonRowMeta.fromJson(inputRowMetaJson);
infoRowMetas = new ArrayList<>();
for (String infoRowMetaJson : infoRowMetaJsons) {
IRowMeta infoRowMeta = JsonRowMeta.fromJson(infoRowMetaJson);
infoRowMetas.add(infoRowMeta);
}
// Create an Injector transform with the right row layout...
// This will help all transforms see the row layout statically...
//
TransformMeta mainInjectorTransformMeta = null;
if (!inputTransform) {
mainInjectorTransformMeta =
createInjectorTransform(
pipelineMeta, INJECTOR_TRANSFORM_NAME, inputRowMeta, 200, 200);
}
// Our main transform writes to a bunch of targets
// Add a dummy transform for each one so the transform can target them
//
int targetLocationY = 200;
List<TransformMeta> targetTransformMetas = new ArrayList<>();
for (String targetTransform : targetTransforms) {
DummyMeta dummyMeta = new DummyMeta();
TransformMeta targetTransformMeta = new TransformMeta(targetTransform, dummyMeta);
targetTransformMeta.setLocation(600, targetLocationY);
targetLocationY += 150;
targetTransformMetas.add(targetTransformMeta);
pipelineMeta.addTransform(targetTransformMeta);
}
// The transform might read information from info transforms
// Transforms like "Stream Lookup" or "Validator"
// They read all the data on input from a side input
//
List<List<HopRow>> infoDataSets = new ArrayList<>();
List<TransformMeta> infoTransformMetas = new ArrayList<>();
for (int i = 0; i < infoTransforms.size(); i++) {
String infoTransform = infoTransforms.get(i);
PCollectionView<List<HopRow>> cv = infoCollectionViews.get(i);
// Get the data from the side input, from the info transform(s)
//
List<HopRow> infoDataSet = context.sideInput(cv);
infoDataSets.add(infoDataSet);
IRowMeta infoRowMeta = infoRowMetas.get(i);
// Add an Injector transform for every info transform so the transform can read from it
//
TransformMeta infoTransformMeta =
createInjectorTransform(
pipelineMeta, infoTransform, infoRowMeta, 200, 350 + 150 * i);
infoTransformMetas.add(infoTransformMeta);
}
transformCombis = new ArrayList<>();
// The main transform inflated from XML metadata...
//
PluginRegistry registry = PluginRegistry.getInstance();
ITransformMeta iTransformMeta =
registry.loadClass(
TransformPluginType.class, transformPluginId, ITransformMeta.class);
if (iTransformMeta == null) {
throw new HopException(
"Unable to load transform plugin with ID "
+ transformPluginId
+ ", this plugin isn't in the plugin registry or classpath");
}
HopBeamUtil.loadTransformMetadataFromXml(
transformName,
iTransformMeta,
transformMetaInterfaceXml,
pipelineMeta.getMetadataProvider());
transformMeta = new TransformMeta(transformName, iTransformMeta);
transformMeta.setTransformPluginId(transformPluginId);
transformMeta.setLocation(400, 200);
pipelineMeta.addTransform(transformMeta);
if (!inputTransform) {
pipelineMeta.addPipelineHop(
new PipelineHopMeta(mainInjectorTransformMeta, transformMeta));
}
// The target hops as well
//
for (TransformMeta targetTransformMeta : targetTransformMetas) {
pipelineMeta.addPipelineHop(new PipelineHopMeta(transformMeta, targetTransformMeta));
}
// And the info hops...
//
for (TransformMeta infoTransformMeta : infoTransformMetas) {
pipelineMeta.addPipelineHop(new PipelineHopMeta(infoTransformMeta, transformMeta));
}
lookupExecutionInformation(variables, metadataProvider);
iTransformMeta.searchInfoAndTargetTransforms(pipelineMeta.getTransforms());
// Create the transformation...
//
pipeline =
new LocalPipelineEngine(
pipelineMeta, variables, new LoggingObject("apache-beam-transform"));
pipeline.setLogLevel(
context.getPipelineOptions().as(HopPipelineExecutionOptions.class).getLogLevel());
pipeline.setMetadataProvider(pipelineMeta.getMetadataProvider());
pipeline
.getPipelineRunConfiguration()
.setName("beam-batch-transform-local (" + transformName + ")");
pipeline.prepareExecution();
// Create producers so we can efficiently pass data
//
rowProducer = null;
if (!inputTransform) {
rowProducer = pipeline.addRowProducer(INJECTOR_TRANSFORM_NAME, 0);
}
infoRowProducers = new ArrayList<>();
for (String infoTransform : infoTransforms) {
RowProducer infoRowProducer = pipeline.addRowProducer(infoTransform, 0);
infoRowProducers.add(infoRowProducer);
}
// Find the right interfaces for execution later...
//
if (!inputTransform) {
TransformMetaDataCombi injectorCombi = findCombi(pipeline, INJECTOR_TRANSFORM_NAME);
transformCombis.add(injectorCombi);
}
TransformMetaDataCombi transformCombi = findCombi(pipeline, transformName);
transformCombis.add(transformCombi);
outputRowMeta = pipelineMeta.getTransformFields(pipeline, transformName);
if (targetTransforms.isEmpty()) {
rowListener =
new RowAdapter() {
@Override
public void rowWrittenEvent(IRowMeta rowMeta, Object[] row) {
resultRows.add(row);
}
};
transformCombi.transform.addRowListener(rowListener);
}
// Create a list of TupleTag to direct the target rows
//
mainTupleTag =
new TupleTag<HopRow>(HopBeamUtil.createMainOutputTupleId(transformName)) {};
tupleTagList = new ArrayList<>();
// The lists in here will contain all the rows that ended up in the various target
// transforms (if any)
//
targetRowMetas = new ArrayList<>();
targetResultRowsList = new ArrayList<>();
for (String targetTransform : targetTransforms) {
TransformMetaDataCombi targetCombi = findCombi(pipeline, targetTransform);
transformCombis.add(targetCombi);
targetRowMetas.add(
pipelineMeta.getTransformFields(pipeline, transformCombi.transformName));
String tupleId = HopBeamUtil.createTargetTupleId(transformName, targetTransform);
TupleTag<HopRow> tupleTag = new TupleTag<HopRow>(tupleId) {};
tupleTagList.add(tupleTag);
final List<Object[]> targetResultRows = new ArrayList<>();
targetResultRowsList.add(targetResultRows);
targetCombi.transform.addRowListener(
new RowAdapter() {
@Override
public void rowReadEvent(IRowMeta rowMeta, Object[] row)
throws HopTransformException {
// We send the target row to a specific list...
//
targetResultRows.add(row);
}
});
}
// If we're sending execution information to a location we should do it differently from a
// Beam node.
// We're only going to go through the effort if we actually have any rows to sample.
//
attachExecutionSamplersToOutput(
variables,
transformName,
pipeline.getLogChannelId(),
inputRowMeta,
outputRowMeta,
pipeline.getTransform(transformName, 0));
executor = new SingleThreadedPipelineExecutor(pipeline);
// Initialize the transforms...
//
executor.init();
initCounter = Metrics.counter(Pipeline.METRIC_NAME_INIT, transformName);
readCounter = Metrics.counter(Pipeline.METRIC_NAME_READ, transformName);
writtenCounter = Metrics.counter(Pipeline.METRIC_NAME_WRITTEN, transformName);
flushBufferCounter = Metrics.counter(Pipeline.METRIC_NAME_FLUSH_BUFFER, transformName);
initCounter.inc();
pipeline.setLogLevel(LogLevel.NOTHING);
// Doesn't really start the threads in single threaded mode
// Just sets some flags all over the place
//
pipeline.startThreads();
pipeline.setLogLevel(LogLevel.BASIC);
resultRows = new ArrayList<>();
// Copy the info data sets to the info transforms...
// We do this only once so all subsequent rows can use this.
//
for (int i = 0; i < infoTransforms.size(); i++) {
RowProducer infoRowProducer = infoRowProducers.get(i);
List<HopRow> infoDataSet = infoDataSets.get(i);
TransformMetaDataCombi combi = findCombi(pipeline, infoTransforms.get(i));
IRowMeta infoRowMeta = infoRowMetas.get(i);
// Pass and process the rows in the info transforms
//
for (HopRow infoRowData : infoDataSet) {
infoRowProducer.putRow(infoRowMeta, infoRowData.getRow());
combi.transform.processRow();
}
// By calling finished() transforms like Stream Lookup know no more rows are going to
// come
// and they can start to work with the info data set
//
infoRowProducer.finished();
// Call once more to flag input as done, transform as finished.
//
combi.transform.processRow();
}
// Install a timer to check every second if the buffer is stale and needs to be flushed...
//
if (flushIntervalMs > 0) {
TimerTask timerTask =
new TimerTask() {
@Override
public void run() {
// Check on the state of the buffer, flush if needed...
//
synchronized (rowBuffer) {
long difference = System.currentTimeMillis() - lastTimerCheck.get();
if (lastTimerCheck.get() <= 0 || difference > flushIntervalMs) {
try {
emptyRowBuffer(new TransformProcessContext(context));
} catch (Exception e) {
throw new RuntimeException(
"Unable to flush row buffer when it got stale after "
+ difference
+ " ms",
e);
}
lastTimerCheck.set(System.currentTimeMillis());
}
}
}
};
timer = new Timer("Flush timer of transform " + transformName);
timer.schedule(timerTask, 100, 100);
}
}
// Get one row from the context main input and make a copy so we can change it.
//
HopRow originalInputRow = context.element();
HopRow inputRow = HopBeamUtil.copyHopRow(originalInputRow, inputRowMeta);
readCounter.inc();
// Take care of the age of the buffer...
//
if (flushIntervalMs > 0 && rowBuffer.isEmpty()) {
lastTimerCheck.set(System.currentTimeMillis());
}
// Add the row to the buffer.
//
synchronized (rowBuffer) {
rowBuffer.add(inputRow);
batchWindow = window;
synchronized (rowBuffer) {
if (rowBuffer.size() >= batchSize) {
emptyRowBuffer(new TransformProcessContext(context));
}
}
}
} catch (Exception e) {
numErrors.inc();
LOG.info("Transform execution error :" + e.getMessage());
throw new RuntimeException("Error executing TransformBatchFn", e);
}
}
@FinishBundle
public void finishBundle(FinishBundleContext context) {
try {
synchronized (rowBuffer) {
if (!rowBuffer.isEmpty()) {
emptyRowBuffer(new TransformFinishBundleContext(context, batchWindow));
}
}
} catch (Exception e) {
numErrors.inc();
LOG.info("Transform finishing bundle error :" + e.getMessage());
throw new RuntimeException(
"Error finalizing bundle of transform '" + transformName + "'", e);
}
}
private transient int maxInputBufferSize = 0;
private transient int minInputBufferSize = Integer.MAX_VALUE;
/**
* Attempt to empty the row buffer
*
* @param context
* @throws HopException
*/
private synchronized void emptyRowBuffer(TupleOutputContext<HopRow> context)
throws HopException {
synchronized (rowBuffer) {
List<HopRow> buffer = new ArrayList<>();
// Copy the data to avoid race conditions
//
int size = rowBuffer.size();
for (int i = 0; i < size; i++) {
HopRow hopRow = rowBuffer.poll();
buffer.add(hopRow);
}
// Only do something if we have work to do
//
if (buffer.isEmpty()) {
return;
}
if (!rowBuffer.isEmpty()) {
LOG.error("Async action detected on rowBuffer");
}
// Empty all the row buffers for another iteration
//
resultRows.clear();
for (int t = 0; t < targetTransforms.size(); t++) {
targetResultRowsList.get(t).clear();
}
// Pass the rows in the rowBuffer to the input RowSet
//
if (!inputTransform) {
int bufferSize = buffer.size();
if (maxInputBufferSize < bufferSize) {
Metrics.counter("maxInputSize", transformName).inc(bufferSize - maxInputBufferSize);
maxInputBufferSize = bufferSize;
}
if (minInputBufferSize > bufferSize) {
if (minInputBufferSize == Integer.MAX_VALUE) {
Metrics.counter("minInputSize", transformName).inc(bufferSize);
} else {
Metrics.counter("minInputSize", transformName).dec(bufferSize - minInputBufferSize);
}
minInputBufferSize = bufferSize;
}
for (HopRow inputRow : buffer) {
rowProducer.putRow(inputRowMeta, inputRow.getRow());
}
}
// Execute all transforms in the transformation
//
executor.oneIteration();
// Evaluate the results...
//
// Pass all rows in the output to the process context
//
for (Object[] resultRow : resultRows) {
// Pass the row to the process context
//
context.output(mainTupleTag, new HopRow(resultRow));
writtenCounter.inc();
}
// Pass whatever ended up on the target nodes
//
for (int t = 0; t < targetResultRowsList.size(); t++) {
List<Object[]> targetRowsList = targetResultRowsList.get(t);
TupleTag<HopRow> tupleTag = tupleTagList.get(t);
for (Object[] targetRow : targetRowsList) {
context.output(tupleTag, new HopRow(targetRow));
}
}
flushBufferCounter.inc();
buffer.clear(); // gc
lastTimerCheck.set(System.currentTimeMillis()); // No need to check sooner
}
}
private TransformMeta createInjectorTransform(
PipelineMeta pipelineMeta,
String injectorTransformName,
IRowMeta injectorRowMeta,
int x,
int y) {
InjectorMeta injectorMeta = new InjectorMeta();
for (IValueMeta valueMeta : injectorRowMeta.getValueMetaList()) {
injectorMeta
.getInjectorFields()
.add(
new InjectorField(
valueMeta.getName(),
valueMeta.getTypeDesc(),
Integer.toString(valueMeta.getLength()),
Integer.toString(valueMeta.getPrecision())));
}
TransformMeta injectorTransformMeta = new TransformMeta(injectorTransformName, injectorMeta);
injectorTransformMeta.setLocation(x, y);
pipelineMeta.addTransform(injectorTransformMeta);
return injectorTransformMeta;
}
private TransformMetaDataCombi findCombi(Pipeline pipeline, String transformName) {
for (TransformMetaDataCombi combi : pipeline.getTransforms()) {
if (combi.transformName.equals(transformName)) {
return combi;
}
}
throw new RuntimeException(
"Configuration error, transform '" + transformName + "' not found in transformation");
}
}
}