| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.hop.pipeline.transforms.vertica.bulkloader; |
| |
| import com.google.common.annotations.VisibleForTesting; |
| import com.vertica.jdbc.VerticaConnection; |
| import com.vertica.jdbc.VerticaCopyStream; |
| import java.io.*; |
| import java.sql.Connection; |
| import java.sql.SQLException; |
| import java.text.SimpleDateFormat; |
| import java.util.*; |
| import java.util.concurrent.Executors; |
| import java.util.stream.Collectors; |
| import javax.sql.PooledConnection; |
| import org.apache.commons.dbcp.DelegatingConnection; |
| import org.apache.hop.core.database.Database; |
| import org.apache.hop.core.database.DatabaseMeta; |
| import org.apache.hop.core.exception.HopDatabaseException; |
| import org.apache.hop.core.exception.HopException; |
| import org.apache.hop.core.exception.HopTransformException; |
| import org.apache.hop.core.exception.HopValueException; |
| import org.apache.hop.core.row.IRowMeta; |
| import org.apache.hop.core.row.IValueMeta; |
| import org.apache.hop.core.row.RowMeta; |
| import org.apache.hop.core.util.StringUtil; |
| import org.apache.hop.core.util.Utils; |
| import org.apache.hop.i18n.BaseMessages; |
| import org.apache.hop.pipeline.Pipeline; |
| import org.apache.hop.pipeline.PipelineMeta; |
| import org.apache.hop.pipeline.transform.BaseTransform; |
| import org.apache.hop.pipeline.transform.TransformMeta; |
| import org.apache.hop.pipeline.transforms.vertica.bulkloader.nativebinary.ColumnSpec; |
| import org.apache.hop.pipeline.transforms.vertica.bulkloader.nativebinary.ColumnType; |
| import org.apache.hop.pipeline.transforms.vertica.bulkloader.nativebinary.StreamEncoder; |
| |
| public class VerticaBulkLoader extends BaseTransform<VerticaBulkLoaderMeta, VerticaBulkLoaderData> { |
| private static final Class<?> PKG = |
| VerticaBulkLoader.class; // for i18n purposes, needed by Translator2!! |
| |
| private static final SimpleDateFormat SIMPLE_DATE_FORMAT = |
| new SimpleDateFormat("yyyy/MM/dd HH:mm:ss"); |
| private FileOutputStream exceptionLog; |
| private FileOutputStream rejectedLog; |
| |
| public VerticaBulkLoader( |
| TransformMeta transformMeta, |
| VerticaBulkLoaderMeta meta, |
| VerticaBulkLoaderData data, |
| int copyNr, |
| PipelineMeta pipelineMeta, |
| Pipeline pipeline) { |
| super(transformMeta, meta, data, copyNr, pipelineMeta, pipeline); |
| } |
| |
| @Override |
| public boolean processRow() throws HopException { |
| Object[] r = getRow(); // this also waits for a previous transform to be |
| // finished. |
| if (r == null) { // no more input to be expected... |
| if (first && meta.isTruncateTable() && !meta.isOnlyWhenHaveRows()) { |
| truncateTable(); |
| } |
| |
| try { |
| data.close(); |
| } catch (IOException ioe) { |
| throw new HopTransformException("Error releasing resources", ioe); |
| } |
| return false; |
| } |
| |
| if (first) { |
| |
| first = false; |
| |
| if (meta.isTruncateTable()) { |
| truncateTable(); |
| } |
| |
| data.outputRowMeta = getInputRowMeta().clone(); |
| meta.getFields(data.outputRowMeta, getTransformName(), null, null, this, metadataProvider); |
| |
| IRowMeta tableMeta = meta.getRequiredFields(variables); |
| |
| if (!meta.specifyFields()) { |
| |
| // Just take the whole input row |
| data.insertRowMeta = getInputRowMeta().clone(); |
| data.selectedRowFieldIndices = new int[data.insertRowMeta.size()]; |
| |
| data.colSpecs = new ArrayList<>(data.insertRowMeta.size()); |
| |
| for (int insertFieldIdx = 0; insertFieldIdx < data.insertRowMeta.size(); insertFieldIdx++) { |
| data.selectedRowFieldIndices[insertFieldIdx] = insertFieldIdx; |
| IValueMeta inputValueMeta = data.insertRowMeta.getValueMeta(insertFieldIdx); |
| IValueMeta insertValueMeta = inputValueMeta.clone(); |
| IValueMeta targetValueMeta = tableMeta.getValueMeta(insertFieldIdx); |
| insertValueMeta.setName(targetValueMeta.getName()); |
| data.insertRowMeta.setValueMeta(insertFieldIdx, insertValueMeta); |
| ColumnSpec cs = getColumnSpecFromField(inputValueMeta, insertValueMeta, targetValueMeta); |
| data.colSpecs.add(insertFieldIdx, cs); |
| } |
| |
| } else { |
| |
| int numberOfInsertFields = meta.getFields().size(); |
| data.insertRowMeta = new RowMeta(); |
| data.colSpecs = new ArrayList<>(numberOfInsertFields); |
| |
| // Cache the position of the selected fields in the row array |
| data.selectedRowFieldIndices = new int[numberOfInsertFields]; |
| for (int insertFieldIdx = 0; insertFieldIdx < numberOfInsertFields; insertFieldIdx++) { |
| VerticaBulkLoaderField vbf = meta.getFields().get(insertFieldIdx); |
| String inputFieldName = vbf.getFieldStream(); |
| int inputFieldIdx = getInputRowMeta().indexOfValue(inputFieldName); |
| if (inputFieldIdx < 0) { |
| throw new HopTransformException( |
| BaseMessages.getString( |
| PKG, |
| "VerticaBulkLoader.Exception.FieldRequired", |
| inputFieldName)); //$NON-NLS-1$ |
| } |
| data.selectedRowFieldIndices[insertFieldIdx] = inputFieldIdx; |
| |
| String insertFieldName = vbf.getFieldDatabase(); |
| IValueMeta inputValueMeta = getInputRowMeta().getValueMeta(inputFieldIdx); |
| if (inputValueMeta == null) { |
| throw new HopTransformException( |
| BaseMessages.getString( |
| PKG, |
| "VerticaBulkLoader.Exception.FailedToFindField", |
| vbf.getFieldStream())); // $NON-NLS-1$ |
| } |
| IValueMeta insertValueMeta = inputValueMeta.clone(); |
| insertValueMeta.setName(insertFieldName); |
| data.insertRowMeta.addValueMeta(insertValueMeta); |
| |
| IValueMeta targetValueMeta = tableMeta.searchValueMeta(insertFieldName); |
| ColumnSpec cs = getColumnSpecFromField(inputValueMeta, insertValueMeta, targetValueMeta); |
| data.colSpecs.add(insertFieldIdx, cs); |
| } |
| } |
| |
| try { |
| data.pipedInputStream = new PipedInputStream(); |
| if (data.colSpecs == null || data.colSpecs.isEmpty()) { |
| return false; |
| } |
| data.encoder = createStreamEncoder(data.colSpecs, data.pipedInputStream); |
| |
| initializeWorker(); |
| data.encoder.writeHeader(); |
| |
| } catch (IOException ioe) { |
| throw new HopTransformException("Error creating stream encoder", ioe); |
| } |
| } |
| |
| try { |
| Object[] outputRowData = writeToOutputStream(r); |
| if (outputRowData != null) { |
| putRow(data.outputRowMeta, outputRowData); // in case we want it |
| // go further... |
| incrementLinesOutput(); |
| } |
| |
| if (checkFeedback(getLinesRead())) { |
| if (log.isBasic()) { |
| logBasic("linenr " + getLinesRead()); |
| } //$NON-NLS-1$ |
| } |
| } catch (HopException e) { |
| logError("Because of an error, this transform can't continue: ", e); |
| setErrors(1); |
| stopAll(); |
| setOutputDone(); // signal end to receiver(s) |
| return false; |
| } catch (IOException e) { |
| e.printStackTrace(); |
| } |
| |
| return true; |
| } |
| |
| @VisibleForTesting |
| void initializeLogFiles() throws HopException { |
| try { |
| if (!StringUtil.isEmpty(meta.getExceptionsFileName())) { |
| exceptionLog = new FileOutputStream(meta.getExceptionsFileName(), true); |
| } |
| if (!StringUtil.isEmpty(meta.getRejectedDataFileName())) { |
| rejectedLog = new FileOutputStream(meta.getRejectedDataFileName(), true); |
| } |
| } catch (FileNotFoundException ex) { |
| throw new HopException(ex); |
| } |
| } |
| |
| @VisibleForTesting |
| void writeExceptionRejectionLogs(HopValueException valueException, Object[] outputRowData) |
| throws IOException { |
| String dateTimeString = |
| (SIMPLE_DATE_FORMAT.format(new Date(System.currentTimeMillis()))) + " - "; |
| logError( |
| BaseMessages.getString( |
| PKG, |
| "VerticaBulkLoader.Exception.RowRejected", |
| Arrays.stream(outputRowData).map(Object::toString).collect(Collectors.joining(" | ")))); |
| |
| if (exceptionLog != null) { |
| // Replace used to ensure timestamps are being added appropriately (some messages are |
| // multi-line) |
| exceptionLog.write( |
| (dateTimeString |
| + valueException |
| .getMessage() |
| .replace(System.lineSeparator(), System.lineSeparator() + dateTimeString)) |
| .getBytes()); |
| exceptionLog.write(System.lineSeparator().getBytes()); |
| for (StackTraceElement element : valueException.getStackTrace()) { |
| exceptionLog.write( |
| (dateTimeString + "at " + element.toString() + System.lineSeparator()).getBytes()); |
| } |
| exceptionLog.write( |
| (dateTimeString |
| + "Caused by: " |
| + valueException.getClass().toString() |
| + System.lineSeparator()) |
| .getBytes()); |
| // Replace used to ensure timestamps are being added appropriately (some messages are |
| // multi-line) |
| exceptionLog.write( |
| ((dateTimeString |
| + valueException |
| .getCause() |
| .getMessage() |
| .replace(System.lineSeparator(), System.lineSeparator() + dateTimeString)) |
| .getBytes())); |
| exceptionLog.write(System.lineSeparator().getBytes()); |
| } |
| if (rejectedLog != null) { |
| rejectedLog.write( |
| (dateTimeString |
| + BaseMessages.getString( |
| PKG, |
| "VerticaBulkLoader.Exception.RowRejected", |
| Arrays.stream(outputRowData) |
| .map(Object::toString) |
| .collect(Collectors.joining(" | ")))) |
| .getBytes()); |
| for (Object outputRowDatum : outputRowData) { |
| rejectedLog.write((outputRowDatum.toString() + " | ").getBytes()); |
| } |
| rejectedLog.write(System.lineSeparator().getBytes()); |
| } |
| } |
| |
| @VisibleForTesting |
| void closeLogFiles() throws HopException { |
| try { |
| if (exceptionLog != null) { |
| exceptionLog.close(); |
| } |
| if (rejectedLog != null) { |
| rejectedLog.close(); |
| } |
| } catch (IOException exception) { |
| throw new HopException(exception); |
| } |
| } |
| |
| private ColumnSpec getColumnSpecFromField( |
| IValueMeta inputValueMeta, IValueMeta insertValueMeta, IValueMeta targetValueMeta) { |
| logBasic( |
| "Mapping input field " |
| + inputValueMeta.getName() |
| + " (" |
| + inputValueMeta.getTypeDesc() |
| + ")" |
| + " to target column " |
| + insertValueMeta.getName() |
| + " (" |
| + targetValueMeta.getOriginalColumnTypeName() |
| + ") "); |
| |
| String targetColumnTypeName = targetValueMeta.getOriginalColumnTypeName().toUpperCase(); |
| |
| if (targetColumnTypeName.equals("INTEGER") || targetColumnTypeName.equals("BIGINT")) { |
| return new ColumnSpec(ColumnSpec.ConstantWidthType.INTEGER_64); |
| } else if (targetColumnTypeName.equals("BOOLEAN")) { |
| return new ColumnSpec(ColumnSpec.ConstantWidthType.BOOLEAN); |
| } else if (targetColumnTypeName.equals("FLOAT") |
| || targetColumnTypeName.equals("DOUBLE PRECISION")) { |
| return new ColumnSpec(ColumnSpec.ConstantWidthType.FLOAT); |
| } else if (targetColumnTypeName.equals("CHAR")) { |
| return new ColumnSpec(ColumnSpec.UserDefinedWidthType.CHAR, targetValueMeta.getLength()); |
| } else if (targetColumnTypeName.equals("VARCHAR") |
| || targetColumnTypeName.equals("CHARACTER VARYING")) { |
| return new ColumnSpec(ColumnSpec.VariableWidthType.VARCHAR, targetValueMeta.getLength()); |
| } else if (targetColumnTypeName.equals("DATE")) { |
| if (inputValueMeta.isDate() == false) { |
| throw new IllegalArgumentException( |
| "Field " |
| + inputValueMeta.getName() |
| + " must be a Date compatible type to match target column " |
| + insertValueMeta.getName()); |
| } else { |
| return new ColumnSpec(ColumnSpec.ConstantWidthType.DATE); |
| } |
| } else if (targetColumnTypeName.equals("TIME")) { |
| if (inputValueMeta.isDate() == false) { |
| throw new IllegalArgumentException( |
| "Field " |
| + inputValueMeta.getName() |
| + " must be a Date compatible type to match target column " |
| + insertValueMeta.getName()); |
| } else { |
| return new ColumnSpec(ColumnSpec.ConstantWidthType.TIME); |
| } |
| } else if (targetColumnTypeName.equals("TIMETZ")) { |
| if (inputValueMeta.isDate() == false) { |
| throw new IllegalArgumentException( |
| "Field " |
| + inputValueMeta.getName() |
| + " must be a Date compatible type to match target column " |
| + insertValueMeta.getName()); |
| } else { |
| return new ColumnSpec(ColumnSpec.ConstantWidthType.TIMETZ); |
| } |
| } else if (targetColumnTypeName.equals("TIMESTAMP")) { |
| if (inputValueMeta.isDate() == false) { |
| throw new IllegalArgumentException( |
| "Field " |
| + inputValueMeta.getName() |
| + " must be a Date compatible type to match target column " |
| + insertValueMeta.getName()); |
| } else { |
| return new ColumnSpec(ColumnSpec.ConstantWidthType.TIMESTAMP); |
| } |
| } else if (targetColumnTypeName.equals("TIMESTAMPTZ")) { |
| if (inputValueMeta.isDate() == false) { |
| throw new IllegalArgumentException( |
| "Field " |
| + inputValueMeta.getName() |
| + " must be a Date compatible type to match target column " |
| + insertValueMeta.getName()); |
| } else { |
| return new ColumnSpec(ColumnSpec.ConstantWidthType.TIMESTAMPTZ); |
| } |
| } else if (targetColumnTypeName.equals("INTERVAL") |
| || targetColumnTypeName.equals("INTERVAL DAY TO SECOND")) { |
| if (inputValueMeta.isDate() == false) { |
| throw new IllegalArgumentException( |
| "Field " |
| + inputValueMeta.getName() |
| + " must be a Date compatible type to match target column " |
| + insertValueMeta.getName()); |
| } else { |
| return new ColumnSpec(ColumnSpec.ConstantWidthType.INTERVAL); |
| } |
| } else if (targetColumnTypeName.equals("BINARY")) { |
| return new ColumnSpec(ColumnSpec.VariableWidthType.VARBINARY, targetValueMeta.getLength()); |
| } else if (targetColumnTypeName.equals("VARBINARY")) { |
| return new ColumnSpec(ColumnSpec.VariableWidthType.VARBINARY, targetValueMeta.getLength()); |
| } else if (targetColumnTypeName.equals("NUMERIC")) { |
| return new ColumnSpec( |
| ColumnSpec.PrecisionScaleWidthType.NUMERIC, |
| targetValueMeta.getLength(), |
| targetValueMeta.getPrecision()); |
| } |
| throw new IllegalArgumentException( |
| "Column type " + targetColumnTypeName + " not supported."); // $NON-NLS-1$ |
| } |
| |
| private void initializeWorker() { |
| final String dml = buildCopyStatementSqlString(); |
| |
| data.workerThread = |
| Executors.defaultThreadFactory() |
| .newThread( |
| new Runnable() { |
| @Override |
| public void run() { |
| try { |
| VerticaCopyStream stream = createVerticaCopyStream(dml); |
| stream.start(); |
| stream.addStream(data.pipedInputStream); |
| setLinesRejected(stream.getRejects().size()); |
| stream.execute(); |
| long rowsLoaded = stream.finish(); |
| if (getLinesOutput() != rowsLoaded) { |
| logMinimal( |
| String.format( |
| "%d records loaded out of %d records sent.", |
| rowsLoaded, getLinesOutput())); |
| } |
| data.db.disconnect(); |
| } catch (SQLException |
| | IllegalStateException |
| | ClassNotFoundException |
| | HopException e) { |
| if (e.getCause() instanceof InterruptedIOException) { |
| logBasic("SQL statement interrupted by halt of pipeline"); |
| } else { |
| logError("SQL Error during statement execution.", e); |
| setErrors(1); |
| stopAll(); |
| setOutputDone(); // signal end to receiver(s) |
| } |
| } |
| } |
| }); |
| |
| data.workerThread.start(); |
| } |
| |
| private String buildCopyStatementSqlString() { |
| final DatabaseMeta databaseMeta = data.db.getDatabaseMeta(); |
| |
| StringBuilder sb = new StringBuilder(150); |
| sb.append("COPY "); |
| |
| sb.append( |
| databaseMeta.getQuotedSchemaTableCombination( |
| variables, |
| data.db.resolve(meta.getSchemaName()), |
| data.db.resolve(meta.getTableName()))); |
| |
| sb.append(" ("); |
| final IRowMeta fields = data.insertRowMeta; |
| for (int i = 0; i < fields.size(); i++) { |
| if (i > 0) { |
| sb.append(", "); |
| } |
| ColumnType columnType = data.colSpecs.get(i).type; |
| IValueMeta valueMeta = fields.getValueMeta(i); |
| switch (columnType) { |
| case NUMERIC: |
| sb.append("TMPFILLERCOL").append(i).append(" FILLER VARCHAR(1000), "); |
| // Force columns to be quoted: |
| sb.append( |
| databaseMeta.getStartQuote() + valueMeta.getName() + databaseMeta.getEndQuote()); |
| sb.append(" AS CAST(").append("TMPFILLERCOL").append(i).append(" AS NUMERIC"); |
| sb.append(")"); |
| break; |
| default: |
| // Force columns to be quoted: |
| sb.append( |
| databaseMeta.getStartQuote() + valueMeta.getName() + databaseMeta.getEndQuote()); |
| break; |
| } |
| } |
| sb.append(")"); |
| |
| sb.append(" FROM STDIN NATIVE "); |
| |
| if (!StringUtil.isEmpty(meta.getExceptionsFileName())) { |
| sb.append("EXCEPTIONS E'") |
| .append(meta.getExceptionsFileName().replace("'", "\\'")) |
| .append("' "); |
| } |
| |
| if (!StringUtil.isEmpty(meta.getRejectedDataFileName())) { |
| sb.append("REJECTED DATA E'") |
| .append(meta.getRejectedDataFileName().replace("'", "\\'")) |
| .append("' "); |
| } |
| |
| // TODO: Should eventually get a preference for this, but for now, be backward compatible. |
| sb.append("ENFORCELENGTH "); |
| |
| if (meta.isAbortOnError()) { |
| sb.append("ABORT ON ERROR "); |
| } |
| |
| if (meta.isDirect()) { |
| sb.append("DIRECT "); |
| } |
| |
| if (!StringUtil.isEmpty(meta.getStreamName())) { |
| sb.append("STREAM NAME E'") |
| .append(data.db.resolve(meta.getStreamName()).replace("'", "\\'")) |
| .append("' "); |
| } |
| |
| // XXX: I believe the right thing to do here is always use NO COMMIT since we want Hop's |
| // configuration to drive. |
| // NO COMMIT does not seem to work even when the pipeline setting 'make the pipeline database |
| // transactional' is on |
| // sb.append("NO COMMIT"); |
| |
| logDebug("copy stmt: " + sb.toString()); |
| |
| return sb.toString(); |
| } |
| |
| private Object[] writeToOutputStream(Object[] r) throws HopException, IOException { |
| assert (r != null); |
| |
| Object[] insertRowData = r; |
| Object[] outputRowData = r; |
| |
| if (meta.specifyFields()) { |
| insertRowData = new Object[data.selectedRowFieldIndices.length]; |
| for (int idx = 0; idx < data.selectedRowFieldIndices.length; idx++) { |
| insertRowData[idx] = r[data.selectedRowFieldIndices[idx]]; |
| } |
| } |
| |
| try { |
| data.encoder.writeRow(data.insertRowMeta, insertRowData); |
| } catch (HopValueException valueException) { |
| /* |
| * If we are to abort, we should continue throwing the exception. If we are not aborting, we need to set the |
| * outputRowData to null, so the next transform knows not to add it and continue. We also need to write to the |
| * rejected log what data failed (print out the outputRowData before null'ing it) and write to the error log the |
| * issue. |
| */ |
| // write outputRowData -> Rejected Row |
| // write Error Log as to why it was rejected |
| writeExceptionRejectionLogs(valueException, outputRowData); |
| if (meta.isAbortOnError()) { |
| throw valueException; |
| } |
| outputRowData = null; |
| } catch (IOException e) { |
| if (!data.isStopped()) { |
| throw new HopException("I/O Error during row write.", e); |
| } |
| } |
| |
| return outputRowData; |
| } |
| |
| protected void verifyDatabaseConnection() throws HopException { |
| // Confirming Database Connection is defined. |
| if (meta.getConnection() == null) { |
| throw new HopException( |
| BaseMessages.getString(PKG, "VerticaBulkLoaderMeta.Error.NoConnection")); |
| } |
| } |
| |
| @Override |
| public boolean init() { |
| |
| if (super.init()) { |
| try { |
| // Validating that the connection has been defined. |
| verifyDatabaseConnection(); |
| data.databaseMeta = this.getPipelineMeta().findDatabase(meta.getConnection(), variables); |
| initializeLogFiles(); |
| |
| data.db = new Database(this, this, data.databaseMeta); |
| data.db.connect(); |
| |
| if (log.isBasic()) { |
| logBasic("Connected to database [" + meta.getDatabaseMeta() + "]"); |
| } |
| |
| data.db.setAutoCommit(false); |
| |
| return true; |
| } catch (HopException e) { |
| logError("An error occurred intialising this transform: " + e.getMessage()); |
| stopAll(); |
| setErrors(1); |
| } |
| } |
| return false; |
| } |
| |
| @Override |
| public void markStop() { |
| // Close the exception/rejected loggers at the end |
| try { |
| closeLogFiles(); |
| } catch (HopException ex) { |
| logError(BaseMessages.getString(PKG, "VerticaBulkLoader.Exception.ClosingLogError", ex)); |
| } |
| super.markStop(); |
| } |
| |
| @Override |
| public void stopRunning() throws HopException { |
| setStopped(true); |
| if (data.workerThread != null) { |
| synchronized (data.workerThread) { |
| if (data.workerThread.isAlive() && !data.workerThread.isInterrupted()) { |
| try { |
| data.workerThread.interrupt(); |
| data.workerThread.join(); |
| } catch (InterruptedException e) { // Checkstyle:OFF: |
| } |
| // Checkstyle:ONN: |
| } |
| } |
| } |
| |
| super.stopRunning(); |
| } |
| |
| void truncateTable() throws HopDatabaseException { |
| if (meta.isTruncateTable() && ((getCopy() == 0) || !Utils.isEmpty(getPartitionId()))) { |
| data.db.truncateTable(resolve(meta.getSchemaName()), resolve(meta.getTableName())); |
| } |
| } |
| |
| @Override |
| public void dispose() { |
| |
| // allow data to be garbage collected immediately: |
| data.colSpecs = null; |
| data.encoder = null; |
| |
| setOutputDone(); |
| |
| try { |
| if (getErrors() > 0) { |
| data.db.rollback(); |
| } |
| } catch (HopDatabaseException e) { |
| logError("Unexpected error rolling back the database connection.", e); |
| } |
| |
| if (data.workerThread != null) { |
| try { |
| data.workerThread.join(); |
| } catch (InterruptedException e) { // Checkstyle:OFF: |
| } |
| // Checkstyle:ONN: |
| } |
| |
| if (data.db != null) { |
| data.db.disconnect(); |
| } |
| super.dispose(); |
| } |
| |
| @VisibleForTesting |
| StreamEncoder createStreamEncoder(List<ColumnSpec> colSpecs, PipedInputStream pipedInputStream) |
| throws IOException { |
| return new StreamEncoder(colSpecs, pipedInputStream); |
| } |
| |
| @VisibleForTesting |
| VerticaCopyStream createVerticaCopyStream(String dml) |
| throws SQLException, ClassNotFoundException, HopDatabaseException { |
| return new VerticaCopyStream(getVerticaConnection(), dml); |
| } |
| |
| @VisibleForTesting |
| VerticaConnection getVerticaConnection() |
| throws SQLException, ClassNotFoundException, HopDatabaseException { |
| |
| Connection conn = data.db.getConnection(); |
| if (conn != null) { |
| if (conn instanceof VerticaConnection) { |
| return (VerticaConnection) conn; |
| } else { |
| Connection underlyingConn = null; |
| if (conn instanceof DelegatingConnection) { |
| DelegatingConnection pooledConn = (DelegatingConnection) conn; |
| underlyingConn = pooledConn.getInnermostDelegate(); |
| } else if (conn instanceof javax.sql.PooledConnection) { |
| PooledConnection pooledConn = (PooledConnection) conn; |
| underlyingConn = pooledConn.getConnection(); |
| } else { |
| // Last resort - attempt to use unwrap to get at the connection. |
| try { |
| if (conn.isWrapperFor(VerticaConnection.class)) { |
| VerticaConnection vc = conn.unwrap(VerticaConnection.class); |
| return vc; |
| } |
| } catch (SQLException ignored) { |
| // ignored - the connection doesn't support unwrap or the connection cannot be |
| // unwrapped into a VerticaConnection. |
| } |
| } |
| if ((underlyingConn != null) && (underlyingConn instanceof VerticaConnection)) { |
| return (VerticaConnection) underlyingConn; |
| } |
| } |
| throw new IllegalStateException( |
| "Could not retrieve a VerticaConnection from " + conn.getClass().getName()); |
| } else { |
| throw new IllegalStateException("Could not retrieve a VerticaConnection from null"); |
| } |
| } |
| } |