blob: 27e5697cd3dfe35346f37f8b4bde156c68894879 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hop.pipeline.transforms.orabulkloader;
//
// The "designer" notes of the Oracle bulkloader:
// ----------------------------------------------
//
// - "Enclosed" is used in the loader instead of "optionally enclosed" as optionally
// encloses kind of destroys the escaping.
// - A Boolean is output as Y and N (as in the text output transform e.g.). If people don't
// like this they can first convert the boolean value to something else before loading
// it.
// - Filters (besides data and datetime) are not supported as it slows down.
//
//
import com.google.common.annotations.VisibleForTesting;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.UnsupportedEncodingException;
import java.util.List;
import org.apache.commons.vfs2.FileObject;
import org.apache.hop.core.Const;
import org.apache.hop.core.database.DatabaseMeta;
import org.apache.hop.core.encryption.Encr;
import org.apache.hop.core.exception.HopException;
import org.apache.hop.core.exception.HopFileException;
import org.apache.hop.core.row.IRowMeta;
import org.apache.hop.core.row.IValueMeta;
import org.apache.hop.core.util.Utils;
import org.apache.hop.core.variables.IVariables;
import org.apache.hop.core.vfs.HopVfs;
import org.apache.hop.i18n.BaseMessages;
import org.apache.hop.pipeline.Pipeline;
import org.apache.hop.pipeline.PipelineMeta;
import org.apache.hop.pipeline.engine.IPipelineEngine;
import org.apache.hop.pipeline.transform.BaseTransform;
import org.apache.hop.pipeline.transform.TransformMeta;
/** Performs a bulk load to an oracle table. */
public class OraBulkLoader extends BaseTransform<OraBulkLoaderMeta, OraBulkLoaderData> {
private static final Class<?> PKG =
OraBulkLoaderMeta.class; // for i18n purposes, needed by Translator2!!
public static final int EX_SUCC = 0;
public static final int EX_WARN = 2;
private Process sqlldrProcess = null;
private OraBulkDataOutput output = null;
/*
* Local copy of the transformation "preview" property. We only forward the rows upon previewing, we don't do any of
* the real stuff.
*/
private boolean preview = false;
//
// This class continually reads from the stream, and sends it to the log
// if the logging level is at least basic level.
//
private final class StreamLogger extends Thread {
private InputStream input;
private String type;
StreamLogger(InputStream is, String type) {
this.input = is;
this.type = type + ">";
}
@Override
public void run() {
try {
final BufferedReader br = new BufferedReader(new InputStreamReader(input));
String line;
while ((line = br.readLine()) != null) {
// Only perform the concatenation if at basic level. Otherwise,
// this just reads from the stream.
if (log.isBasic()) {
logBasic(type + line);
}
}
} catch (IOException ioe) {
ioe.printStackTrace();
}
}
}
public OraBulkLoader(
TransformMeta transformMeta,
OraBulkLoaderMeta meta,
OraBulkLoaderData data,
int copyNr,
PipelineMeta pipelineMeta,
Pipeline pipeline) {
super(transformMeta, meta, data, copyNr, pipelineMeta, pipeline);
}
private String substituteRecordTerminator(String terminator) {
final StringBuilder in = new StringBuilder();
int length;
boolean escaped = false;
terminator = resolve(terminator);
length = terminator.length();
for (int i = 0; i < length; i++) {
final char c = terminator.charAt(i);
if (escaped) {
switch (c) {
case 'n':
in.append('\n');
break;
case 'r':
in.append('\r');
break;
default:
in.append(c);
break;
}
escaped = false;
} else if (c == '\\') {
escaped = true;
} else {
in.append(c);
}
}
return in.toString();
}
private String encodeRecordTerminator(String terminator, String encoding) throws HopException {
final String in = substituteRecordTerminator(terminator);
final StringBuilder out = new StringBuilder();
byte[] bytes;
try {
// use terminator in hex representation due to character set
// terminator in hex representation must be in character set
// of data file
if (Utils.isEmpty(encoding)) {
bytes = in.getBytes();
} else {
bytes = in.getBytes(encoding);
}
for (byte aByte : bytes) {
final String hex = Integer.toHexString(aByte);
if (hex.length() == 1) {
out.append('0');
}
out.append(hex);
}
} catch (UnsupportedEncodingException e) {
throw new HopException("Unsupported character encoding: " + encoding, e);
}
return out.toString();
}
/**
* Get the contents of the control file as specified in the meta object
*
* @param meta the meta object to model the control file after
* @return a string containing the control file contents
*/
public String getControlFileContents(OraBulkLoaderMeta meta, IRowMeta rowMeta, Object[] row)
throws HopException {
DatabaseMeta dm = meta.getDatabaseMeta();
String inputName = "'" + getFilename(getFileObject(meta.getDataFile(), variables)) + "'";
String loadAction = meta.getLoadAction();
StringBuilder contents = new StringBuilder(500);
contents.append("OPTIONS(").append(Const.CR);
contents.append(" ERRORS=\'").append(meta.getMaxErrors()).append("\'").append(Const.CR);
if (meta.getCommitSizeAsInt(this) != 0
&& !(meta.isDirectPath() && getTransformMeta().getCopies(variables) > 1)) {
// For the second part of the above expressions: ROWS is not supported
// in parallel mode (by sqlldr).
contents.append(" , ROWS=\'").append(meta.getCommitSize()).append("\'").append(Const.CR);
}
if (meta.getBindSizeAsInt(this) != 0) {
contents.append(" , BINDSIZE=\'").append(meta.getBindSize()).append("\'").append(Const.CR);
}
if (meta.getReadSizeAsInt(this) != 0) {
contents.append(" , READSIZE=\'").append(meta.getReadSize()).append("\'").append(Const.CR);
}
contents.append(")").append(Const.CR);
contents.append("LOAD DATA").append(Const.CR);
if (!Utils.isEmpty(meta.getCharacterSetName())) {
contents.append("CHARACTERSET ").append(meta.getCharacterSetName()).append(Const.CR);
}
if (!OraBulkLoaderMeta.METHOD_AUTO_CONCURRENT.equals(meta.getLoadMethod())
|| !Utils.isEmpty(meta.getAltRecordTerm())) {
String infile = inputName;
if (OraBulkLoaderMeta.METHOD_AUTO_CONCURRENT.equals(meta.getLoadMethod())) {
infile = "''";
}
// For concurrent input, data command line argument must be specified
contents.append("INFILE ").append(infile);
if (!Utils.isEmpty(meta.getAltRecordTerm())) {
contents
.append(" \"STR x'")
.append(encodeRecordTerminator(meta.getAltRecordTerm(), meta.getEncoding()))
.append("'\"");
}
contents.append(Const.CR);
}
contents
.append("INTO TABLE ")
.append(
dm.getQuotedSchemaTableCombination(
variables, meta.getSchemaName(), meta.getTableName()))
.append(Const.CR)
.append(loadAction)
.append(Const.CR)
.append("FIELDS TERMINATED BY ',' ENCLOSED BY '\"'")
.append(Const.CR)
.append("TRAILING NULLCOLS")
.append(Const.CR)
.append('(');
List<OraBulkLoaderMappingMeta> mappings = meta.getMappings();
if (mappings == null || mappings.isEmpty()) {
throw new HopException("No fields defined to load to database");
}
boolean firstMapping = true;
for (OraBulkLoaderMappingMeta mapping : mappings) {
if (!firstMapping) {
contents.append(", ").append(Const.CR);
}
firstMapping = false;
contents.append(dm.quoteField(mapping.getFieldTable()));
int pos = rowMeta.indexOfValue(mapping.getFieldStream());
if (pos < 0) {
throw new HopException("Could not find field " + mapping.getFieldStream() + " in stream");
}
IValueMeta v = rowMeta.getValueMeta(pos);
switch (v.getType()) {
case IValueMeta.TYPE_STRING:
if (v.getLength() > 255) {
contents.append(" CHAR(").append(v.getLength()).append(")");
} else {
contents.append(" CHAR");
}
break;
case IValueMeta.TYPE_INTEGER:
case IValueMeta.TYPE_NUMBER:
case IValueMeta.TYPE_BIGNUMBER:
break;
case IValueMeta.TYPE_DATE:
if (OraBulkLoaderMeta.DATE_MASK_DATE.equals(mapping.getDateMask())) {
contents.append(" DATE 'yyyy-mm-dd'");
} else if (OraBulkLoaderMeta.DATE_MASK_DATETIME.equals(mapping.getDateMask())) {
contents.append(" TIMESTAMP 'yyyy-mm-dd hh24:mi:ss.ff'");
} else {
// If not specified the default is date.
contents.append(" DATE 'yyyy-mm-dd'");
}
break;
case IValueMeta.TYPE_BINARY:
contents.append(" ENCLOSED BY '<startlob>' AND '<endlob>'");
break;
case IValueMeta.TYPE_TIMESTAMP:
contents.append(" TIMESTAMP 'yyyy-mm-dd hh24:mi:ss.ff'");
break;
default:
break;
}
}
contents.append(")");
return contents.toString();
}
/**
* Create a control file.
*
* @param filename path to control file
* @param meta transform meta
* @throws HopException
*/
public void createControlFile(String filename, Object[] row, OraBulkLoaderMeta meta)
throws HopException {
FileWriter fw = null;
try {
File controlFile = new File(getFileObject(filename, variables).getURL().getFile());
// Need to ensure that the parent directory they set exists for the control file.
controlFile.getParentFile().mkdirs();
controlFile.createNewFile();
fw = new FileWriter(controlFile);
fw.write(getControlFileContents(meta, getInputRowMeta(), row));
} catch (IOException ex) {
throw new HopException(ex.getMessage(), ex);
} finally {
try {
if (fw != null) {
fw.close();
}
} catch (Exception ex) {
// Ignore errors
}
}
}
/**
* Create the command line for an sqlldr process depending on the meta information supplied.
*
* @param meta The meta data to create the command line from
* @param password Use the real password or not
* @return The string to execute.
* @throws HopException Upon any exception
*/
public String createCommandLine(OraBulkLoaderMeta meta, boolean password) throws HopException {
StringBuilder sb = new StringBuilder(300);
if (meta.getSqlldr() != null) {
try {
FileObject fileObject = getFileObject(meta.getSqlldr(), variables);
String sqlldr = getFilename(fileObject);
sb.append(sqlldr);
} catch (HopFileException ex) {
throw new HopException("Error retrieving sqlldr string", ex);
}
} else {
throw new HopException("No sqlldr application specified");
}
if (meta.getControlFile() != null) {
try {
FileObject fileObject = getFileObject(meta.getControlFile(), variables);
sb.append(" control=\'");
sb.append(getFilename(fileObject));
sb.append("\'");
} catch (HopFileException ex) {
throw new HopException("Error retrieving controlfile string", ex);
}
} else {
throw new HopException("No control file specified");
}
if (OraBulkLoaderMeta.METHOD_AUTO_CONCURRENT.equals(meta.getLoadMethod())) {
sb.append(" data=\'-\'");
}
if (meta.getLogFile() != null) {
try {
FileObject fileObject = getFileObject(meta.getLogFile(), variables);
sb.append(" log=\'");
sb.append(getFilename(fileObject));
sb.append("\'");
} catch (HopFileException ex) {
throw new HopException("Error retrieving logfile string", ex);
}
}
if (meta.getBadFile() != null) {
try {
FileObject fileObject = getFileObject(meta.getBadFile(), variables);
sb.append(" bad=\'");
sb.append(getFilename(fileObject));
sb.append("\'");
} catch (HopFileException ex) {
throw new HopException("Error retrieving badfile string", ex);
}
}
if (meta.getDiscardFile() != null) {
try {
FileObject fileObject = getFileObject(meta.getDiscardFile(), variables);
sb.append(" discard=\'");
sb.append(getFilename(fileObject));
sb.append("\'");
} catch (HopFileException ex) {
throw new HopException("Error retrieving discardfile string", ex);
}
}
DatabaseMeta db = meta.getDatabaseMeta();
if (db != null) {
String user = Const.NVL(db.getUsername(), "");
String pass =
Const.NVL(Encr.decryptPasswordOptionallyEncrypted(resolve(db.getPassword())), "");
if (!password) {
pass = "******";
}
sb.append(" userid=").append(resolve(user)).append("/").append(resolve(pass)).append("@");
// If host name is specified, use form host:port/dbName else use TNS_NAME
if (!Utils.isEmpty(db.getHostname())) {
sb.append(resolve(db.getHostname()));
sb.append(':');
sb.append(resolve(db.getPort()));
}
String databaseName = resolve(db.getDatabaseName());
// Quote
if (databaseName.indexOf('(') >= 0) {
databaseName = databaseName.replace("=", "\\=");
// databaseName = databaseName.replace("(", "\\(");
// databaseName = databaseName.replace(")", "\\)");
databaseName = '"' + databaseName + '"';
}
sb.append(databaseName);
} else {
throw new HopException("No connection specified");
}
if (meta.isDirectPath()) {
sb.append(" DIRECT=TRUE");
if (this.getTransformMeta().getCopies(variables) > 1 || meta.isParallel()) {
sb.append(" PARALLEL=TRUE");
}
}
return sb.toString();
}
public void checkExitVal(int exitVal) throws HopException {
if (exitVal == EX_SUCC) {
return;
}
if (meta.isFailOnWarning() && (exitVal == EX_WARN)) {
throw new HopException("sqlldr returned warning");
} else if (meta.isFailOnError() && (exitVal != EX_WARN)) {
throw new HopException("sqlldr returned an error (exit code " + exitVal + ")");
}
}
public boolean execute(OraBulkLoaderMeta meta, boolean wait) throws HopException {
Runtime runtime = Runtime.getRuntime();
try {
sqlldrProcess = runtime.exec(createCommandLine(meta, true));
// any error message?
StreamLogger errorLogger = new StreamLogger(sqlldrProcess.getErrorStream(), "ERROR");
// any output?
StreamLogger outputLogger = new StreamLogger(sqlldrProcess.getInputStream(), "OUTPUT");
// kick them off
errorLogger.start();
outputLogger.start();
if (wait) {
// any error???
int exitVal = sqlldrProcess.waitFor();
sqlldrProcess = null;
logBasic(BaseMessages.getString(PKG, "OraBulkLoader.Log.ExitValueSqlldr", "" + exitVal));
checkExitVal(exitVal);
}
} catch (Exception ex) {
// Don't throw the message upwards, the message contains the password.
throw new HopException(
"Error while executing sqlldr \'" + createCommandLine(meta, false) + "\'");
}
return true;
}
@Override
public boolean processRow() throws HopException {
try {
Object[] r = getRow(); // Get row from input rowset & set row busy!
if (r == null) {
// no more input to be expected...
setOutputDone();
if (!preview) {
if (output != null) {
// Close the output
try {
output.close();
} catch (IOException e) {
throw new HopException("Error while closing output", e);
}
output = null;
}
String loadMethod = meta.getLoadMethod();
if (OraBulkLoaderMeta.METHOD_AUTO_END.equals(loadMethod)) {
// if this is the first line, we do not need to execute loader
// control file may not exists
if (!first) {
execute(meta, true);
sqlldrProcess = null;
}
} else if (OraBulkLoaderMeta.METHOD_AUTO_CONCURRENT.equals(meta.getLoadMethod())) {
try {
if (sqlldrProcess != null) {
int exitVal = sqlldrProcess.waitFor();
sqlldrProcess = null;
logBasic(
BaseMessages.getString(PKG, "OraBulkLoader.Log.ExitValueSqlldr", "" + exitVal));
checkExitVal(exitVal);
} else if (!first) {
throw new HopException("Internal error: no sqlldr process running");
}
} catch (Exception ex) {
throw new HopException("Error while executing sqlldr", ex);
}
}
}
return false;
}
if (!preview) {
if (first) {
first = false;
String recTerm = Const.CR;
if (!Utils.isEmpty(meta.getAltRecordTerm())) {
recTerm = substituteRecordTerminator(meta.getAltRecordTerm());
}
createControlFile(resolve(meta.getControlFile()), r, meta);
output = new OraBulkDataOutput(meta, recTerm);
if (OraBulkLoaderMeta.METHOD_AUTO_CONCURRENT.equals(meta.getLoadMethod())) {
execute(meta, false);
}
output.open(this, sqlldrProcess);
}
output.writeLine(getInputRowMeta(), r);
}
putRow(getInputRowMeta(), r);
incrementLinesOutput();
} catch (HopException e) {
logError(BaseMessages.getString(PKG, "OraBulkLoader.Log.ErrorInTransform") + e.getMessage());
setErrors(1);
stopAll();
setOutputDone(); // signal end to receiver(s)
return false;
}
return true;
}
protected void verifyDatabaseConnection() throws HopException {
if (meta.getDatabaseMeta() == null) {
throw new HopException(
BaseMessages.getString(PKG, "OraBulkLoaderMeta.GetSQL.NoConnectionDefined"));
}
}
@Override
public boolean init() {
IPipelineEngine pipeline = this.getPipeline();
preview = pipeline.isPreview();
if (super.init()) {
try {
verifyDatabaseConnection();
} catch (HopException ex) {
logError(ex.getMessage());
return false;
}
return true;
}
return false;
}
@Override
public void dispose() {
super.dispose();
// close output stream (may terminate running sqlldr)
if (output != null) {
// Close the output
try {
output.close();
} catch (IOException e) {
logError("Error while closing output", e);
}
output = null;
}
// running sqlldr process must be terminated
if (sqlldrProcess != null) {
try {
int exitVal = sqlldrProcess.waitFor();
sqlldrProcess = null;
logBasic(BaseMessages.getString(PKG, "OraBulkLoader.Log.ExitValueSqlldr", "" + exitVal));
} catch (InterruptedException e) {
/* process should be destroyed */
e.printStackTrace();
if (sqlldrProcess != null) {
sqlldrProcess.destroy();
}
}
}
if (!preview && meta.isEraseFiles()) {
// Erase the created cfg/dat files if requested. We don't erase
// the rest of the files because it would be "stupid" to erase them
// right after creation. If you don't want them, don't fill them in.
FileObject fileObject = null;
String method = meta.getLoadMethod();
if (OraBulkLoaderMeta.METHOD_AUTO_END.equals(method) && meta.getControlFile() != null) {
try {
fileObject = getFileObject(meta.getControlFile(), variables);
fileObject.delete();
fileObject.close();
} catch (Exception ex) {
logError(
"Error deleting control file \'" + getFilename(fileObject) + "\': " + ex.getMessage(),
ex);
}
}
if (OraBulkLoaderMeta.METHOD_AUTO_END.equals(method) && meta.getDataFile() != null) {
// In concurrent mode the data is written to the control file.
try {
fileObject = getFileObject(meta.getDataFile(), variables);
fileObject.delete();
fileObject.close();
} catch (Exception ex) {
logError(
"Error deleting data file \'" + getFilename(fileObject) + "\': " + ex.getMessage(),
ex);
}
}
if (OraBulkLoaderMeta.METHOD_MANUAL.equals(method)) {
logBasic("Deletion of files is not compatible with \'manual load method\'");
}
}
}
@VisibleForTesting
String getFilename(FileObject fileObject) {
return HopVfs.getFilename(fileObject);
}
@VisibleForTesting
FileObject getFileObject(String fileName, IVariables variables) throws HopFileException {
return HopVfs.getFileObject(variables.resolve(fileName));
}
}