| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.hadoop.ozone.client; |
| |
| import java.io.ByteArrayInputStream; |
| import java.io.DataInputStream; |
| import java.io.IOException; |
| import java.lang.reflect.Proxy; |
| |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.hdds.conf.ConfigurationSource; |
| import org.apache.hadoop.hdds.conf.OzoneConfiguration; |
| import org.apache.hadoop.ozone.OmUtils; |
| import org.apache.hadoop.ozone.OzoneConsts; |
| import org.apache.hadoop.ozone.client.protocol.ClientProtocol; |
| import org.apache.hadoop.ozone.client.rpc.RpcClient; |
| import org.apache.hadoop.ozone.security.OzoneTokenIdentifier; |
| import org.apache.hadoop.security.token.Token; |
| |
| import com.google.common.base.Preconditions; |
| import org.apache.commons.lang3.StringUtils; |
| import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_ADDRESS_KEY; |
| import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_SERVICE_IDS_KEY; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| /** |
| * Factory class to create OzoneClients. |
| */ |
| public final class OzoneClientFactory { |
| |
| private static final Logger LOG = LoggerFactory.getLogger( |
| OzoneClientFactory.class); |
| |
| /** |
| * Private constructor, class is not meant to be initialized. |
| */ |
| private OzoneClientFactory(){} |
| |
| |
| /** |
| * Constructs and return an OzoneClient with default configuration. |
| * |
| * @return OzoneClient |
| * |
| * @throws IOException |
| */ |
| public static OzoneClient getRpcClient() throws IOException { |
| LOG.info("Creating OzoneClient with default configuration."); |
| return getRpcClient(new OzoneConfiguration()); |
| } |
| |
| /** |
| * Returns an OzoneClient which will use RPC protocol. |
| * |
| * @param omHost |
| * hostname of OzoneManager to connect. |
| * |
| * @param omRpcPort |
| * RPC port of OzoneManager. |
| * |
| * @param config |
| * Configuration to be used for OzoneClient creation |
| * |
| * @return OzoneClient |
| * |
| * @throws IOException |
| */ |
| public static OzoneClient getRpcClient(String omHost, Integer omRpcPort, |
| ConfigurationSource config) |
| throws IOException { |
| Preconditions.checkNotNull(omHost); |
| Preconditions.checkNotNull(omRpcPort); |
| Preconditions.checkNotNull(config); |
| config.set(OZONE_OM_ADDRESS_KEY, omHost + ":" + omRpcPort); |
| return getRpcClient(getClientProtocol(config), config); |
| } |
| |
| /** |
| * Returns an OzoneClient which will use RPC protocol. |
| * |
| * @param omServiceId |
| * Service ID of OzoneManager HA cluster. |
| * |
| * @param config |
| * Configuration to be used for OzoneClient creation |
| * |
| * @return OzoneClient |
| * |
| * @throws IOException |
| */ |
| public static OzoneClient getRpcClient(String omServiceId, |
| ConfigurationSource config) throws IOException { |
| Preconditions.checkNotNull(omServiceId); |
| Preconditions.checkNotNull(config); |
| if (OmUtils.isOmHAServiceId(config, omServiceId)) { |
| return getRpcClient(getClientProtocol(config, omServiceId), config); |
| } else { |
| throw new IOException("Service ID specified " + |
| "does not match with " + OZONE_OM_SERVICE_IDS_KEY + " defined in " + |
| "the configuration. Configured " + OZONE_OM_SERVICE_IDS_KEY + " are" + |
| config.getTrimmedStringCollection(OZONE_OM_SERVICE_IDS_KEY)); |
| } |
| } |
| |
| /** |
| * Returns an OzoneClient which will use RPC protocol. |
| * |
| * @param config |
| * used for OzoneClient creation |
| * |
| * @return OzoneClient |
| * |
| * @throws IOException |
| */ |
| public static OzoneClient getRpcClient(ConfigurationSource config) |
| throws IOException { |
| Preconditions.checkNotNull(config); |
| |
| // Doing this explicitly so that when service ids are defined in the |
| // configuration, we don't fall back to default ozone.om.address defined |
| // in ozone-default.xml. |
| |
| if (OmUtils.isServiceIdsDefined(config)) { |
| throw new IOException("Following ServiceID's " + |
| config.getTrimmedStringCollection(OZONE_OM_SERVICE_IDS_KEY) + " are" + |
| " defined in the configuration. Use the method getRpcClient which " + |
| "takes serviceID and configuration as param"); |
| } |
| return getRpcClient(getClientProtocol(config), config); |
| } |
| |
| /** |
| * Creates OzoneClient with the given ClientProtocol and Configuration. |
| * |
| * @param clientProtocol |
| * Protocol to be used by the OzoneClient |
| * |
| * @param config |
| * Configuration to be used for OzoneClient creation |
| */ |
| private static OzoneClient getRpcClient(ClientProtocol clientProtocol, |
| ConfigurationSource config) { |
| OzoneClientInvocationHandler clientHandler = |
| new OzoneClientInvocationHandler(clientProtocol); |
| ClientProtocol proxy = (ClientProtocol) Proxy.newProxyInstance( |
| OzoneClientInvocationHandler.class.getClassLoader(), |
| new Class<?>[]{ClientProtocol.class}, clientHandler); |
| return new OzoneClient(config, proxy); |
| } |
| |
| /** |
| * Create OzoneClient for token renew/cancel operations. |
| * @param conf Configuration to be used for OzoneCient creation |
| * @param token ozone token is involved |
| * @return |
| * @throws IOException |
| */ |
| public static OzoneClient getOzoneClient(Configuration conf, |
| Token<OzoneTokenIdentifier> token) throws IOException { |
| Preconditions.checkNotNull(token, "Null token is not allowed"); |
| OzoneTokenIdentifier tokenId = new OzoneTokenIdentifier(); |
| ByteArrayInputStream buf = new ByteArrayInputStream( |
| token.getIdentifier()); |
| DataInputStream in = new DataInputStream(buf); |
| tokenId.readFields(in); |
| String omServiceId = tokenId.getOmServiceId(); |
| OzoneConfiguration ozoneConf = OzoneConfiguration.of(conf); |
| // Must check with OzoneConfiguration so that ozone-site.xml is loaded. |
| if (StringUtils.isNotEmpty(omServiceId)) { |
| // new OM should always issue token with omServiceId |
| if (!OmUtils.isServiceIdsDefined(ozoneConf) |
| && omServiceId.equals(OzoneConsts.OM_SERVICE_ID_DEFAULT)) { |
| // Non-HA or single-node Ratis HA |
| return OzoneClientFactory.getRpcClient(ozoneConf); |
| } else if (OmUtils.isOmHAServiceId(ozoneConf, omServiceId)) { |
| // HA with matching service id |
| return OzoneClientFactory.getRpcClient(omServiceId, ozoneConf); |
| } else { |
| // HA with mismatched service id |
| throw new IOException("Service ID specified " + omServiceId + |
| " does not match" + " with " + OZONE_OM_SERVICE_IDS_KEY + |
| " defined in the " + "configuration. Configured " + |
| OZONE_OM_SERVICE_IDS_KEY + " are" + |
| ozoneConf.getTrimmedStringCollection(OZONE_OM_SERVICE_IDS_KEY)); |
| } |
| } else { |
| // Old OM may issue token without omServiceId that should work |
| // with non-HA case |
| if (!OmUtils.isServiceIdsDefined(ozoneConf)) { |
| return OzoneClientFactory.getRpcClient(ozoneConf); |
| } else { |
| throw new IOException("OzoneToken with no service ID can't " |
| + "be renewed or canceled with local OM HA setup because we " |
| + "don't know if the token is issued from local OM HA cluster " |
| + "or not."); |
| } |
| } |
| } |
| |
| /** |
| * Returns an instance of Protocol class. |
| * |
| * |
| * @param config |
| * Configuration used to initialize ClientProtocol. |
| * |
| * @return ClientProtocol |
| * |
| * @throws IOException |
| */ |
| private static ClientProtocol getClientProtocol(ConfigurationSource config) |
| throws IOException { |
| return getClientProtocol(config, null); |
| } |
| |
| /** |
| * Returns an instance of Protocol class. |
| * |
| * |
| * @param config |
| * Configuration used to initialize ClientProtocol. |
| * |
| * @return ClientProtocol |
| * |
| * @throws IOException |
| */ |
| private static ClientProtocol getClientProtocol(ConfigurationSource config, |
| String omServiceId) throws IOException { |
| try { |
| return new RpcClient(config, omServiceId); |
| } catch (Exception e) { |
| final String message = "Couldn't create RpcClient protocol"; |
| LOG.error(message + " exception: ", e); |
| if (e.getCause() instanceof IOException) { |
| throw (IOException) e.getCause(); |
| } else { |
| throw new IOException(message, e); |
| } |
| } |
| } |
| |
| } |