blob: 211c025dcd6c5eda0128f8e9b9a8364d6508180a [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.qpid.server.store.berkeleydb;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.server.store.berkeleydb.BDBMessageStore;
import org.apache.qpid.server.store.berkeleydb.BindingKey;
import org.apache.qpid.server.store.berkeleydb.ContentTB;
import org.apache.qpid.server.store.berkeleydb.DatabaseVisitor;
import org.apache.qpid.server.store.berkeleydb.ExchangeTB;
import org.apache.qpid.server.store.berkeleydb.keys.MessageContentKey_4;
import org.apache.qpid.server.store.berkeleydb.keys.MessageContentKey_5;
import org.apache.qpid.server.store.berkeleydb.records.ExchangeRecord;
import org.apache.qpid.server.store.berkeleydb.records.QueueRecord;
import org.apache.qpid.server.store.berkeleydb.tuples.MessageContentKeyTB_4;
import org.apache.qpid.server.store.berkeleydb.tuples.MessageContentKeyTB_5;
import org.apache.qpid.server.store.berkeleydb.tuples.QueueEntryTB;
import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.logging.actors.BrokerActor;
import org.apache.qpid.server.logging.NullRootMessageLogger;
import org.apache.qpid.server.message.MessageMetaData;
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQStoreException;
import org.apache.qpid.util.FileUtils;
import org.apache.commons.cli.PosixParser;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.ParseException;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.OptionBuilder;
import java.io.File;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.io.IOException;
import java.io.FileNotFoundException;
import java.nio.ByteBuffer;
import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map.Entry;
import com.sleepycat.je.DatabaseEntry;
import com.sleepycat.je.DatabaseException;
import com.sleepycat.je.Database;
import com.sleepycat.je.Environment;
import com.sleepycat.je.EnvironmentConfig;
import com.sleepycat.bind.tuple.TupleBinding;
/**
* This is a simple BerkeleyDB Store upgrade tool that will upgrade a V4 Store to a V5 Store.
*
* Currently upgrade is fixed from v4 -> v5
*
* Improvments:
* - Add List BDBMessageStore.getDatabases(); This can the be iterated to guard against new DBs being added.
* - A version in the store would allow automated upgrade or later with more available versions interactive upgrade.
* - Add process logging and disable all Store and Qpid logging.
*/
public class BDBStoreUpgrade
{
private static final Logger _logger = LoggerFactory.getLogger(BDBStoreUpgrade.class);
/** The Store Directory that needs upgrading */
File _fromDir;
/** The Directory that will be made to contain the upgraded store */
File _toDir;
/** The Directory that will be made to backup the original store if required */
File _backupDir;
/** The Old Store */
BDBMessageStore _oldMessageStore;
/** The New Store */
BDBMessageStore _newMessageStore;
/** The file ending that is used by BDB Store Files */
private static final String BDB_FILE_ENDING = ".jdb";
static final Options _options = new Options();
static CommandLine _commandLine;
private boolean _interactive;
private boolean _force;
private static final String VERSION = "3.0";
private static final String USER_ABORTED_PROCESS = "User aborted process";
private static final int LOWEST_SUPPORTED_STORE_VERSION = 4;
private static final String PREVIOUS_STORE_VERSION_UNSUPPORTED = "Store upgrade from version {0} is not supported."
+ " You must first run the previous store upgrade tool.";
private static final String FOLLOWING_STORE_VERSION_UNSUPPORTED = "Store version {0} is newer than this tool supports. "
+ "You must use a newer version of the store upgrade tool";
private static final String STORE_ALREADY_UPGRADED = "Store has already been upgraded to version {0}.";
private static final String OPTION_INPUT_SHORT = "i";
private static final String OPTION_INPUT = "input";
private static final String OPTION_OUTPUT_SHORT = "o";
private static final String OPTION_OUTPUT = "output";
private static final String OPTION_BACKUP_SHORT = "b";
private static final String OPTION_BACKUP = "backup";
private static final String OPTION_QUIET_SHORT = "q";
private static final String OPTION_QUIET = "quiet";
private static final String OPTION_FORCE_SHORT = "f";
private static final String OPTION_FORCE = "force";
private boolean _inplace = false;
public BDBStoreUpgrade(String fromDir, String toDir, String backupDir, boolean interactive, boolean force)
{
_interactive = interactive;
_force = force;
_fromDir = new File(fromDir);
if (!_fromDir.exists())
{
throw new IllegalArgumentException("BDBStore path '" + fromDir + "' could not be read. "
+ "Ensure the path is correct and that the permissions are correct.");
}
if (!isDirectoryAStoreDir(_fromDir))
{
throw new IllegalArgumentException("Specified directory '" + fromDir + "' does not contain a valid BDBMessageStore.");
}
if (toDir == null)
{
_inplace = true;
_toDir = new File(fromDir+"-Inplace");
}
else
{
_toDir = new File(toDir);
}
if (_toDir.exists())
{
if (_interactive)
{
if (toDir == null)
{
System.out.println("Upgrading in place:" + fromDir);
}
else
{
System.out.println("Upgrade destination: '" + toDir + "'");
}
if (userInteract("Upgrade destination exists do you wish to replace it?"))
{
if (!FileUtils.delete(_toDir, true))
{
throw new IllegalArgumentException("Unable to remove upgrade destination '" + _toDir + "'");
}
}
else
{
throw new IllegalArgumentException("Upgrade destination '" + _toDir + "' already exists. ");
}
}
else
{
if (_force)
{
if (!FileUtils.delete(_toDir, true))
{
throw new IllegalArgumentException("Unable to remove upgrade destination '" + _toDir + "'");
}
}
else
{
throw new IllegalArgumentException("Upgrade destination '" + _toDir + "' already exists. ");
}
}
}
if (!_toDir.mkdirs())
{
throw new IllegalArgumentException("Upgrade destination '" + _toDir + "' could not be created. "
+ "Ensure the path is correct and that the permissions are correct.");
}
if (backupDir != null)
{
if (backupDir.equals(""))
{
_backupDir = new File(_fromDir.getAbsolutePath().toString() + "-Backup");
}
else
{
_backupDir = new File(backupDir);
}
}
else
{
_backupDir = null;
}
}
private static String ANSWER_OPTIONS = " Yes/No/Abort? ";
private static String ANSWER_NO = "no";
private static String ANSWER_N = "n";
private static String ANSWER_YES = "yes";
private static String ANSWER_Y = "y";
private static String ANSWER_ABORT = "abort";
private static String ANSWER_A = "a";
/**
* Interact with the user via System.in and System.out. If the user wishes to Abort then a RuntimeException is thrown.
* Otherwise the method will return based on their response true=yes false=no.
*
* @param message Message to print out
*
* @return boolean response from user if they wish to proceed
*/
private boolean userInteract(String message)
{
System.out.print(message + ANSWER_OPTIONS);
BufferedReader br = new BufferedReader(new InputStreamReader(System.in));
String input = "";
try
{
input = br.readLine();
}
catch (IOException e)
{
input = "";
}
if (input.equalsIgnoreCase(ANSWER_Y) || input.equalsIgnoreCase(ANSWER_YES))
{
return true;
}
else
{
if (input.equalsIgnoreCase(ANSWER_N) || input.equalsIgnoreCase(ANSWER_NO))
{
return false;
}
else
{
if (input.equalsIgnoreCase(ANSWER_A) || input.equalsIgnoreCase(ANSWER_ABORT))
{
throw new RuntimeException(USER_ABORTED_PROCESS);
}
}
}
return userInteract(message);
}
/**
* Upgrade a Store of a specified version to the latest version.
*
* @param version the version of the current store
*
* @throws Exception
*/
public void upgradeFromVersion(int version) throws Exception
{
upgradeFromVersion(version, _fromDir, _toDir, _backupDir, _force,
_inplace);
}
/**
* Upgrade a Store of a specified version to the latest version.
*
* @param version the version of the current store
* @param fromDir the directory with the old Store
* @param toDir the directrory to hold the newly Upgraded Store
* @param backupDir the directrory to backup to if required
* @param force suppress all questions
* @param inplace replace the from dir with the upgraded result in toDir
*
* @throws Exception due to Virtualhost/MessageStore.close() being
* rather poor at exception handling
* @throws DatabaseException if there is a problem with the store formats
* @throws AMQException if there is an issue creating Qpid data structures
*/
public void upgradeFromVersion(int version, File fromDir, File toDir,
File backupDir, boolean force,
boolean inplace) throws Exception
{
_logger.info("Located store to upgrade at '" + fromDir + "'");
// Verify user has created a backup, giving option to perform backup
if (_interactive)
{
if (!userInteract("Have you performed a DB backup of this store."))
{
File backup;
if (backupDir == null)
{
backup = new File(fromDir.getAbsolutePath().toString() + "-Backup");
}
else
{
backup = backupDir;
}
if (userInteract("Do you wish to perform a DB backup now? " +
"(Store will be backed up to '" + backup.getName() + "')"))
{
performDBBackup(fromDir, backup, force);
}
else
{
if (!userInteract("Are you sure wish to proceed with DB migration without backup? " +
"(For more details of the consequences check the Qpid/BDB Message Store Wiki)."))
{
throw new IllegalArgumentException("Upgrade stopped at user request as no DB Backup performed.");
}
}
}
else
{
if (!inplace)
{
_logger.info("Upgrade will create a new store at '" + toDir + "'");
}
_logger.info("Using the contents in the Message Store '" + fromDir + "'");
if (!userInteract("Do you wish to proceed?"))
{
throw new IllegalArgumentException("Upgrade stopped as did not wish to proceed");
}
}
}
else
{
if (backupDir != null)
{
performDBBackup(fromDir, backupDir, force);
}
}
CurrentActor.set(new BrokerActor(new NullRootMessageLogger()));
//Create a new messageStore
_newMessageStore = new BDBMessageStore();
_newMessageStore.configure(toDir, false);
_newMessageStore.start();
try
{
//Load the old MessageStore
switch (version)
{
default:
case 4:
_oldMessageStore = new BDBMessageStore(4);
_oldMessageStore.configure(fromDir, true);
_oldMessageStore.start();
upgradeFromVersion_4();
break;
case 3:
case 2:
case 1:
throw new IllegalArgumentException(MessageFormat.format(PREVIOUS_STORE_VERSION_UNSUPPORTED,
new Object[] { Integer.toString(version) }));
}
}
finally
{
_newMessageStore.close();
if (_oldMessageStore != null)
{
_oldMessageStore.close();
}
// if we are running inplace then swap fromDir and toDir
if (inplace)
{
// Remove original copy
if (FileUtils.delete(fromDir, true))
{
// Rename upgraded store
toDir.renameTo(fromDir);
}
else
{
throw new RuntimeException("Unable to upgrade inplace as " +
"unable to delete source '"
+fromDir+"', Store upgrade " +
"successfully performed to :"
+toDir);
}
}
}
}
private void upgradeFromVersion_4() throws AMQException, DatabaseException
{
_logger.info("Starting store upgrade from version 4");
//Migrate _exchangeDb;
_logger.info("Exchanges");
moveContents(_oldMessageStore.getExchangesDb(), _newMessageStore.getExchangesDb(), "Exchange");
final List<AMQShortString> topicExchanges = new ArrayList<AMQShortString>();
final TupleBinding exchangeTB = new ExchangeTB();
DatabaseVisitor exchangeListVisitor = new DatabaseVisitor()
{
public void visit(DatabaseEntry key, DatabaseEntry value) throws DatabaseException
{
ExchangeRecord exchangeRec = (ExchangeRecord) exchangeTB.entryToObject(value);
AMQShortString type = exchangeRec.getType();
if (ExchangeDefaults.TOPIC_EXCHANGE_CLASS.equals(type))
{
topicExchanges.add(exchangeRec.getNameShortString());
}
}
};
_oldMessageStore.visitExchanges(exchangeListVisitor);
//Migrate _queueBindingsDb;
_logger.info("Queue Bindings");
moveContents(_oldMessageStore.getBindingsDb(), _newMessageStore.getBindingsDb(), "Queue Binding");
//Inspect the bindings to gather a list of queues which are probably durable subscriptions, i.e. those
//which have a colon in their name and are bound to the Topic exchanges above
final List<AMQShortString> durableSubQueues = new ArrayList<AMQShortString>();
final TupleBinding<BindingKey> bindingTB = _oldMessageStore.getBindingTupleBindingFactory().getInstance();
DatabaseVisitor durSubQueueListVisitor = new DatabaseVisitor()
{
public void visit(DatabaseEntry key, DatabaseEntry value) throws DatabaseException
{
BindingKey bindingRec = (BindingKey) bindingTB.entryToObject(key);
AMQShortString queueName = bindingRec.getQueueName();
AMQShortString exchangeName = bindingRec.getExchangeName();
if (topicExchanges.contains(exchangeName) && queueName.asString().contains(":"))
{
durableSubQueues.add(queueName);
}
}
};
_oldMessageStore.visitBindings(durSubQueueListVisitor);
//Migrate _queueDb;
_logger.info("Queues");
// hold the list of existing queue names
final List<AMQShortString> existingQueues = new ArrayList<AMQShortString>();
final TupleBinding<QueueRecord> queueTupleBinding = _oldMessageStore.getQueueTupleBindingFactory().getInstance();
DatabaseVisitor queueVisitor = new DatabaseVisitor()
{
public void visit(DatabaseEntry key, DatabaseEntry value) throws AMQStoreException
{
QueueRecord queueRec = (QueueRecord) queueTupleBinding.entryToObject(value);
AMQShortString queueName = queueRec.getNameShortString();
//if the queue name is in the gathered list then set its exclusivity true
if (durableSubQueues.contains(queueName))
{
_logger.info("Marking as possible DurableSubscription backing queue: " + queueName);
queueRec.setExclusive(true);
}
//The simple call to createQueue with the QueueRecord object is sufficient for a v2->v3 upgrade as
//the extra 'exclusive' property in v3 will be defaulted to false in the record creation.
_newMessageStore.createQueue(queueRec);
_count++;
existingQueues.add(queueName);
}
};
_oldMessageStore.visitQueues(queueVisitor);
logCount(queueVisitor.getVisitedCount(), "Queue");
// Look for persistent messages stored for non-durable queues
_logger.info("Checking for messages previously sent to non-durable queues");
// track all message delivery to existing queues
final HashSet<Long> queueMessages = new HashSet<Long>();
// hold all non existing queues and their messages IDs
final HashMap<String, HashSet<Long>> phantomMessageQueues = new HashMap<String, HashSet<Long>>();
// delivery DB visitor to check message delivery and identify non existing queues
final QueueEntryTB queueEntryTB = new QueueEntryTB();
DatabaseVisitor messageDeliveryCheckVisitor = new DatabaseVisitor()
{
public void visit(DatabaseEntry key, DatabaseEntry value) throws DatabaseException
{
QueueEntryKey entryKey = (QueueEntryKey) queueEntryTB.entryToObject(key);
Long messageId = entryKey.getMessageId();
AMQShortString queueName = entryKey.getQueueName();
if (!existingQueues.contains(queueName))
{
String name = queueName.asString();
HashSet<Long> messages = phantomMessageQueues.get(name);
if (messages == null)
{
messages = new HashSet<Long>();
phantomMessageQueues.put(name, messages);
}
messages.add(messageId);
_count++;
}
else
{
queueMessages.add(messageId);
}
}
};
_oldMessageStore.visitDelivery(messageDeliveryCheckVisitor);
if (phantomMessageQueues.isEmpty())
{
_logger.info("No such messages were found");
}
else
{
_logger.info("Found " + messageDeliveryCheckVisitor.getVisitedCount()+ " such messages in total");
for (Entry<String, HashSet<Long>> phantomQueue : phantomMessageQueues.entrySet())
{
String queueName = phantomQueue.getKey();
HashSet<Long> messages = phantomQueue.getValue();
_logger.info(MessageFormat.format("There are {0} messages which were previously delivered to non-durable queue ''{1}''",messages.size(), queueName));
boolean createQueue;
if(!_interactive)
{
createQueue = true;
_logger.info("Running in batch-mode, marking queue as durable to ensure retention of the messages.");
}
else
{
createQueue = userInteract("Do you want to make this queue durable?\n"
+ "NOTE: Answering No will result in these messages being discarded!");
}
if (createQueue)
{
for (Long messageId : messages)
{
queueMessages.add(messageId);
}
AMQShortString name = new AMQShortString(queueName);
existingQueues.add(name);
QueueRecord record = new QueueRecord(name, null, false, null);
_newMessageStore.createQueue(record);
}
}
}
//Migrate _messageMetaDataDb;
_logger.info("Message MetaData");
final Database newMetaDataDB = _newMessageStore.getMetaDataDb();
final TupleBinding<Object> oldMetaDataTupleBinding = _oldMessageStore.getMetaDataTupleBindingFactory().getInstance();
final TupleBinding<Object> newMetaDataTupleBinding = _newMessageStore.getMetaDataTupleBindingFactory().getInstance();
DatabaseVisitor metaDataVisitor = new DatabaseVisitor()
{
public void visit(DatabaseEntry key, DatabaseEntry value) throws DatabaseException
{
_count++;
MessageMetaData metaData = (MessageMetaData) oldMetaDataTupleBinding.entryToObject(value);
// get message id
Long messageId = TupleBinding.getPrimitiveBinding(Long.class).entryToObject(key);
// ONLY copy data if message is delivered to existing queue
if (!queueMessages.contains(messageId))
{
return;
}
DatabaseEntry newValue = new DatabaseEntry();
newMetaDataTupleBinding.objectToEntry(metaData, newValue);
newMetaDataDB.put(null, key, newValue);
}
};
_oldMessageStore.visitMetaDataDb(metaDataVisitor);
logCount(metaDataVisitor.getVisitedCount(), "Message MetaData");
//Migrate _messageContentDb;
_logger.info("Message Contents");
final Database newContentDB = _newMessageStore.getContentDb();
final TupleBinding<MessageContentKey> oldContentKeyTupleBinding = new MessageContentKeyTB_4();
final TupleBinding<MessageContentKey> newContentKeyTupleBinding = new MessageContentKeyTB_5();
final TupleBinding contentTB = new ContentTB();
DatabaseVisitor contentVisitor = new DatabaseVisitor()
{
long _prevMsgId = -1; //Initialise to invalid value
int _bytesSeenSoFar = 0;
public void visit(DatabaseEntry key, DatabaseEntry value) throws DatabaseException
{
_count++;
//determine the msgId of the current entry
MessageContentKey_4 contentKey = (MessageContentKey_4) oldContentKeyTupleBinding.entryToObject(key);
long msgId = contentKey.getMessageId();
// ONLY copy data if message is delivered to existing queue
if (!queueMessages.contains(msgId))
{
return;
}
//if this is a new message, restart the byte offset count.
if(_prevMsgId != msgId)
{
_bytesSeenSoFar = 0;
}
//determine the content size
ByteBuffer content = (ByteBuffer) contentTB.entryToObject(value);
int contentSize = content.limit();
//create the new key: id + previously seen data count
MessageContentKey_5 newKey = new MessageContentKey_5(msgId, _bytesSeenSoFar);
DatabaseEntry newKeyEntry = new DatabaseEntry();
newContentKeyTupleBinding.objectToEntry(newKey, newKeyEntry);
DatabaseEntry newValueEntry = new DatabaseEntry();
contentTB.objectToEntry(content, newValueEntry);
newContentDB.put(null, newKeyEntry, newValueEntry);
_prevMsgId = msgId;
_bytesSeenSoFar += contentSize;
}
};
_oldMessageStore.visitContentDb(contentVisitor);
logCount(contentVisitor.getVisitedCount(), "Message Content");
//Migrate _deliveryDb;
_logger.info("Delivery Records");
final Database deliveryDb =_newMessageStore.getDeliveryDb();
DatabaseVisitor deliveryDbVisitor = new DatabaseVisitor()
{
public void visit(DatabaseEntry key, DatabaseEntry value) throws DatabaseException
{
_count++;
// get message id from entry key
QueueEntryKey entryKey = (QueueEntryKey) queueEntryTB.entryToObject(key);
AMQShortString queueName = entryKey.getQueueName();
// ONLY copy data if message queue exists
if (existingQueues.contains(queueName))
{
deliveryDb.put(null, key, value);
}
}
};
_oldMessageStore.visitDelivery(deliveryDbVisitor);
logCount(contentVisitor.getVisitedCount(), "Delivery Record");
}
/**
* Log the specified count for item in a user friendly way.
*
* @param count of items to log
* @param item description of what is being logged.
*/
private void logCount(int count, String item)
{
_logger.info(" " + count + " " + item + " " + (count == 1 ? "entry" : "entries"));
}
/**
* @param oldDatabase The old MessageStoreDB to perform the visit on
* @param newDatabase The new MessageStoreDB to copy the data to.
* @param contentName The string name of the content for display purposes.
*
* @throws AMQException Due to createQueue thorwing AMQException
* @throws DatabaseException If there is a problem with the loading of the data
*/
private void moveContents(Database oldDatabase, final Database newDatabase, String contentName) throws AMQException, DatabaseException
{
DatabaseVisitor moveVisitor = new DatabaseVisitor()
{
public void visit(DatabaseEntry key, DatabaseEntry value) throws DatabaseException
{
_count++;
newDatabase.put(null, key, value);
}
};
_oldMessageStore.visitDatabase(oldDatabase, moveVisitor);
logCount(moveVisitor.getVisitedCount(), contentName);
}
private static void usage()
{
System.out.println("usage: BDBStoreUpgrade:\n [-h|--help] [-q|--quiet] [-f|--force] [-b|--backup <Path to backup-db>] " +
"-i|--input <Path to input-db> [-o|--output <Path to upgraded-db>]");
}
private static void help()
{
System.out.println("usage: BDBStoreUpgrade:");
System.out.println("Required:");
for (Object obj : _options.getOptions())
{
Option option = (Option) obj;
if (option.isRequired())
{
System.out.println("-" + option.getOpt() + "|--" + option.getLongOpt() + "\t\t-\t" + option.getDescription());
}
}
System.out.println("\nOptions:");
for (Object obj : _options.getOptions())
{
Option option = (Option) obj;
if (!option.isRequired())
{
System.out.println("--" + option.getLongOpt() + "|-" + option.getOpt() + "\t\t-\t" + option.getDescription());
}
}
}
static boolean isDirectoryAStoreDir(File directory)
{
if (directory.isFile())
{
return false;
}
for (File file : directory.listFiles())
{
if (file.isFile())
{
if (file.getName().endsWith(BDB_FILE_ENDING))
{
return true;
}
}
}
return false;
}
static File[] discoverDBStores(File fromDir)
{
if (!fromDir.exists())
{
throw new IllegalArgumentException("'" + fromDir + "' does not exist unable to upgrade.");
}
// Ensure we are given a directory
if (fromDir.isFile())
{
throw new IllegalArgumentException("'" + fromDir + "' is not a directory unable to upgrade.");
}
// Check to see if we have been given a single directory
if (isDirectoryAStoreDir(fromDir))
{
return new File[]{fromDir};
}
// Check to see if we have been give a directory containing stores.
List<File> stores = new LinkedList<File>();
for (File directory : fromDir.listFiles())
{
if (directory.isDirectory())
{
if (isDirectoryAStoreDir(directory))
{
stores.add(directory);
}
}
}
return stores.toArray(new File[stores.size()]);
}
private static void performDBBackup(File source, File backup, boolean force) throws Exception
{
if (backup.exists())
{
if (force)
{
_logger.info("Backup location exists. Forced to remove.");
FileUtils.delete(backup, true);
}
else
{
throw new IllegalArgumentException("Unable to perform backup a backup already exists.");
}
}
try
{
_logger.info("Backing up '" + source + "' to '" + backup + "'");
FileUtils.copyRecursive(source, backup);
}
catch (FileNotFoundException e)
{
//Throwing IAE here as this will be reported as a Backup not started
throw new IllegalArgumentException("Unable to perform backup:" + e.getMessage());
}
catch (FileUtils.UnableToCopyException e)
{
//Throwing exception here as this will be reported as a Failed Backup
throw new Exception("Unable to perform backup due to:" + e.getMessage());
}
}
public static void main(String[] args) throws ParseException
{
setOptions(_options);
final Options helpOptions = new Options();
setHelpOptions(helpOptions);
//Display help
boolean displayHelp = false;
try
{
if (new PosixParser().parse(helpOptions, args).hasOption("h"))
{
showHelp();
}
}
catch (ParseException pe)
{
displayHelp = true;
}
//Parse commandline for required arguments
try
{
_commandLine = new PosixParser().parse(_options, args);
}
catch (ParseException mae)
{
if (displayHelp)
{
showHelp();
}
else
{
fatalError(mae.getMessage());
}
}
String fromDir = _commandLine.getOptionValue(OPTION_INPUT_SHORT);
String toDir = _commandLine.getOptionValue(OPTION_OUTPUT_SHORT);
String backupDir = _commandLine.getOptionValue(OPTION_BACKUP_SHORT);
if (backupDir == null && _commandLine.hasOption(OPTION_BACKUP_SHORT))
{
backupDir = "";
}
//Attempt to locate possible Store to upgrade on input path
File[] stores = new File[0];
try
{
stores = discoverDBStores(new File(fromDir));
}
catch (IllegalArgumentException iae)
{
fatalError(iae.getMessage());
}
boolean interactive = !_commandLine.hasOption(OPTION_QUIET_SHORT);
boolean force = _commandLine.hasOption(OPTION_FORCE_SHORT);
try{
for (File store : stores)
{
// if toDir is null then we are upgrading inplace so we don't need
// to provide an upgraded toDir when upgrading multiple stores.
if (toDir == null ||
// Check to see if we are upgrading a store specified in
// fromDir or if the directories are nested.
(stores.length > 0
&& stores[0].toString().length() == fromDir.length()))
{
upgrade(store, toDir, backupDir, interactive, force);
}
else
{
// Add the extra part of path from store to the toDir
upgrade(store, toDir + File.separator + store.toString().substring(fromDir.length()), backupDir, interactive, force);
}
}
}
catch (RuntimeException re)
{
if (!(USER_ABORTED_PROCESS).equals(re.getMessage()))
{
re.printStackTrace();
_logger.error("Upgrade Failed: " + re.getMessage());
}
else
{
_logger.error("Upgrade stopped : User aborted");
}
}
}
@SuppressWarnings("static-access")
private static void setOptions(Options options)
{
Option input =
OptionBuilder.isRequired().hasArg().withDescription("Location (Path) of store to upgrade.").withLongOpt(OPTION_INPUT)
.create(OPTION_INPUT_SHORT);
Option output =
OptionBuilder.hasArg().withDescription("Location (Path) for the upgraded-db to be written.").withLongOpt(OPTION_OUTPUT)
.create(OPTION_OUTPUT_SHORT);
Option quiet = new Option(OPTION_QUIET_SHORT, OPTION_QUIET, false, "Disable interactive options.");
Option force = new Option(OPTION_FORCE_SHORT, OPTION_FORCE, false, "Force upgrade removing any existing upgrade target.");
Option backup =
OptionBuilder.hasOptionalArg().withDescription("Location (Path) for the backup-db to be written.").withLongOpt(OPTION_BACKUP)
.create(OPTION_BACKUP_SHORT);
options.addOption(input);
options.addOption(output);
options.addOption(quiet);
options.addOption(force);
options.addOption(backup);
setHelpOptions(options);
}
private static void setHelpOptions(Options options)
{
options.addOption(new Option("h", "help", false, "Show this help."));
}
static void upgrade(File fromDir, String toDir, String backupDir, boolean interactive, boolean force)
{
_logger.info("Running BDB Message Store upgrade tool: v" + VERSION);
int version = getStoreVersion(fromDir);
if (!isVersionUpgradable(version))
{
return;
}
try
{
new BDBStoreUpgrade(fromDir.toString(), toDir, backupDir, interactive, force).upgradeFromVersion(version);
_logger.info("Upgrade complete.");
}
catch (IllegalArgumentException iae)
{
_logger.error("Upgrade not started due to: " + iae.getMessage());
}
catch (DatabaseException de)
{
de.printStackTrace();
_logger.error("Upgrade Failed: " + de.getMessage());
}
catch (RuntimeException re)
{
if (!(USER_ABORTED_PROCESS).equals(re.getMessage()))
{
re.printStackTrace();
_logger.error("Upgrade Failed: " + re.getMessage());
}
else
{
throw re;
}
}
catch (Exception e)
{
e.printStackTrace();
_logger.error("Upgrade Failed: " + e.getMessage());
}
}
/**
* Utility method to verify if store of given version can be upgraded.
*
* @param version
* store version to verify
* @return true if store can be upgraded, false otherwise
*/
protected static boolean isVersionUpgradable(int version)
{
boolean storeUpgradable = false;
if (version == 0)
{
_logger.error("Existing store version is undefined!");
}
else if (version < LOWEST_SUPPORTED_STORE_VERSION)
{
_logger.error(MessageFormat.format(PREVIOUS_STORE_VERSION_UNSUPPORTED,
new Object[] { Integer.toString(version) }));
}
else if (version == BDBMessageStore.DATABASE_FORMAT_VERSION)
{
_logger.error(MessageFormat.format(STORE_ALREADY_UPGRADED, new Object[] { Integer.toString(version) }));
}
else if (version > BDBMessageStore.DATABASE_FORMAT_VERSION)
{
_logger.error(MessageFormat.format(FOLLOWING_STORE_VERSION_UNSUPPORTED,
new Object[] { Integer.toString(version) }));
}
else
{
_logger.info("Existing store version is " + version);
storeUpgradable = true;
}
return storeUpgradable;
}
/**
* Detects existing store version by checking list of database in store
* environment
*
* @param fromDir
* store folder
* @return version
*/
public static int getStoreVersion(File fromDir)
{
int version = 0;
EnvironmentConfig envConfig = new EnvironmentConfig();
envConfig.setAllowCreate(false);
envConfig.setTransactional(false);
envConfig.setReadOnly(true);
Environment environment = null;
try
{
environment = new Environment(fromDir, envConfig);
List<String> databases = environment.getDatabaseNames();
for (String name : databases)
{
if (name.startsWith("exchangeDb"))
{
if (name.startsWith("exchangeDb_v"))
{
version = Integer.parseInt(name.substring(12));
}
else
{
version = 1;
}
break;
}
}
}
catch (Exception e)
{
_logger.error("Failure to open existing database: " + e.getMessage());
}
finally
{
if (environment != null)
{
try
{
environment.close();
}
catch (Exception e)
{
// ignoring. It should never happen.
}
}
}
return version;
}
private static void fatalError(String message)
{
System.out.println(message);
usage();
System.exit(1);
}
private static void showHelp()
{
help();
System.exit(0);
}
}