| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.rya.accumulo.mr.merge; |
| |
| import java.io.File; |
| import java.io.IOException; |
| import java.text.ParseException; |
| import java.text.SimpleDateFormat; |
| import java.util.ArrayList; |
| import java.util.Date; |
| import java.util.List; |
| import java.util.Map.Entry; |
| import java.util.Set; |
| |
| import org.apache.accumulo.core.client.AccumuloException; |
| import org.apache.accumulo.core.client.AccumuloSecurityException; |
| import org.apache.accumulo.core.client.ClientConfiguration; |
| import org.apache.accumulo.core.client.Connector; |
| import org.apache.accumulo.core.client.IteratorSetting; |
| import org.apache.accumulo.core.client.TableExistsException; |
| import org.apache.accumulo.core.client.admin.SecurityOperations; |
| import org.apache.accumulo.core.client.admin.TableOperations; |
| import org.apache.accumulo.core.client.mapreduce.AbstractInputFormat; |
| import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat; |
| import org.apache.accumulo.core.client.mapreduce.InputFormatBase; |
| import org.apache.accumulo.core.client.security.tokens.PasswordToken; |
| import org.apache.accumulo.core.data.Mutation; |
| import org.apache.accumulo.core.iterators.user.AgeOffFilter; |
| import org.apache.accumulo.core.security.Authorizations; |
| import org.apache.accumulo.core.security.TablePermission; |
| import org.apache.commons.lang.StringUtils; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.fs.FileSystem; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.fs.permission.FsAction; |
| import org.apache.hadoop.fs.permission.FsPermission; |
| import org.apache.hadoop.io.Text; |
| import org.apache.hadoop.mapreduce.Job; |
| import org.apache.hadoop.mapreduce.Reducer; |
| import org.apache.hadoop.util.ToolRunner; |
| import org.apache.log4j.BasicConfigurator; |
| import org.apache.log4j.Logger; |
| import org.apache.log4j.xml.DOMConfigurator; |
| import org.apache.rya.accumulo.AccumuloRdfConfiguration; |
| import org.apache.rya.accumulo.mr.AccumuloHDFSFileInputFormat; |
| import org.apache.rya.accumulo.mr.MRUtils; |
| import org.apache.rya.accumulo.mr.merge.mappers.MergeToolMapper; |
| import org.apache.rya.accumulo.mr.merge.util.AccumuloRyaUtils; |
| import org.apache.rya.accumulo.mr.merge.util.TimeUtils; |
| import org.apache.rya.accumulo.mr.merge.util.ToolConfigUtils; |
| import org.apache.rya.api.RdfCloudTripleStoreConfiguration; |
| import org.apache.rya.api.RdfCloudTripleStoreConstants; |
| import org.apache.rya.api.RdfCloudTripleStoreUtils; |
| import org.apache.rya.api.path.PathUtils; |
| import org.apache.rya.indexing.accumulo.ConfigUtils; |
| |
| import com.google.common.base.Joiner; |
| import com.google.common.collect.ImmutableList; |
| import com.google.common.collect.ImmutableMap; |
| |
| /** |
| * Handles merging a child accumulo instance's data back into its parent's |
| * instance. |
| */ |
| public class MergeTool extends AbstractDualInstanceAccumuloMRTool { |
| private static final Logger log = Logger.getLogger(MergeTool.class); |
| |
| public static final SimpleDateFormat START_TIME_FORMATTER = new SimpleDateFormat("yyyyMMddHHmmssSSSz"); |
| |
| /** |
| * Appended to certain config property names to indicate that the property is for the child instance. |
| */ |
| public static final String CHILD_SUFFIX = ".child"; |
| |
| /** |
| * Suffix added to a child table when it is temporarily being imported into the parent instance when |
| * being read from file and before the tables are merged together. |
| */ |
| public static final String TEMP_SUFFIX = "_temp_child"; |
| |
| /** |
| * The time of the data to be included in the copy/merge process. |
| */ |
| public static final String START_TIME_PROP = "tool.start.time"; |
| |
| /** |
| * The name of the table to process for the map reduce job. |
| */ |
| public static final String TABLE_NAME_PROP = "tool.table.name"; |
| |
| /** |
| * "true" to use file input. "false" to use Accumulo output. |
| */ |
| public static final String USE_MERGE_FILE_INPUT = "use.merge.file.input"; |
| |
| /** |
| * The file path to the child data input to merge in. |
| */ |
| public static final String MERGE_FILE_INPUT_PATH = "merge.file.input.path"; |
| |
| // startTime is the time of the data to merge. Only data modified AFTER the selected time will be merged. |
| private String startTime = null; |
| private String tempDir = null; |
| private boolean useMergeFileInput = false; |
| private String localMergeFileImportDir = null; |
| private String baseImportDir = null; |
| |
| private String tempChildAuths = null; |
| |
| private final List<String> tables = new ArrayList<>(); |
| |
| /** |
| * Map of keys that are supposed to use the same values. |
| */ |
| public static final ImmutableMap<String, List<String>> DUPLICATE_KEY_MAP = ImmutableMap.<String, List<String>>builder() |
| .put(MRUtils.AC_MOCK_PROP, ImmutableList.of(ConfigUtils.USE_MOCK_INSTANCE)) |
| .put(MRUtils.AC_INSTANCE_PROP, ImmutableList.of(ConfigUtils.CLOUDBASE_INSTANCE)) |
| .put(MRUtils.AC_USERNAME_PROP, ImmutableList.of(ConfigUtils.CLOUDBASE_USER)) |
| .put(MRUtils.AC_PWD_PROP, ImmutableList.of(ConfigUtils.CLOUDBASE_PASSWORD)) |
| .put(MRUtils.AC_AUTH_PROP, ImmutableList.of(ConfigUtils.CLOUDBASE_AUTHS, RdfCloudTripleStoreConfiguration.CONF_QUERY_AUTH)) |
| .put(MRUtils.AC_ZK_PROP, ImmutableList.of(ConfigUtils.CLOUDBASE_ZOOKEEPERS)) |
| .put(MRUtils.TABLE_PREFIX_PROPERTY, ImmutableList.of(RdfCloudTripleStoreConfiguration.CONF_TBL_PREFIX)) |
| .put(MRUtils.AC_MOCK_PROP + CHILD_SUFFIX, ImmutableList.of(ConfigUtils.USE_MOCK_INSTANCE + CHILD_SUFFIX)) |
| .put(MRUtils.AC_INSTANCE_PROP + CHILD_SUFFIX, ImmutableList.of(ConfigUtils.CLOUDBASE_INSTANCE + CHILD_SUFFIX)) |
| .put(MRUtils.AC_USERNAME_PROP + CHILD_SUFFIX, ImmutableList.of(ConfigUtils.CLOUDBASE_USER + CHILD_SUFFIX)) |
| .put(MRUtils.AC_PWD_PROP + CHILD_SUFFIX, ImmutableList.of(ConfigUtils.CLOUDBASE_PASSWORD + CHILD_SUFFIX)) |
| .put(MRUtils.AC_AUTH_PROP + CHILD_SUFFIX, ImmutableList.of(ConfigUtils.CLOUDBASE_AUTHS + CHILD_SUFFIX, RdfCloudTripleStoreConfiguration.CONF_QUERY_AUTH + CHILD_SUFFIX)) |
| .put(MRUtils.AC_ZK_PROP + CHILD_SUFFIX, ImmutableList.of(ConfigUtils.CLOUDBASE_ZOOKEEPERS + CHILD_SUFFIX)) |
| .put(MRUtils.TABLE_PREFIX_PROPERTY + CHILD_SUFFIX, ImmutableList.of(RdfCloudTripleStoreConfiguration.CONF_TBL_PREFIX + CHILD_SUFFIX)) |
| .build(); |
| |
| |
| /** |
| * Sets duplicate keys in the config. |
| * @param config the {@link Configuration}. |
| */ |
| public static void setDuplicateKeys(final Configuration config) { |
| for (final Entry<String, List<String>> entry : DUPLICATE_KEY_MAP.entrySet()) { |
| final String key = entry.getKey(); |
| final List<String> duplicateKeys = entry.getValue(); |
| final String value = config.get(key); |
| if (value != null) { |
| for (final String duplicateKey : duplicateKeys) { |
| config.set(duplicateKey, value); |
| } |
| } |
| } |
| } |
| |
| /** |
| * Sets all duplicate keys for the property in the config to the specified value. |
| * @param config the {@link Configuration}. |
| * @param property the property to set and all its duplicates. |
| * @param value the value to set the property to. |
| */ |
| public static void setDuplicateKeysForProperty(final Configuration config, final String property, final String value) { |
| final List<String> duplicateKeys = DUPLICATE_KEY_MAP.get(property); |
| config.set(property, value); |
| if (duplicateKeys != null) { |
| for (final String key : duplicateKeys) { |
| config.set(key, value); |
| } |
| } |
| } |
| |
| /** |
| * Sets up and initializes the merge tool's configuration. |
| * @throws Exception |
| */ |
| public void setup() throws Exception { |
| super.init(); |
| |
| tempDir = conf.get("hadoop.tmp.dir", null); |
| if (tempDir == null) { |
| throw new Exception("Invalid hadoop temp directory. \"hadoop.tmp.dir\" could not be found in the configuration."); |
| } |
| |
| useMergeFileInput = conf.getBoolean(USE_MERGE_FILE_INPUT, false); |
| localMergeFileImportDir = conf.get(MERGE_FILE_INPUT_PATH, null); |
| baseImportDir = tempDir + "/merge_tool_file_input/"; |
| |
| startTime = conf.get(START_TIME_PROP, null); |
| |
| if (!useMergeFileInput) { |
| if (startTime != null) { |
| try { |
| final Date date = START_TIME_FORMATTER.parse(startTime); |
| log.info("Will merge all data after " + date); |
| } catch (final ParseException e) { |
| throw new Exception("Unable to parse the provided start time: " + startTime, e); |
| } |
| } |
| |
| final boolean useTimeSync = conf.getBoolean(CopyTool.USE_NTP_SERVER_PROP, false); |
| if (useTimeSync) { |
| final String tomcatUrl = conf.get(CopyTool.CHILD_TOMCAT_URL_PROP, null); |
| final String ntpServerHost = conf.get(CopyTool.NTP_SERVER_HOST_PROP, null); |
| Long timeOffset = null; |
| try { |
| log.info("Comparing child machine's time to NTP server time..."); |
| timeOffset = TimeUtils.getNtpServerAndMachineTimeDifference(ntpServerHost, tomcatUrl); |
| } catch (IOException | ParseException e) { |
| throw new Exception("Unable to get time difference between machine and NTP server.", e); |
| } |
| if (timeOffset != null) { |
| conf.set(CopyTool.CHILD_TIME_OFFSET_PROP, "" + timeOffset); |
| } |
| } |
| } |
| |
| setDuplicateKeys(conf); |
| |
| tables.add(tablePrefix + RdfCloudTripleStoreConstants.TBL_SPO_SUFFIX); |
| } |
| |
| @Override |
| public int run(final String[] strings) throws Exception { |
| useMergeFileInput = conf.getBoolean(USE_MERGE_FILE_INPUT, false); |
| |
| log.info("Setting up Merge Tool..."); |
| setup(); |
| |
| if (useMergeFileInput) { |
| // When using file input mode the child instance will use a temporary table in the parent instance to |
| // store the child table data. The two tables will then be merged together. |
| copyParentPropertiesToChild(conf); |
| } |
| |
| for (final String table : tables) { |
| final String childTable = table.replaceFirst(tablePrefix, childTablePrefix); |
| final String jobName = "Merge Tool, merging Child Table: " + childTable + ", into Parent Table: " + table + ", " + System.currentTimeMillis(); |
| log.info("Initializing job: " + jobName); |
| conf.set(MRUtils.JOB_NAME_PROP, jobName); |
| conf.set(TABLE_NAME_PROP, table); |
| |
| final Job job = Job.getInstance(conf); |
| job.setJarByClass(MergeTool.class); |
| |
| if (useMergeFileInput) { |
| importChildFilesToTempParentTable(childTable); |
| } |
| |
| setupAccumuloInput(job); |
| |
| InputFormatBase.setInputTableName(job, table); |
| |
| // Set input output of the particular job |
| job.setMapOutputKeyClass(Text.class); |
| job.setMapOutputValueClass(Mutation.class); |
| job.setOutputKeyClass(Text.class); |
| job.setOutputValueClass(Mutation.class); |
| |
| setupAccumuloOutput(job, table); |
| |
| // Set mapper and reducer classes |
| job.setMapperClass(MergeToolMapper.class); |
| job.setReducerClass(Reducer.class); |
| |
| // Submit the job |
| final Date beginTime = new Date(); |
| log.info("Job for table \"" + table + "\" started: " + beginTime); |
| final int exitCode = job.waitForCompletion(true) ? 0 : 1; |
| |
| if (useMergeFileInput && StringUtils.isNotBlank(tempChildAuths)) { |
| // Clear any of the temporary child auths given to the parent |
| final AccumuloRdfConfiguration parentAccumuloRdfConfiguration = new AccumuloRdfConfiguration(conf); |
| parentAccumuloRdfConfiguration.setTablePrefix(tablePrefix); |
| final Connector parentConnector = AccumuloRyaUtils.setupConnector(parentAccumuloRdfConfiguration); |
| final SecurityOperations secOps = parentConnector.securityOperations(); |
| |
| AccumuloRyaUtils.removeUserAuths(userName, secOps, tempChildAuths); |
| } |
| |
| if (exitCode == 0) { |
| final Date endTime = new Date(); |
| log.info("Job for table \"" + table + "\" finished: " + endTime); |
| log.info("The job took " + (endTime.getTime() - beginTime.getTime()) / 1000 + " seconds."); |
| } else { |
| log.error("Job for table \"" + table + "\" Failed!!!"); |
| return exitCode; |
| } |
| } |
| |
| return 0; |
| } |
| |
| /** |
| * Creates the temp child table if it doesn't already exist in the parent. |
| * @param childTableName the name of the child table. |
| * @throws IOException |
| */ |
| public void createTempTableIfNeeded(final String childTableName) throws IOException { |
| try { |
| final AccumuloRdfConfiguration accumuloRdfConfiguration = new AccumuloRdfConfiguration(conf); |
| accumuloRdfConfiguration.setTablePrefix(childTablePrefix); |
| final Connector connector = AccumuloRyaUtils.setupConnector(accumuloRdfConfiguration); |
| if (!connector.tableOperations().exists(childTableName)) { |
| log.info("Creating table: " + childTableName); |
| connector.tableOperations().create(childTableName); |
| log.info("Created table: " + childTableName); |
| log.info("Granting authorizations to table: " + childTableName); |
| final SecurityOperations secOps = connector.securityOperations(); |
| secOps.grantTablePermission(userName, childTableName, TablePermission.WRITE); |
| log.info("Granted authorizations to table: " + childTableName); |
| |
| final Authorizations parentAuths = secOps.getUserAuthorizations(userName); |
| // Add child authorizations so the temp parent table can be accessed. |
| if (!parentAuths.equals(childAuthorizations)) { |
| final List<String> childAuthList = findUniqueAuthsFromChild(parentAuths.toString(), childAuthorizations.toString()); |
| tempChildAuths = Joiner.on(",").join(childAuthList); |
| log.info("Adding the authorization, \"" + tempChildAuths + "\", to the parent user, \"" + userName + "\""); |
| final Authorizations newAuths = AccumuloRyaUtils.addUserAuths(userName, secOps, new Authorizations(tempChildAuths)); |
| secOps.changeUserAuthorizations(userName, newAuths); |
| } |
| } |
| } catch (TableExistsException | AccumuloException | AccumuloSecurityException e) { |
| throw new IOException(e); |
| } |
| } |
| |
| /** |
| * Gets any unique user auths that the child has that the parent does not. |
| * @param parentAuths the comma-separated string of parent authorizations. |
| * @param childAuths the comma-separated string of parent authorizations. |
| * @return the unique child authorizations that are not in the parent. |
| */ |
| private static List<String> findUniqueAuthsFromChild(final String parentAuths, final String childAuths) { |
| final List<String> parentAuthList = AccumuloRyaUtils.convertAuthStringToList(parentAuths); |
| final List<String> childAuthList = AccumuloRyaUtils.convertAuthStringToList(childAuths); |
| |
| childAuthList.removeAll(parentAuthList); |
| |
| return childAuthList; |
| } |
| |
| /** |
| * Imports the child files that hold the table data into the parent instance as a temporary table. |
| * @param childTableName the name of the child table to import into a temporary parent table. |
| * @throws Exception |
| */ |
| public void importChildFilesToTempParentTable(final String childTableName) throws Exception { |
| // Create a temporary table in the parent instance to import the child files to. Then run the merge process on the parent table and temp child table. |
| final String tempChildTable = childTableName + TEMP_SUFFIX; |
| |
| createTempTableIfNeeded(tempChildTable); |
| |
| final AccumuloRdfConfiguration parentAccumuloRdfConfiguration = new AccumuloRdfConfiguration(conf); |
| parentAccumuloRdfConfiguration.setTablePrefix(childTablePrefix); |
| final Connector parentConnector = AccumuloRyaUtils.setupConnector(parentAccumuloRdfConfiguration); |
| final TableOperations parentTableOperations = parentConnector.tableOperations(); |
| |
| final Path localWorkDir = CopyTool.getPath(localMergeFileImportDir, childTableName); |
| final Path hdfsBaseWorkDir = CopyTool.getPath(baseImportDir, childTableName); |
| |
| CopyTool.copyLocalToHdfs(localWorkDir, hdfsBaseWorkDir, conf); |
| |
| final Path files = CopyTool.getPath(hdfsBaseWorkDir.toString(), "files"); |
| final Path failures = CopyTool.getPath(hdfsBaseWorkDir.toString(), "failures"); |
| final FileSystem fs = FileSystem.get(conf); |
| // With HDFS permissions on, we need to make sure the Accumulo user can read/move the files |
| fs.setPermission(hdfsBaseWorkDir, new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL)); |
| if (fs.exists(failures)) { |
| fs.delete(failures, true); |
| } |
| fs.mkdirs(failures); |
| |
| parentTableOperations.importDirectory(tempChildTable, files.toString(), failures.toString(), false); |
| |
| AccumuloRyaUtils.printTablePretty(tempChildTable, conf); |
| } |
| |
| /** |
| * Copies all the relevant parent instance config properties to the corresponding child properties. |
| * @param config the {@link Configuration} to use. |
| */ |
| public static void copyParentPropertiesToChild(final Configuration config) { |
| // Copy the parent properties for the child to use. |
| copyParentPropToChild(config, MRUtils.AC_MOCK_PROP); |
| copyParentPropToChild(config, MRUtils.AC_INSTANCE_PROP); |
| copyParentPropToChild(config, MRUtils.AC_USERNAME_PROP); |
| copyParentPropToChild(config, MRUtils.AC_PWD_PROP); |
| //copyParentPropToChild(config, MRUtils.TABLE_PREFIX_PROPERTY); |
| //copyParentPropToChild(config, MRUtils.AC_AUTH_PROP); |
| //copyParentPropToChild(config, RdfCloudTripleStoreConfiguration.CONF_QUERY_AUTH); |
| copyParentPropToChild(config, MRUtils.AC_ZK_PROP); |
| |
| MergeTool.setDuplicateKeys(config); |
| } |
| |
| /** |
| * Copies the parent config property to the corresponding child property. |
| * @param config the {@link Configuration} to use. |
| * @param parentPropertyName the parent property name to use. |
| */ |
| public static void copyParentPropToChild(final Configuration config, final String parentPropertyName) { |
| final String parentValue = config.get(parentPropertyName, ""); |
| config.set(parentPropertyName + MergeTool.CHILD_SUFFIX, parentValue); |
| } |
| |
| @Override |
| protected void setupAccumuloInput(final Job job) throws AccumuloSecurityException { |
| // set up accumulo input |
| if (!hdfsInput) { |
| job.setInputFormatClass(AccumuloInputFormat.class); |
| } else { |
| job.setInputFormatClass(AccumuloHDFSFileInputFormat.class); |
| } |
| AbstractInputFormat.setConnectorInfo(job, userName, new PasswordToken(pwd)); |
| InputFormatBase.setInputTableName(job, RdfCloudTripleStoreUtils.layoutPrefixToTable(rdfTableLayout, tablePrefix)); |
| AbstractInputFormat.setScanAuthorizations(job, authorizations); |
| if (!mock) { |
| AbstractInputFormat.setZooKeeperInstance(job, new ClientConfiguration().withInstance(instance).withZkHosts(zk)); |
| } else { |
| AbstractInputFormat.setMockInstance(job, instance); |
| } |
| if (ttl != null) { |
| final IteratorSetting setting = new IteratorSetting(1, "fi", AgeOffFilter.class); |
| AgeOffFilter.setTTL(setting, Long.valueOf(ttl)); |
| InputFormatBase.addIterator(job, setting); |
| } |
| for (final IteratorSetting iteratorSetting : AccumuloRyaUtils.COMMON_REG_EX_FILTER_SETTINGS) { |
| InputFormatBase.addIterator(job, iteratorSetting); |
| } |
| } |
| |
| /** |
| * Sets up and runs the merge tool with the provided args. |
| * @param args the arguments list. |
| * @return the execution result. |
| */ |
| public static int setupAndRun(final String[] args) { |
| int returnCode = -1; |
| try { |
| final Configuration conf = new Configuration(); |
| final Set<String> toolArgs = ToolConfigUtils.getUserArguments(conf, args); |
| if (!toolArgs.isEmpty()) { |
| final String parameters = Joiner.on("\r\n\t").join(toolArgs); |
| log.info("Running Merge Tool with the following parameters...\r\n\t" + parameters); |
| } |
| |
| returnCode = ToolRunner.run(conf, new MergeTool(), args); |
| } catch (final Exception e) { |
| log.error("Error running merge tool", e); |
| } |
| return returnCode; |
| } |
| |
| public static void main(final String[] args) { |
| final String log4jConfiguration = System.getProperties().getProperty("log4j.configuration"); |
| if (StringUtils.isNotBlank(log4jConfiguration)) { |
| final String parsedConfiguration = PathUtils.clean(StringUtils.removeStart(log4jConfiguration, "file:")); |
| final File configFile = new File(parsedConfiguration); |
| if (configFile.exists()) { |
| DOMConfigurator.configure(parsedConfiguration); |
| } else { |
| BasicConfigurator.configure(); |
| } |
| } |
| log.info("Starting Merge Tool"); |
| |
| Thread.setDefaultUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() { |
| @Override |
| public void uncaughtException(final Thread thread, final Throwable throwable) { |
| log.error("Uncaught exception in " + thread.getName(), throwable); |
| } |
| }); |
| |
| final int returnCode = setupAndRun(args); |
| |
| log.info("Finished running Merge Tool"); |
| |
| System.exit(returnCode); |
| } |
| |
| /** |
| * Creates a formatted string for the start time based on the specified date. |
| * @param startDate the start {@link Date} to format. |
| * @return the formatted start time string. |
| */ |
| public static String getStartTimeString(final Date startDate) { |
| return convertDateToStartTimeString(startDate); |
| } |
| |
| /** |
| * Converts the specified date into a string to use as the start time for the timestamp filter. |
| * @param date the start {@link Date} of the filter that will be formatted as a string. |
| * @return the formatted start time string. |
| */ |
| public static String convertDateToStartTimeString(final Date date) { |
| final String startTimeString = START_TIME_FORMATTER.format(date); |
| return startTimeString; |
| } |
| |
| /** |
| * Converts the specified string into a date to use as the start time for the timestamp filter. |
| * @param startTimeString the formatted time string. |
| * @return the start {@link Date}. |
| */ |
| public static Date convertStartTimeStringToDate(final String startTimeString) { |
| Date date; |
| try { |
| date = START_TIME_FORMATTER.parse(startTimeString); |
| } catch (final ParseException e) { |
| log.error("Could not parse date", e); |
| return null; |
| } |
| return date; |
| } |
| } |