blob: 4987cbd91dec6115aae6148b04fd5f6d02537b73 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.accumulo.core.client.impl;
import static com.google.common.base.Preconditions.checkArgument;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.Objects;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import org.apache.accumulo.core.client.AccumuloClient;
import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.BatchDeleter;
import org.apache.accumulo.core.client.BatchScanner;
import org.apache.accumulo.core.client.BatchWriter;
import org.apache.accumulo.core.client.BatchWriterConfig;
import org.apache.accumulo.core.client.ClientInfo;
import org.apache.accumulo.core.client.ConditionalWriter;
import org.apache.accumulo.core.client.ConditionalWriterConfig;
import org.apache.accumulo.core.client.MultiTableBatchWriter;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.client.TableOfflineException;
import org.apache.accumulo.core.client.admin.InstanceOperations;
import org.apache.accumulo.core.client.admin.NamespaceOperations;
import org.apache.accumulo.core.client.admin.ReplicationOperations;
import org.apache.accumulo.core.client.admin.SecurityOperations;
import org.apache.accumulo.core.client.admin.TableOperations;
import org.apache.accumulo.core.client.impl.thrift.SecurityErrorCode;
import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
import org.apache.accumulo.core.conf.ClientProperty;
import org.apache.accumulo.core.master.state.tables.TableState;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.core.trace.Tracer;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
public class AccumuloClientImpl extends org.apache.accumulo.core.client.Connector
implements AccumuloClient {
private static final String SYSTEM_TOKEN_NAME = "org.apache.accumulo.server.security."
+ "SystemCredentials$SystemToken";
private final ClientContext context;
private final String instanceID;
private SecurityOperations secops = null;
private TableOperationsImpl tableops = null;
private NamespaceOperations namespaceops = null;
private InstanceOperations instanceops = null;
private ReplicationOperations replicationops = null;
public AccumuloClientImpl(final ClientContext context)
throws AccumuloSecurityException, AccumuloException {
checkArgument(context != null, "Context is null");
checkArgument(context.getCredentials() != null, "Credentials are null");
checkArgument(context.getCredentials().getToken() != null, "Authentication token is null");
if (context.getCredentials().getToken().isDestroyed())
throw new AccumuloSecurityException(context.getCredentials().getPrincipal(),
SecurityErrorCode.TOKEN_EXPIRED);
this.context = context;
instanceID = context.getInstanceID();
// Skip fail fast for system services; string literal for class name, to avoid dependency on
// server jar
final String tokenClassName = context.getCredentials().getToken().getClass().getName();
if (!SYSTEM_TOKEN_NAME.equals(tokenClassName)) {
ServerClient.executeVoid(context, iface -> {
if (!iface.authenticate(Tracer.traceInfo(), context.rpcCreds()))
throw new AccumuloSecurityException("Authentication failed, access denied",
SecurityErrorCode.BAD_CREDENTIALS);
});
}
this.tableops = new TableOperationsImpl(context);
this.namespaceops = new NamespaceOperationsImpl(context, tableops);
}
private Table.ID getTableId(String tableName) throws TableNotFoundException {
Table.ID tableId = Tables.getTableId(context, tableName);
if (Tables.getTableState(context, tableId) == TableState.OFFLINE)
throw new TableOfflineException(Tables.getTableOfflineMsg(context, tableId));
return tableId;
}
@Override
@Deprecated
public org.apache.accumulo.core.client.Instance getInstance() {
return context.getDeprecatedInstance();
}
@Override
public BatchScanner createBatchScanner(String tableName, Authorizations authorizations,
int numQueryThreads) throws TableNotFoundException {
checkArgument(tableName != null, "tableName is null");
checkArgument(authorizations != null, "authorizations is null");
return new TabletServerBatchReader(context, getTableId(tableName), authorizations,
numQueryThreads);
}
@Override
public BatchScanner createBatchScanner(String tableName, Authorizations authorizations)
throws TableNotFoundException {
Integer numQueryThreads = ClientProperty.BATCH_SCANNER_NUM_QUERY_THREADS
.getInteger(context.getClientInfo().getProperties());
Objects.requireNonNull(numQueryThreads);
return createBatchScanner(tableName, authorizations, numQueryThreads);
}
@Deprecated
@Override
public BatchDeleter createBatchDeleter(String tableName, Authorizations authorizations,
int numQueryThreads, long maxMemory, long maxLatency, int maxWriteThreads)
throws TableNotFoundException {
checkArgument(tableName != null, "tableName is null");
checkArgument(authorizations != null, "authorizations is null");
return new TabletServerBatchDeleter(context, getTableId(tableName), authorizations,
numQueryThreads, new BatchWriterConfig().setMaxMemory(maxMemory)
.setMaxLatency(maxLatency, TimeUnit.MILLISECONDS).setMaxWriteThreads(maxWriteThreads));
}
@Override
public BatchDeleter createBatchDeleter(String tableName, Authorizations authorizations,
int numQueryThreads, BatchWriterConfig config) throws TableNotFoundException {
checkArgument(tableName != null, "tableName is null");
checkArgument(authorizations != null, "authorizations is null");
return new TabletServerBatchDeleter(context, getTableId(tableName), authorizations,
numQueryThreads, config.merge(context.getBatchWriterConfig()));
}
@Override
public BatchDeleter createBatchDeleter(String tableName, Authorizations authorizations,
int numQueryThreads) throws TableNotFoundException {
return createBatchDeleter(tableName, authorizations, numQueryThreads, new BatchWriterConfig());
}
@Deprecated
@Override
public BatchWriter createBatchWriter(String tableName, long maxMemory, long maxLatency,
int maxWriteThreads) throws TableNotFoundException {
checkArgument(tableName != null, "tableName is null");
return new BatchWriterImpl(context, getTableId(tableName),
new BatchWriterConfig().setMaxMemory(maxMemory)
.setMaxLatency(maxLatency, TimeUnit.MILLISECONDS).setMaxWriteThreads(maxWriteThreads));
}
@Override
public BatchWriter createBatchWriter(String tableName, BatchWriterConfig config)
throws TableNotFoundException {
checkArgument(tableName != null, "tableName is null");
// we used to allow null inputs for bw config
if (config == null) {
config = new BatchWriterConfig();
}
return new BatchWriterImpl(context, getTableId(tableName),
config.merge(context.getBatchWriterConfig()));
}
@Override
public BatchWriter createBatchWriter(String tableName) throws TableNotFoundException {
return createBatchWriter(tableName, new BatchWriterConfig());
}
@Deprecated
@Override
public MultiTableBatchWriter createMultiTableBatchWriter(long maxMemory, long maxLatency,
int maxWriteThreads) {
return new MultiTableBatchWriterImpl(context, new BatchWriterConfig().setMaxMemory(maxMemory)
.setMaxLatency(maxLatency, TimeUnit.MILLISECONDS).setMaxWriteThreads(maxWriteThreads));
}
@Override
public MultiTableBatchWriter createMultiTableBatchWriter(BatchWriterConfig config) {
return new MultiTableBatchWriterImpl(context, config.merge(context.getBatchWriterConfig()));
}
@Override
public MultiTableBatchWriter createMultiTableBatchWriter() {
return createMultiTableBatchWriter(new BatchWriterConfig());
}
@Override
public ConditionalWriter createConditionalWriter(String tableName, ConditionalWriterConfig config)
throws TableNotFoundException {
return new ConditionalWriterImpl(context, getTableId(tableName), config);
}
@Override
public Scanner createScanner(String tableName, Authorizations authorizations)
throws TableNotFoundException {
checkArgument(tableName != null, "tableName is null");
checkArgument(authorizations != null, "authorizations is null");
Scanner scanner = new ScannerImpl(context, getTableId(tableName), authorizations);
Integer batchSize = ClientProperty.SCANNER_BATCH_SIZE
.getInteger(context.getClientInfo().getProperties());
if (batchSize != null) {
scanner.setBatchSize(batchSize);
}
return scanner;
}
@Override
public String whoami() {
return context.getCredentials().getPrincipal();
}
@Override
public String getInstanceID() {
return instanceID;
}
@Override
public synchronized TableOperations tableOperations() {
return tableops;
}
@Override
public synchronized NamespaceOperations namespaceOperations() {
return namespaceops;
}
@Override
public synchronized SecurityOperations securityOperations() {
if (secops == null)
secops = new SecurityOperationsImpl(context);
return secops;
}
@Override
public synchronized InstanceOperations instanceOperations() {
if (instanceops == null)
instanceops = new InstanceOperationsImpl(context);
return instanceops;
}
@Override
public synchronized ReplicationOperations replicationOperations() {
if (null == replicationops) {
replicationops = new ReplicationOperationsImpl(context);
}
return replicationops;
}
@Override
public ClientInfo info() {
return this.context.getClientInfo();
}
@Override
public AccumuloClient changeUser(String principal, AuthenticationToken token)
throws AccumuloSecurityException, AccumuloException {
return new AccumuloClientBuilderImpl().usingClientInfo(info()).usingToken(principal, token)
.build();
}
public static class AccumuloClientBuilderImpl
implements InstanceArgs, PropertyOptions, ClientInfoOptions, AuthenticationArgs,
ConnectionOptions, SslOptions, SaslOptions, AccumuloClientFactory, FromOptions {
private Properties properties = new Properties();
private AuthenticationToken token = null;
private ClientInfo getClientInfo() {
if (token != null) {
return new ClientInfoImpl(properties, token);
}
return new ClientInfoImpl(properties);
}
@Override
public AccumuloClient build() throws AccumuloException, AccumuloSecurityException {
return org.apache.accumulo.core.client.impl.ClientInfoFactory.getClient(getClientInfo());
}
@Override
public ClientInfo info() {
return getClientInfo();
}
@Override
public AuthenticationArgs forInstance(String instanceName, String zookeepers) {
setProperty(ClientProperty.INSTANCE_NAME, instanceName);
setProperty(ClientProperty.INSTANCE_ZOOKEEPERS, zookeepers);
return this;
}
@Override
public SslOptions withTruststore(String path) {
setProperty(ClientProperty.SSL_TRUSTSTORE_PATH, path);
return this;
}
@Override
public SslOptions withTruststore(String path, String password, String type) {
setProperty(ClientProperty.SSL_TRUSTSTORE_PATH, path);
setProperty(ClientProperty.SSL_TRUSTSTORE_PASSWORD, password);
setProperty(ClientProperty.SSL_TRUSTSTORE_TYPE, type);
return this;
}
@Override
public SslOptions withKeystore(String path) {
setProperty(ClientProperty.SSL_KEYSTORE_PATH, path);
return this;
}
@Override
public SslOptions withKeystore(String path, String password, String type) {
setProperty(ClientProperty.SSL_KEYSTORE_PATH, path);
setProperty(ClientProperty.SSL_KEYSTORE_PASSWORD, password);
setProperty(ClientProperty.SSL_KEYSTORE_TYPE, type);
return this;
}
@Override
public SslOptions useJsse() {
setProperty(ClientProperty.SSL_USE_JSSE, "true");
return this;
}
@Override
public ConnectionOptions withZkTimeout(int timeout) {
setProperty(ClientProperty.INSTANCE_ZOOKEEPERS_TIMEOUT, Integer.toString(timeout) + "ms");
return this;
}
@Override
public SslOptions withSsl() {
setProperty(ClientProperty.SSL_ENABLED, "true");
return this;
}
@Override
public SaslOptions withSasl() {
setProperty(ClientProperty.SASL_ENABLED, "true");
return this;
}
@Override
public ConnectionOptions withBatchWriterConfig(BatchWriterConfig batchWriterConfig) {
setProperty(ClientProperty.BATCH_WRITER_MAX_MEMORY_BYTES, batchWriterConfig.getMaxMemory());
setProperty(ClientProperty.BATCH_WRITER_MAX_LATENCY_SEC,
batchWriterConfig.getMaxLatency(TimeUnit.SECONDS));
setProperty(ClientProperty.BATCH_WRITER_MAX_TIMEOUT_SEC,
batchWriterConfig.getTimeout(TimeUnit.SECONDS));
setProperty(ClientProperty.BATCH_WRITER_MAX_WRITE_THREADS,
batchWriterConfig.getMaxWriteThreads());
setProperty(ClientProperty.BATCH_WRITER_DURABILITY,
batchWriterConfig.getDurability().toString());
return this;
}
@Override
public ConnectionOptions withBatchScannerQueryThreads(int numQueryThreads) {
setProperty(ClientProperty.BATCH_SCANNER_NUM_QUERY_THREADS, numQueryThreads);
return this;
}
@Override
public ConnectionOptions withScannerBatchSize(int batchSize) {
setProperty(ClientProperty.SCANNER_BATCH_SIZE, batchSize);
return this;
}
@Override
public SaslOptions withPrimary(String kerberosServerPrimary) {
setProperty(ClientProperty.SASL_KERBEROS_SERVER_PRIMARY, kerberosServerPrimary);
return this;
}
@Override
public SaslOptions withQop(String qualityOfProtection) {
setProperty(ClientProperty.SASL_QOP, qualityOfProtection);
return this;
}
@SuppressFBWarnings(value = "PATH_TRAVERSAL_IN",
justification = "code runs in same security context as user who provided configFile")
@Override
public AccumuloClientFactory usingProperties(String configFile) {
Properties properties = new Properties();
try (InputStream is = new FileInputStream(configFile)) {
properties.load(is);
} catch (IOException e) {
throw new IllegalArgumentException(e);
}
return usingProperties(properties);
}
@Override
public AccumuloClientFactory usingProperties(Properties properties) {
this.properties = properties;
return this;
}
@Override
public ConnectionOptions usingPassword(String principal, CharSequence password) {
setProperty(ClientProperty.AUTH_PRINCIPAL, principal);
ClientProperty.setPassword(properties, password.toString());
return this;
}
@Override
public ConnectionOptions usingKerberos(String principal, String keyTabFile) {
setProperty(ClientProperty.AUTH_PRINCIPAL, principal);
ClientProperty.setKerberosKeytab(properties, keyTabFile);
return this;
}
@Override
public ConnectionOptions usingToken(String principal, AuthenticationToken token) {
setProperty(ClientProperty.AUTH_PRINCIPAL, principal);
this.token = token;
return this;
}
@Override
public FromOptions usingClientInfo(ClientInfo clientInfo) {
this.properties = clientInfo.getProperties();
return this;
}
public void setProperty(ClientProperty property, String value) {
properties.setProperty(property.getKey(), value);
}
public void setProperty(ClientProperty property, Long value) {
setProperty(property, Long.toString(value));
}
public void setProperty(ClientProperty property, Integer value) {
setProperty(property, Integer.toString(value));
}
}
}