blob: 76230c336c7322d8f1aeabf020ddee3ef5193faa [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.accumulo.core.client.impl;
import static com.google.common.base.Preconditions.checkArgument;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.accumulo.fate.util.UtilWaitThread.sleepUninterruptibly;
import java.util.concurrent.TimeUnit;
import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.client.impl.TabletLocator.TabletLocation;
import org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.impl.KeyExtent;
import org.apache.accumulo.core.rpc.ThriftUtil;
import org.apache.accumulo.core.tabletserver.thrift.ConstraintViolationException;
import org.apache.accumulo.core.tabletserver.thrift.NotServingTabletException;
import org.apache.accumulo.core.tabletserver.thrift.TDurability;
import org.apache.accumulo.core.tabletserver.thrift.TabletClientService;
import org.apache.accumulo.core.trace.Tracer;
import org.apache.accumulo.core.util.HostAndPort;
import org.apache.hadoop.io.Text;
import org.apache.thrift.TException;
import org.apache.thrift.TServiceClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class Writer {
private static final Logger log = LoggerFactory.getLogger(Writer.class);
private ClientContext context;
private Table.ID tableId;
public Writer(ClientContext context, Table.ID tableId) {
checkArgument(context != null, "context is null");
checkArgument(tableId != null, "tableId is null");
this.context = context;
this.tableId = tableId;
}
private static void updateServer(ClientContext context, Mutation m, KeyExtent extent,
HostAndPort server) throws TException, NotServingTabletException,
ConstraintViolationException, AccumuloSecurityException {
checkArgument(m != null, "m is null");
checkArgument(extent != null, "extent is null");
checkArgument(server != null, "server is null");
checkArgument(context != null, "context is null");
TabletClientService.Iface client = null;
try {
client = ThriftUtil.getTServerClient(server, context);
client.update(Tracer.traceInfo(), context.rpcCreds(), extent.toThrift(), m.toThrift(),
TDurability.DEFAULT);
return;
} catch (ThriftSecurityException e) {
throw new AccumuloSecurityException(e.user, e.code);
} finally {
ThriftUtil.returnClient((TServiceClient) client);
}
}
public void update(Mutation m) throws AccumuloException, AccumuloSecurityException,
ConstraintViolationException, TableNotFoundException {
checkArgument(m != null, "m is null");
if (m.size() == 0)
throw new IllegalArgumentException("Can not add empty mutations");
while (true) {
TabletLocation tabLoc = TabletLocator.getLocator(context, tableId).locateTablet(context,
new Text(m.getRow()), false, true);
if (tabLoc == null) {
log.trace("No tablet location found for row {}", new String(m.getRow(), UTF_8));
sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
continue;
}
final HostAndPort parsedLocation = HostAndPort.fromString(tabLoc.tablet_location);
try {
updateServer(context, m, tabLoc.tablet_extent, parsedLocation);
return;
} catch (NotServingTabletException e) {
log.trace("Not serving tablet, server = {}", parsedLocation);
TabletLocator.getLocator(context, tableId).invalidateCache(tabLoc.tablet_extent);
} catch (ConstraintViolationException cve) {
log.error("error sending update to {}", parsedLocation, cve);
// probably do not need to invalidate cache, but it does not hurt
TabletLocator.getLocator(context, tableId).invalidateCache(tabLoc.tablet_extent);
throw cve;
} catch (TException e) {
log.error("error sending update to {}", parsedLocation, e);
TabletLocator.getLocator(context, tableId).invalidateCache(tabLoc.tablet_extent);
}
sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
}
}
}