| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.hadoop.yarn.server.resourcemanager.recovery; |
| |
| import static org.fusesource.leveldbjni.JniDBFactory.asString; |
| import static org.fusesource.leveldbjni.JniDBFactory.bytes; |
| |
| import java.io.ByteArrayInputStream; |
| import java.io.ByteArrayOutputStream; |
| import java.io.DataInputStream; |
| import java.io.DataOutputStream; |
| import java.io.File; |
| import java.io.IOException; |
| import java.security.PrivateKey; |
| import java.security.cert.X509Certificate; |
| import java.util.HashMap; |
| import java.util.Map.Entry; |
| |
| import org.apache.hadoop.yarn.server.resourcemanager.DBManager; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.fs.FileSystem; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.fs.permission.FsPermission; |
| import org.apache.hadoop.io.IOUtils; |
| import org.apache.hadoop.security.token.delegation.DelegationKey; |
| import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; |
| import org.apache.hadoop.yarn.api.records.ApplicationId; |
| import org.apache.hadoop.yarn.api.records.ReservationId; |
| import org.apache.hadoop.yarn.conf.YarnConfiguration; |
| import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; |
| import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.AMRMTokenSecretManagerStateProto; |
| import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.EpochProto; |
| import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.ApplicationAttemptStateDataProto; |
| import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.ApplicationStateDataProto; |
| import org.apache.hadoop.yarn.proto.YarnProtos.ReservationAllocationStateProto; |
| import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; |
| import org.apache.hadoop.yarn.server.records.Version; |
| import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.AMRMTokenSecretManagerState; |
| import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData; |
| import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData; |
| import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.Epoch; |
| import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMDelegationTokenIdentifierData; |
| import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.AMRMTokenSecretManagerStatePBImpl; |
| import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl; |
| import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl; |
| import org.apache.hadoop.yarn.server.utils.LeveldbIterator; |
| import org.iq80.leveldb.DB; |
| import org.iq80.leveldb.DBException; |
| import org.iq80.leveldb.Options; |
| import org.iq80.leveldb.WriteBatch; |
| |
| import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting; |
| |
| /** |
| * Changes from 1.0 to 1.1, Addition of ReservationSystem state. |
| */ |
| public class LeveldbRMStateStore extends RMStateStore { |
| |
| public static final Logger LOG = |
| LoggerFactory.getLogger(LeveldbRMStateStore.class); |
| |
| private static final String SEPARATOR = "/"; |
| private static final String DB_NAME = "yarn-rm-state"; |
| private static final String RM_DT_MASTER_KEY_KEY_PREFIX = |
| RM_DT_SECRET_MANAGER_ROOT + SEPARATOR + DELEGATION_KEY_PREFIX; |
| private static final String RM_DT_TOKEN_KEY_PREFIX = |
| RM_DT_SECRET_MANAGER_ROOT + SEPARATOR + DELEGATION_TOKEN_PREFIX; |
| private static final String RM_DT_SEQUENCE_NUMBER_KEY = |
| RM_DT_SECRET_MANAGER_ROOT + SEPARATOR + "RMDTSequentialNumber"; |
| private static final String RM_APP_KEY_PREFIX = |
| RM_APP_ROOT + SEPARATOR + ApplicationId.appIdStrPrefix; |
| private static final String RM_RESERVATION_KEY_PREFIX = |
| RESERVATION_SYSTEM_ROOT + SEPARATOR; |
| |
| private static final Version CURRENT_VERSION_INFO = Version |
| .newInstance(1, 1); |
| |
| private DB db; |
| private DBManager dbManager = new DBManager(); |
| private long compactionIntervalMsec; |
| |
| private String getApplicationNodeKey(ApplicationId appId) { |
| return RM_APP_ROOT + SEPARATOR + appId; |
| } |
| |
| private String getApplicationAttemptNodeKey(ApplicationAttemptId attemptId) { |
| return getApplicationAttemptNodeKey( |
| getApplicationNodeKey(attemptId.getApplicationId()), attemptId); |
| } |
| |
| private String getApplicationAttemptNodeKey(String appNodeKey, |
| ApplicationAttemptId attemptId) { |
| return appNodeKey + SEPARATOR + attemptId; |
| } |
| |
| private String getRMDTMasterKeyNodeKey(DelegationKey masterKey) { |
| return RM_DT_MASTER_KEY_KEY_PREFIX + masterKey.getKeyId(); |
| } |
| |
| private String getRMDTTokenNodeKey(RMDelegationTokenIdentifier tokenId) { |
| return RM_DT_TOKEN_KEY_PREFIX + tokenId.getSequenceNumber(); |
| } |
| |
| private String getReservationNodeKey(String planName, |
| String reservationId) { |
| return RESERVATION_SYSTEM_ROOT + SEPARATOR + planName + SEPARATOR |
| + reservationId; |
| } |
| |
| private String getProxyCACertNodeKey() { |
| return PROXY_CA_ROOT + SEPARATOR + PROXY_CA_CERT_NODE; |
| } |
| |
| private String getProxyCAPrivateKeyNodeKey() { |
| return PROXY_CA_ROOT + SEPARATOR + PROXY_CA_PRIVATE_KEY_NODE; |
| } |
| |
| @Override |
| protected void initInternal(Configuration conf) { |
| compactionIntervalMsec = conf.getLong( |
| YarnConfiguration.RM_LEVELDB_COMPACTION_INTERVAL_SECS, |
| YarnConfiguration.DEFAULT_RM_LEVELDB_COMPACTION_INTERVAL_SECS) * 1000; |
| } |
| |
| private Path getStorageDir() throws IOException { |
| Configuration conf = getConfig(); |
| String storePath = conf.get(YarnConfiguration.RM_LEVELDB_STORE_PATH); |
| if (storePath == null) { |
| throw new IOException("No store location directory configured in " + |
| YarnConfiguration.RM_LEVELDB_STORE_PATH); |
| } |
| return new Path(storePath, DB_NAME); |
| } |
| |
| private Path createStorageDir() throws IOException { |
| Path root = getStorageDir(); |
| FileSystem fs = FileSystem.getLocal(getConfig()); |
| fs.mkdirs(root, new FsPermission((short)0700)); |
| return root; |
| } |
| |
| @Override |
| protected void startInternal() throws Exception { |
| Path storeRoot = createStorageDir(); |
| Options options = new Options(); |
| options.createIfMissing(false); |
| LOG.info("Using state database at " + storeRoot + " for recovery"); |
| File dbfile = new File(storeRoot.toString()); |
| db = dbManager.initDatabase(dbfile, options, (database) -> |
| storeVersion(CURRENT_VERSION_INFO)); |
| dbManager.startCompactionTimer(compactionIntervalMsec, |
| this.getClass().getSimpleName()); |
| } |
| |
| @Override |
| protected void closeInternal() throws Exception { |
| dbManager.close(); |
| } |
| |
| @VisibleForTesting |
| boolean isClosed() { |
| return db == null; |
| } |
| |
| @VisibleForTesting |
| DB getDatabase() { |
| return db; |
| } |
| |
| @Override |
| protected Version loadVersion() throws Exception { |
| return dbManager.loadVersion(VERSION_NODE); |
| } |
| |
| @Override |
| protected void storeVersion() throws Exception { |
| try { |
| storeVersion(CURRENT_VERSION_INFO); |
| } catch (DBException e) { |
| throw new IOException(e); |
| } |
| } |
| |
| protected void storeVersion(Version version) { |
| dbManager.storeVersion(VERSION_NODE, version); |
| } |
| |
| @Override |
| protected Version getCurrentVersion() { |
| return CURRENT_VERSION_INFO; |
| } |
| |
| @Override |
| public synchronized long getAndIncrementEpoch() throws Exception { |
| long currentEpoch = baseEpoch; |
| byte[] dbKeyBytes = bytes(EPOCH_NODE); |
| try { |
| byte[] data = db.get(dbKeyBytes); |
| if (data != null) { |
| currentEpoch = EpochProto.parseFrom(data).getEpoch(); |
| } |
| EpochProto proto = Epoch.newInstance(nextEpoch(currentEpoch)).getProto(); |
| db.put(dbKeyBytes, proto.toByteArray()); |
| } catch (DBException e) { |
| throw new IOException(e); |
| } |
| return currentEpoch; |
| } |
| |
| @Override |
| public RMState loadState() throws Exception { |
| RMState rmState = new RMState(); |
| loadRMDTSecretManagerState(rmState); |
| loadRMApps(rmState); |
| loadAMRMTokenSecretManagerState(rmState); |
| loadReservationState(rmState); |
| loadProxyCAManagerState(rmState); |
| return rmState; |
| } |
| |
| private void loadReservationState(RMState rmState) throws IOException { |
| int numReservations = 0; |
| try (LeveldbIterator iter = new LeveldbIterator(db)) { |
| iter.seek(bytes(RM_RESERVATION_KEY_PREFIX)); |
| while (iter.hasNext()) { |
| Entry<byte[],byte[]> entry = iter.next(); |
| String key = asString(entry.getKey()); |
| if (!key.startsWith(RM_RESERVATION_KEY_PREFIX)) { |
| break; |
| } |
| |
| String planReservationString = |
| key.substring(RM_RESERVATION_KEY_PREFIX.length()); |
| String[] parts = planReservationString.split(SEPARATOR); |
| if (parts.length != 2) { |
| LOG.warn("Incorrect reservation state key " + key); |
| continue; |
| } |
| String planName = parts[0]; |
| String reservationName = parts[1]; |
| ReservationAllocationStateProto allocationState = |
| ReservationAllocationStateProto.parseFrom(entry.getValue()); |
| if (!rmState.getReservationState().containsKey(planName)) { |
| rmState.getReservationState().put(planName, |
| new HashMap<ReservationId, ReservationAllocationStateProto>()); |
| } |
| ReservationId reservationId = |
| ReservationId.parseReservationId(reservationName); |
| rmState.getReservationState().get(planName).put(reservationId, |
| allocationState); |
| numReservations++; |
| } |
| } catch (DBException e) { |
| throw new IOException(e); |
| } |
| LOG.info("Recovered " + numReservations + " reservations"); |
| } |
| |
| private void loadRMDTSecretManagerState(RMState state) throws IOException { |
| int numKeys = loadRMDTSecretManagerKeys(state); |
| LOG.info("Recovered " + numKeys + " RM delegation token master keys"); |
| int numTokens = loadRMDTSecretManagerTokens(state); |
| LOG.info("Recovered " + numTokens + " RM delegation tokens"); |
| loadRMDTSecretManagerTokenSequenceNumber(state); |
| } |
| |
| private int loadRMDTSecretManagerKeys(RMState state) throws IOException { |
| int numKeys = 0; |
| try (LeveldbIterator iter = new LeveldbIterator(db)) { |
| iter.seek(bytes(RM_DT_MASTER_KEY_KEY_PREFIX)); |
| while (iter.hasNext()) { |
| Entry<byte[],byte[]> entry = iter.next(); |
| String key = asString(entry.getKey()); |
| if (!key.startsWith(RM_DT_MASTER_KEY_KEY_PREFIX)) { |
| break; |
| } |
| DelegationKey masterKey = loadDelegationKey(entry.getValue()); |
| state.rmSecretManagerState.masterKeyState.add(masterKey); |
| ++numKeys; |
| LOG.debug("Loaded RM delegation key from {}: keyId={}," |
| + " expirationDate={}", key, masterKey.getKeyId(), |
| masterKey.getExpiryDate()); |
| } |
| } catch (DBException e) { |
| throw new IOException(e); |
| } |
| return numKeys; |
| } |
| |
| private DelegationKey loadDelegationKey(byte[] data) throws IOException { |
| DelegationKey key = new DelegationKey(); |
| DataInputStream in = new DataInputStream(new ByteArrayInputStream(data)); |
| try { |
| key.readFields(in); |
| } finally { |
| IOUtils.cleanupWithLogger(LOG, in); |
| } |
| return key; |
| } |
| |
| private int loadRMDTSecretManagerTokens(RMState state) throws IOException { |
| int numTokens = 0; |
| try (LeveldbIterator iter = new LeveldbIterator(db)) { |
| iter.seek(bytes(RM_DT_TOKEN_KEY_PREFIX)); |
| while (iter.hasNext()) { |
| Entry<byte[],byte[]> entry = iter.next(); |
| String key = asString(entry.getKey()); |
| if (!key.startsWith(RM_DT_TOKEN_KEY_PREFIX)) { |
| break; |
| } |
| RMDelegationTokenIdentifierData tokenData = loadDelegationToken( |
| entry.getValue()); |
| RMDelegationTokenIdentifier tokenId = tokenData.getTokenIdentifier(); |
| long renewDate = tokenData.getRenewDate(); |
| state.rmSecretManagerState.delegationTokenState.put(tokenId, |
| renewDate); |
| ++numTokens; |
| LOG.debug("Loaded RM delegation token from {}: tokenId={}," |
| + " renewDate={}", key, tokenId, renewDate); |
| } |
| } catch (DBException e) { |
| throw new IOException(e); |
| } |
| return numTokens; |
| } |
| |
| private RMDelegationTokenIdentifierData loadDelegationToken(byte[] data) |
| throws IOException { |
| RMDelegationTokenIdentifierData tokenData; |
| DataInputStream in = new DataInputStream(new ByteArrayInputStream(data)); |
| try { |
| tokenData = RMStateStoreUtils.readRMDelegationTokenIdentifierData(in); |
| } finally { |
| IOUtils.cleanupWithLogger(LOG, in); |
| } |
| return tokenData; |
| } |
| |
| private void loadRMDTSecretManagerTokenSequenceNumber(RMState state) |
| throws IOException { |
| byte[] data; |
| try { |
| data = db.get(bytes(RM_DT_SEQUENCE_NUMBER_KEY)); |
| } catch (DBException e) { |
| throw new IOException(e); |
| } |
| if (data != null) { |
| DataInputStream in = new DataInputStream(new ByteArrayInputStream(data)); |
| try { |
| state.rmSecretManagerState.dtSequenceNumber = in.readInt(); |
| } finally { |
| IOUtils.cleanupWithLogger(LOG, in); |
| } |
| } |
| } |
| |
| private void loadRMApps(RMState state) throws IOException { |
| int numApps = 0; |
| int numAppAttempts = 0; |
| try (LeveldbIterator iter = new LeveldbIterator(db)) { |
| iter.seek(bytes(RM_APP_KEY_PREFIX)); |
| while (iter.hasNext()) { |
| Entry<byte[],byte[]> entry = iter.next(); |
| String key = asString(entry.getKey()); |
| if (!key.startsWith(RM_APP_KEY_PREFIX)) { |
| break; |
| } |
| |
| String appIdStr = key.substring(RM_APP_ROOT.length() + 1); |
| if (appIdStr.contains(SEPARATOR)) { |
| LOG.warn("Skipping extraneous data " + key); |
| continue; |
| } |
| |
| numAppAttempts += loadRMApp(state, iter, appIdStr, entry.getValue()); |
| ++numApps; |
| } |
| } catch (DBException e) { |
| throw new IOException(e); |
| } |
| LOG.info("Recovered " + numApps + " applications and " + numAppAttempts |
| + " application attempts"); |
| } |
| |
| private int loadRMApp(RMState rmState, LeveldbIterator iter, String appIdStr, |
| byte[] appData) throws IOException { |
| ApplicationStateData appState = createApplicationState(appIdStr, appData); |
| ApplicationId appId = |
| appState.getApplicationSubmissionContext().getApplicationId(); |
| rmState.appState.put(appId, appState); |
| String attemptNodePrefix = getApplicationNodeKey(appId) + SEPARATOR; |
| while (iter.hasNext()) { |
| Entry<byte[],byte[]> entry = iter.peekNext(); |
| String key = asString(entry.getKey()); |
| if (!key.startsWith(attemptNodePrefix)) { |
| break; |
| } |
| |
| String attemptId = key.substring(attemptNodePrefix.length()); |
| if (attemptId.startsWith(ApplicationAttemptId.appAttemptIdStrPrefix)) { |
| ApplicationAttemptStateData attemptState = |
| createAttemptState(attemptId, entry.getValue()); |
| appState.attempts.put(attemptState.getAttemptId(), attemptState); |
| } else { |
| LOG.warn("Ignoring unknown application key: " + key); |
| } |
| iter.next(); |
| } |
| int numAttempts = appState.attempts.size(); |
| LOG.debug("Loaded application {} with {} attempts", appId, numAttempts); |
| return numAttempts; |
| } |
| |
| private ApplicationStateData createApplicationState(String appIdStr, |
| byte[] data) throws IOException { |
| ApplicationId appId = ApplicationId.fromString(appIdStr); |
| ApplicationStateDataPBImpl appState = |
| new ApplicationStateDataPBImpl( |
| ApplicationStateDataProto.parseFrom(data)); |
| if (!appId.equals( |
| appState.getApplicationSubmissionContext().getApplicationId())) { |
| throw new YarnRuntimeException("The database entry for " + appId |
| + " contains data for " |
| + appState.getApplicationSubmissionContext().getApplicationId()); |
| } |
| return appState; |
| } |
| |
| @VisibleForTesting |
| ApplicationStateData loadRMAppState(ApplicationId appId) throws IOException { |
| String appKey = getApplicationNodeKey(appId); |
| byte[] data; |
| try { |
| data = db.get(bytes(appKey)); |
| } catch (DBException e) { |
| throw new IOException(e); |
| } |
| if (data == null) { |
| return null; |
| } |
| return createApplicationState(appId.toString(), data); |
| } |
| |
| @VisibleForTesting |
| ApplicationAttemptStateData loadRMAppAttemptState( |
| ApplicationAttemptId attemptId) throws IOException { |
| String attemptKey = getApplicationAttemptNodeKey(attemptId); |
| byte[] data; |
| try { |
| data = db.get(bytes(attemptKey)); |
| } catch (DBException e) { |
| throw new IOException(e); |
| } |
| if (data == null) { |
| return null; |
| } |
| return createAttemptState(attemptId.toString(), data); |
| } |
| |
| private ApplicationAttemptStateData createAttemptState(String itemName, |
| byte[] data) throws IOException { |
| ApplicationAttemptId attemptId = ApplicationAttemptId.fromString(itemName); |
| ApplicationAttemptStateDataPBImpl attemptState = |
| new ApplicationAttemptStateDataPBImpl( |
| ApplicationAttemptStateDataProto.parseFrom(data)); |
| if (!attemptId.equals(attemptState.getAttemptId())) { |
| throw new YarnRuntimeException("The database entry for " + attemptId |
| + " contains data for " + attemptState.getAttemptId()); |
| } |
| return attemptState; |
| } |
| |
| private void loadAMRMTokenSecretManagerState(RMState rmState) |
| throws IOException { |
| try { |
| byte[] data = db.get(bytes(AMRMTOKEN_SECRET_MANAGER_ROOT)); |
| if (data != null) { |
| AMRMTokenSecretManagerStatePBImpl stateData = |
| new AMRMTokenSecretManagerStatePBImpl( |
| AMRMTokenSecretManagerStateProto.parseFrom(data)); |
| rmState.amrmTokenSecretManagerState = |
| AMRMTokenSecretManagerState.newInstance( |
| stateData.getCurrentMasterKey(), |
| stateData.getNextMasterKey()); |
| } |
| } catch (DBException e) { |
| throw new IOException(e); |
| } |
| } |
| |
| private void loadProxyCAManagerState(RMState rmState) throws Exception { |
| byte[] caCertData; |
| byte[] caPrivateKeyData; |
| |
| String caCertKey = getProxyCACertNodeKey(); |
| String caPrivateKeyKey = getProxyCAPrivateKeyNodeKey(); |
| |
| try { |
| caCertData = db.get(bytes(caCertKey)); |
| } catch (DBException e) { |
| throw new IOException(e); |
| } |
| |
| try { |
| caPrivateKeyData = db.get(bytes(caPrivateKeyKey)); |
| } catch (DBException e) { |
| throw new IOException(e); |
| } |
| |
| if (caCertData == null || caPrivateKeyData == null) { |
| LOG.warn("Couldn't find Proxy CA data"); |
| return; |
| } |
| |
| rmState.proxyCAState.setCaCert(caCertData); |
| rmState.proxyCAState.setCaPrivateKey(caPrivateKeyData); |
| } |
| |
| @Override |
| protected void storeApplicationStateInternal(ApplicationId appId, |
| ApplicationStateData appStateData) throws IOException { |
| String key = getApplicationNodeKey(appId); |
| LOG.debug("Storing state for app {} at {}", appId, key); |
| try { |
| db.put(bytes(key), appStateData.getProto().toByteArray()); |
| } catch (DBException e) { |
| throw new IOException(e); |
| } |
| } |
| |
| @Override |
| protected void updateApplicationStateInternal(ApplicationId appId, |
| ApplicationStateData appStateData) throws IOException { |
| storeApplicationStateInternal(appId, appStateData); |
| } |
| |
| @Override |
| protected void storeApplicationAttemptStateInternal( |
| ApplicationAttemptId attemptId, |
| ApplicationAttemptStateData attemptStateData) throws IOException { |
| String key = getApplicationAttemptNodeKey(attemptId); |
| LOG.debug("Storing state for attempt {} at {}", attemptId, key); |
| try { |
| db.put(bytes(key), attemptStateData.getProto().toByteArray()); |
| } catch (DBException e) { |
| throw new IOException(e); |
| } |
| } |
| |
| @Override |
| protected void updateApplicationAttemptStateInternal( |
| ApplicationAttemptId attemptId, |
| ApplicationAttemptStateData attemptStateData) throws IOException { |
| storeApplicationAttemptStateInternal(attemptId, attemptStateData); |
| } |
| |
| @Override |
| public synchronized void removeApplicationAttemptInternal( |
| ApplicationAttemptId attemptId) |
| throws IOException { |
| String attemptKey = getApplicationAttemptNodeKey(attemptId); |
| LOG.debug("Removing state for attempt {} at {}", attemptId, attemptKey); |
| try { |
| db.delete(bytes(attemptKey)); |
| } catch (DBException e) { |
| throw new IOException(e); |
| } |
| } |
| |
| @Override |
| protected void removeApplicationStateInternal(ApplicationStateData appState) |
| throws IOException { |
| ApplicationId appId = |
| appState.getApplicationSubmissionContext().getApplicationId(); |
| String appKey = getApplicationNodeKey(appId); |
| try { |
| try (WriteBatch batch = db.createWriteBatch()) { |
| batch.delete(bytes(appKey)); |
| for (ApplicationAttemptId attemptId : appState.attempts.keySet()) { |
| String attemptKey = getApplicationAttemptNodeKey(appKey, attemptId); |
| batch.delete(bytes(attemptKey)); |
| } |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Removing state for app " + appId + " and " |
| + appState.attempts.size() + " attempts" + " at " + appKey); |
| } |
| db.write(batch); |
| } |
| } catch (DBException e) { |
| throw new IOException(e); |
| } |
| } |
| |
| @Override |
| protected void storeReservationState( |
| ReservationAllocationStateProto reservationAllocation, String planName, |
| String reservationIdName) throws Exception { |
| try { |
| try (WriteBatch batch = db.createWriteBatch()) { |
| String key = getReservationNodeKey(planName, reservationIdName); |
| LOG.debug("Storing state for reservation {} plan {} at {}", |
| reservationIdName, planName, key); |
| |
| batch.put(bytes(key), reservationAllocation.toByteArray()); |
| db.write(batch); |
| } |
| } catch (DBException e) { |
| throw new IOException(e); |
| } |
| } |
| |
| @Override |
| protected void removeReservationState(String planName, |
| String reservationIdName) throws Exception { |
| try { |
| try (WriteBatch batch = db.createWriteBatch()) { |
| String reservationKey = |
| getReservationNodeKey(planName, reservationIdName); |
| batch.delete(bytes(reservationKey)); |
| LOG.debug("Removing state for reservation {} plan {} at {}", |
| reservationIdName, planName, reservationKey); |
| db.write(batch); |
| } |
| } catch (DBException e) { |
| throw new IOException(e); |
| } |
| } |
| |
| private void storeOrUpdateRMDT(RMDelegationTokenIdentifier tokenId, |
| Long renewDate, boolean isUpdate) throws IOException { |
| String tokenKey = getRMDTTokenNodeKey(tokenId); |
| RMDelegationTokenIdentifierData tokenData = |
| new RMDelegationTokenIdentifierData(tokenId, renewDate); |
| LOG.debug("Storing token to {}", tokenKey); |
| try { |
| try (WriteBatch batch = db.createWriteBatch()) { |
| batch.put(bytes(tokenKey), tokenData.toByteArray()); |
| if (!isUpdate) { |
| ByteArrayOutputStream bs = new ByteArrayOutputStream(); |
| try (DataOutputStream ds = new DataOutputStream(bs)) { |
| ds.writeInt(tokenId.getSequenceNumber()); |
| } |
| LOG.debug("Storing {} to {}", tokenId.getSequenceNumber(), |
| RM_DT_SEQUENCE_NUMBER_KEY); |
| batch.put(bytes(RM_DT_SEQUENCE_NUMBER_KEY), bs.toByteArray()); |
| } |
| db.write(batch); |
| } |
| } catch (DBException e) { |
| throw new IOException(e); |
| } |
| } |
| |
| @Override |
| protected void storeRMDelegationTokenState( |
| RMDelegationTokenIdentifier tokenId, Long renewDate) |
| throws IOException { |
| storeOrUpdateRMDT(tokenId, renewDate, false); |
| } |
| |
| @Override |
| protected void updateRMDelegationTokenState( |
| RMDelegationTokenIdentifier tokenId, Long renewDate) |
| throws IOException { |
| storeOrUpdateRMDT(tokenId, renewDate, true); |
| } |
| |
| @Override |
| protected void removeRMDelegationTokenState( |
| RMDelegationTokenIdentifier tokenId) throws IOException { |
| String tokenKey = getRMDTTokenNodeKey(tokenId); |
| LOG.debug("Removing token at {}", tokenKey); |
| try { |
| db.delete(bytes(tokenKey)); |
| } catch (DBException e) { |
| throw new IOException(e); |
| } |
| } |
| |
| @Override |
| protected void storeRMDTMasterKeyState(DelegationKey masterKey) |
| throws IOException { |
| String dbKey = getRMDTMasterKeyNodeKey(masterKey); |
| LOG.debug("Storing token master key to {}", dbKey); |
| ByteArrayOutputStream os = new ByteArrayOutputStream(); |
| try (DataOutputStream out = new DataOutputStream(os)) { |
| masterKey.write(out); |
| } |
| try { |
| db.put(bytes(dbKey), os.toByteArray()); |
| } catch (DBException e) { |
| throw new IOException(e); |
| } |
| } |
| |
| @Override |
| protected void removeRMDTMasterKeyState(DelegationKey masterKey) |
| throws IOException { |
| String dbKey = getRMDTMasterKeyNodeKey(masterKey); |
| LOG.debug("Removing token master key at {}", dbKey); |
| try { |
| db.delete(bytes(dbKey)); |
| } catch (DBException e) { |
| throw new IOException(e); |
| } |
| } |
| |
| @Override |
| public void storeOrUpdateAMRMTokenSecretManagerState( |
| AMRMTokenSecretManagerState state, boolean isUpdate) { |
| AMRMTokenSecretManagerState data = |
| AMRMTokenSecretManagerState.newInstance(state); |
| byte[] stateData = data.getProto().toByteArray(); |
| db.put(bytes(AMRMTOKEN_SECRET_MANAGER_ROOT), stateData); |
| } |
| |
| @Override |
| protected void storeProxyCACertState( |
| X509Certificate caCert, PrivateKey caPrivateKey) throws Exception { |
| byte[] caCertData = caCert.getEncoded(); |
| byte[] caPrivateKeyData = caPrivateKey.getEncoded(); |
| |
| String caCertKey = getProxyCACertNodeKey(); |
| String caPrivateKeyKey = getProxyCAPrivateKeyNodeKey(); |
| |
| try { |
| try (WriteBatch batch = db.createWriteBatch()) { |
| batch.put(bytes(caCertKey), caCertData); |
| batch.put(bytes(caPrivateKeyKey), caPrivateKeyData); |
| db.write(batch); |
| } |
| } catch (DBException e) { |
| throw new IOException(e); |
| } |
| } |
| |
| @Override |
| public void deleteStore() throws IOException { |
| Path root = getStorageDir(); |
| LOG.info("Deleting state database at " + root); |
| db.close(); |
| db = null; |
| FileSystem fs = FileSystem.getLocal(getConfig()); |
| fs.delete(root, true); |
| } |
| |
| @Override |
| public synchronized void removeApplication(ApplicationId removeAppId) |
| throws IOException { |
| String appKey = getApplicationNodeKey(removeAppId); |
| LOG.info("Removing state for app " + removeAppId); |
| try { |
| db.delete(bytes(appKey)); |
| } catch (DBException e) { |
| throw new IOException(e); |
| } |
| } |
| |
| @VisibleForTesting |
| int getNumEntriesInDatabase() throws IOException { |
| int numEntries = 0; |
| try (LeveldbIterator iter = new LeveldbIterator(db)) { |
| iter.seekToFirst(); |
| while (iter.hasNext()) { |
| Entry<byte[], byte[]> entry = iter.next(); |
| LOG.info("entry: " + asString(entry.getKey())); |
| ++numEntries; |
| } |
| } catch (DBException e) { |
| throw new IOException(e); |
| } |
| return numEntries; |
| } |
| |
| @VisibleForTesting |
| protected void setDbManager(DBManager dbManager) { |
| this.dbManager = dbManager; |
| } |
| } |