| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.accumulo.server; |
| |
| import static java.nio.charset.StandardCharsets.UTF_8; |
| import static org.apache.accumulo.fate.util.UtilWaitThread.sleepUninterruptibly; |
| |
| import java.io.File; |
| import java.io.FileInputStream; |
| import java.io.IOException; |
| import java.io.InputStream; |
| import java.net.UnknownHostException; |
| import java.util.Arrays; |
| import java.util.Map.Entry; |
| import java.util.TreeMap; |
| import java.util.concurrent.TimeUnit; |
| |
| import org.apache.accumulo.core.Constants; |
| import org.apache.accumulo.core.client.AccumuloException; |
| import org.apache.accumulo.core.conf.AccumuloConfiguration; |
| import org.apache.accumulo.core.conf.Property; |
| import org.apache.accumulo.core.util.AddressUtil; |
| import org.apache.accumulo.core.util.threads.ThreadPools; |
| import org.apache.accumulo.core.volume.Volume; |
| import org.apache.accumulo.fate.ReadOnlyStore; |
| import org.apache.accumulo.fate.ReadOnlyTStore; |
| import org.apache.accumulo.fate.ZooStore; |
| import org.apache.accumulo.server.fs.VolumeManager; |
| import org.apache.hadoop.fs.FileStatus; |
| import org.apache.hadoop.fs.FileSystem; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.zookeeper.KeeperException; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| public class ServerUtil { |
| |
| private static final Logger log = LoggerFactory.getLogger(ServerUtil.class); |
| |
| public static synchronized void updateAccumuloVersion(VolumeManager fs, int oldVersion) { |
| for (Volume volume : fs.getVolumes()) { |
| try { |
| if (getAccumuloPersistentVersion(volume) == oldVersion) { |
| log.debug("Attempting to upgrade {}", volume); |
| Path dataVersionLocation = ServerConstants.getDataVersionLocation(volume); |
| fs.create(new Path(dataVersionLocation, Integer.toString(ServerConstants.DATA_VERSION))) |
| .close(); |
| // TODO document failure mode & recovery if FS permissions cause above to work and below |
| // to fail ACCUMULO-2596 |
| Path prevDataVersionLoc = new Path(dataVersionLocation, Integer.toString(oldVersion)); |
| if (!fs.delete(prevDataVersionLoc)) { |
| throw new RuntimeException("Could not delete previous data version location (" |
| + prevDataVersionLoc + ") for " + volume); |
| } |
| } |
| } catch (IOException e) { |
| throw new RuntimeException("Unable to set accumulo version: an error occurred.", e); |
| } |
| } |
| } |
| |
| public static synchronized int getAccumuloPersistentVersion(FileSystem fs, Path path) { |
| int dataVersion; |
| try { |
| FileStatus[] files = fs.listStatus(path); |
| if (files == null || files.length == 0) { |
| dataVersion = -1; // assume it is 0.5 or earlier |
| } else { |
| dataVersion = Integer.parseInt(files[0].getPath().getName()); |
| } |
| return dataVersion; |
| } catch (IOException e) { |
| throw new RuntimeException("Unable to read accumulo version: an error occurred.", e); |
| } |
| } |
| |
| public static synchronized int getAccumuloPersistentVersion(Volume v) { |
| Path path = ServerConstants.getDataVersionLocation(v); |
| return getAccumuloPersistentVersion(v.getFileSystem(), path); |
| } |
| |
| public static synchronized int getAccumuloPersistentVersion(VolumeManager vm) { |
| // It doesn't matter which Volume is used as they should all have the data version stored |
| return getAccumuloPersistentVersion(vm.getFirst()); |
| } |
| |
| public static synchronized Path getAccumuloInstanceIdPath(VolumeManager vm) { |
| // It doesn't matter which Volume is used as they should all have the instance ID stored |
| return ServerConstants.getInstanceIdLocation(vm.getFirst()); |
| } |
| |
| public static void init(ServerContext context, String application) { |
| final AccumuloConfiguration conf = context.getConfiguration(); |
| |
| log.info("{} starting", application); |
| log.info("Instance {}", context.getInstanceID()); |
| int dataVersion = ServerUtil.getAccumuloPersistentVersion(context.getVolumeManager()); |
| log.info("Data Version {}", dataVersion); |
| ServerUtil.waitForZookeeperAndHdfs(context); |
| |
| ensureDataVersionCompatible(dataVersion); |
| |
| TreeMap<String,String> sortedProps = new TreeMap<>(); |
| for (Entry<String,String> entry : conf) |
| sortedProps.put(entry.getKey(), entry.getValue()); |
| |
| for (Entry<String,String> entry : sortedProps.entrySet()) { |
| String key = entry.getKey(); |
| log.info("{} = {}", key, (Property.isSensitive(key) ? "<hidden>" : entry.getValue())); |
| Property prop = Property.getPropertyByKey(key); |
| if (prop != null && conf.isPropertySet(prop, false)) { |
| if (prop.isDeprecated()) { |
| Property replacedBy = prop.replacedBy(); |
| if (replacedBy != null) { |
| log.warn("{} is deprecated, use {} instead.", prop.getKey(), replacedBy.getKey()); |
| } else { |
| log.warn("{} is deprecated", prop.getKey()); |
| } |
| } |
| } |
| } |
| |
| monitorSwappiness(conf); |
| |
| // Encourage users to configure TLS |
| final String SSL = "SSL"; |
| for (Property sslProtocolProperty : Arrays.asList(Property.RPC_SSL_CLIENT_PROTOCOL, |
| Property.RPC_SSL_ENABLED_PROTOCOLS, Property.MONITOR_SSL_INCLUDE_PROTOCOLS)) { |
| String value = conf.get(sslProtocolProperty); |
| if (value.contains(SSL)) { |
| log.warn("It is recommended that {} only allow TLS", sslProtocolProperty); |
| } |
| } |
| } |
| |
| /** |
| * Check to see if this version of Accumulo can run against or upgrade the passed in data version. |
| */ |
| public static void ensureDataVersionCompatible(int dataVersion) { |
| if (!(ServerConstants.CAN_RUN.contains(dataVersion))) { |
| throw new IllegalStateException("This version of accumulo (" + Constants.VERSION |
| + ") is not compatible with files stored using data version " + dataVersion); |
| } |
| } |
| |
| /** |
| * Does the data version number stored in the backing Volumes indicate we need to upgrade |
| * something? |
| */ |
| public static boolean persistentVersionNeedsUpgrade(final int accumuloPersistentVersion) { |
| return ServerConstants.NEEDS_UPGRADE.contains(accumuloPersistentVersion); |
| } |
| |
| public static void monitorSwappiness(AccumuloConfiguration config) { |
| ThreadPools.createGeneralScheduledExecutorService(config).scheduleWithFixedDelay(() -> { |
| try { |
| String procFile = "/proc/sys/vm/swappiness"; |
| File swappiness = new File(procFile); |
| if (swappiness.exists() && swappiness.canRead()) { |
| try (InputStream is = new FileInputStream(procFile)) { |
| byte[] buffer = new byte[10]; |
| int bytes = is.read(buffer); |
| String setting = new String(buffer, 0, bytes, UTF_8); |
| setting = setting.trim(); |
| if (bytes > 0 && Integer.parseInt(setting) > 10) { |
| log.warn("System swappiness setting is greater than ten ({})" |
| + " which can cause time-sensitive operations to be delayed." |
| + " Accumulo is time sensitive because it needs to maintain" |
| + " distributed lock agreement.", setting); |
| } |
| } |
| } |
| } catch (Exception t) { |
| log.error("", t); |
| } |
| }, 1000, 10 * 60 * 1000, TimeUnit.MILLISECONDS); |
| } |
| |
| public static void waitForZookeeperAndHdfs(ServerContext context) { |
| log.info("Attempting to talk to zookeeper"); |
| while (true) { |
| try { |
| context.getZooReaderWriter().getChildren(Constants.ZROOT); |
| break; |
| } catch (InterruptedException e) { |
| // ignored |
| } catch (KeeperException ex) { |
| log.info("Waiting for accumulo to be initialized"); |
| sleepUninterruptibly(1, TimeUnit.SECONDS); |
| } |
| } |
| log.info("ZooKeeper connected and initialized, attempting to talk to HDFS"); |
| long sleep = 1000; |
| int unknownHostTries = 3; |
| while (true) { |
| try { |
| if (context.getVolumeManager().isReady()) |
| break; |
| log.warn("Waiting for the NameNode to leave safemode"); |
| } catch (IOException ex) { |
| log.warn("Unable to connect to HDFS", ex); |
| } catch (IllegalArgumentException e) { |
| /* Unwrap the UnknownHostException so we can deal with it directly */ |
| if (e.getCause() instanceof UnknownHostException) { |
| if (unknownHostTries > 0) { |
| log.warn("Unable to connect to HDFS, will retry. cause: {}", e.getCause()); |
| /* |
| * We need to make sure our sleep period is long enough to avoid getting a cached |
| * failure of the host lookup. |
| */ |
| int ttl = AddressUtil.getAddressCacheNegativeTtl((UnknownHostException) e.getCause()); |
| sleep = Math.max(sleep, (ttl + 1) * 1000L); |
| } else { |
| log.error("Unable to connect to HDFS and exceeded the maximum number of retries.", e); |
| throw e; |
| } |
| unknownHostTries--; |
| } else { |
| throw e; |
| } |
| } |
| log.info("Backing off due to failure; current sleep period is {} seconds", sleep / 1000.); |
| sleepUninterruptibly(sleep, TimeUnit.MILLISECONDS); |
| /* Back off to give transient failures more time to clear. */ |
| sleep = Math.min(60 * 1000, sleep * 2); |
| } |
| log.info("Connected to HDFS"); |
| } |
| |
| /** |
| * Exit loudly if there are outstanding Fate operations. Since Fate serializes class names, we |
| * need to make sure there are no queued transactions from a previous version before continuing an |
| * upgrade. The status of the operations is irrelevant; those in SUCCESSFUL status cause the same |
| * problem as those just queued. |
| * |
| * Note that the Manager should not allow write access to Fate until after all upgrade steps are |
| * complete. |
| * |
| * Should be called as a guard before performing any upgrade steps, after determining that an |
| * upgrade is needed. |
| * |
| * see ACCUMULO-2519 |
| */ |
| public static void abortIfFateTransactions(ServerContext context) { |
| try { |
| final ReadOnlyTStore<ServerUtil> fate = |
| new ReadOnlyStore<>(new ZooStore<>(context.getZooKeeperRoot() + Constants.ZFATE, |
| context.getZooReaderWriter())); |
| if (!(fate.list().isEmpty())) { |
| throw new AccumuloException("Aborting upgrade because there are" |
| + " outstanding FATE transactions from a previous Accumulo version." |
| + " You can start the tservers and then use the shell to delete completed " |
| + " transactions. If there are uncomplete transactions, you will need to roll" |
| + " back and fix those issues. Please see the following page for more information: " |
| + " https://accumulo.apache.org/docs/2.x/troubleshooting/advanced#upgrade-issues"); |
| } |
| } catch (Exception exception) { |
| log.error("Problem verifying Fate readiness", exception); |
| System.exit(1); |
| } |
| } |
| } |