blob: 9135705ccaa4ad59a4ded1601de538697839d56c [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.recon.tasks;
import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_TASK_THREAD_COUNT_DEFAULT;
import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_TASK_THREAD_COUNT_KEY;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.ozone.om.OMMetadataManager;
import org.hadoop.ozone.recon.schema.tables.daos.ReconTaskStatusDao;
import org.hadoop.ozone.recon.schema.tables.pojos.ReconTaskStatus;
import org.jooq.Configuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.inject.Inject;
/**
* Implementation of ReconTaskController.
*/
public class ReconTaskControllerImpl implements ReconTaskController {
private static final Logger LOG =
LoggerFactory.getLogger(ReconTaskControllerImpl.class);
private Map<String, ReconDBUpdateTask> reconDBUpdateTasks;
private ExecutorService executorService;
private int threadCount = 1;
private final Semaphore taskSemaphore = new Semaphore(1);
private Map<String, AtomicInteger> taskFailureCounter = new HashMap<>();
private static final int TASK_FAILURE_THRESHOLD = 2;
private ReconTaskStatusDao reconTaskStatusDao;
@Inject
public ReconTaskControllerImpl(OzoneConfiguration configuration,
Configuration sqlConfiguration,
Set<ReconDBUpdateTask> tasks) {
reconDBUpdateTasks = new HashMap<>();
threadCount = configuration.getInt(OZONE_RECON_TASK_THREAD_COUNT_KEY,
OZONE_RECON_TASK_THREAD_COUNT_DEFAULT);
executorService = Executors.newFixedThreadPool(threadCount);
reconTaskStatusDao = new ReconTaskStatusDao(sqlConfiguration);
for (ReconDBUpdateTask task : tasks) {
registerTask(task);
}
}
@Override
public void registerTask(ReconDBUpdateTask task) {
String taskName = task.getTaskName();
LOG.info("Registered task " + taskName + " with controller.");
// Store task in Task Map.
reconDBUpdateTasks.put(taskName, task);
// Store Task in Task failure tracker.
taskFailureCounter.put(taskName, new AtomicInteger(0));
// Create DB record for the task.
ReconTaskStatus reconTaskStatusRecord = new ReconTaskStatus(taskName,
0L, 0L);
if (!reconTaskStatusDao.existsById(taskName)) {
reconTaskStatusDao.insert(reconTaskStatusRecord);
}
}
/**
* For every registered task, we try process step twice and then reprocess
* once (if process failed twice) to absorb the events. If a task has failed
* reprocess call more than 2 times across events, it is unregistered
* (blacklisted).
* @param events set of events
* @throws InterruptedException
*/
@Override
public void consumeOMEvents(OMUpdateEventBatch events,
OMMetadataManager omMetadataManager)
throws InterruptedException {
taskSemaphore.acquire();
try {
if (!events.isEmpty()) {
Collection<Callable<Pair>> tasks = new ArrayList<>();
for (Map.Entry<String, ReconDBUpdateTask> taskEntry :
reconDBUpdateTasks.entrySet()) {
ReconDBUpdateTask task = taskEntry.getValue();
Collection<String> tables = task.getTaskTables();
tasks.add(() -> task.process(events.filter(tables)));
}
List<Future<Pair>> results = executorService.invokeAll(tasks);
List<String> failedTasks = processTaskResults(results, events);
// Retry
List<String> retryFailedTasks = new ArrayList<>();
if (!failedTasks.isEmpty()) {
tasks.clear();
for (String taskName : failedTasks) {
ReconDBUpdateTask task = reconDBUpdateTasks.get(taskName);
Collection<String> tables = task.getTaskTables();
tasks.add(() -> task.process(events.filter(tables)));
}
results = executorService.invokeAll(tasks);
retryFailedTasks = processTaskResults(results, events);
}
// Reprocess the failed tasks.
// TODO Move to a separate task queue since reprocess may be a heavy
// operation for large OM DB instances
if (!retryFailedTasks.isEmpty()) {
tasks.clear();
for (String taskName : failedTasks) {
ReconDBUpdateTask task = reconDBUpdateTasks.get(taskName);
tasks.add(() -> task.reprocess(omMetadataManager));
}
results = executorService.invokeAll(tasks);
List<String> reprocessFailedTasks =
processTaskResults(results, events);
for (String taskName : reprocessFailedTasks) {
LOG.info("Reprocess step failed for task : " + taskName);
if (taskFailureCounter.get(taskName).incrementAndGet() >
TASK_FAILURE_THRESHOLD) {
LOG.info("Blacklisting Task since it failed retry and " +
"reprocess more than " + TASK_FAILURE_THRESHOLD + " times.");
reconDBUpdateTasks.remove(taskName);
}
}
}
}
} catch (ExecutionException e) {
LOG.error("Unexpected error : ", e);
} finally {
taskSemaphore.release();
}
}
@Override
public void reInitializeTasks(OMMetadataManager omMetadataManager)
throws InterruptedException {
taskSemaphore.acquire();
try {
Collection<Callable<Pair>> tasks = new ArrayList<>();
for (Map.Entry<String, ReconDBUpdateTask> taskEntry :
reconDBUpdateTasks.entrySet()) {
ReconDBUpdateTask task = taskEntry.getValue();
tasks.add(() -> task.reprocess(omMetadataManager));
}
List<Future<Pair>> results = executorService.invokeAll(tasks);
for (Future<Pair> f : results) {
String taskName = f.get().getLeft().toString();
if (!(Boolean)f.get().getRight()) {
LOG.info("Init failed for task : " + taskName);
}
}
} catch (ExecutionException e) {
LOG.error("Unexpected error : ", e);
} finally {
taskSemaphore.release();
}
}
/**
* Store the last completed event sequence number and timestamp to the DB
* for that task.
* @param taskName taskname to be updated.
* @param lastSequenceNumber contains the new sequence number.
*/
private void storeLastCompletedTransaction(
String taskName, long lastSequenceNumber) {
ReconTaskStatus reconTaskStatusRecord = new ReconTaskStatus(taskName,
System.currentTimeMillis(), lastSequenceNumber);
reconTaskStatusDao.update(reconTaskStatusRecord);
}
@Override
public Map<String, ReconDBUpdateTask> getRegisteredTasks() {
return reconDBUpdateTasks;
}
@Override
public ReconTaskStatusDao getReconTaskStatusDao() {
return reconTaskStatusDao;
}
@Override
public void stop() {
this.executorService.shutdownNow();
}
/**
* Wait on results of all tasks.
* @param results Set of Futures.
* @param events Events.
* @return List of failed task names
* @throws ExecutionException execution Exception
* @throws InterruptedException Interrupted Exception
*/
private List<String> processTaskResults(List<Future<Pair>> results,
OMUpdateEventBatch events)
throws ExecutionException, InterruptedException {
List<String> failedTasks = new ArrayList<>();
for (Future<Pair> f : results) {
String taskName = f.get().getLeft().toString();
if (!(Boolean)f.get().getRight()) {
LOG.info("Failed task : " + taskName);
failedTasks.add(f.get().getLeft().toString());
} else {
taskFailureCounter.get(taskName).set(0);
storeLastCompletedTransaction(taskName, events.getLastSequenceNumber());
}
}
return failedTasks;
}
}