| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.hop.pipeline.transforms.orabulkloader; |
| |
| // |
| // The "designer" notes of the Oracle bulkloader: |
| // ---------------------------------------------- |
| // |
| // - "Enclosed" is used in the loader instead of "optionally enclosed" as optionally |
| // encloses kind of destroys the escaping. |
| // - A Boolean is output as Y and N (as in the text output transform e.g.). If people don't |
| // like this they can first convert the boolean value to something else before loading |
| // it. |
| // - Filters (besides data and datetime) are not supported as it slows down. |
| // |
| // |
| |
| import com.google.common.annotations.VisibleForTesting; |
| import java.io.BufferedReader; |
| import java.io.File; |
| import java.io.FileWriter; |
| import java.io.IOException; |
| import java.io.InputStream; |
| import java.io.InputStreamReader; |
| import java.io.UnsupportedEncodingException; |
| import java.util.List; |
| import org.apache.commons.vfs2.FileObject; |
| import org.apache.hop.core.Const; |
| import org.apache.hop.core.database.DatabaseMeta; |
| import org.apache.hop.core.encryption.Encr; |
| import org.apache.hop.core.exception.HopException; |
| import org.apache.hop.core.exception.HopFileException; |
| import org.apache.hop.core.row.IRowMeta; |
| import org.apache.hop.core.row.IValueMeta; |
| import org.apache.hop.core.util.Utils; |
| import org.apache.hop.core.variables.IVariables; |
| import org.apache.hop.core.vfs.HopVfs; |
| import org.apache.hop.i18n.BaseMessages; |
| import org.apache.hop.pipeline.Pipeline; |
| import org.apache.hop.pipeline.PipelineMeta; |
| import org.apache.hop.pipeline.engine.IPipelineEngine; |
| import org.apache.hop.pipeline.transform.BaseTransform; |
| import org.apache.hop.pipeline.transform.TransformMeta; |
| |
| /** Performs a bulk load to an oracle table. */ |
| public class OraBulkLoader extends BaseTransform<OraBulkLoaderMeta, OraBulkLoaderData> { |
| private static final Class<?> PKG = |
| OraBulkLoaderMeta.class; // for i18n purposes, needed by Translator2!! |
| |
| public static final int EX_SUCC = 0; |
| |
| public static final int EX_WARN = 2; |
| |
| private Process sqlldrProcess = null; |
| |
| private OraBulkDataOutput output = null; |
| |
| /* |
| * Local copy of the transformation "preview" property. We only forward the rows upon previewing, we don't do any of |
| * the real stuff. |
| */ |
| private boolean preview = false; |
| |
| // |
| // This class continually reads from the stream, and sends it to the log |
| // if the logging level is at least basic level. |
| // |
| private final class StreamLogger extends Thread { |
| private InputStream input; |
| private String type; |
| |
| StreamLogger(InputStream is, String type) { |
| this.input = is; |
| this.type = type + ">"; |
| } |
| |
| @Override |
| public void run() { |
| try { |
| final BufferedReader br = new BufferedReader(new InputStreamReader(input)); |
| String line; |
| while ((line = br.readLine()) != null) { |
| // Only perform the concatenation if at basic level. Otherwise, |
| // this just reads from the stream. |
| if (log.isBasic()) { |
| logBasic(type + line); |
| } |
| } |
| } catch (IOException ioe) { |
| ioe.printStackTrace(); |
| } |
| } |
| } |
| |
| public OraBulkLoader( |
| TransformMeta transformMeta, |
| OraBulkLoaderMeta meta, |
| OraBulkLoaderData data, |
| int copyNr, |
| PipelineMeta pipelineMeta, |
| Pipeline pipeline) { |
| super(transformMeta, meta, data, copyNr, pipelineMeta, pipeline); |
| } |
| |
| private String substituteRecordTerminator(String terminator) { |
| final StringBuilder in = new StringBuilder(); |
| int length; |
| boolean escaped = false; |
| |
| terminator = resolve(terminator); |
| length = terminator.length(); |
| for (int i = 0; i < length; i++) { |
| final char c = terminator.charAt(i); |
| |
| if (escaped) { |
| switch (c) { |
| case 'n': |
| in.append('\n'); |
| break; |
| case 'r': |
| in.append('\r'); |
| break; |
| default: |
| in.append(c); |
| break; |
| } |
| escaped = false; |
| } else if (c == '\\') { |
| escaped = true; |
| } else { |
| in.append(c); |
| } |
| } |
| |
| return in.toString(); |
| } |
| |
| private String encodeRecordTerminator(String terminator, String encoding) throws HopException { |
| final String in = substituteRecordTerminator(terminator); |
| final StringBuilder out = new StringBuilder(); |
| byte[] bytes; |
| |
| try { |
| // use terminator in hex representation due to character set |
| // terminator in hex representation must be in character set |
| // of data file |
| if (Utils.isEmpty(encoding)) { |
| bytes = in.getBytes(); |
| } else { |
| bytes = in.getBytes(encoding); |
| } |
| for (byte aByte : bytes) { |
| final String hex = Integer.toHexString(aByte); |
| |
| if (hex.length() == 1) { |
| out.append('0'); |
| } |
| out.append(hex); |
| } |
| } catch (UnsupportedEncodingException e) { |
| throw new HopException("Unsupported character encoding: " + encoding, e); |
| } |
| |
| return out.toString(); |
| } |
| |
| /** |
| * Get the contents of the control file as specified in the meta object |
| * |
| * @param meta the meta object to model the control file after |
| * @return a string containing the control file contents |
| */ |
| public String getControlFileContents(OraBulkLoaderMeta meta, IRowMeta rowMeta, Object[] row) |
| throws HopException { |
| DatabaseMeta dm = meta.getDatabaseMeta(); |
| String inputName = "'" + getFilename(getFileObject(meta.getDataFile(), variables)) + "'"; |
| |
| String loadAction = meta.getLoadAction(); |
| |
| StringBuilder contents = new StringBuilder(500); |
| contents.append("OPTIONS(").append(Const.CR); |
| contents.append(" ERRORS=\'").append(meta.getMaxErrors()).append("\'").append(Const.CR); |
| |
| if (meta.getCommitSizeAsInt(this) != 0 |
| && !(meta.isDirectPath() && getTransformMeta().getCopies(variables) > 1)) { |
| // For the second part of the above expressions: ROWS is not supported |
| // in parallel mode (by sqlldr). |
| contents.append(" , ROWS=\'").append(meta.getCommitSize()).append("\'").append(Const.CR); |
| } |
| |
| if (meta.getBindSizeAsInt(this) != 0) { |
| contents.append(" , BINDSIZE=\'").append(meta.getBindSize()).append("\'").append(Const.CR); |
| } |
| |
| if (meta.getReadSizeAsInt(this) != 0) { |
| contents.append(" , READSIZE=\'").append(meta.getReadSize()).append("\'").append(Const.CR); |
| } |
| |
| contents.append(")").append(Const.CR); |
| |
| contents.append("LOAD DATA").append(Const.CR); |
| if (!Utils.isEmpty(meta.getCharacterSetName())) { |
| contents.append("CHARACTERSET ").append(meta.getCharacterSetName()).append(Const.CR); |
| } |
| if (!OraBulkLoaderMeta.METHOD_AUTO_CONCURRENT.equals(meta.getLoadMethod()) |
| || !Utils.isEmpty(meta.getAltRecordTerm())) { |
| String infile = inputName; |
| |
| if (OraBulkLoaderMeta.METHOD_AUTO_CONCURRENT.equals(meta.getLoadMethod())) { |
| infile = "''"; |
| } |
| |
| // For concurrent input, data command line argument must be specified |
| contents.append("INFILE ").append(infile); |
| if (!Utils.isEmpty(meta.getAltRecordTerm())) { |
| contents |
| .append(" \"STR x'") |
| .append(encodeRecordTerminator(meta.getAltRecordTerm(), meta.getEncoding())) |
| .append("'\""); |
| } |
| contents.append(Const.CR); |
| } |
| contents |
| .append("INTO TABLE ") |
| .append( |
| dm.getQuotedSchemaTableCombination( |
| variables, meta.getSchemaName(), meta.getTableName())) |
| .append(Const.CR) |
| .append(loadAction) |
| .append(Const.CR) |
| .append("FIELDS TERMINATED BY ',' ENCLOSED BY '\"'") |
| .append(Const.CR) |
| .append("TRAILING NULLCOLS") |
| .append(Const.CR) |
| .append('('); |
| |
| List<OraBulkLoaderMappingMeta> mappings = meta.getMappings(); |
| if (mappings == null || mappings.isEmpty()) { |
| throw new HopException("No fields defined to load to database"); |
| } |
| boolean firstMapping = true; |
| for (OraBulkLoaderMappingMeta mapping : mappings) { |
| if (!firstMapping) { |
| contents.append(", ").append(Const.CR); |
| } |
| firstMapping = false; |
| contents.append(dm.quoteField(mapping.getFieldTable())); |
| |
| int pos = rowMeta.indexOfValue(mapping.getFieldStream()); |
| if (pos < 0) { |
| throw new HopException("Could not find field " + mapping.getFieldStream() + " in stream"); |
| } |
| IValueMeta v = rowMeta.getValueMeta(pos); |
| switch (v.getType()) { |
| case IValueMeta.TYPE_STRING: |
| if (v.getLength() > 255) { |
| contents.append(" CHAR(").append(v.getLength()).append(")"); |
| } else { |
| contents.append(" CHAR"); |
| } |
| break; |
| case IValueMeta.TYPE_INTEGER: |
| case IValueMeta.TYPE_NUMBER: |
| case IValueMeta.TYPE_BIGNUMBER: |
| break; |
| case IValueMeta.TYPE_DATE: |
| if (OraBulkLoaderMeta.DATE_MASK_DATE.equals(mapping.getDateMask())) { |
| contents.append(" DATE 'yyyy-mm-dd'"); |
| } else if (OraBulkLoaderMeta.DATE_MASK_DATETIME.equals(mapping.getDateMask())) { |
| contents.append(" TIMESTAMP 'yyyy-mm-dd hh24:mi:ss.ff'"); |
| } else { |
| // If not specified the default is date. |
| contents.append(" DATE 'yyyy-mm-dd'"); |
| } |
| break; |
| case IValueMeta.TYPE_BINARY: |
| contents.append(" ENCLOSED BY '<startlob>' AND '<endlob>'"); |
| break; |
| case IValueMeta.TYPE_TIMESTAMP: |
| contents.append(" TIMESTAMP 'yyyy-mm-dd hh24:mi:ss.ff'"); |
| break; |
| default: |
| break; |
| } |
| } |
| contents.append(")"); |
| |
| return contents.toString(); |
| } |
| |
| /** |
| * Create a control file. |
| * |
| * @param filename path to control file |
| * @param meta transform meta |
| * @throws HopException |
| */ |
| public void createControlFile(String filename, Object[] row, OraBulkLoaderMeta meta) |
| throws HopException { |
| FileWriter fw = null; |
| |
| try { |
| File controlFile = new File(getFileObject(filename, variables).getURL().getFile()); |
| // Need to ensure that the parent directory they set exists for the control file. |
| controlFile.getParentFile().mkdirs(); |
| controlFile.createNewFile(); |
| fw = new FileWriter(controlFile); |
| fw.write(getControlFileContents(meta, getInputRowMeta(), row)); |
| } catch (IOException ex) { |
| throw new HopException(ex.getMessage(), ex); |
| } finally { |
| try { |
| if (fw != null) { |
| fw.close(); |
| } |
| } catch (Exception ex) { |
| // Ignore errors |
| } |
| } |
| } |
| |
| /** |
| * Create the command line for an sqlldr process depending on the meta information supplied. |
| * |
| * @param meta The meta data to create the command line from |
| * @param password Use the real password or not |
| * @return The string to execute. |
| * @throws HopException Upon any exception |
| */ |
| public String createCommandLine(OraBulkLoaderMeta meta, boolean password) throws HopException { |
| StringBuilder sb = new StringBuilder(300); |
| |
| if (meta.getSqlldr() != null) { |
| try { |
| FileObject fileObject = getFileObject(meta.getSqlldr(), variables); |
| String sqlldr = getFilename(fileObject); |
| sb.append(sqlldr); |
| } catch (HopFileException ex) { |
| throw new HopException("Error retrieving sqlldr string", ex); |
| } |
| } else { |
| throw new HopException("No sqlldr application specified"); |
| } |
| |
| if (meta.getControlFile() != null) { |
| try { |
| FileObject fileObject = getFileObject(meta.getControlFile(), variables); |
| |
| sb.append(" control=\'"); |
| sb.append(getFilename(fileObject)); |
| sb.append("\'"); |
| } catch (HopFileException ex) { |
| throw new HopException("Error retrieving controlfile string", ex); |
| } |
| } else { |
| throw new HopException("No control file specified"); |
| } |
| |
| if (OraBulkLoaderMeta.METHOD_AUTO_CONCURRENT.equals(meta.getLoadMethod())) { |
| sb.append(" data=\'-\'"); |
| } |
| |
| if (meta.getLogFile() != null) { |
| try { |
| FileObject fileObject = getFileObject(meta.getLogFile(), variables); |
| |
| sb.append(" log=\'"); |
| sb.append(getFilename(fileObject)); |
| sb.append("\'"); |
| } catch (HopFileException ex) { |
| throw new HopException("Error retrieving logfile string", ex); |
| } |
| } |
| |
| if (meta.getBadFile() != null) { |
| try { |
| FileObject fileObject = getFileObject(meta.getBadFile(), variables); |
| |
| sb.append(" bad=\'"); |
| sb.append(getFilename(fileObject)); |
| sb.append("\'"); |
| } catch (HopFileException ex) { |
| throw new HopException("Error retrieving badfile string", ex); |
| } |
| } |
| |
| if (meta.getDiscardFile() != null) { |
| try { |
| FileObject fileObject = getFileObject(meta.getDiscardFile(), variables); |
| |
| sb.append(" discard=\'"); |
| sb.append(getFilename(fileObject)); |
| sb.append("\'"); |
| } catch (HopFileException ex) { |
| throw new HopException("Error retrieving discardfile string", ex); |
| } |
| } |
| |
| DatabaseMeta db = meta.getDatabaseMeta(); |
| if (db != null) { |
| String user = Const.NVL(db.getUsername(), ""); |
| String pass = |
| Const.NVL(Encr.decryptPasswordOptionallyEncrypted(resolve(db.getPassword())), ""); |
| if (!password) { |
| pass = "******"; |
| } |
| |
| sb.append(" userid=").append(resolve(user)).append("/").append(resolve(pass)).append("@"); |
| |
| // If host name is specified, use form host:port/dbName else use TNS_NAME |
| if (!Utils.isEmpty(db.getHostname())) { |
| sb.append(resolve(db.getHostname())); |
| sb.append(':'); |
| sb.append(resolve(db.getPort())); |
| } |
| |
| String databaseName = resolve(db.getDatabaseName()); |
| |
| // Quote |
| if (databaseName.indexOf('(') >= 0) { |
| databaseName = databaseName.replace("=", "\\="); |
| // databaseName = databaseName.replace("(", "\\("); |
| // databaseName = databaseName.replace(")", "\\)"); |
| databaseName = '"' + databaseName + '"'; |
| } |
| |
| sb.append(databaseName); |
| } else { |
| throw new HopException("No connection specified"); |
| } |
| |
| if (meta.isDirectPath()) { |
| sb.append(" DIRECT=TRUE"); |
| |
| if (this.getTransformMeta().getCopies(variables) > 1 || meta.isParallel()) { |
| sb.append(" PARALLEL=TRUE"); |
| } |
| } |
| |
| return sb.toString(); |
| } |
| |
| public void checkExitVal(int exitVal) throws HopException { |
| if (exitVal == EX_SUCC) { |
| return; |
| } |
| |
| if (meta.isFailOnWarning() && (exitVal == EX_WARN)) { |
| throw new HopException("sqlldr returned warning"); |
| } else if (meta.isFailOnError() && (exitVal != EX_WARN)) { |
| throw new HopException("sqlldr returned an error (exit code " + exitVal + ")"); |
| } |
| } |
| |
| public boolean execute(OraBulkLoaderMeta meta, boolean wait) throws HopException { |
| Runtime runtime = Runtime.getRuntime(); |
| |
| try { |
| sqlldrProcess = runtime.exec(createCommandLine(meta, true)); |
| // any error message? |
| StreamLogger errorLogger = new StreamLogger(sqlldrProcess.getErrorStream(), "ERROR"); |
| |
| // any output? |
| StreamLogger outputLogger = new StreamLogger(sqlldrProcess.getInputStream(), "OUTPUT"); |
| |
| // kick them off |
| errorLogger.start(); |
| outputLogger.start(); |
| |
| if (wait) { |
| // any error??? |
| int exitVal = sqlldrProcess.waitFor(); |
| sqlldrProcess = null; |
| logBasic(BaseMessages.getString(PKG, "OraBulkLoader.Log.ExitValueSqlldr", "" + exitVal)); |
| checkExitVal(exitVal); |
| } |
| } catch (Exception ex) { |
| // Don't throw the message upwards, the message contains the password. |
| throw new HopException( |
| "Error while executing sqlldr \'" + createCommandLine(meta, false) + "\'"); |
| } |
| |
| return true; |
| } |
| |
| @Override |
| public boolean processRow() throws HopException { |
| try { |
| Object[] r = getRow(); // Get row from input rowset & set row busy! |
| if (r == null) { |
| // no more input to be expected... |
| |
| setOutputDone(); |
| |
| if (!preview) { |
| if (output != null) { |
| // Close the output |
| try { |
| output.close(); |
| } catch (IOException e) { |
| throw new HopException("Error while closing output", e); |
| } |
| |
| output = null; |
| } |
| |
| String loadMethod = meta.getLoadMethod(); |
| if (OraBulkLoaderMeta.METHOD_AUTO_END.equals(loadMethod)) { |
| // if this is the first line, we do not need to execute loader |
| // control file may not exists |
| if (!first) { |
| execute(meta, true); |
| sqlldrProcess = null; |
| } |
| } else if (OraBulkLoaderMeta.METHOD_AUTO_CONCURRENT.equals(meta.getLoadMethod())) { |
| try { |
| if (sqlldrProcess != null) { |
| int exitVal = sqlldrProcess.waitFor(); |
| sqlldrProcess = null; |
| logBasic( |
| BaseMessages.getString(PKG, "OraBulkLoader.Log.ExitValueSqlldr", "" + exitVal)); |
| checkExitVal(exitVal); |
| } else if (!first) { |
| throw new HopException("Internal error: no sqlldr process running"); |
| } |
| } catch (Exception ex) { |
| throw new HopException("Error while executing sqlldr", ex); |
| } |
| } |
| } |
| return false; |
| } |
| |
| if (!preview) { |
| if (first) { |
| first = false; |
| |
| String recTerm = Const.CR; |
| if (!Utils.isEmpty(meta.getAltRecordTerm())) { |
| recTerm = substituteRecordTerminator(meta.getAltRecordTerm()); |
| } |
| |
| createControlFile(resolve(meta.getControlFile()), r, meta); |
| output = new OraBulkDataOutput(meta, recTerm); |
| |
| if (OraBulkLoaderMeta.METHOD_AUTO_CONCURRENT.equals(meta.getLoadMethod())) { |
| execute(meta, false); |
| } |
| output.open(this, sqlldrProcess); |
| } |
| output.writeLine(getInputRowMeta(), r); |
| } |
| putRow(getInputRowMeta(), r); |
| incrementLinesOutput(); |
| |
| } catch (HopException e) { |
| logError(BaseMessages.getString(PKG, "OraBulkLoader.Log.ErrorInTransform") + e.getMessage()); |
| setErrors(1); |
| stopAll(); |
| setOutputDone(); // signal end to receiver(s) |
| return false; |
| } |
| |
| return true; |
| } |
| |
| protected void verifyDatabaseConnection() throws HopException { |
| if (meta.getDatabaseMeta() == null) { |
| throw new HopException( |
| BaseMessages.getString(PKG, "OraBulkLoaderMeta.GetSQL.NoConnectionDefined")); |
| } |
| } |
| |
| @Override |
| public boolean init() { |
| |
| IPipelineEngine pipeline = this.getPipeline(); |
| preview = pipeline.isPreview(); |
| |
| if (super.init()) { |
| try { |
| verifyDatabaseConnection(); |
| } catch (HopException ex) { |
| logError(ex.getMessage()); |
| return false; |
| } |
| return true; |
| } |
| return false; |
| } |
| |
| @Override |
| public void dispose() { |
| |
| super.dispose(); |
| |
| // close output stream (may terminate running sqlldr) |
| if (output != null) { |
| // Close the output |
| try { |
| output.close(); |
| } catch (IOException e) { |
| logError("Error while closing output", e); |
| } |
| |
| output = null; |
| } |
| // running sqlldr process must be terminated |
| if (sqlldrProcess != null) { |
| try { |
| int exitVal = sqlldrProcess.waitFor(); |
| sqlldrProcess = null; |
| logBasic(BaseMessages.getString(PKG, "OraBulkLoader.Log.ExitValueSqlldr", "" + exitVal)); |
| } catch (InterruptedException e) { |
| /* process should be destroyed */ |
| e.printStackTrace(); |
| if (sqlldrProcess != null) { |
| sqlldrProcess.destroy(); |
| } |
| } |
| } |
| |
| if (!preview && meta.isEraseFiles()) { |
| // Erase the created cfg/dat files if requested. We don't erase |
| // the rest of the files because it would be "stupid" to erase them |
| // right after creation. If you don't want them, don't fill them in. |
| FileObject fileObject = null; |
| |
| String method = meta.getLoadMethod(); |
| if (OraBulkLoaderMeta.METHOD_AUTO_END.equals(method) && meta.getControlFile() != null) { |
| |
| try { |
| fileObject = getFileObject(meta.getControlFile(), variables); |
| fileObject.delete(); |
| fileObject.close(); |
| } catch (Exception ex) { |
| logError( |
| "Error deleting control file \'" + getFilename(fileObject) + "\': " + ex.getMessage(), |
| ex); |
| } |
| } |
| |
| if (OraBulkLoaderMeta.METHOD_AUTO_END.equals(method) && meta.getDataFile() != null) { |
| // In concurrent mode the data is written to the control file. |
| |
| try { |
| fileObject = getFileObject(meta.getDataFile(), variables); |
| fileObject.delete(); |
| fileObject.close(); |
| } catch (Exception ex) { |
| logError( |
| "Error deleting data file \'" + getFilename(fileObject) + "\': " + ex.getMessage(), |
| ex); |
| } |
| } |
| |
| if (OraBulkLoaderMeta.METHOD_MANUAL.equals(method)) { |
| logBasic("Deletion of files is not compatible with \'manual load method\'"); |
| } |
| } |
| } |
| |
| @VisibleForTesting |
| String getFilename(FileObject fileObject) { |
| return HopVfs.getFilename(fileObject); |
| } |
| |
| @VisibleForTesting |
| FileObject getFileObject(String fileName, IVariables variables) throws HopFileException { |
| return HopVfs.getFileObject(variables.resolve(fileName)); |
| } |
| } |