blob: 6996d94c9fbeb55c1a76b6cbb064ef9d12e77395 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.apex.malhar.python;
import java.util.Random;
import javax.validation.constraints.Min;
import org.apache.apex.malhar.python.base.util.NDimensionalArray;
import com.datatorrent.api.Context;
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.api.InputOperator;
/**
* Generates and emits a simple float and int for python operator to consume
*
* @since 3.8.0
*/
public class PythonPayloadPOJOGenerator implements InputOperator
{
private long tuplesCounter = 0;
private long currentWindowTuplesCounter = 0;
// Limit number of emitted tuples per window
@Min(1)
private long maxTuplesPerWindow = 150;
@Min(1)
private long maxTuples = 300;
private final Random random = new Random();
private static final int MAX_RANDOM_INT = 100;
public static final int DIMENSION_SIZE = 2;
public static int[] intDimensionSums = new int[ DIMENSION_SIZE * DIMENSION_SIZE ];
public static float[] floatDimensionSums = new float[DIMENSION_SIZE * DIMENSION_SIZE];
public final transient DefaultOutputPort<PythonProcessingPojo> output = new DefaultOutputPort<>();
public PythonPayloadPOJOGenerator()
{
for ( int i = 0; i < (DIMENSION_SIZE * DIMENSION_SIZE ); i++) {
intDimensionSums[i] = 0;
floatDimensionSums[i] = 0;
}
}
@Override
public void beginWindow(long windowId)
{
currentWindowTuplesCounter = 0;
}
@Override
public void endWindow()
{
}
@Override
public void emitTuples()
{
while ( ( currentWindowTuplesCounter < maxTuplesPerWindow) && (tuplesCounter < maxTuples) ) {
PythonProcessingPojo pythonProcessingPojo = new PythonProcessingPojo();
pythonProcessingPojo.setX(random.nextInt(MAX_RANDOM_INT));
pythonProcessingPojo.setY(random.nextFloat());
float[] f = new float[( DIMENSION_SIZE * DIMENSION_SIZE)];
for ( int i = 0; i < (DIMENSION_SIZE * DIMENSION_SIZE ); i++) {
f[i] = random.nextFloat();
floatDimensionSums[ i % DIMENSION_SIZE ] = floatDimensionSums[ i % DIMENSION_SIZE ] + f[i];
}
NDimensionalArray<float[]> nDimensionalFloatArray = new NDimensionalArray<>();
nDimensionalFloatArray.setData(f);
nDimensionalFloatArray.setDimensions(new int[] {DIMENSION_SIZE, DIMENSION_SIZE});
nDimensionalFloatArray.setLengthOfSequentialArray(floatDimensionSums.length);
nDimensionalFloatArray.setSignedFlag(false);
pythonProcessingPojo.setNumpyFloatArray(nDimensionalFloatArray);
int[] ints = new int[( DIMENSION_SIZE * DIMENSION_SIZE)];
for ( int i = 0; i < (DIMENSION_SIZE * DIMENSION_SIZE ); i++) {
ints[i] = random.nextInt(MAX_RANDOM_INT);
intDimensionSums[ i % DIMENSION_SIZE ] = intDimensionSums [ i % DIMENSION_SIZE ] + ints[i];
}
NDimensionalArray<int[]> nDimensionalIntArray = new NDimensionalArray<>();
nDimensionalIntArray.setData(ints);
nDimensionalIntArray.setDimensions(new int[] {DIMENSION_SIZE, DIMENSION_SIZE});
nDimensionalIntArray.setLengthOfSequentialArray(ints.length);
nDimensionalIntArray.setSignedFlag(false);
pythonProcessingPojo.setNumpyIntArray(nDimensionalIntArray);
output.emit(pythonProcessingPojo);
currentWindowTuplesCounter += 1;
tuplesCounter += 1;
}
}
public long getMaxTuples()
{
return maxTuples;
}
public void setMaxTuples(long maxTuples)
{
this.maxTuples = maxTuples;
}
public long getMaxTuplesPerWindow()
{
return maxTuplesPerWindow;
}
public void setMaxTuplesPerWindow(long maxTuplesPerWindow)
{
this.maxTuplesPerWindow = maxTuplesPerWindow;
}
@Override
public void setup(Context.OperatorContext context)
{
}
@Override
public void teardown()
{
}
public static int[] getIntDimensionSums()
{
return intDimensionSums;
}
public static void setIntDimensionSums(int[] intDimensionSums)
{
PythonPayloadPOJOGenerator.intDimensionSums = intDimensionSums;
}
public static float[] getFloatDimensionSums()
{
return floatDimensionSums;
}
public static void setFloatDimensionSums(float[] floatDimensionSums)
{
PythonPayloadPOJOGenerator.floatDimensionSums = floatDimensionSums;
}
}