blob: 805b899f134f078046f8f7e7b600be2ff63cf292 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hop.pipeline.transforms.addsequence;
import org.apache.hop.core.Counter;
import org.apache.hop.core.Counters;
import org.apache.hop.core.database.Database;
import org.apache.hop.core.database.DatabaseMeta;
import org.apache.hop.core.exception.HopDatabaseException;
import org.apache.hop.core.exception.HopException;
import org.apache.hop.core.exception.HopTransformException;
import org.apache.hop.core.row.IRowMeta;
import org.apache.hop.core.row.RowDataUtil;
import org.apache.hop.core.util.Utils;
import org.apache.hop.i18n.BaseMessages;
import org.apache.hop.pipeline.Pipeline;
import org.apache.hop.pipeline.PipelineMeta;
import org.apache.hop.pipeline.transform.BaseTransform;
import org.apache.hop.pipeline.transform.TransformMeta;
/** Adds a sequential number to a stream of rows. */
public class AddSequence extends BaseTransform<AddSequenceMeta, AddSequenceData> {
private static final Class<?> PKG = AddSequence.class; // For Translator
public AddSequence(
TransformMeta transformMeta,
AddSequenceMeta meta,
AddSequenceData data,
int copyNr,
PipelineMeta pipelineMeta,
Pipeline pipeline) {
super(transformMeta, meta, data, copyNr, pipelineMeta, pipeline);
}
public synchronized Object[] addSequence(IRowMeta inputRowMeta, Object[] inputRowData)
throws HopException {
Object next;
if (meta.isCounterUsed()) {
next = data.counter.getAndNext();
} else if (meta.isDatabaseUsed()) {
try {
next =
data.getDb()
.getNextSequenceValue(
data.realSchemaName, data.realSequenceName, meta.getValueName());
} catch (HopDatabaseException dbe) {
throw new HopTransformException(
BaseMessages.getString(
PKG, "AddSequence.Exception.ErrorReadingSequence", data.realSequenceName),
dbe);
}
} else {
// This should never happen, but if it does, don't continue!!!
throw new HopTransformException(
BaseMessages.getString(PKG, "AddSequence.Exception.NoSpecifiedMethod"));
}
if (next != null) {
Object[] outputRowData = inputRowData;
if (inputRowData.length < inputRowMeta.size() + 1) {
outputRowData = RowDataUtil.resizeArray(inputRowData, inputRowMeta.size() + 1);
}
outputRowData[inputRowMeta.size()] = next;
return outputRowData;
} else {
throw new HopTransformException(
BaseMessages.getString(PKG, "AddSequence.Exception.CouldNotFindNextValueForSequence")
+ meta.getValueName());
}
}
@Override
public boolean processRow() throws HopException {
Object[] r = getRow(); // Get row from input rowset & set row busy!
if (r == null) {
// no more input to be expected...
setOutputDone();
return false;
}
if (first) {
first = false;
data.outputRowMeta = getInputRowMeta().clone();
meta.getFields(data.outputRowMeta, getTransformName(), null, null, this, metadataProvider);
}
if (log.isRowLevel()) {
logRowlevel(
BaseMessages.getString(PKG, "AddSequence.Log.ReadRow")
+ getLinesRead()
+ " : "
+ getInputRowMeta().getString(r));
}
try {
putRow(data.outputRowMeta, addSequence(getInputRowMeta(), r));
if (log.isRowLevel()) {
logRowlevel(
BaseMessages.getString(PKG, "AddSequence.Log.WriteRow")
+ getLinesWritten()
+ " : "
+ getInputRowMeta().getString(r));
}
if (checkFeedback(getLinesRead())) {
if (log.isBasic()) {
logBasic(BaseMessages.getString(PKG, "AddSequence.Log.LineNumber") + getLinesRead());
}
}
} catch (HopException e) {
logError(BaseMessages.getString(PKG, "AddSequence.Log.ErrorInTransform") + e.getMessage());
setErrors(1);
stopAll();
setOutputDone(); // signal end to receiver(s)
return false;
}
return true;
}
@Override
public boolean init() {
if (super.init()) {
data.realSchemaName = resolve(meta.getSchemaName());
data.realSequenceName = resolve(meta.getSequenceName());
if (meta.isDatabaseUsed()) {
if (Utils.isEmpty(meta.getConnection())) {
logError(
BaseMessages.getString(
PKG, "InsertUpdate.Init.ConnectionMissing", getTransformName()));
return false;
}
DatabaseMeta databaseMeta = getPipelineMeta().findDatabase(meta.getConnection(), variables);
Database db = new Database(this, this, databaseMeta);
data.setDb(db);
try {
data.getDb().connect();
if (log.isDetailed()) {
logDetailed(BaseMessages.getString(PKG, "AddSequence.Log.ConnectedDB"));
}
return true;
} catch (HopDatabaseException dbe) {
logError(
BaseMessages.getString(PKG, "AddSequence.Log.CouldNotConnectToDB")
+ dbe.getMessage());
}
} else if (meta.isCounterUsed()) {
// Do the environment translations of the counter values.
boolean doAbort = false;
try {
data.start = Long.parseLong(resolve(meta.getStartAt()));
} catch (NumberFormatException ex) {
logError(
BaseMessages.getString(
PKG,
"AddSequence.Log.CouldNotParseCounterValue",
"start",
meta.getStartAt(),
resolve(meta.getStartAt()),
ex.getMessage()));
doAbort = true;
}
try {
data.increment = Long.parseLong(resolve(meta.getIncrementBy()));
} catch (NumberFormatException ex) {
logError(
BaseMessages.getString(
PKG,
"AddSequence.Log.CouldNotParseCounterValue",
"increment",
meta.getIncrementBy(),
resolve(meta.getIncrementBy()),
ex.getMessage()));
doAbort = true;
}
try {
data.maximum = Long.parseLong(resolve(meta.getMaxValue()));
} catch (NumberFormatException ex) {
logError(
BaseMessages.getString(
PKG,
"AddSequence.Log.CouldNotParseCounterValue",
"increment",
meta.getMaxValue(),
resolve(meta.getMaxValue()),
ex.getMessage()));
doAbort = true;
}
if (doAbort) {
return false;
}
String realCounterName = resolve(meta.getCounterName());
if (!Utils.isEmpty(realCounterName)) {
data.setLookup("@@sequence:" + realCounterName);
} else {
data.setLookup("@@sequence:" + meta.getValueName());
}
// We need to synchronize over the whole pipeline to make sure that we always get the same
// counter
// regardless of the number of transform copies asking for it.
//
synchronized (getPipeline()) {
data.counter =
Counters.getInstance()
.getOrUpdateCounter(
data.getLookup(), new Counter(data.start, data.increment, data.maximum));
}
return true;
} else {
logError(
BaseMessages.getString(PKG, "AddSequence.Log.PipelineCountersHashtableNotAllocated"));
}
} else {
logError(BaseMessages.getString(PKG, "AddSequence.Log.NeedToSelectSequence"));
}
return false;
}
@Override
public void dispose() {
if (meta.isCounterUsed()) {
if (data.getLookup() != null) {
Counters.getInstance().removeCounter(data.getLookup());
}
data.counter = null;
}
if (meta.isDatabaseUsed()) {
if (data.getDb() != null) {
data.getDb().disconnect();
}
}
super.dispose();
}
@Override
public void cleanup() {
super.cleanup();
}
}