blob: c6441caf82119e67ca7f9cde60b16574d329d9d3 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.federation.store.driver.impl;
import static org.apache.hadoop.hdfs.server.federation.store.StateStoreUtils.filterMultiple;
import static org.apache.hadoop.hdfs.server.federation.store.StateStoreUtils.getRecordName;
import static org.apache.hadoop.util.curator.ZKCuratorManager.getNodePath;
import static org.apache.hadoop.util.Time.monotonicNow;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.imps.CuratorFrameworkState;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys;
import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver;
import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord;
import org.apache.hadoop.hdfs.server.federation.store.records.Query;
import org.apache.hadoop.hdfs.server.federation.store.records.QueryResult;
import org.apache.hadoop.util.curator.ZKCuratorManager;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* {@link StateStoreDriver} driver implementation that uses ZooKeeper as a
* backend.
* <p>
* The structure of the znodes in the ensemble is:
* PARENT_PATH
* |--- MOUNT
* |--- MEMBERSHIP
* |--- REBALANCER
* |--- ROUTERS
*/
public class StateStoreZooKeeperImpl extends StateStoreSerializableImpl {
private static final Logger LOG =
LoggerFactory.getLogger(StateStoreZooKeeperImpl.class);
/** Configuration keys. */
public static final String FEDERATION_STORE_ZK_DRIVER_PREFIX =
RBFConfigKeys.FEDERATION_STORE_PREFIX + "driver.zk.";
public static final String FEDERATION_STORE_ZK_PARENT_PATH =
FEDERATION_STORE_ZK_DRIVER_PREFIX + "parent-path";
public static final String FEDERATION_STORE_ZK_PARENT_PATH_DEFAULT =
"/hdfs-federation";
/** Directory to store the state store data. */
private String baseZNode;
/** Interface to ZooKeeper. */
private ZKCuratorManager zkManager;
/** ACLs for ZooKeeper. */
private List<ACL> zkAcl;
@Override
public boolean initDriver() {
LOG.info("Initializing ZooKeeper connection");
Configuration conf = getConf();
baseZNode = conf.get(
FEDERATION_STORE_ZK_PARENT_PATH,
FEDERATION_STORE_ZK_PARENT_PATH_DEFAULT);
try {
this.zkManager = new ZKCuratorManager(conf);
this.zkManager.start();
this.zkAcl = ZKCuratorManager.getZKAcls(conf);
} catch (IOException e) {
LOG.error("Cannot initialize the ZK connection", e);
return false;
}
return true;
}
@Override
public <T extends BaseRecord> boolean initRecordStorage(
String className, Class<T> clazz) {
try {
String checkPath = getNodePath(baseZNode, className);
zkManager.createRootDirRecursively(checkPath, zkAcl);
return true;
} catch (Exception e) {
LOG.error("Cannot initialize ZK node for {}: {}",
className, e.getMessage());
return false;
}
}
@Override
public void close() throws Exception {
if (zkManager != null) {
zkManager.close();
}
}
@Override
public boolean isDriverReady() {
if (zkManager == null) {
return false;
}
CuratorFramework curator = zkManager.getCurator();
if (curator == null) {
return false;
}
return curator.getState() == CuratorFrameworkState.STARTED;
}
@Override
public <T extends BaseRecord> QueryResult<T> get(Class<T> clazz)
throws IOException {
verifyDriverReady();
long start = monotonicNow();
List<T> ret = new ArrayList<>();
String znode = getZNodeForClass(clazz);
try {
List<String> children = zkManager.getChildren(znode);
for (String child : children) {
try {
String path = getNodePath(znode, child);
Stat stat = new Stat();
String data = zkManager.getStringData(path, stat);
boolean corrupted = false;
if (data == null || data.equals("")) {
// All records should have data, otherwise this is corrupted
corrupted = true;
} else {
try {
T record = createRecord(data, stat, clazz);
ret.add(record);
} catch (IOException e) {
LOG.error("Cannot create record type \"{}\" from \"{}\": {}",
clazz.getSimpleName(), data, e.getMessage());
corrupted = true;
}
}
if (corrupted) {
LOG.error("Cannot get data for {} at {}, cleaning corrupted data",
child, path);
zkManager.delete(path);
}
} catch (Exception e) {
LOG.error("Cannot get data for {}: {}", child, e.getMessage());
}
}
} catch (Exception e) {
getMetrics().addFailure(monotonicNow() - start);
String msg = "Cannot get children for \"" + znode + "\": " +
e.getMessage();
LOG.error(msg);
throw new IOException(msg);
}
long end = monotonicNow();
getMetrics().addRead(end - start);
return new QueryResult<T>(ret, getTime());
}
@Override
public <T extends BaseRecord> boolean putAll(
List<T> records, boolean update, boolean error) throws IOException {
verifyDriverReady();
if (records.isEmpty()) {
return true;
}
// All records should be the same
T record0 = records.get(0);
Class<? extends BaseRecord> recordClass = record0.getClass();
String znode = getZNodeForClass(recordClass);
long start = monotonicNow();
boolean status = true;
for (T record : records) {
String primaryKey = getPrimaryKey(record);
String recordZNode = getNodePath(znode, primaryKey);
byte[] data = serialize(record);
if (!writeNode(recordZNode, data, update, error)){
status = false;
}
}
long end = monotonicNow();
if (status) {
getMetrics().addWrite(end - start);
} else {
getMetrics().addFailure(end - start);
}
return status;
}
@Override
public <T extends BaseRecord> int remove(
Class<T> clazz, Query<T> query) throws IOException {
verifyDriverReady();
if (query == null) {
return 0;
}
// Read the current data
long start = monotonicNow();
List<T> records = null;
try {
QueryResult<T> result = get(clazz);
records = result.getRecords();
} catch (IOException ex) {
LOG.error("Cannot get existing records", ex);
getMetrics().addFailure(monotonicNow() - start);
return 0;
}
// Check the records to remove
String znode = getZNodeForClass(clazz);
List<T> recordsToRemove = filterMultiple(query, records);
// Remove the records
int removed = 0;
for (T existingRecord : recordsToRemove) {
LOG.info("Removing \"{}\"", existingRecord);
try {
String primaryKey = getPrimaryKey(existingRecord);
String path = getNodePath(znode, primaryKey);
if (zkManager.delete(path)) {
removed++;
} else {
LOG.error("Did not remove \"{}\"", existingRecord);
}
} catch (Exception e) {
LOG.error("Cannot remove \"{}\"", existingRecord, e);
getMetrics().addFailure(monotonicNow() - start);
}
}
long end = monotonicNow();
if (removed > 0) {
getMetrics().addRemove(end - start);
}
return removed;
}
@Override
public <T extends BaseRecord> boolean removeAll(Class<T> clazz)
throws IOException {
long start = monotonicNow();
boolean status = true;
String znode = getZNodeForClass(clazz);
LOG.info("Deleting all children under {}", znode);
try {
List<String> children = zkManager.getChildren(znode);
for (String child : children) {
String path = getNodePath(znode, child);
LOG.info("Deleting {}", path);
zkManager.delete(path);
}
} catch (Exception e) {
LOG.error("Cannot remove {}: {}", znode, e.getMessage());
status = false;
}
long time = monotonicNow() - start;
if (status) {
getMetrics().addRemove(time);
} else {
getMetrics().addFailure(time);
}
return status;
}
private boolean writeNode(
String znode, byte[] bytes, boolean update, boolean error) {
try {
boolean created = zkManager.create(znode);
if (!update && !created && error) {
LOG.info("Cannot write record \"{}\", it already exists", znode);
return false;
}
// Write data
zkManager.setData(znode, bytes, -1);
return true;
} catch (Exception e) {
LOG.error("Cannot write record \"{}\": {}", znode, e.getMessage());
}
return false;
}
/**
* Get the ZNode for a class.
*
* @param clazz Record class to evaluate.
* @return The ZNode for the class.
*/
private <T extends BaseRecord> String getZNodeForClass(Class<T> clazz) {
String className = getRecordName(clazz);
return getNodePath(baseZNode, className);
}
/**
* Creates a record from a string returned by ZooKeeper.
*
* @param data The data to write.
* @param stat Stat of the data record to create.
* @param clazz The data record type to create.
* @return The created record.
* @throws IOException
*/
private <T extends BaseRecord> T createRecord(
String data, Stat stat, Class<T> clazz) throws IOException {
T record = newRecord(data, clazz, false);
record.setDateCreated(stat.getCtime());
record.setDateModified(stat.getMtime());
return record;
}
}