blob: 5063e00cfdfe714735152ef58d41ffd325f717e5 [file] [log] [blame]
/*
*
*
* Licensed to the Apache Software Foundation (ASF) under one or more
*
* contributor license agreements. See the NOTICE file distributed with
*
* this work for additional information regarding copyright ownership.
*
* The ASF licenses this file to You under the Apache License, Version 2.0
*
* (the "License"); you may not use this file except in compliance with
*
* the License. You may obtain a copy of the License at
*
*
*
* http://www.apache.org/licenses/LICENSE-2.0
*
*
*
* Unless required by applicable law or agreed to in writing, software
*
* distributed under the License is distributed on an "AS IS" BASIS,
*
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*
* See the License for the specific language governing permissions and
*
* limitations under the License.
*
* /
*/
package org.apache.kylin.storage.hbase.steps;
import com.google.common.collect.Lists;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.Put;
import org.apache.kylin.common.util.ImmutableBitSet;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.cube.cuboid.Cuboid;
import org.apache.kylin.cube.inmemcubing.ICuboidWriter;
import org.apache.kylin.cube.kv.AbstractRowKeyEncoder;
import org.apache.kylin.cube.model.CubeDesc;
import org.apache.kylin.cube.model.HBaseColumnDesc;
import org.apache.kylin.cube.model.HBaseColumnFamilyDesc;
import org.apache.kylin.gridtable.GTRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.List;
/**
*/
public class HBaseCuboidWriter implements ICuboidWriter {
private static final Logger logger = LoggerFactory.getLogger(HBaseCuboidWriter.class);
private static final int BATCH_PUT_THRESHOLD = 10000;
private final List<KeyValueCreator> keyValueCreators;
private final int nColumns;
private final Table hTable;
private final CubeDesc cubeDesc;
private final CubeSegment cubeSegment;
private final Object[] measureValues;
private List<Put> puts = Lists.newArrayList();
private AbstractRowKeyEncoder rowKeyEncoder;
private byte[] keybuf;
public HBaseCuboidWriter(CubeSegment segment, Table hTable) {
this.keyValueCreators = Lists.newArrayList();
this.cubeSegment = segment;
this.cubeDesc = cubeSegment.getCubeDesc();
for (HBaseColumnFamilyDesc cfDesc : cubeDesc.getHbaseMapping().getColumnFamily()) {
for (HBaseColumnDesc colDesc : cfDesc.getColumns()) {
keyValueCreators.add(new KeyValueCreator(cubeDesc, colDesc));
}
}
this.nColumns = keyValueCreators.size();
this.hTable = hTable;
this.measureValues = new Object[cubeDesc.getMeasures().size()];
}
private byte[] copy(byte[] array, int offset, int length) {
byte[] result = new byte[length];
System.arraycopy(array, offset, result, 0, length);
return result;
}
//TODO:shardingonstreaming
private byte[] createKey(Long cuboidId, GTRecord record) {
if (rowKeyEncoder == null || rowKeyEncoder.getCuboidID() != cuboidId) {
rowKeyEncoder = AbstractRowKeyEncoder.createInstance(cubeSegment, Cuboid.findById(cubeDesc, cuboidId));
keybuf = rowKeyEncoder.createBuf();
}
rowKeyEncoder.encode(record, record.getInfo().getPrimaryKey(), keybuf);
return keybuf;
}
@Override
public void write(long cuboidId, GTRecord record) throws IOException {
byte[] key = createKey(cuboidId, record);
final Cuboid cuboid = Cuboid.findById(cubeDesc, cuboidId);
final int nDims = cuboid.getColumns().size();
final ImmutableBitSet bitSet = new ImmutableBitSet(nDims, nDims + cubeDesc.getMeasures().size());
for (int i = 0; i < nColumns; i++) {
final Object[] values = record.getValues(bitSet, measureValues);
final KeyValue keyValue = keyValueCreators.get(i).create(key, 0, key.length, values);
final Put put = new Put(copy(key, 0, key.length));
byte[] family = copy(keyValue.getFamilyArray(), keyValue.getFamilyOffset(), keyValue.getFamilyLength());
byte[] qualifier = copy(keyValue.getQualifierArray(), keyValue.getQualifierOffset(), keyValue.getQualifierLength());
byte[] value = copy(keyValue.getValueArray(), keyValue.getValueOffset(), keyValue.getValueLength());
put.add(family, qualifier, value);
puts.add(put);
}
if (puts.size() >= BATCH_PUT_THRESHOLD) {
flush();
}
}
@Override
public final void flush() throws IOException {
if (!puts.isEmpty()) {
long t = System.currentTimeMillis();
if (hTable != null) {
hTable.put(puts);
}
logger.info("commit total " + puts.size() + " puts, totally cost:" + (System.currentTimeMillis() - t) + "ms");
puts.clear();
}
}
@Override
public void close() throws IOException {
flush();
IOUtils.closeQuietly(hTable);
}
}