blob: 149943936ee7848ff8fc51374db3c5984535105e [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.accumulo.core.client.impl;
import static com.google.common.base.Preconditions.checkArgument;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.BatchWriter;
import org.apache.accumulo.core.client.BatchWriterConfig;
import org.apache.accumulo.core.client.MultiTableBatchWriter;
import org.apache.accumulo.core.client.MutationsRejectedException;
import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.client.TableOfflineException;
import org.apache.accumulo.core.data.Mutation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.util.concurrent.UncheckedExecutionException;
public class MultiTableBatchWriterImpl implements MultiTableBatchWriter {
private static final Logger log = LoggerFactory.getLogger(MultiTableBatchWriterImpl.class);
private AtomicBoolean closed;
private class TableBatchWriter implements BatchWriter {
private Table.ID tableId;
TableBatchWriter(Table.ID tableId) {
this.tableId = tableId;
}
@Override
public void addMutation(Mutation m) throws MutationsRejectedException {
checkArgument(m != null, "m is null");
bw.addMutation(tableId, m);
}
@Override
public void addMutations(Iterable<Mutation> iterable) throws MutationsRejectedException {
bw.addMutation(tableId, iterable.iterator());
}
@Override
public void close() {
throw new UnsupportedOperationException(
"Must close all tables, can not close an individual table");
}
@Override
public void flush() {
throw new UnsupportedOperationException(
"Must flush all tables, can not flush an individual table");
}
}
private TabletServerBatchWriter bw;
private ConcurrentHashMap<Table.ID,BatchWriter> tableWriters;
private final ClientContext context;
public MultiTableBatchWriterImpl(ClientContext context, BatchWriterConfig config) {
checkArgument(context != null, "context is null");
checkArgument(config != null, "config is null");
this.context = context;
this.bw = new TabletServerBatchWriter(context, config);
tableWriters = new ConcurrentHashMap<>();
this.closed = new AtomicBoolean(false);
}
@Override
public boolean isClosed() {
return this.closed.get();
}
@Override
public void close() throws MutationsRejectedException {
this.closed.set(true);
bw.close();
}
// WARNING: do not rely upon finalize to close this class. Finalize is not guaranteed to be
// called.
@Override
protected void finalize() {
if (!closed.get()) {
log.warn("{} not shutdown; did you forget to call close()?",
MultiTableBatchWriterImpl.class.getSimpleName());
try {
close();
} catch (MutationsRejectedException mre) {
log.error(MultiTableBatchWriterImpl.class.getSimpleName() + " internal error.", mre);
throw new RuntimeException(
"Exception when closing " + MultiTableBatchWriterImpl.class.getSimpleName(), mre);
}
}
}
/**
* Returns the table ID for the given table name.
*
* @param tableName
* The name of the table which to find the ID for
* @return The table ID, or null if the table name doesn't exist
*/
private Table.ID getId(String tableName) throws TableNotFoundException {
try {
return Tables.getTableId(context, tableName);
} catch (UncheckedExecutionException e) {
Throwable cause = e.getCause();
log.error("Unexpected exception when fetching table id for {}", tableName);
if (null == cause) {
throw new RuntimeException(e);
} else if (cause instanceof TableNotFoundException) {
throw (TableNotFoundException) cause;
} else if (cause instanceof TableOfflineException) {
throw (TableOfflineException) cause;
}
throw e;
}
}
@Override
public BatchWriter getBatchWriter(String tableName)
throws AccumuloException, AccumuloSecurityException, TableNotFoundException {
checkArgument(tableName != null, "tableName is null");
Table.ID tableId = getId(tableName);
BatchWriter tbw = tableWriters.get(tableId);
if (tbw == null) {
tbw = new TableBatchWriter(tableId);
BatchWriter current = tableWriters.putIfAbsent(tableId, tbw);
// return the current one if another thread created one first
return current != null ? current : tbw;
} else {
return tbw;
}
}
@Override
public void flush() throws MutationsRejectedException {
bw.flush();
}
}