| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more contributor license |
| * agreements. See the NOTICE file distributed with this work for additional information regarding |
| * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance with the License. You may obtain a |
| * copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software distributed under the License |
| * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
| * or implied. See the License for the specific language governing permissions and limitations under |
| * the License. |
| */ |
| |
| package org.apache.fluo.core.impl; |
| |
| import java.net.InetAddress; |
| import java.net.UnknownHostException; |
| import java.nio.charset.StandardCharsets; |
| import java.util.Map.Entry; |
| |
| import com.google.common.annotations.VisibleForTesting; |
| import com.google.common.base.Preconditions; |
| import org.apache.accumulo.core.client.AccumuloClient; |
| import org.apache.accumulo.core.client.AccumuloException; |
| import org.apache.accumulo.core.client.TableNotFoundException; |
| import org.apache.accumulo.core.security.Authorizations; |
| import org.apache.curator.framework.CuratorFramework; |
| import org.apache.fluo.accumulo.util.AccumuloProps; |
| import org.apache.fluo.accumulo.util.ZookeeperPath; |
| import org.apache.fluo.api.config.FluoConfiguration; |
| import org.apache.fluo.api.config.SimpleConfiguration; |
| import org.apache.fluo.api.metrics.MetricsReporter; |
| import org.apache.fluo.core.client.FluoAdminImpl; |
| import org.apache.fluo.core.metrics.MetricNames; |
| import org.apache.fluo.core.metrics.MetricsReporterImpl; |
| import org.apache.fluo.core.observer.ObserverUtil; |
| import org.apache.fluo.core.observer.RegisteredObservers; |
| import org.apache.fluo.core.util.AccumuloUtil; |
| import org.apache.fluo.core.util.CuratorUtil; |
| |
| /** |
| * Holds common environment configuration and shared resources |
| */ |
| public class Environment implements AutoCloseable { |
| |
| private String table; |
| private Authorizations auths = new Authorizations(); |
| private String accumuloInstance; |
| private RegisteredObservers observers; |
| private AccumuloClient client; |
| private String accumuloInstanceID; |
| private String fluoApplicationID; |
| private FluoConfiguration config; |
| private SharedResources resources; |
| private MetricNames metricNames; |
| private SimpleConfiguration appConfig; |
| private String metricsReporterID; |
| |
| private void ensureDeletesAreDisabled() { |
| String value = null; |
| Iterable<Entry<String, String>> props; |
| try { |
| props = client.tableOperations().getProperties(table); |
| } catch (AccumuloException | TableNotFoundException e) { |
| throw new IllegalStateException(e); |
| } |
| |
| for (Entry<String, String> entry : props) { |
| if (entry.getKey().equals(AccumuloProps.TABLE_DELETE_BEHAVIOR)) { |
| value = entry.getValue(); |
| } |
| } |
| |
| Preconditions.checkState(AccumuloProps.TABLE_DELETE_BEHAVIOR_VALUE.equals(value), |
| "The Accumulo table %s is not configured correctly. Please set %s=%s for this table in Accumulo.", |
| table, AccumuloProps.TABLE_DELETE_BEHAVIOR, AccumuloProps.TABLE_DELETE_BEHAVIOR_VALUE); |
| } |
| |
| /** |
| * Constructs an environment from given FluoConfiguration |
| * |
| * @param configuration Configuration used to configure environment |
| */ |
| public Environment(FluoConfiguration configuration) { |
| config = configuration; |
| client = AccumuloUtil.getClient(config); |
| |
| readZookeeperConfig(); |
| |
| ensureDeletesAreDisabled(); |
| |
| String instanceName = client.properties().getProperty(AccumuloProps.CLIENT_INSTANCE_NAME); |
| if (!instanceName.equals(accumuloInstance)) { |
| throw new IllegalArgumentException( |
| "unexpected accumulo instance name " + instanceName + " != " + accumuloInstance); |
| } |
| |
| if (!client.getInstanceID().equals(accumuloInstanceID)) { |
| throw new IllegalArgumentException("unexpected accumulo instance id " + client.getInstanceID() |
| + " != " + accumuloInstanceID); |
| } |
| |
| try { |
| resources = new SharedResources(this); |
| } catch (TableNotFoundException e1) { |
| throw new IllegalStateException(e1); |
| } |
| } |
| |
| /** |
| * Constructs an environment from another environment |
| * |
| * @param env Environment |
| */ |
| @VisibleForTesting |
| public Environment(Environment env) throws Exception { |
| this.table = env.table; |
| this.auths = env.auths; |
| this.accumuloInstance = env.accumuloInstance; |
| this.observers = env.observers; |
| this.client = env.client; |
| this.accumuloInstanceID = env.accumuloInstanceID; |
| this.fluoApplicationID = env.fluoApplicationID; |
| this.config = env.config; |
| this.resources = new SharedResources(this); |
| } |
| |
| /** |
| * Read configuration from zookeeper |
| */ |
| private void readZookeeperConfig() { |
| |
| try (CuratorFramework curator = CuratorUtil.newAppCurator(config)) { |
| curator.start(); |
| |
| accumuloInstance = |
| new String(curator.getData().forPath(ZookeeperPath.CONFIG_ACCUMULO_INSTANCE_NAME), |
| StandardCharsets.UTF_8); |
| accumuloInstanceID = |
| new String(curator.getData().forPath(ZookeeperPath.CONFIG_ACCUMULO_INSTANCE_ID), |
| StandardCharsets.UTF_8); |
| fluoApplicationID = |
| new String(curator.getData().forPath(ZookeeperPath.CONFIG_FLUO_APPLICATION_ID), |
| StandardCharsets.UTF_8); |
| |
| table = new String(curator.getData().forPath(ZookeeperPath.CONFIG_ACCUMULO_TABLE), |
| StandardCharsets.UTF_8); |
| |
| observers = ObserverUtil.load(curator); |
| |
| config = FluoAdminImpl.mergeZookeeperConfig(config); |
| |
| // make sure not to include config passed to env, only want config from zookeeper |
| appConfig = config.getAppConfiguration(); |
| } catch (Exception e) { |
| throw new IllegalStateException(e); |
| } |
| } |
| |
| |
| public void setAuthorizations(Authorizations auths) { |
| this.auths = auths; |
| |
| // TODO the following is a big hack, this method is currently not exposed in API |
| resources.close(); |
| try { |
| this.resources = new SharedResources(this); |
| } catch (TableNotFoundException e) { |
| throw new RuntimeException(e); |
| } |
| } |
| |
| public Authorizations getAuthorizations() { |
| return auths; |
| } |
| |
| public String getAccumuloInstance() { |
| return accumuloInstance; |
| } |
| |
| public String getAccumuloInstanceID() { |
| return accumuloInstanceID; |
| } |
| |
| public String getFluoApplicationID() { |
| return fluoApplicationID; |
| } |
| |
| public RegisteredObservers getConfiguredObservers() { |
| return observers; |
| } |
| |
| public String getTable() { |
| return table; |
| } |
| |
| public AccumuloClient getAccumuloClient() { |
| return client; |
| } |
| |
| public SharedResources getSharedResources() { |
| return resources; |
| } |
| |
| public FluoConfiguration getConfiguration() { |
| return config; |
| } |
| |
| public synchronized String getMetricsReporterID() { |
| if (metricsReporterID == null) { |
| String mid = System.getProperty(MetricNames.METRICS_REPORTER_ID_PROP); |
| if (mid == null) { |
| try { |
| String hostname = InetAddress.getLocalHost().getHostName(); |
| int idx = hostname.indexOf('.'); |
| if (idx > 0) { |
| hostname = hostname.substring(0, idx); |
| } |
| mid = hostname + "_" + getSharedResources().getTransactorID(); |
| } catch (UnknownHostException e) { |
| throw new RuntimeException(e); |
| } |
| } |
| metricsReporterID = mid.replace('.', '_'); |
| } |
| return metricsReporterID; |
| } |
| |
| public String getMetricsAppName() { |
| return config.getApplicationName().replace('.', '_'); |
| } |
| |
| public synchronized MetricNames getMetricNames() { |
| if (metricNames == null) { |
| metricNames = new MetricNames(getMetricsReporterID(), getMetricsAppName()); |
| } |
| return metricNames; |
| } |
| |
| public MetricsReporter getMetricsReporter() { |
| return new MetricsReporterImpl(getConfiguration(), getSharedResources().getMetricRegistry(), |
| getMetricsReporterID()); |
| } |
| |
| public SimpleConfiguration getAppConfiguration() { |
| // TODO create immutable wrapper |
| return new SimpleConfiguration(appConfig); |
| } |
| |
| @Override |
| public void close() { |
| resources.close(); |
| client.close(); |
| } |
| } |