blob: eb9cc62d88e552e40777ce39f805ab392bd009f6 [file] [log] [blame]
package org.apache.oodt.filemgrget;
import org.apache.oodt.cas.filemgr.structs.exceptions.CatalogException;
import org.apache.oodt.cas.filemgr.structs.exceptions.ConnectionException;
import org.apache.oodt.cas.filemgr.structs.exceptions.DataTransferException;
import org.pentaho.di.core.Const;
import org.pentaho.di.core.exception.KettleException;
import org.pentaho.di.core.row.RowMeta;
import org.pentaho.di.trans.Trans;
import org.pentaho.di.trans.TransMeta;
import org.pentaho.di.trans.step.*;
import sun.util.logging.resources.logging_de;
import java.io.IOException;
import java.net.MalformedURLException;
import java.util.Map;
import static org.pentaho.di.core.row.RowDataUtil.allocateRowData;
/**
* Copyright 2014 OSBI Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
public class FilemgrGetStep extends BaseStep implements StepInterface {
private OODTConfig oodt = new OODTConfig();
private OODTProcesses oodtproc = new OODTProcesses();
private FilemgrGetStepData data;
private FilemgrGetStepMeta meta;
/**
* The constructor should simply pass on its arguments to the parent class.
*
* @param s step description
* @param stepDataInterface step data class
* @param c step copy
* @param t transformation description
* @param dis transformation executing
*/
public FilemgrGetStep(StepMeta s, StepDataInterface stepDataInterface, int c, TransMeta t, Trans dis) {
super(s, stepDataInterface, c, t, dis);
}
/**
* This method is called by PDI during transformation startup.
*
* It should initialize required for step execution.
*
* The meta and data implementations passed in can safely be cast
* to the step's respective implementations.
*
* It is mandatory that super.init() is called to ensure correct behavior.
*
* Typical tasks executed here are establishing the connection to a database,
* as wall as obtaining resources, like file handles.
*
* @param smi step meta interface implementation, containing the step settings
* @param sdi step data interface implementation, used to store runtime information
*
* @return true if initialization completed successfully, false if there was an error preventing the step from working.
*
*/
public boolean init(StepMetaInterface smi, StepDataInterface sdi) {
// Casting to step-specific implementation classes is safe
meta = (FilemgrGetStepMeta) smi;
data = (FilemgrGetStepData) sdi;
logDetailed("loading xmlrpcclient");
try {
oodt.loadXMLRpcClient(meta.getServerURLField());
} catch (MalformedURLException e) {
logError("Incorrect URL", e);
} catch (ConnectionException e) {
logError("There was a problem connecting", e);
}
logDetailed("finished loading xmlrpcclient");
return super.init(meta, data);
}
private Object[] buildEmptyRow() {
return allocateRowData(data.outputRowMeta.size());
}
/**
* Once the transformation starts executing, the processRow() method is called repeatedly
* by PDI for as long as it returns true. To indicate that a step has finished processing rows
* this method must call setOutputDone() and return false;
*
* Steps which process incoming rows typically call getRow() to read a single row from the
* input stream, change or add row content, call putRow() to pass the changed row on
* and return true. If getRow() returns null, no more rows are expected to come in,
* and the processRow() implementation calls setOutputDone() and returns false to
* indicate that it is done too.
*
* Steps which generate rows typically construct a new row Object[] using a call to
* RowDataUtil.allocateRowData(numberOfFields), add row content, and call putRow() to
* pass the new row on. Above process may happen in a loop to generate multiple rows,
* at the end of which processRow() would call setOutputDone() and return false;
*
* @param smi the step meta interface containing the step settings
* @param sdi the step data interface that should be used to store
*
* @return true to indicate that the function should be called again, false if the step is done
*/
public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws KettleException {
if ( first ) {
first = false;
data.outputRowMeta = new RowMeta();
meta.getFields( data.outputRowMeta, getStepname(), null, null, this, repository, metaStore );
}
if(meta.getProcessType() == Process.LIST){
try {
Map<String, Map<String, String>> products = oodtproc.getAllProducts(oodt, environmentSubstitute(meta.getProductTypeField()));
for (Map.Entry<String, Map<String,String>> entry : products.entrySet())
{
Object[] row = buildEmptyRow();
incrementLinesRead();
row[0] = entry.getKey();
int i = 1;
for(Map.Entry<String,String> innerentry : entry.getValue().entrySet()){
row[i] = innerentry.getValue();
i++;
}
putRow(data.outputRowMeta, row);
}
} catch (Exception e) {
logError("Could not get data", e);
}
}
else if(meta.getProcessType() == Process.GET && meta.getSearchType() == Search.ID){
try {
String lookup = meta.getLookup();
if(!Const.isEmpty(lookup)){
lookup = environmentSubstitute(lookup);
}
Object[] row = buildEmptyRow();
row[0] = oodtproc.getProductByID(oodt, lookup);
putRow(data.outputRowMeta, row);
} catch (CatalogException e) {
logError("Catalog Exception", e);
} catch (DataTransferException e) {
logError("Data Transfer Exception", e);
} catch (IOException e) {
logError("IO Exception",e);
}
}
else if(meta.getProcessType() == Process.GET && meta.getSearchType() == Search.NAME){
try {
String lookup = meta.getLookup();
if(!Const.isEmpty(lookup)){
lookup = environmentSubstitute(lookup);
}
Object[] row = buildEmptyRow();
row[0] = oodtproc.getProductByName(oodt, lookup);
putRow(data.outputRowMeta, row);
} catch (CatalogException e) {
logError("Catalog Exception", e);
} catch (DataTransferException e) {
logError("Data Transfer Exception", e);
} catch (IOException e) {
logError("IO Exception", e);
}
}
// log progress if it is time to to so
if (checkFeedback(getLinesRead())) {
logBasic("Linenr " + getLinesRead()); // Some basic logging
}
// indicate that processRow() should be called again
setOutputDone();
return false;
}
/**
* This method is called by PDI once the step is done processing.
*
* The dispose() method is the counterpart to init() and should release any resources
* acquired for step execution like file handles or database connections.
*
* The meta and data implementations passed in can safely be cast
* to the step's respective implementations.
*
* It is mandatory that super.dispose() is called to ensure correct behavior.
*
* @param smi step meta interface implementation, containing the step settings
* @param sdi step data interface implementation, used to store runtime information
*/
public void dispose(StepMetaInterface smi, StepDataInterface sdi) {
// Casting to step-specific implementation classes is safe
FilemgrGetStepMeta meta = (FilemgrGetStepMeta) smi;
FilemgrGetStepData data = (FilemgrGetStepData) sdi;
super.dispose(meta, data);
}
}