blob: 5cd7e74885c432f17e85090a2ae5a9d37088a89f [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.falcon.hive.mapreduce;
import org.apache.falcon.hive.HiveDRArgs;
import org.apache.falcon.hive.util.EventUtils;
import org.apache.falcon.hive.util.HiveDRUtils;
import org.apache.falcon.hive.util.ReplicationStatus;
import org.apache.falcon.job.ReplicationJobCountersList;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.io.IOException;
import java.sql.SQLException;
import java.util.List;
import java.util.concurrent.TimeUnit;
/**
* Map class for Hive DR.
*/
public class CopyMapper extends Mapper<LongWritable, Text, Text, Text> {
private static final Logger LOG = LoggerFactory.getLogger(CopyMapper.class);
private EventUtils eventUtils;
ScheduledThreadPoolExecutor timer;
@Override
protected void setup(Context context) throws IOException, InterruptedException {
eventUtils = new EventUtils(context.getConfiguration());
eventUtils.initializeFS();
try {
eventUtils.setupConnection();
} catch (Exception e) {
throw new IOException(e);
}
}
@Override
protected void map(LongWritable key, Text value,
final Context context) throws IOException, InterruptedException {
LOG.debug("Processing Event value: {}", value.toString());
timer = new ScheduledThreadPoolExecutor(1);
timer.scheduleAtFixedRate(new Runnable() {
public void run() {
System.out.println("Hive DR copy mapper progress heart beat");
context.progress();
}
}, 0, 30, TimeUnit.SECONDS);
try {
eventUtils.processEvents(value.toString());
} catch (Exception e) {
LOG.error("Exception in processing events:", e);
throw new IOException(e);
} finally {
timer.shutdownNow();
cleanup(context);
}
List<ReplicationStatus> replicationStatusList = eventUtils.getListReplicationStatus();
if (replicationStatusList != null && !replicationStatusList.isEmpty()) {
for (ReplicationStatus rs : replicationStatusList) {
context.write(new Text(rs.getJobName()), new Text(rs.toString()));
}
}
// In case of export stage, populate custom counters
if (context.getConfiguration().get(HiveDRArgs.EXECUTION_STAGE.getName())
.equalsIgnoreCase(HiveDRUtils.ExecutionStage.EXPORT.name())
&& !eventUtils.isCountersMapEmpty()) {
context.getCounter(ReplicationJobCountersList.BYTESCOPIED).increment(
eventUtils.getCounterValue(ReplicationJobCountersList.BYTESCOPIED.getName()));
context.getCounter(ReplicationJobCountersList.COPY).increment(
eventUtils.getCounterValue(ReplicationJobCountersList.COPY.getName()));
}
}
protected void cleanup(Context context) throws IOException, InterruptedException {
LOG.info("Invoking cleanup process");
super.cleanup(context);
try {
if (context.getConfiguration().get(HiveDRArgs.EXECUTION_STAGE.getName())
.equalsIgnoreCase(HiveDRUtils.ExecutionStage.IMPORT.name())) {
eventUtils.cleanEventsDirectory();
}
} catch (IOException e) {
LOG.error("Cleaning up of events directories failed", e);
} finally {
try {
eventUtils.closeConnection();
} catch (SQLException e) {
LOG.error("Closing the connections failed", e);
}
}
}
}