blob: 43de55a8ecbb220fb6b7975ca70d40eb13675df6 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.omid.timestamp.storage;
import static com.google.common.base.Charsets.UTF_8;
import java.io.IOException;
import javax.inject.Inject;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.CheckAndMutate;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Stores the max timestamp assigned by the TO in HBase.
* It's always written non-transactionally in the same row and column
*/
public class HBaseTimestampStorage implements TimestampStorage {
private static final long INITIAL_MAX_TS_VALUE = 0;
private static final Logger LOG = LoggerFactory.getLogger(HBaseTimestampStorage.class);
// ROW and COLUMN to write the assigned timestamp
private static final byte[] TSO_ROW = "MAX_TIMESTAMP_R".getBytes(UTF_8);
private static final byte[] TSO_QUALIFIER = "MAX_TIMESTAMP_Q".getBytes(UTF_8);
private final Table table;
private final byte[] cfName;
private final Connection connection;
@Inject
public HBaseTimestampStorage(Configuration hbaseConfig, HBaseTimestampStorageConfig config) throws IOException {
connection = ConnectionFactory.createConnection(hbaseConfig);
this.table = connection.getTable(TableName.valueOf(config.getTableName()));
this.cfName = config.getFamilyName().getBytes(UTF_8);
}
@Override
public void updateMaxTimestamp(long previousMaxTimestamp, long newMaxTimestamp) throws IOException {
if (newMaxTimestamp < 0) {
LOG.error("Negative value received for maxTimestamp: {}", newMaxTimestamp);
throw new IllegalArgumentException("Negative value received for maxTimestamp" + newMaxTimestamp);
}
Put put = new Put(TSO_ROW);
put.addColumn(cfName, TSO_QUALIFIER, Bytes.toBytes(newMaxTimestamp));
byte[] previousVal = null;
if (previousMaxTimestamp != INITIAL_MAX_TS_VALUE) {
previousVal = Bytes.toBytes(previousMaxTimestamp);
}
CheckAndMutate checkAndPut = CheckAndMutate.newBuilder(TSO_ROW)
.ifEquals(cfName, TSO_QUALIFIER, previousVal)
.build(put);
if (!table.checkAndMutate(checkAndPut).isSuccess()) {
throw new IOException("Previous max timestamp is incorrect");
}
}
@Override
public long getMaxTimestamp() throws IOException {
Get get = new Get(TSO_ROW);
get.addColumn(cfName, TSO_QUALIFIER);
Result result = table.get(get);
if (result.containsColumn(cfName, TSO_QUALIFIER)) {
return Bytes.toLong(result.getValue(cfName, TSO_QUALIFIER));
} else {
// This happens for example when a new cluster is created
return INITIAL_MAX_TS_VALUE;
}
}
public void close() throws IOException {
//TODO this is never called
table.close();
connection.close();
}
}