blob: cc37ade2f7d975fd0acdce4dcce46e19a35cda18 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.accumulo.core.client;
import static java.nio.charset.StandardCharsets.UTF_8;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.accumulo.core.client.impl.DurabilityImpl;
import org.apache.commons.lang.builder.HashCodeBuilder;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.util.StringUtils;
/**
* This object holds configuration settings used to instantiate a {@link BatchWriter}
*
* @since 1.5.0
*/
public class BatchWriterConfig implements Writable {
private static final Long DEFAULT_MAX_MEMORY = 50 * 1024 * 1024L;
private Long maxMemory = null;
private static final Long DEFAULT_MAX_LATENCY = 2 * 60 * 1000L;
private Long maxLatency = null;
private static final Long DEFAULT_TIMEOUT = Long.MAX_VALUE;
private Long timeout = null;
private static final Integer DEFAULT_MAX_WRITE_THREADS = 3;
private Integer maxWriteThreads = null;
private Durability durability = Durability.DEFAULT;
private boolean isDurabilitySet = false;
/**
* Sets the maximum memory to batch before writing. The smaller this value, the more frequently
* the {@link BatchWriter} will write.<br>
* If set to a value smaller than a single mutation, then it will {@link BatchWriter#flush()}
* after each added mutation. Must be non-negative.
*
* <p>
* <b>Default:</b> 50M
*
* @param maxMemory
* max size in bytes
* @throws IllegalArgumentException
* if {@code maxMemory} is less than 0
* @return {@code this} to allow chaining of set methods
*/
public BatchWriterConfig setMaxMemory(long maxMemory) {
if (maxMemory < 0)
throw new IllegalArgumentException("Max memory must be non-negative.");
this.maxMemory = maxMemory;
return this;
}
/**
* Sets the maximum amount of time to hold the data in memory before flushing it to servers.<br>
* For no maximum, set to zero, or {@link Long#MAX_VALUE} with {@link TimeUnit#MILLISECONDS}.
*
* <p>
* {@link TimeUnit#MICROSECONDS} or {@link TimeUnit#NANOSECONDS} will be truncated to the nearest
* {@link TimeUnit#MILLISECONDS}.<br>
* If this truncation would result in making the value zero when it was specified as non-zero,
* then a minimum value of one {@link TimeUnit#MILLISECONDS} will be used.
*
* <p>
* <b>Default:</b> 120 seconds
*
* @param maxLatency
* the maximum latency, in the unit specified by the value of {@code timeUnit}
* @param timeUnit
* determines how {@code maxLatency} will be interpreted
* @throws IllegalArgumentException
* if {@code maxLatency} is less than 0
* @return {@code this} to allow chaining of set methods
*/
public BatchWriterConfig setMaxLatency(long maxLatency, TimeUnit timeUnit) {
if (maxLatency < 0)
throw new IllegalArgumentException("Negative max latency not allowed " + maxLatency);
if (maxLatency == 0)
this.maxLatency = Long.MAX_VALUE;
else
// make small, positive values that truncate to 0 when converted use the minimum millis
// instead
this.maxLatency = Math.max(1, timeUnit.toMillis(maxLatency));
return this;
}
/**
* Sets the maximum amount of time an unresponsive server will be re-tried. When this timeout is
* exceeded, the {@link BatchWriter} should throw an exception.<br>
* For no timeout, set to zero, or {@link Long#MAX_VALUE} with {@link TimeUnit#MILLISECONDS}.
*
* <p>
* {@link TimeUnit#MICROSECONDS} or {@link TimeUnit#NANOSECONDS} will be truncated to the nearest
* {@link TimeUnit#MILLISECONDS}.<br>
* If this truncation would result in making the value zero when it was specified as non-zero,
* then a minimum value of one {@link TimeUnit#MILLISECONDS} will be used.
*
* <p>
* <b>Default:</b> {@link Long#MAX_VALUE} (no timeout)
*
* @param timeout
* the timeout, in the unit specified by the value of {@code timeUnit}
* @param timeUnit
* determines how {@code timeout} will be interpreted
* @throws IllegalArgumentException
* if {@code timeout} is less than 0
* @return {@code this} to allow chaining of set methods
*/
public BatchWriterConfig setTimeout(long timeout, TimeUnit timeUnit) {
if (timeout < 0)
throw new IllegalArgumentException("Negative timeout not allowed " + timeout);
if (timeout == 0)
this.timeout = Long.MAX_VALUE;
else
// make small, positive values that truncate to 0 when converted use the minimum millis
// instead
this.timeout = Math.max(1, timeUnit.toMillis(timeout));
return this;
}
/**
* Sets the maximum number of threads to use for writing data to the tablet servers.
*
* <p>
* <b>Default:</b> 3
*
* @param maxWriteThreads
* the maximum threads to use
* @throws IllegalArgumentException
* if {@code maxWriteThreads} is non-positive
* @return {@code this} to allow chaining of set methods
*/
public BatchWriterConfig setMaxWriteThreads(int maxWriteThreads) {
if (maxWriteThreads <= 0)
throw new IllegalArgumentException("Max threads must be positive " + maxWriteThreads);
this.maxWriteThreads = maxWriteThreads;
return this;
}
public long getMaxMemory() {
return maxMemory != null ? maxMemory : DEFAULT_MAX_MEMORY;
}
public long getMaxLatency(TimeUnit timeUnit) {
return timeUnit.convert(maxLatency != null ? maxLatency : DEFAULT_MAX_LATENCY,
TimeUnit.MILLISECONDS);
}
public long getTimeout(TimeUnit timeUnit) {
return timeUnit.convert(timeout != null ? timeout : DEFAULT_TIMEOUT, TimeUnit.MILLISECONDS);
}
public int getMaxWriteThreads() {
return maxWriteThreads != null ? maxWriteThreads : DEFAULT_MAX_WRITE_THREADS;
}
/**
* @since 1.7.0
* @return the durability to be used by the BatchWriter
*/
public Durability getDurability() {
return durability;
}
/**
* Change the durability for the BatchWriter session. The default durability is "default" which is
* the table's durability setting. If the durability is set to something other than the default,
* it will override the durability setting of the table.
*
* @param durability
* the Durability to be used by the BatchWriter
* @since 1.7.0
*
*/
public BatchWriterConfig setDurability(Durability durability) {
this.durability = durability;
isDurabilitySet = true;
return this;
}
@Override
public void write(DataOutput out) throws IOException {
// write this out in a human-readable way
ArrayList<String> fields = new ArrayList<>();
if (maxMemory != null)
addField(fields, "maxMemory", maxMemory);
if (maxLatency != null)
addField(fields, "maxLatency", maxLatency);
if (maxWriteThreads != null)
addField(fields, "maxWriteThreads", maxWriteThreads);
if (timeout != null)
addField(fields, "timeout", timeout);
if (durability != Durability.DEFAULT)
addField(fields, "durability", durability);
String output = StringUtils.join(",", fields);
byte[] bytes = output.getBytes(UTF_8);
byte[] len = String.format("%6s#", Integer.toString(bytes.length, 36)).getBytes(UTF_8);
if (len.length != 7)
throw new IllegalStateException("encoded length does not match expected value");
out.write(len);
out.write(bytes);
}
private void addField(List<String> fields, String name, Object value) {
String key = StringUtils.escapeString(name, '\\', new char[] {',', '='});
String val = StringUtils.escapeString(String.valueOf(value), '\\', new char[] {',', '='});
fields.add(key + '=' + val);
}
@Override
public void readFields(DataInput in) throws IOException {
byte[] len = new byte[7];
in.readFully(len);
String strLen = new String(len, UTF_8);
if (!strLen.endsWith("#"))
throw new IllegalStateException("length was not encoded correctly");
byte[] bytes = new byte[Integer
.parseInt(strLen.substring(strLen.lastIndexOf(' ') + 1, strLen.length() - 1), 36)];
in.readFully(bytes);
String strFields = new String(bytes, UTF_8);
String[] fields = StringUtils.split(strFields, '\\', ',');
for (String field : fields) {
String[] keyValue = StringUtils.split(field, '\\', '=');
String key = keyValue[0];
String value = keyValue[1];
if ("maxMemory".equals(key)) {
maxMemory = Long.valueOf(value);
} else if ("maxLatency".equals(key)) {
maxLatency = Long.valueOf(value);
} else if ("maxWriteThreads".equals(key)) {
maxWriteThreads = Integer.valueOf(value);
} else if ("timeout".equals(key)) {
timeout = Long.valueOf(value);
} else if ("durability".equals(key)) {
durability = DurabilityImpl.fromString(value);
} else {
/* ignore any other properties */
}
}
}
@Override
public boolean equals(Object o) {
if (o instanceof BatchWriterConfig) {
BatchWriterConfig other = (BatchWriterConfig) o;
if (null != maxMemory) {
if (!maxMemory.equals(other.maxMemory)) {
return false;
}
} else {
if (null != other.maxMemory) {
return false;
}
}
if (null != maxLatency) {
if (!maxLatency.equals(other.maxLatency)) {
return false;
}
} else {
if (null != other.maxLatency) {
return false;
}
}
if (null != maxWriteThreads) {
if (!maxWriteThreads.equals(other.maxWriteThreads)) {
return false;
}
} else {
if (null != other.maxWriteThreads) {
return false;
}
}
if (null != timeout) {
if (!timeout.equals(other.timeout)) {
return false;
}
} else {
if (null != other.timeout) {
return false;
}
}
return durability == other.durability;
}
return false;
}
private static <T> T merge(T o1, T o2) {
if (o1 != null)
return o1;
return o2;
}
/**
* Merge this BatchWriterConfig with another. If config is set in both, preference will be given
* to this config.
*
* @param other
* Another BatchWriterConfig
* @return Merged BatchWriterConfig
* @since 2.0.0
*/
public BatchWriterConfig merge(BatchWriterConfig other) {
BatchWriterConfig result = new BatchWriterConfig();
result.maxMemory = merge(this.maxMemory, other.maxMemory);
result.maxLatency = merge(this.maxLatency, other.maxLatency);
result.timeout = merge(this.timeout, other.timeout);
result.maxWriteThreads = merge(this.maxWriteThreads, other.maxWriteThreads);
if (this.isDurabilitySet) {
result.durability = this.durability;
} else if (other.isDurabilitySet) {
result.durability = other.durability;
}
return result;
}
@Override
public int hashCode() {
HashCodeBuilder hcb = new HashCodeBuilder();
hcb.append(maxMemory).append(maxLatency).append(maxWriteThreads).append(timeout)
.append(durability);
return hcb.toHashCode();
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder(32);
sb.append("[maxMemory=").append(getMaxMemory()).append(", maxLatency=")
.append(getMaxLatency(TimeUnit.MILLISECONDS)).append(", maxWriteThreads=")
.append(getMaxWriteThreads()).append(", timeout=").append(getTimeout(TimeUnit.MILLISECONDS))
.append(", durability=").append(durability).append("]");
return sb.toString();
}
}