blob: 2ba3ce839844ec9d4db34bba4c8df14c77bb4205 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.oozie.tools;
import com.google.common.base.Strings;
import com.google.common.collect.Sets;
import com.google.gson.Gson;
import com.google.gson.JsonSyntaxException;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.oozie.BundleActionBean;
import org.apache.oozie.BundleJobBean;
import org.apache.oozie.CoordinatorActionBean;
import org.apache.oozie.CoordinatorJobBean;
import org.apache.oozie.WorkflowActionBean;
import org.apache.oozie.WorkflowJobBean;
import org.apache.oozie.cli.CLIParser;
import org.apache.oozie.executor.jpa.JPAExecutorException;
import org.apache.oozie.service.ConfigurationService;
import org.apache.oozie.service.JPAService;
import org.apache.oozie.service.Services;
import org.apache.oozie.sla.SLARegistrationBean;
import org.apache.oozie.sla.SLASummaryBean;
import org.apache.oozie.store.StoreException;
import org.apache.openjpa.persistence.OpenJPAEntityManagerSPI;
import org.apache.openjpa.persistence.RollbackException;
import javax.persistence.EntityManager;
import javax.persistence.EntityTransaction;
import javax.persistence.FlushModeType;
import javax.persistence.Table;
import javax.persistence.metamodel.EntityType;
import javax.persistence.metamodel.Metamodel;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.sql.Connection;
import java.sql.DatabaseMetaData;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.ListIterator;
import java.util.Objects;
import java.util.Set;
import java.util.zip.ZipEntry;
import java.util.zip.ZipFile;
import static com.google.common.base.Preconditions.checkState;
import static org.apache.oozie.tools.OozieDBExportCLI.OOZIEDB_AC_JSON;
import static org.apache.oozie.tools.OozieDBExportCLI.OOZIEDB_BNA_JSON;
import static org.apache.oozie.tools.OozieDBExportCLI.OOZIEDB_BNJ_JSON;
import static org.apache.oozie.tools.OozieDBExportCLI.OOZIEDB_CA_JSON;
import static org.apache.oozie.tools.OozieDBExportCLI.OOZIEDB_CJ_JSON;
import static org.apache.oozie.tools.OozieDBExportCLI.OOZIEDB_SLAREG_JSON;
import static org.apache.oozie.tools.OozieDBExportCLI.OOZIEDB_SLASUM_JSON;
import static org.apache.oozie.tools.OozieDBExportCLI.OOZIEDB_SYS_INFO_JSON;
import static org.apache.oozie.tools.OozieDBExportCLI.OOZIEDB_WF_JSON;
/**
* This class provides the following functionality:
* <ul>
* <li>imports the data from json files created by {@link OozieDBExportCLI} the specified target zip file</li>
* <li>this class uses the current Oozie configuration in {oozie-site.xml}</li>
* <li></li>
* </ul>
*/
public class OozieDBImportCLI {
private static final String[] HELP_INFO = {
"",
"OozieDBImportCLI reads Oozie database from a zip file."
};
private static final String IMPORT_CMD = "import";
private static final String HELP_CMD = "help";
public static final String OOZIE_DB_IMPORT_BATCH_SIZE_KEY = "oozie.db.import.batch.size";
static final int DEFAULT_BATCH_SIZE = 1000;
private static int IMPORT_BATCH_SIZE;
private static final String OPTION_VERBOSE_SHORT = "v";
private static final String OPTION_VERBOSE_LONG = "verbose";
private final EntityManager entityManager;
private final ZipFile mainZipFile;
private final boolean verbose;
private boolean cleanupNecessary = false;
private final Set<Class<?>> entityClasses = Sets.newLinkedHashSet();
private OozieDBImportCLI(final EntityManager entityManager, final ZipFile mainZipFile, final boolean verbose) {
this.entityManager = entityManager;
this.mainZipFile = mainZipFile;
this.verbose = verbose;
}
public static void main(final String[] args) throws ParseException {
final CLIParser parser = new CLIParser("oozie-setup.sh", HELP_INFO);
parser.addCommand(HELP_CMD, "", "display usage for all commands or specified command", new Options(), false);
parser.addCommand(IMPORT_CMD, "",
"imports the contents of the Oozie database from the specified file",
new Options().addOption(OPTION_VERBOSE_SHORT, OPTION_VERBOSE_LONG, false, "Enables verbose logging."), true);
boolean verbose = false;
try {
final CLIParser.Command command = parser.parse(args);
if (command.getName().equals(IMPORT_CMD)) {
final Services services = new Services();
services.getConf().set(Services.CONF_SERVICE_CLASSES, JPAService.class.getName());
services.getConf().set(Services.CONF_SERVICE_EXT_CLASSES, "");
services.init();
setImportBatchSize();
System.out.println("==========================================================");
System.out.println(Arrays.toString(command.getCommandLine().getArgs()));
System.out.println(String.format("Import batch length is %d", IMPORT_BATCH_SIZE));
verbose = command.getCommandLine().hasOption(OPTION_VERBOSE_SHORT)
|| command.getCommandLine().hasOption(OPTION_VERBOSE_LONG);
importAllDBTables(command.getCommandLine().getArgs()[0], verbose);
} else if (command.getName().equals(HELP_CMD)) {
parser.showHelp(command.getCommandLine());
}
} catch (final ParseException pex) {
System.err.println("Invalid sub-command: " + pex.getMessage());
System.err.println();
System.err.println(parser.shortHelp());
System.exit(1);
} catch (final Throwable e) {
System.err.println();
System.err.println("Error: " + e.getMessage());
System.err.println();
if (verbose) {
System.err.println("Stack trace for the error was (for debug purposes):");
System.err.println("--------------------------------------");
e.printStackTrace(System.err);
System.err.println("--------------------------------------");
System.err.println();
}
System.exit(1);
} finally {
if (Services.get() != null) {
Services.get().destroy();
}
}
}
private static void setImportBatchSize() {
if (!Strings.isNullOrEmpty(System.getProperty(OOZIE_DB_IMPORT_BATCH_SIZE_KEY))) {
try {
IMPORT_BATCH_SIZE = Integer.parseInt(System.getProperty(OOZIE_DB_IMPORT_BATCH_SIZE_KEY));
}
catch (final NumberFormatException e) {
IMPORT_BATCH_SIZE = ConfigurationService.getInt(OOZIE_DB_IMPORT_BATCH_SIZE_KEY, DEFAULT_BATCH_SIZE);
}
}
else {
IMPORT_BATCH_SIZE = ConfigurationService.getInt(OOZIE_DB_IMPORT_BATCH_SIZE_KEY, DEFAULT_BATCH_SIZE);
}
}
private static void importAllDBTables(final String zipFileName, final boolean verbose) throws StoreException, IOException,
JPAExecutorException, SQLException {
EntityManager entityManager = null;
try (ZipFile mainZipFile = new ZipFile(zipFileName)) {
entityManager = Services.get().get(JPAService.class).getEntityManager();
entityManager.setFlushMode(FlushModeType.COMMIT);
final OozieDBImportCLI importer = new OozieDBImportCLI(entityManager, mainZipFile, verbose);
importer.checkDBVersion();
importer.checkTablesArePresentAndEmpty();
importer.importOneInputFileToOneEntityTable(WorkflowJobBean.class, OOZIEDB_WF_JSON);
importer.importOneInputFileToOneEntityTable(WorkflowActionBean.class, OOZIEDB_AC_JSON);
importer.importOneInputFileToOneEntityTable(CoordinatorJobBean.class, OOZIEDB_CJ_JSON);
importer.importOneInputFileToOneEntityTable(CoordinatorActionBean.class, OOZIEDB_CA_JSON);
importer.importOneInputFileToOneEntityTable(BundleJobBean.class, OOZIEDB_BNJ_JSON);
importer.importOneInputFileToOneEntityTable(BundleActionBean.class, OOZIEDB_BNA_JSON);
importer.importOneInputFileToOneEntityTable(SLARegistrationBean.class, OOZIEDB_SLAREG_JSON);
importer.importOneInputFileToOneEntityTable(SLASummaryBean.class, OOZIEDB_SLASUM_JSON);
final boolean cleanupPerformed = importer.cleanupIfNecessary();
checkState(!cleanupPerformed, "DB cleanup happened due to skipped rows. " +
"See previous log entries about what rows were skipped and why.");
} finally {
if (entityManager != null) {
entityManager.close();
}
}
}
private void checkDBVersion() throws IOException {
try {
final String currentDBVersion = (String) entityManager
.createNativeQuery("select data from OOZIE_SYS where name='db.version'")
.getSingleResult();
String dumpDBVersion = null;
final ZipEntry sysInfoEntry = mainZipFile.getEntry(OOZIEDB_SYS_INFO_JSON);
try (BufferedReader reader = new BufferedReader(
new InputStreamReader(mainZipFile.getInputStream(sysInfoEntry), StandardCharsets.UTF_8))) {
String line;
final Gson gson = new Gson();
while ((line = reader.readLine()) != null) {
final List<String> info = gson.fromJson(line, List.class);
if (info.size() > 1 && "db.version".equals(info.get(0))) {
dumpDBVersion = info.get(1);
}
}
}
if (currentDBVersion.equals(dumpDBVersion)) {
System.out.println("Loading to Oozie database version " + currentDBVersion);
}
else {
System.err.println("ERROR Oozie database version mismatch.");
System.err.println("Oozie DB version:\t" + currentDBVersion);
System.err.println("Dump DB version:\t" + dumpDBVersion);
System.exit(1);
}
} catch (final Exception e) {
System.err.println();
System.err.println("Error during DB version check: " + e.getMessage());
System.err.println();
if (verbose) {
System.err.println("Stack trace for the error was (for debug purposes):");
System.err.println("--------------------------------------");
e.printStackTrace(System.err);
System.err.println("--------------------------------------");
System.err.println();
}
}
}
private void checkTablesArePresentAndEmpty() throws SQLException {
checkTableIsPresentAndEmpty(WorkflowJobBean.class);
checkTableIsPresentAndEmpty(WorkflowActionBean.class);
checkTableIsPresentAndEmpty(CoordinatorJobBean.class);
checkTableIsPresentAndEmpty(CoordinatorActionBean.class);
checkTableIsPresentAndEmpty(BundleJobBean.class);
checkTableIsPresentAndEmpty(BundleActionBean.class);
checkTableIsPresentAndEmpty(SLARegistrationBean.class);
checkTableIsPresentAndEmpty(SLASummaryBean.class);
}
private <E> void checkTableIsPresentAndEmpty(final Class<E> entityClass) throws SQLException {
final OpenJPAEntityManagerSPI entityManagerDelegate = (OpenJPAEntityManagerSPI) entityManager.getDelegate();
final Connection connection = (Connection) entityManagerDelegate.getConnection();
final DatabaseMetaData metaData = connection.getMetaData();
final String tableName = findTableName(entityManager, entityClass);
try (final ResultSet rs = metaData.getTables(null, null, tableName, null)) {
checkState(rs.next(),
String.format("Table [%s] does not exist for class [%s].", tableName, entityClass.getSimpleName()));
}
final long entityCount = getEntityCount(entityClass);
checkState(entityCount == 0,
String.format("There are already [%d] entries in table [%s] for class [%s], should be empty.",
entityCount,
tableName,
entityClass.getSimpleName()));
}
private <E> long getEntityCount(final Class<E> entityClass) {
return entityManager.createQuery(
String.format("SELECT COUNT(e) FROM %s e", entityClass.getSimpleName()), Long.class)
.getSingleResult();
}
/**
* Import all the contents of the input JSON file to one database table where the {@link javax.persistence.Entity} instances are
* stored. This call hides batch {@link EntityTransaction} handling details, as well as trying to commit pending entities
* one-by-one, if needed.
* @param entityClass the class to persist
* @param importFileName the JSON file name
* @param <E> {@link javax.persistence.Entity} type
* @throws JPAExecutorException
*/
private <E> void importOneInputFileToOneEntityTable(final Class<E> entityClass, final String importFileName) {
final BatchTransactionHandler<E> batchTransactionHandler = new BatchTransactionHandler<>();
final BatchEntityPersister<E> batchEntityPersister = new BatchEntityPersister<>(entityClass,
importFileName, batchTransactionHandler);
final List<E> batch = new ArrayList<>();
final Gson gson = new Gson();
final ZipEntry importEntry = mainZipFile.getEntry(importFileName);
if (importEntry != null) {
long lineIndex = 1L;
try (BufferedReader reader = new BufferedReader(
new InputStreamReader(mainZipFile.getInputStream(importEntry), StandardCharsets.UTF_8))) {
String line;
while ((line = reader.readLine()) != null) {
final E newEntity = gson.fromJson(line, entityClass);
batch.add(newEntity);
if (lineIndex % IMPORT_BATCH_SIZE == 0) {
System.out.println(String.format("Batch is full, persisting. [lineIndex=%s;batch.size=%s]",
lineIndex, batch.size()));
batchEntityPersister.persist(batch);
}
lineIndex++;
}
} catch (final IOException e) {
rollbackAndThrow(importFileName, batchTransactionHandler, e);
} catch (final JsonSyntaxException e) {
if (verbose) {
System.err.println(String.format("JSON error. [lineIndex=%s;e.message=%s]", lineIndex, e.getMessage()));
}
rollbackAndThrow(importFileName, batchTransactionHandler, e);
}
}
if (!batch.isEmpty()) {
System.out.println(String.format("Persisting last batch. [batch.size=%s]", batch.size()));
batchEntityPersister.persist(batch);
}
final String tableName = findTableName(entityManager, entityClass);
System.out.println(String.format("%s row(s) imported to table %s.",
batchTransactionHandler.getTotalPersistedCount(),
tableName));
if (batchTransactionHandler.getTotalSkippedCount() > 0) {
System.err.println(
String.format("[%s] row(s) skipped while importing to table [%s]. " +
"Will remove all the rows of all the tables to get clean data.",
batchTransactionHandler.getTotalSkippedCount(),
tableName));
cleanupNecessary = true;
}
}
private boolean cleanupIfNecessary() {
if (!cleanupNecessary) {
System.out.println("Cleanup not necessary, no entities skipped.");
return false;
}
System.out.println(String.format("Performing cleanup of [%d] tables due to skipped entities.", entityClasses.size()));
for (final Class<?> entityClass : entityClasses) {
final String tableName = findTableName(entityManager, entityClass);
System.out.println(String.format("Cleaning up table [%s].", tableName));
final BatchTransactionHandler<?> batchTransactionHandler = new BatchTransactionHandler<>();
batchTransactionHandler.begin();
entityManager.createQuery(String.format("DELETE FROM %s e", entityClass.getSimpleName())).executeUpdate();
batchTransactionHandler.commit();
System.out.println(String.format("Table [%s] cleaned up.", tableName));
}
System.out.println(String.format("Cleanup of [%d] tables due to skipped entities performed.", entityClasses.size()));
return true;
}
private <E> void rollbackAndThrow(final String importFileName,
final BatchTransactionHandler<E> batchTransactionHandler,
final Exception cause) {
batchTransactionHandler.rollbackIfActive();
throw new RuntimeException(String.format("Import failed from json [zippedFileName=%s;e.message=%s].", importFileName,
cause.getMessage()), cause);
}
static <E> String findTableName(final EntityManager entityManager, final Class<E> entityClass) {
final Metamodel meta = entityManager.getMetamodel();
final EntityType<E> entityType = meta.entity(entityClass);
final Table t = entityClass.getAnnotation(Table.class);
final String tableName = (t == null)
? entityType.getName().toUpperCase()
: t.name();
return tableName;
}
/**
* Handles batch transactions, that is, the actual commit will be done when the number of persistable entities reach the batch
* limit. This was needed because importing everything in a huge {@link EntityTransaction} (and holding in
* {@link EntityManager} heap) has resulted in {@link OutOfMemoryError} for large input JSON files.
* <p/>
* When there is some problem while persisting one of them, usually it's an issue that
* gets revealed when we're about to call {@link EntityTransaction#commit()}. If this call fails, the caller can get all the
* pending {@link javax.persistence.Entity} instances that have not been persisted successfully, and make some kind of retry
* using a new {@link BatchTransactionHandler} instance.
*
* @param <E> the {@link javax.persistence.Entity} class
*/
private class BatchTransactionHandler<E> {
private EntityTransaction currentTransaction;
private int totalPersistedCount = 0;
private int totalSkippedCount = 0;
private List<E> pendingEntities = new ArrayList<>();
/**
* Begin recording the {@link EntityTransaction}
*/
void begin() {
currentTransaction = entityManager.getTransaction();
currentTransaction.begin();
pendingEntities.clear();
}
/**
* Commit the {@link EntityTransaction}
*/
void commit() {
Objects.requireNonNull(currentTransaction, "TX should be open.");
currentTransaction.commit();
totalPersistedCount += pendingEntities.size();
pendingEntities.clear();
}
/**
* Rollback if {@link EntityTransaction} is active
*/
void rollbackIfActive() {
if (currentTransaction == null) {
return;
}
if (currentTransaction.isActive()) {
currentTransaction.rollback();
}
pendingEntities.clear();
}
/**
* Persist a new {@link javax.persistence.Entity} instance
* @param newEntity the new {@link javax.persistence.Entity} instance
*/
void persist(final E newEntity) {
Objects.requireNonNull(currentTransaction, "TX should be open.");
entityManager.persist(newEntity);
pendingEntities.add(newEntity);
if (pendingEntities.size() == IMPORT_BATCH_SIZE) {
commit();
begin();
}
}
/**
* Persist and try to commit a pending {@link javax.persistence.Entity} instance, that is, one that was part of a failing
* {@link EntityTransaction#commit()} call
* @param pendingEntity the pending {@link javax.persistence.Entity} instance
*/
void persistAndTryCommit(final E pendingEntity) {
try {
currentTransaction.begin();
entityManager.persist(pendingEntity);
currentTransaction.commit();
totalPersistedCount++;
} catch (final RollbackException re) {
if (verbose) {
System.err.println(String.format("Cannot persist entity, skipping. [re.failedObject=%s]",
re.getFailedObject()));
}
totalSkippedCount++;
}
}
/**
* Number of persisted {@link javax.persistence.Entity} instances
* @return how many {@link javax.persistence.Entity}es have already been persisted by this {@link BatchTransactionHandler}
*/
int getTotalPersistedCount() {
return totalPersistedCount;
}
/**
* Number of skipped {@link javax.persistence.Entity} instances
* @return how many {@link javax.persistence.Entity}es have already been skipped by this {@link BatchTransactionHandler}
*/
int getTotalSkippedCount() {
return totalSkippedCount;
}
/**
* All the pending {@link javax.persistence.Entity} instance that have been tried to {@link EntityManager#persist(Object)}
* but have not yet been {@link EntityTransaction#commit()}ed.
* @return how many {@link javax.persistence.Entity}es have not been committed by this {@link BatchTransactionHandler}
*/
List<E> getPendingEntities() {
return pendingEntities;
}
}
/**
* Persists entities in batches. Delegates to {@link BatchTransactionHandler} methods.
* <p/>
* As entities are being persisted, the backing {@code List} is also being emptied. This is to relieve GC cycles when batch
* size is such that entities fitting into one batch would cause {@link OutOfMemoryError}.
* @param <E>
*/
private class BatchEntityPersister<E> {
private final Class<E> entityClass;
private final String importFileName;
private final BatchTransactionHandler<E> batchTransactionHandler;
private BatchEntityPersister(final Class<E> entityClass,
final String importFileName,
final BatchTransactionHandler<E> batchTransactionHandler) {
this.entityClass = entityClass;
this.importFileName = importFileName;
this.batchTransactionHandler = batchTransactionHandler;
}
void persist(final List<E> batch) {
if (batch.isEmpty()) {
System.out.println("No entities to import.");
return;
}
while (!batch.isEmpty()) {
try {
entityClasses.add(entityClass);
batchTransactionHandler.begin();
final ListIterator<E> iterator = batch.listIterator();
while (iterator.hasNext()) {
final E entityToPersist = iterator.next();
iterator.remove();
batchTransactionHandler.persist(entityToPersist);
}
batchTransactionHandler.commit();
} catch (final RollbackException re) {
for (final E pendingEntity : batchTransactionHandler.getPendingEntities()) {
batchTransactionHandler.persistAndTryCommit(pendingEntity);
}
} catch (final Exception e) {
rollbackAndThrow(importFileName, batchTransactionHandler, e);
}
}
}
}
}