| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.hadoop.hdfs.qjournal.server; |
| |
| import org.apache.hadoop.classification.VisibleForTesting; |
| import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableList; |
| import org.apache.hadoop.classification.InterfaceAudience; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.fs.FileUtil; |
| import org.apache.hadoop.hdfs.DFSConfigKeys; |
| |
| import org.apache.hadoop.hdfs.DFSUtilClient; |
| import org.apache.hadoop.hdfs.protocolPB.PBHelper; |
| import org.apache.hadoop.hdfs.qjournal.protocol.InterQJournalProtocol; |
| import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetEditLogManifestResponseProto; |
| import org.apache.hadoop.hdfs.qjournal.protocolPB.InterQJournalProtocolPB; |
| import org.apache.hadoop.hdfs.qjournal.protocolPB.InterQJournalProtocolTranslatorPB; |
| import org.apache.hadoop.hdfs.server.common.Util; |
| import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; |
| import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog; |
| import org.apache.hadoop.hdfs.util.DataTransferThrottler; |
| import org.apache.hadoop.ipc.ProtobufRpcEngine2; |
| import org.apache.hadoop.ipc.RPC; |
| import org.apache.hadoop.security.SecurityUtil; |
| import org.apache.hadoop.security.UserGroupInformation; |
| import org.apache.hadoop.util.Daemon; |
| import org.apache.hadoop.util.Lists; |
| import org.apache.hadoop.util.Sets; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import java.io.File; |
| import java.io.IOException; |
| import java.net.InetSocketAddress; |
| import java.net.MalformedURLException; |
| import java.net.URI; |
| import java.net.URISyntaxException; |
| import java.net.URL; |
| import java.security.PrivilegedExceptionAction; |
| import java.util.Collection; |
| import java.util.HashSet; |
| import java.util.List; |
| import java.util.Set; |
| |
| /** |
| * A Journal Sync thread runs through the lifetime of the JN. It periodically |
| * gossips with other journal nodes to compare edit log manifests and if it |
| * detects any missing log segment, it downloads it from the other journal node |
| */ |
| @InterfaceAudience.Private |
| public class JournalNodeSyncer { |
| public static final Logger LOG = LoggerFactory.getLogger( |
| JournalNodeSyncer.class); |
| private final JournalNode jn; |
| private final Journal journal; |
| private final String jid; |
| private String nameServiceId; |
| private final JNStorage jnStorage; |
| private final Configuration conf; |
| private volatile Daemon syncJournalDaemon; |
| private volatile boolean shouldSync = true; |
| |
| private List<JournalNodeProxy> otherJNProxies = Lists.newArrayList(); |
| private int numOtherJNs; |
| private int journalNodeIndexForSync = 0; |
| private final long journalSyncInterval; |
| private final int logSegmentTransferTimeout; |
| private final DataTransferThrottler throttler; |
| private final JournalMetrics metrics; |
| private boolean journalSyncerStarted; |
| |
| JournalNodeSyncer(JournalNode jouranlNode, Journal journal, String jid, |
| Configuration conf, String nameServiceId) { |
| this.jn = jouranlNode; |
| this.journal = journal; |
| this.jid = jid; |
| this.nameServiceId = nameServiceId; |
| this.jnStorage = journal.getStorage(); |
| this.conf = conf; |
| journalSyncInterval = conf.getLong( |
| DFSConfigKeys.DFS_JOURNALNODE_SYNC_INTERVAL_KEY, |
| DFSConfigKeys.DFS_JOURNALNODE_SYNC_INTERVAL_DEFAULT); |
| logSegmentTransferTimeout = conf.getInt( |
| DFSConfigKeys.DFS_EDIT_LOG_TRANSFER_TIMEOUT_KEY, |
| DFSConfigKeys.DFS_EDIT_LOG_TRANSFER_TIMEOUT_DEFAULT); |
| throttler = getThrottler(conf); |
| metrics = journal.getMetrics(); |
| journalSyncerStarted = false; |
| } |
| |
| void stopSync() { |
| shouldSync = false; |
| // Delete the edits.sync directory |
| File editsSyncDir = journal.getStorage().getEditsSyncDir(); |
| if (editsSyncDir.exists()) { |
| FileUtil.fullyDelete(editsSyncDir); |
| } |
| if (syncJournalDaemon != null) { |
| syncJournalDaemon.interrupt(); |
| } |
| } |
| |
| public void start(String nsId) { |
| if (nsId != null) { |
| this.nameServiceId = nsId; |
| journal.setTriedJournalSyncerStartedwithnsId(true); |
| } |
| if (!journalSyncerStarted && getOtherJournalNodeProxies()) { |
| LOG.info("Starting SyncJournal daemon for journal " + jid); |
| startSyncJournalsDaemon(); |
| journalSyncerStarted = true; |
| } |
| |
| } |
| |
| public boolean isJournalSyncerStarted() { |
| return journalSyncerStarted; |
| } |
| |
| private boolean createEditsSyncDir() { |
| File editsSyncDir = journal.getStorage().getEditsSyncDir(); |
| if (editsSyncDir.exists()) { |
| LOG.info(editsSyncDir + " directory already exists."); |
| return true; |
| } |
| return editsSyncDir.mkdir(); |
| } |
| |
| private boolean getOtherJournalNodeProxies() { |
| List<InetSocketAddress> otherJournalNodes = getOtherJournalNodeAddrs(); |
| if (otherJournalNodes == null || otherJournalNodes.isEmpty()) { |
| LOG.warn("Other JournalNode addresses not available. Journal Syncing " + |
| "cannot be done"); |
| return false; |
| } |
| for (InetSocketAddress addr : otherJournalNodes) { |
| try { |
| otherJNProxies.add(new JournalNodeProxy(addr)); |
| } catch (IOException e) { |
| LOG.warn("Could not add proxy for Journal at addresss " + addr, e); |
| } |
| } |
| // Check if there are any other JournalNodes before starting the sync. Although some proxies |
| // may be unresolved now, the act of attempting to sync will instigate resolution when the |
| // servers become available. |
| if (otherJNProxies.isEmpty()) { |
| LOG.error("Cannot sync as there is no other JN available for sync."); |
| return false; |
| } |
| numOtherJNs = otherJNProxies.size(); |
| return true; |
| } |
| |
| private void startSyncJournalsDaemon() { |
| syncJournalDaemon = new Daemon(() -> { |
| // Wait for journal to be formatted to create edits.sync directory |
| while(!journal.isFormatted()) { |
| try { |
| Thread.sleep(journalSyncInterval); |
| } catch (InterruptedException e) { |
| LOG.error("JournalNodeSyncer daemon received Runtime exception.", e); |
| Thread.currentThread().interrupt(); |
| return; |
| } |
| } |
| if (!createEditsSyncDir()) { |
| LOG.error("Failed to create directory for downloading log " + |
| "segments: {}. Stopping Journal Node Sync.", |
| journal.getStorage().getEditsSyncDir()); |
| return; |
| } |
| while(shouldSync) { |
| try { |
| if (!journal.isFormatted()) { |
| LOG.warn("Journal cannot sync. Not formatted."); |
| } else { |
| syncJournals(); |
| } |
| } catch (Throwable t) { |
| if (!shouldSync) { |
| if (t instanceof InterruptedException) { |
| LOG.info("Stopping JournalNode Sync."); |
| Thread.currentThread().interrupt(); |
| return; |
| } else { |
| LOG.warn("JournalNodeSyncer received an exception while " + |
| "shutting down.", t); |
| } |
| break; |
| } else { |
| if (t instanceof InterruptedException) { |
| LOG.warn("JournalNodeSyncer interrupted", t); |
| Thread.currentThread().interrupt(); |
| return; |
| } |
| } |
| LOG.error( |
| "JournalNodeSyncer daemon received Runtime exception. ", t); |
| } |
| try { |
| Thread.sleep(journalSyncInterval); |
| } catch (InterruptedException e) { |
| if (!shouldSync) { |
| LOG.info("Stopping JournalNode Sync."); |
| } else { |
| LOG.warn("JournalNodeSyncer interrupted", e); |
| } |
| Thread.currentThread().interrupt(); |
| return; |
| } |
| } |
| }); |
| syncJournalDaemon.start(); |
| } |
| |
| private void syncJournals() { |
| syncWithJournalAtIndex(journalNodeIndexForSync); |
| journalNodeIndexForSync = (journalNodeIndexForSync + 1) % numOtherJNs; |
| } |
| |
| private void syncWithJournalAtIndex(int index) { |
| LOG.info("Syncing Journal " + jn.getBoundIpcAddress().getAddress() + ":" |
| + jn.getBoundIpcAddress().getPort() + " with " |
| + otherJNProxies.get(index) + ", journal id: " + jid); |
| final InterQJournalProtocol jnProxy = otherJNProxies.get(index).jnProxy; |
| if (jnProxy == null) { |
| LOG.error("JournalNode Proxy not found."); |
| return; |
| } |
| |
| List<RemoteEditLog> thisJournalEditLogs; |
| try { |
| thisJournalEditLogs = journal.getEditLogManifest(0, false).getLogs(); |
| } catch (IOException e) { |
| LOG.error("Exception in getting local edit log manifest", e); |
| return; |
| } |
| |
| GetEditLogManifestResponseProto editLogManifest; |
| try { |
| editLogManifest = jnProxy.getEditLogManifestFromJournal(jid, |
| nameServiceId, 0, false); |
| } catch (IOException e) { |
| LOG.debug("Could not sync with Journal at {}.", |
| otherJNProxies.get(journalNodeIndexForSync), e); |
| return; |
| } |
| |
| getMissingLogSegments(thisJournalEditLogs, editLogManifest, |
| otherJNProxies.get(index)); |
| } |
| |
| private List<InetSocketAddress> getOtherJournalNodeAddrs() { |
| String uriStr = ""; |
| try { |
| uriStr = conf.getTrimmed(DFSConfigKeys.DFS_NAMENODE_SHARED_EDITS_DIR_KEY); |
| |
| if (uriStr == null || uriStr.isEmpty()) { |
| if (nameServiceId != null) { |
| uriStr = conf.getTrimmed(DFSConfigKeys |
| .DFS_NAMENODE_SHARED_EDITS_DIR_KEY + "." + nameServiceId); |
| } |
| } |
| |
| if (uriStr == null || uriStr.isEmpty()) { |
| HashSet<String> sharedEditsUri = new HashSet<>(); |
| if (nameServiceId != null) { |
| Collection<String> nnIds = DFSUtilClient.getNameNodeIds( |
| conf, nameServiceId); |
| for (String nnId : nnIds) { |
| String suffix = nameServiceId + "." + nnId; |
| uriStr = conf.getTrimmed(DFSConfigKeys |
| .DFS_NAMENODE_SHARED_EDITS_DIR_KEY + "." + suffix); |
| sharedEditsUri.add(uriStr); |
| } |
| if (sharedEditsUri.size() > 1) { |
| uriStr = null; |
| LOG.error("The conf property " + DFSConfigKeys |
| .DFS_NAMENODE_SHARED_EDITS_DIR_KEY + " not set properly, " + |
| "it has been configured with different journalnode values " + |
| sharedEditsUri.toString() + " for a" + |
| " single nameserviceId" + nameServiceId); |
| } |
| } |
| } |
| |
| if (uriStr == null || uriStr.isEmpty()) { |
| LOG.error("Could not construct Shared Edits Uri"); |
| return null; |
| } else { |
| return getJournalAddrList(uriStr); |
| } |
| |
| } catch (URISyntaxException e) { |
| LOG.error("The conf property " + DFSConfigKeys |
| .DFS_NAMENODE_SHARED_EDITS_DIR_KEY + " not set properly."); |
| } catch (IOException e) { |
| LOG.error("Could not parse JournalNode addresses: " + uriStr); |
| } |
| return null; |
| } |
| |
| @VisibleForTesting |
| protected List<InetSocketAddress> getJournalAddrList(String uriStr) throws |
| URISyntaxException, |
| IOException { |
| URI uri = new URI(uriStr); |
| |
| InetSocketAddress boundIpcAddress = jn.getBoundIpcAddress(); |
| Set<InetSocketAddress> excluded = Sets.newHashSet(boundIpcAddress); |
| List<InetSocketAddress> addrList = Util.getLoggerAddresses(uri, excluded, conf); |
| |
| // Exclude the current JournalNode instance (a local address and the same port). If the address |
| // is bound to a local address on the same port, then remove it to handle scenarios where a |
| // wildcard address (e.g. "0.0.0.0") is used. We can't simply exclude all local addresses |
| // since we may be running multiple servers on the same host. |
| addrList.removeIf(addr -> !addr.isUnresolved() && addr.getAddress().isAnyLocalAddress() |
| && boundIpcAddress.getPort() == addr.getPort()); |
| |
| return addrList; |
| } |
| |
| private void getMissingLogSegments(List<RemoteEditLog> thisJournalEditLogs, |
| GetEditLogManifestResponseProto response, |
| JournalNodeProxy remoteJNproxy) { |
| |
| List<RemoteEditLog> otherJournalEditLogs = PBHelper.convert( |
| response.getManifest()).getLogs(); |
| if (otherJournalEditLogs == null || otherJournalEditLogs.isEmpty()) { |
| LOG.warn("Journal at " + remoteJNproxy.jnAddr + " has no edit logs"); |
| return; |
| } |
| List<RemoteEditLog> missingLogs = getMissingLogList(thisJournalEditLogs, |
| otherJournalEditLogs); |
| |
| if (!missingLogs.isEmpty()) { |
| NamespaceInfo nsInfo = jnStorage.getNamespaceInfo(); |
| |
| for (RemoteEditLog missingLog : missingLogs) { |
| URL url = null; |
| boolean success = false; |
| try { |
| if (remoteJNproxy.httpServerUrl == null) { |
| if (response.hasFromURL()) { |
| remoteJNproxy.httpServerUrl = getHttpServerURI( |
| response.getFromURL(), remoteJNproxy.jnAddr.getHostName()); |
| } else { |
| LOG.error("EditLogManifest response does not have fromUrl " + |
| "field set. Aborting current sync attempt"); |
| break; |
| } |
| } |
| |
| String urlPath = GetJournalEditServlet.buildPath(jid, missingLog |
| .getStartTxId(), nsInfo, false); |
| url = new URL(remoteJNproxy.httpServerUrl, urlPath); |
| success = downloadMissingLogSegment(url, missingLog); |
| } catch (URISyntaxException e) { |
| LOG.error("EditLogManifest's fromUrl field syntax incorrect", e); |
| } catch (MalformedURLException e) { |
| LOG.error("MalformedURL when download missing log segment", e); |
| } catch (Exception e) { |
| LOG.error("Exception in downloading missing log segment from url " + |
| url, e); |
| } |
| if (!success) { |
| LOG.error("Aborting current sync attempt."); |
| break; |
| } |
| } |
| } |
| } |
| |
| /** |
| * Returns the logs present in otherJournalEditLogs and missing from |
| * thisJournalEditLogs. |
| */ |
| private List<RemoteEditLog> getMissingLogList( |
| List<RemoteEditLog> thisJournalEditLogs, |
| List<RemoteEditLog> otherJournalEditLogs) { |
| if (thisJournalEditLogs.isEmpty()) { |
| return otherJournalEditLogs; |
| } |
| |
| List<RemoteEditLog> missingEditLogs = Lists.newArrayList(); |
| |
| int localJnIndex = 0, remoteJnIndex = 0; |
| int localJnNumLogs = thisJournalEditLogs.size(); |
| int remoteJnNumLogs = otherJournalEditLogs.size(); |
| |
| while (localJnIndex < localJnNumLogs && remoteJnIndex < remoteJnNumLogs) { |
| long localJNstartTxId = thisJournalEditLogs.get(localJnIndex) |
| .getStartTxId(); |
| long remoteJNstartTxId = otherJournalEditLogs.get(remoteJnIndex) |
| .getStartTxId(); |
| |
| if (localJNstartTxId == remoteJNstartTxId) { |
| localJnIndex++; |
| remoteJnIndex++; |
| } else if (localJNstartTxId > remoteJNstartTxId) { |
| missingEditLogs.add(otherJournalEditLogs.get(remoteJnIndex)); |
| remoteJnIndex++; |
| } else { |
| localJnIndex++; |
| } |
| } |
| |
| if (remoteJnIndex < remoteJnNumLogs) { |
| for (; remoteJnIndex < remoteJnNumLogs; remoteJnIndex++) { |
| missingEditLogs.add(otherJournalEditLogs.get(remoteJnIndex)); |
| } |
| } |
| |
| return missingEditLogs; |
| } |
| |
| private URL getHttpServerURI(String fromUrl, String hostAddr) |
| throws URISyntaxException, MalformedURLException { |
| URI uri = new URI(fromUrl); |
| return new URL(uri.getScheme(), hostAddr, uri.getPort(), ""); |
| } |
| |
| /** |
| * Transfer an edit log from one journal node to another for sync-up. |
| */ |
| private boolean downloadMissingLogSegment(URL url, RemoteEditLog log) |
| throws IOException { |
| LOG.info("Downloading missing Edit Log from " + url + " to " + jnStorage |
| .getRoot()); |
| |
| assert log.getStartTxId() > 0 && log.getEndTxId() > 0 : "bad log: " + log; |
| File finalEditsFile = jnStorage.getFinalizedEditsFile(log.getStartTxId(), |
| log.getEndTxId()); |
| |
| if (finalEditsFile.exists() && FileUtil.canRead(finalEditsFile)) { |
| LOG.info("Skipping download of remote edit log " + log + " since it's" + |
| " already stored locally at " + finalEditsFile); |
| return true; |
| } |
| |
| // Download the log segment to current.tmp directory first. |
| File tmpEditsFile = jnStorage.getTemporaryEditsFile( |
| log.getStartTxId(), log.getEndTxId()); |
| |
| if (!SecurityUtil.doAsLoginUser(() -> { |
| if (UserGroupInformation.isSecurityEnabled()) { |
| UserGroupInformation.getCurrentUser().checkTGTAndReloginFromKeytab(); |
| } |
| try { |
| Util.doGetUrl(url, ImmutableList.of(tmpEditsFile), jnStorage, false, |
| logSegmentTransferTimeout, throttler); |
| } catch (IOException e) { |
| LOG.error("Download of Edit Log file for Syncing failed. Deleting temp " |
| + "file: " + tmpEditsFile, e); |
| if (!tmpEditsFile.delete()) { |
| LOG.warn("Deleting " + tmpEditsFile + " has failed"); |
| } |
| return false; |
| } |
| return true; |
| })) { |
| return false; |
| } |
| LOG.info("Downloaded file " + tmpEditsFile.getName() + " of size " + |
| tmpEditsFile.length() + " bytes."); |
| |
| boolean moveSuccess = false; |
| try { |
| moveSuccess = journal.moveTmpSegmentToCurrent(tmpEditsFile, |
| finalEditsFile, log.getEndTxId()); |
| } catch (IOException e) { |
| LOG.info("Could not move {} to current directory.", tmpEditsFile); |
| } finally { |
| if (tmpEditsFile.exists() && !tmpEditsFile.delete()) { |
| LOG.warn("Deleting " + tmpEditsFile + " has failed"); |
| } |
| } |
| if (moveSuccess) { |
| metrics.incrNumEditLogsSynced(); |
| return true; |
| } else { |
| return false; |
| } |
| } |
| |
| private static DataTransferThrottler getThrottler(Configuration conf) { |
| long transferBandwidth = |
| conf.getLong(DFSConfigKeys.DFS_EDIT_LOG_TRANSFER_RATE_KEY, |
| DFSConfigKeys.DFS_EDIT_LOG_TRANSFER_RATE_DEFAULT); |
| DataTransferThrottler throttler = null; |
| if (transferBandwidth > 0) { |
| throttler = new DataTransferThrottler(transferBandwidth); |
| } |
| return throttler; |
| } |
| |
| private class JournalNodeProxy { |
| private final InetSocketAddress jnAddr; |
| private final InterQJournalProtocol jnProxy; |
| private URL httpServerUrl; |
| |
| JournalNodeProxy(InetSocketAddress jnAddr) throws IOException { |
| final Configuration confCopy = new Configuration(conf); |
| this.jnAddr = jnAddr; |
| this.jnProxy = SecurityUtil.doAsLoginUser( |
| new PrivilegedExceptionAction<InterQJournalProtocol>() { |
| @Override |
| public InterQJournalProtocol run() throws IOException { |
| RPC.setProtocolEngine(confCopy, InterQJournalProtocolPB.class, |
| ProtobufRpcEngine2.class); |
| InterQJournalProtocolPB interQJournalProtocolPB = RPC.getProxy( |
| InterQJournalProtocolPB.class, |
| RPC.getProtocolVersion(InterQJournalProtocolPB.class), |
| jnAddr, confCopy); |
| return new InterQJournalProtocolTranslatorPB( |
| interQJournalProtocolPB); |
| } |
| }); |
| } |
| |
| @Override |
| public String toString() { |
| return jnAddr.toString(); |
| } |
| } |
| } |