| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.giraph.zk; |
| |
| import com.google.common.util.concurrent.Uninterruptibles; |
| import org.apache.commons.io.FileUtils; |
| import org.apache.giraph.conf.GiraphConstants; |
| import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; |
| import org.apache.giraph.time.SystemTime; |
| import org.apache.giraph.time.Time; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.fs.FileStatus; |
| import org.apache.hadoop.fs.FileSystem; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.mapreduce.Mapper; |
| import org.apache.log4j.Logger; |
| |
| import java.io.File; |
| import java.io.IOException; |
| import java.net.ConnectException; |
| import java.net.InetSocketAddress; |
| import java.net.Socket; |
| import java.net.SocketTimeoutException; |
| import java.util.Arrays; |
| import java.util.concurrent.TimeUnit; |
| |
| import static com.google.common.base.Preconditions.checkState; |
| import static org.apache.giraph.conf.GiraphConstants.BASE_ZNODE_KEY; |
| import static org.apache.giraph.conf.GiraphConstants.ZOOKEEPER_MANAGER_DIRECTORY; |
| |
| |
| /** |
| * Manages the election of ZooKeeper servers, starting/stopping the services, |
| * etc. |
| */ |
| public class ZooKeeperManager { |
| /** Class logger */ |
| private static final Logger LOG = Logger.getLogger(ZooKeeperManager.class); |
| /** Separates the hostname and task in the candidate stamp */ |
| private static final String HOSTNAME_TASK_SEPARATOR = " "; |
| /** The ZooKeeperString filename prefix */ |
| private static final String ZOOKEEPER_SERVER_LIST_FILE_PREFIX = |
| "zkServerList_"; |
| /** Job context (mainly for progress) */ |
| private Mapper<?, ?, ?, ?>.Context context; |
| /** Hadoop configuration */ |
| private final ImmutableClassesGiraphConfiguration conf; |
| /** Task partition, to ensure uniqueness */ |
| private final int taskPartition; |
| /** HDFS base directory for all file-based coordination */ |
| private final Path baseDirectory; |
| /** |
| * HDFS task ZooKeeper candidate/completed |
| * directory for all file-based coordination |
| */ |
| private final Path taskDirectory; |
| /** |
| * HDFS ZooKeeper server ready/done directory |
| * for all file-based coordination |
| */ |
| private final Path serverDirectory; |
| /** HDFS path to whether the task is done */ |
| private final Path myClosedPath; |
| /** Polling msecs timeout */ |
| private final int pollMsecs; |
| /** File system */ |
| private final FileSystem fs; |
| /** Zookeeper wrapper */ |
| private ZooKeeperRunner zkRunner; |
| /** ZooKeeper local file system directory */ |
| private final String zkDir; |
| /** ZooKeeper config file path */ |
| private final ZookeeperConfig config; |
| /** ZooKeeper server host */ |
| private String zkServerHost; |
| /** ZooKeeper server task */ |
| private int zkServerTask; |
| /** ZooKeeper base port */ |
| private int zkBasePort; |
| /** Final ZooKeeper server port list (for clients) */ |
| private String zkServerPortString; |
| /** My hostname */ |
| private String myHostname = null; |
| /** Job id, to ensure uniqueness */ |
| private final String jobId; |
| /** Time object for tracking timeouts */ |
| private final Time time = SystemTime.get(); |
| |
| /** State of the application */ |
| public enum State { |
| /** Failure occurred */ |
| FAILED, |
| /** Application finished */ |
| FINISHED |
| } |
| |
| /** |
| * Constructor with context. |
| * |
| * @param context Context to be stored internally |
| * @param configuration Configuration |
| * @throws IOException |
| */ |
| public ZooKeeperManager(Mapper<?, ?, ?, ?>.Context context, |
| ImmutableClassesGiraphConfiguration configuration) |
| throws IOException { |
| this.context = context; |
| this.conf = configuration; |
| taskPartition = conf.getTaskPartition(); |
| jobId = conf.get("mapred.job.id", "Unknown Job"); |
| baseDirectory = |
| new Path(ZOOKEEPER_MANAGER_DIRECTORY.getWithDefault(conf, |
| getFinalZooKeeperPath())); |
| taskDirectory = new Path(baseDirectory, |
| "_task"); |
| serverDirectory = new Path(baseDirectory, |
| "_zkServer"); |
| myClosedPath = new Path(taskDirectory, |
| (new ComputationDoneName(taskPartition)).getName()); |
| pollMsecs = GiraphConstants.ZOOKEEPER_SERVERLIST_POLL_MSECS.get(conf); |
| String jobLocalDir = conf.get("job.local.dir"); |
| String zkDirDefault; |
| if (jobLocalDir != null) { // for non-local jobs |
| zkDirDefault = jobLocalDir + |
| "/_bspZooKeeper"; |
| } else { |
| zkDirDefault = System.getProperty("user.dir") + "/" + |
| ZOOKEEPER_MANAGER_DIRECTORY.getDefaultValue(); |
| } |
| zkDir = conf.get(GiraphConstants.ZOOKEEPER_DIR, zkDirDefault); |
| config = new ZookeeperConfig(); |
| zkBasePort = GiraphConstants.ZOOKEEPER_SERVER_PORT.get(conf); |
| |
| myHostname = conf.getLocalHostname(); |
| fs = FileSystem.get(conf); |
| } |
| |
| /** |
| * Generate the final ZooKeeper coordination directory on HDFS |
| * |
| * @return directory path with job id |
| */ |
| private String getFinalZooKeeperPath() { |
| return ZOOKEEPER_MANAGER_DIRECTORY.getDefaultValue() + "/" + jobId; |
| } |
| |
| /** |
| * Return the base ZooKeeper ZNode from which all other ZNodes Giraph creates |
| * should be sited, for instance in a multi-tenant ZooKeeper, the znode |
| * reserved for Giraph |
| * |
| * @param conf Necessary to access user-provided values |
| * @return String of path without trailing slash |
| */ |
| public static String getBasePath(Configuration conf) { |
| String result = conf.get(BASE_ZNODE_KEY, ""); |
| if (!result.equals("") && !result.startsWith("/")) { |
| throw new IllegalArgumentException("Value for " + |
| BASE_ZNODE_KEY + " must start with /: " + result); |
| } |
| |
| return result; |
| } |
| |
| /** |
| * Create the candidate stamps and decide on the servers to start if |
| * you are partition 0. |
| * |
| * @throws IOException |
| * @throws InterruptedException |
| */ |
| public void setup() throws IOException, InterruptedException { |
| createCandidateStamp(); |
| getZooKeeperServerList(); |
| } |
| |
| /** |
| * Create a HDFS stamp for this task. If another task already |
| * created it, then this one will fail, which is fine. |
| */ |
| public void createCandidateStamp() { |
| try { |
| fs.mkdirs(baseDirectory); |
| LOG.info("createCandidateStamp: Made the directory " + |
| baseDirectory); |
| } catch (IOException e) { |
| LOG.error("createCandidateStamp: Failed to mkdirs " + |
| baseDirectory); |
| } |
| try { |
| fs.mkdirs(serverDirectory); |
| LOG.info("createCandidateStamp: Made the directory " + |
| serverDirectory); |
| } catch (IOException e) { |
| LOG.error("createCandidateStamp: Failed to mkdirs " + |
| serverDirectory); |
| } |
| // Check that the base directory exists and is a directory |
| try { |
| if (!fs.getFileStatus(baseDirectory).isDir()) { |
| throw new IllegalArgumentException( |
| "createCandidateStamp: " + baseDirectory + |
| " is not a directory, but should be."); |
| } |
| } catch (IOException e) { |
| throw new IllegalArgumentException( |
| "createCandidateStamp: Couldn't get file status " + |
| "for base directory " + baseDirectory + ". If there is an " + |
| "issue with this directory, please set an accesible " + |
| "base directory with the Hadoop configuration option " + |
| ZOOKEEPER_MANAGER_DIRECTORY.getKey(), e); |
| } |
| |
| Path myCandidacyPath = new Path( |
| taskDirectory, myHostname + |
| HOSTNAME_TASK_SEPARATOR + taskPartition); |
| try { |
| if (LOG.isInfoEnabled()) { |
| LOG.info("createCandidateStamp: Creating my filestamp " + |
| myCandidacyPath); |
| } |
| fs.createNewFile(myCandidacyPath); |
| } catch (IOException e) { |
| LOG.error("createCandidateStamp: Failed (maybe previous task " + |
| "failed) to create filestamp " + myCandidacyPath, e); |
| } |
| } |
| |
| /** |
| * Create a new file with retries if it fails. |
| * |
| * @param fs File system where the new file is created |
| * @param path Path of the new file |
| * @param maxAttempts Maximum number of attempts |
| * @param retryWaitMsecs Milliseconds to wait before retrying |
| */ |
| private static void createNewFileWithRetries( |
| FileSystem fs, Path path, int maxAttempts, int retryWaitMsecs) { |
| int attempt = 0; |
| while (attempt < maxAttempts) { |
| try { |
| fs.createNewFile(path); |
| return; |
| } catch (IOException e) { |
| LOG.warn("createNewFileWithRetries: Failed to create file at path " + |
| path + " on attempt " + attempt + " of " + maxAttempts + ".", e); |
| } |
| ++attempt; |
| Uninterruptibles.sleepUninterruptibly( |
| retryWaitMsecs, TimeUnit.MILLISECONDS); |
| } |
| throw new IllegalStateException( |
| "createNewFileWithRetries: Failed to create file at path " + |
| path + " after " + attempt + " attempts"); |
| } |
| |
| /** |
| * Every task must create a stamp to let the ZooKeeper servers know that |
| * they can shutdown. This also lets the task know that it was already |
| * completed. |
| */ |
| private void createZooKeeperClosedStamp() { |
| LOG.info("createZooKeeperClosedStamp: Creating my filestamp " + |
| myClosedPath); |
| createNewFileWithRetries(fs, myClosedPath, |
| conf.getHdfsFileCreationRetries(), |
| conf.getHdfsFileCreationRetryWaitMs()); |
| } |
| |
| /** |
| * Check if all the computation is done. |
| * @return true if all computation is done. |
| */ |
| public boolean computationDone() { |
| try { |
| return fs.exists(myClosedPath); |
| } catch (IOException e) { |
| throw new RuntimeException(e); |
| } |
| } |
| |
| /** |
| * Task 0 will call this to create the ZooKeeper server list. The result is |
| * a file that describes the ZooKeeper servers through the filename. |
| * |
| * @throws IOException |
| * @throws InterruptedException |
| */ |
| private void createZooKeeperServerList() throws IOException, |
| InterruptedException { |
| String host; |
| String task; |
| while (true) { |
| FileStatus [] fileStatusArray = fs.listStatus(taskDirectory); |
| if (fileStatusArray.length > 0) { |
| FileStatus fileStatus = fileStatusArray[0]; |
| String[] hostnameTaskArray = |
| fileStatus.getPath().getName().split( |
| HOSTNAME_TASK_SEPARATOR); |
| checkState(hostnameTaskArray.length == 2, |
| "createZooKeeperServerList: Task 0 failed " + |
| "to parse " + fileStatus.getPath().getName()); |
| host = hostnameTaskArray[0]; |
| task = hostnameTaskArray[1]; |
| break; |
| } |
| Thread.sleep(pollMsecs); |
| } |
| String serverListFile = |
| ZOOKEEPER_SERVER_LIST_FILE_PREFIX + host + |
| HOSTNAME_TASK_SEPARATOR + task; |
| Path serverListPath = |
| new Path(baseDirectory, serverListFile); |
| if (LOG.isInfoEnabled()) { |
| LOG.info("createZooKeeperServerList: Creating the final " + |
| "ZooKeeper file '" + serverListPath + "'"); |
| } |
| fs.createNewFile(serverListPath); |
| } |
| |
| /** |
| * Make an attempt to get the server list file by looking for a file in |
| * the appropriate directory with the prefix |
| * ZOOKEEPER_SERVER_LIST_FILE_PREFIX. |
| * @return null if not found or the filename if found |
| * @throws IOException |
| */ |
| private String getServerListFile() throws IOException { |
| String serverListFile = null; |
| FileStatus [] fileStatusArray = fs.listStatus(baseDirectory); |
| for (FileStatus fileStatus : fileStatusArray) { |
| if (fileStatus.getPath().getName().startsWith( |
| ZOOKEEPER_SERVER_LIST_FILE_PREFIX)) { |
| serverListFile = fileStatus.getPath().getName(); |
| break; |
| } |
| } |
| return serverListFile; |
| } |
| |
| /** |
| * Task 0 is the designated master and will generate the server list |
| * (unless it has already done so). Other |
| * tasks will consume the file after it is created (just the filename). |
| * @throws IOException |
| * @throws InterruptedException |
| */ |
| private void getZooKeeperServerList() throws IOException, |
| InterruptedException { |
| String serverListFile; |
| |
| if (taskPartition == 0) { |
| serverListFile = getServerListFile(); |
| if (serverListFile == null) { |
| createZooKeeperServerList(); |
| } |
| } |
| |
| while (true) { |
| serverListFile = getServerListFile(); |
| if (LOG.isInfoEnabled()) { |
| LOG.info("getZooKeeperServerList: For task " + taskPartition + |
| ", got file '" + serverListFile + |
| "' (polling period is " + |
| pollMsecs + ")"); |
| } |
| if (serverListFile != null) { |
| break; |
| } |
| try { |
| Thread.sleep(pollMsecs); |
| } catch (InterruptedException e) { |
| LOG.warn("getZooKeeperServerList: Strange interrupted " + |
| "exception " + e.getMessage()); |
| } |
| |
| } |
| |
| String[] serverHostList = serverListFile.substring( |
| ZOOKEEPER_SERVER_LIST_FILE_PREFIX.length()).split( |
| HOSTNAME_TASK_SEPARATOR); |
| if (LOG.isInfoEnabled()) { |
| LOG.info("getZooKeeperServerList: Found " + |
| Arrays.toString(serverHostList) + |
| " hosts in filename '" + serverListFile + "'"); |
| } |
| |
| zkServerHost = serverHostList[0]; |
| zkServerTask = Integer.parseInt(serverHostList[1]); |
| updateZkPortString(); |
| } |
| |
| /** |
| * Update zookeeper host:port string. |
| */ |
| private void updateZkPortString() { |
| zkServerPortString = zkServerHost + ":" + zkBasePort; |
| } |
| |
| /** |
| * Users can get the server port string to connect to ZooKeeper |
| * @return server port string - comma separated |
| */ |
| public String getZooKeeperServerPortString() { |
| return zkServerPortString; |
| } |
| |
| /** |
| * Whoever is elected to be a ZooKeeper server must generate a config file |
| * locally. |
| * |
| */ |
| private void generateZooKeeperConfig() { |
| if (LOG.isInfoEnabled()) { |
| LOG.info("generateZooKeeperConfig: with base port " + |
| zkBasePort); |
| } |
| File zkDirFile = new File(this.zkDir); |
| boolean mkDirRet = zkDirFile.mkdirs(); |
| if (LOG.isInfoEnabled()) { |
| LOG.info("generateZooKeeperConfigFile: Make directory of " + |
| zkDirFile.getName() + " = " + mkDirRet); |
| } |
| |
| config.setDataDir(zkDir); |
| config.setDataLogDir(zkDir); |
| config.setClientPortAddress(new InetSocketAddress(zkBasePort)); |
| config.setMinSessionTimeout(conf.getZooKeeperMinSessionTimeout()); |
| config.setMaxSessionTimeout(conf.getZooKeeperMaxSessionTimeout()); |
| } |
| |
| /** |
| * If this task has been selected, online a ZooKeeper server. Otherwise, |
| * wait until this task knows that the ZooKeeper servers have been onlined. |
| */ |
| public void onlineZooKeeperServer() { |
| if (zkServerTask == taskPartition) { |
| File zkDirFile = new File(this.zkDir); |
| try { |
| if (LOG.isInfoEnabled()) { |
| LOG.info("onlineZooKeeperServers: Trying to delete old " + |
| "directory " + this.zkDir); |
| } |
| FileUtils.deleteDirectory(zkDirFile); |
| } catch (IOException e) { |
| LOG.warn("onlineZooKeeperServers: Failed to delete " + |
| "directory " + this.zkDir, e); |
| } |
| generateZooKeeperConfig(); |
| synchronized (this) { |
| zkRunner = createRunner(); |
| int port = zkRunner.start(zkDir, config); |
| if (port > 0) { |
| zkBasePort = port; |
| updateZkPortString(); |
| } |
| } |
| |
| // Once the server is up and running, notify that this server is up |
| // and running by dropping a ready stamp. |
| int connectAttempts = 0; |
| final int maxConnectAttempts = |
| conf.getZookeeperConnectionAttempts(); |
| while (connectAttempts < maxConnectAttempts) { |
| try { |
| if (LOG.isInfoEnabled()) { |
| LOG.info("onlineZooKeeperServers: Connect attempt " + |
| connectAttempts + " of " + |
| maxConnectAttempts + |
| " max trying to connect to " + |
| myHostname + ":" + zkBasePort + |
| " with poll msecs = " + pollMsecs); |
| } |
| InetSocketAddress zkServerAddress = |
| new InetSocketAddress(myHostname, zkBasePort); |
| Socket testServerSock = new Socket(); |
| testServerSock.connect(zkServerAddress, 5000); |
| if (LOG.isInfoEnabled()) { |
| LOG.info("onlineZooKeeperServers: Connected to " + |
| zkServerAddress + "!"); |
| } |
| break; |
| } catch (SocketTimeoutException e) { |
| LOG.warn("onlineZooKeeperServers: Got " + |
| "SocketTimeoutException", e); |
| } catch (ConnectException e) { |
| LOG.warn("onlineZooKeeperServers: Got " + |
| "ConnectException", e); |
| } catch (IOException e) { |
| LOG.warn("onlineZooKeeperServers: Got " + |
| "IOException", e); |
| } |
| |
| ++connectAttempts; |
| try { |
| Thread.sleep(pollMsecs); |
| } catch (InterruptedException e) { |
| LOG.warn("onlineZooKeeperServers: Sleep of " + pollMsecs + |
| " interrupted - " + e.getMessage()); |
| } |
| } |
| if (connectAttempts == maxConnectAttempts) { |
| throw new IllegalStateException( |
| "onlineZooKeeperServers: Failed to connect in " + |
| connectAttempts + " tries!"); |
| } |
| Path myReadyPath = new Path( |
| serverDirectory, myHostname + |
| HOSTNAME_TASK_SEPARATOR + taskPartition + |
| HOSTNAME_TASK_SEPARATOR + zkBasePort); |
| try { |
| if (LOG.isInfoEnabled()) { |
| LOG.info("onlineZooKeeperServers: Creating my filestamp " + |
| myReadyPath); |
| } |
| fs.createNewFile(myReadyPath); |
| } catch (IOException e) { |
| LOG.error("onlineZooKeeperServers: Failed (maybe previous " + |
| "task failed) to create filestamp " + myReadyPath, e); |
| } |
| } else { |
| int readyRetrievalAttempt = 0; |
| String foundServer = null; |
| while (true) { |
| try { |
| FileStatus [] fileStatusArray = |
| fs.listStatus(serverDirectory); |
| if ((fileStatusArray != null) && |
| (fileStatusArray.length > 0)) { |
| for (int i = 0; i < fileStatusArray.length; ++i) { |
| String[] hostnameTaskArray = |
| fileStatusArray[i].getPath().getName().split( |
| HOSTNAME_TASK_SEPARATOR); |
| if (hostnameTaskArray.length != 3) { |
| throw new RuntimeException( |
| "getZooKeeperServerList: Task 0 failed " + |
| "to parse " + |
| fileStatusArray[i].getPath().getName()); |
| } |
| foundServer = hostnameTaskArray[0]; |
| zkBasePort = Integer.parseInt(hostnameTaskArray[2]); |
| updateZkPortString(); |
| } |
| if (LOG.isInfoEnabled()) { |
| LOG.info("onlineZooKeeperServers: Got " + |
| foundServer + " on port " + |
| zkBasePort + |
| " (polling period is " + |
| pollMsecs + ") on attempt " + |
| readyRetrievalAttempt); |
| } |
| if (zkServerHost.equals(foundServer)) { |
| break; |
| } |
| } else { |
| if (LOG.isInfoEnabled()) { |
| LOG.info("onlineZooKeeperServers: Empty " + |
| "directory " + serverDirectory + |
| ", waiting " + pollMsecs + " msecs."); |
| } |
| } |
| Thread.sleep(pollMsecs); |
| ++readyRetrievalAttempt; |
| } catch (IOException e) { |
| throw new RuntimeException(e); |
| } catch (InterruptedException e) { |
| LOG.warn("onlineZooKeeperServers: Strange interrupt from " + |
| e.getMessage(), e); |
| } |
| } |
| } |
| } |
| |
| /** |
| * Wait for all workers to signal completion. Will wait up to |
| * WAIT_TASK_DONE_TIMEOUT_MS milliseconds for this to complete before |
| * reporting an error. |
| * |
| * @param totalWorkers Number of workers to wait for |
| */ |
| private void waitUntilAllTasksDone(int totalWorkers) { |
| int attempt = 0; |
| long maxMs = time.getMilliseconds() + |
| conf.getWaitTaskDoneTimeoutMs(); |
| while (true) { |
| boolean[] taskDoneArray = new boolean[totalWorkers]; |
| try { |
| FileStatus [] fileStatusArray = |
| fs.listStatus(taskDirectory); |
| int totalDone = 0; |
| if (fileStatusArray.length > 0) { |
| for (FileStatus fileStatus : fileStatusArray) { |
| String name = fileStatus.getPath().getName(); |
| if (ComputationDoneName.isName(name)) { |
| ++totalDone; |
| taskDoneArray[ComputationDoneName.fromName( |
| name).getWorkerId()] = true; |
| } |
| } |
| } |
| if (LOG.isInfoEnabled()) { |
| LOG.info("waitUntilAllTasksDone: Got " + totalDone + |
| " and " + totalWorkers + |
| " desired (polling period is " + |
| pollMsecs + ") on attempt " + |
| attempt); |
| } |
| if (totalDone >= totalWorkers) { |
| break; |
| } else { |
| StringBuilder sb = new StringBuilder(); |
| for (int i = 0; i < taskDoneArray.length; ++i) { |
| if (!taskDoneArray[i]) { |
| sb.append(i).append(", "); |
| } |
| } |
| LOG.info("waitUntilAllTasksDone: Still waiting on tasks " + |
| sb.toString()); |
| } |
| ++attempt; |
| Thread.sleep(pollMsecs); |
| context.progress(); |
| } catch (IOException e) { |
| LOG.warn("waitUntilAllTasksDone: Got IOException.", e); |
| } catch (InterruptedException e) { |
| LOG.warn("waitUntilAllTasksDone: Got InterruptedException", e); |
| } |
| |
| if (time.getMilliseconds() > maxMs) { |
| throw new IllegalStateException("waitUntilAllTasksDone: Tasks " + |
| "did not finish by the maximum time of " + |
| conf.getWaitTaskDoneTimeoutMs() + " milliseconds"); |
| } |
| } |
| } |
| |
| /** |
| * Notify the ZooKeeper servers that this partition is done with all |
| * ZooKeeper communication. If this task is running a ZooKeeper server, |
| * kill it when all partitions are done and wait for |
| * completion. Clean up the ZooKeeper local directory as well. |
| * |
| * @param state State of the application |
| */ |
| public void offlineZooKeeperServers(State state) { |
| if (state == State.FINISHED) { |
| createZooKeeperClosedStamp(); |
| } |
| synchronized (this) { |
| if (zkRunner != null) { |
| boolean isYarnJob = GiraphConstants.IS_PURE_YARN_JOB.get(conf); |
| int totalWorkers = conf.getMapTasks(); |
| // A Yarn job always spawns MAX_WORKERS + 1 containers |
| if (isYarnJob) { |
| totalWorkers = conf.getInt(GiraphConstants.MAX_WORKERS, 0) + 1; |
| } |
| LOG.info("offlineZooKeeperServers: Will wait for " + |
| totalWorkers + " tasks"); |
| waitUntilAllTasksDone(totalWorkers); |
| zkRunner.stop(); |
| File zkDirFile; |
| try { |
| zkDirFile = new File(zkDir); |
| FileUtils.deleteDirectory(zkDirFile); |
| } catch (IOException e) { |
| LOG.warn("offlineZooKeeperSevers: " + |
| "IOException, but continuing", |
| e); |
| } |
| if (LOG.isInfoEnabled()) { |
| LOG.info("offlineZooKeeperServers: deleted directory " + zkDir); |
| } |
| zkRunner = null; |
| } |
| } |
| } |
| |
| /** |
| * Create appropriate zookeeper wrapper depending on configuration. |
| * Zookeeper can run in master process or outside as a separate |
| * java process. |
| * |
| * @return either in process or out of process wrapper. |
| */ |
| private ZooKeeperRunner createRunner() { |
| ZooKeeperRunner runner = new InProcessZooKeeperRunner(); |
| runner.setConf(conf); |
| return runner; |
| } |
| |
| /** |
| * Is this task running a ZooKeeper server? Only could be true if called |
| * after onlineZooKeeperServers(). |
| * |
| * @return true if running a ZooKeeper server, false otherwise |
| */ |
| public boolean runsZooKeeper() { |
| synchronized (this) { |
| return zkRunner != null; |
| } |
| } |
| |
| /** |
| * Do necessary cleanup in zookeeper wrapper. |
| */ |
| public void cleanup() { |
| synchronized (this) { |
| if (zkRunner != null) { |
| zkRunner.cleanup(); |
| } |
| } |
| } |
| } |