blob: 18104017744e9dab4f5a731508934c742e3b3469 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.eagle.storage.hbase;
import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
import org.apache.eagle.log.entity.GenericEntityWriter;
import org.apache.eagle.log.entity.HBaseInternalLogHelper;
import org.apache.eagle.log.entity.InternalLog;
import org.apache.eagle.log.entity.index.RowKeyLogReader;
import org.apache.eagle.log.entity.meta.EntityDefinition;
import org.apache.eagle.log.entity.old.GenericDeleter;
import org.apache.eagle.query.GenericQuery;
import org.apache.eagle.storage.DataStorageBase;
import org.apache.eagle.storage.hbase.query.GenericQueryBuilder;
import org.apache.eagle.storage.operation.CompiledQuery;
import org.apache.eagle.storage.result.ModifyResult;
import org.apache.eagle.storage.result.QueryResult;
import org.apache.eagle.common.EagleBase64Wrapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import static org.apache.eagle.audit.common.AuditConstants.AUDIT_EVENT_CREATE;
import static org.apache.eagle.audit.common.AuditConstants.AUDIT_EVENT_UPDATE;
import static org.apache.eagle.audit.common.AuditConstants.AUDIT_EVENT_DELETE;
/**
* @since 3/18/15
*/
public class HBaseStorage extends DataStorageBase {
private final static Logger LOG = LoggerFactory.getLogger(HBaseStorage.class);
private HBaseStorageAudit audit = new HBaseStorageAudit();
@Override
public void init() throws IOException {
HBaseEntitySchemaManager.getInstance().init();
LOG.info("Initializing");
}
@Override
public <E extends TaggedLogAPIEntity> ModifyResult<String> update(List<E> entities, EntityDefinition entityDefinition) throws IOException {
ModifyResult<String> result = create(entities, entityDefinition);
audit.auditOperation(AUDIT_EVENT_UPDATE, entities, null, entityDefinition); // added for jira: EAGLE-47
return result;
}
@Override
public <E extends TaggedLogAPIEntity> ModifyResult<String> create(List<E> entities, EntityDefinition entityDefinition) throws IOException {
ModifyResult<String> result = new ModifyResult<>();
try {
GenericEntityWriter entityWriter = new GenericEntityWriter(entityDefinition);
result.setIdentifiers(entityWriter.write(entities));
result.setSuccess(true);
} catch (Exception e) {
LOG.error(e.getMessage(),e);
throw new IOException(e);
}
audit.auditOperation(AUDIT_EVENT_CREATE, entities, null, entityDefinition); // added for jira: EAGLE-47
return result;
}
/**
* @param entities
* @param entityDefinition
* @param <E>
*
* @return ModifyResult
*
* @throws IOException
*/
@Override
public <E extends TaggedLogAPIEntity> ModifyResult<String> delete(List<E> entities, EntityDefinition entityDefinition) throws IOException {
ModifyResult<String> result = new ModifyResult<String>();
try{
GenericDeleter deleter = new GenericDeleter(entityDefinition.getTable(), entityDefinition.getColumnFamily());
result.setIdentifiers(deleter.delete(entities));
}catch(Exception ex){
LOG.error(ex.getMessage(),ex);
result.setSuccess(false);
throw new IOException(ex);
}
audit.auditOperation(AUDIT_EVENT_DELETE, entities, null, entityDefinition); // added for jira: EAGLE-47
result.setSuccess(true);
return result;
}
/**
* TODO:
*
* @param ids
* @param entityDefinition
* @return
* @throws IOException
*/
@Override
public ModifyResult<String> deleteByID(List<String> ids, EntityDefinition entityDefinition) throws IOException {
ModifyResult<String> result = new ModifyResult<String>();
try{
GenericDeleter deleter = new GenericDeleter(entityDefinition.getTable(), entityDefinition.getColumnFamily());
deleter.deleteByEncodedRowkeys(ids);
result.setIdentifiers(ids);
}catch(Exception ex){
LOG.error(ex.getMessage(),ex);
result.setSuccess(false);
throw new IOException(ex);
}
audit.auditOperation(AUDIT_EVENT_DELETE, null, ids, entityDefinition); // added for jira: EAGLE-47
result.setSuccess(true);
return result;
}
/**
* TODO:
*
* @param query
* @param entityDefinition
* @return
* @throws IOException
*/
@Override
public ModifyResult<String> delete(CompiledQuery query, EntityDefinition entityDefinition) throws IOException {
if(query.isHasAgg()){
throw new IOException("delete by aggregation query is not supported");
}
ModifyResult<String> result;
try {
LOG.info("Querying for deleting: "+query);
GenericQuery reader = GenericQueryBuilder
.select(query.getSearchCondition().getOutputFields())
.from(query.getServiceName(),query.getRawQuery().getMetricName()).where(query.getSearchCondition())
.groupBy(query.isHasAgg(), query.getGroupByFields(), query.getAggregateFunctionTypes(), query.getAggregateFields())
.timeSeries(query.getRawQuery().isTimeSeries(),query.getRawQuery().getIntervalmin())
.treeAgg(query.getRawQuery().isTreeAgg())
.orderBy(query.getSortOptions(),query.getSortFunctions(),query.getSortFields())
.top(query.getRawQuery().getTop())
.parallel(query.getRawQuery().getParallel())
.build();
List<? extends TaggedLogAPIEntity> entities = reader.result();
if(entities != null){
LOG.info("Deleting "+entities.size()+" entities");
result = delete(entities,entityDefinition);
}else{
LOG.info("Deleting 0 entities");
result = new ModifyResult<String>();
result.setSuccess(true);
}
} catch (Exception e) {
LOG.error(e.getMessage(),e);
throw new IOException(e);
}
return result;
}
/**
* TODO:
*
* @param query
* @param entityDefinition
* @param <E>
* @return
* @throws IOException
*/
@Override
@SuppressWarnings("unchecked")
public <E extends Object> QueryResult<E> query(CompiledQuery query, EntityDefinition entityDefinition) throws IOException {
QueryResult<E> result = new QueryResult<E>();
try {
GenericQuery reader = GenericQueryBuilder
.select(query.getSearchCondition().getOutputFields())
.from(query.getServiceName(),query.getRawQuery().getMetricName()).where(query.getSearchCondition())
.groupBy(query.isHasAgg(), query.getGroupByFields(), query.getAggregateFunctionTypes(), query.getAggregateFields())
.timeSeries(query.getRawQuery().isTimeSeries(),query.getRawQuery().getIntervalmin())
.treeAgg(query.getRawQuery().isTreeAgg())
.orderBy(query.getSortOptions(),query.getSortFunctions(),query.getSortFields())
.top(query.getRawQuery().getTop())
.parallel(query.getRawQuery().getParallel())
.build();
List<E> entities = reader.result();
result.setData(entities);
result.setFirstTimestamp(reader.getFirstTimeStamp());
result.setLastTimestamp(reader.getLastTimestamp());
result.setSize(entities.size());
if(!query.isHasAgg()) result.setEntityType((Class<E>) entityDefinition.getEntityClass());
result.setSuccess(true);
} catch (Exception e) {
LOG.error(e.getMessage(),e);
throw new IOException(e);
}
return result;
}
/**
* Query by HBase rowkey
*
* @param ids hbase rowkey list
* @param entityDefinition entity definition
* @param <E> entity type
* @return QueryResult with entity type <E>
*
* @throws IOException
*/
@Override
public <E> QueryResult<E> queryById(List<String> ids, EntityDefinition entityDefinition) throws IOException {
List<byte[]> rowkeys = new ArrayList<>(ids.size());
QueryResult<E> result = new QueryResult<E>();
for(String id:ids) rowkeys.add(EagleBase64Wrapper.decode(id));
RowKeyLogReader reader = null;
try {
reader = new RowKeyLogReader(entityDefinition, rowkeys,null);
reader.open();
List<TaggedLogAPIEntity> entities = new LinkedList<>();
while(true) {
InternalLog log = reader.read();
if(log == null) break;
TaggedLogAPIEntity entity = HBaseInternalLogHelper.buildEntity(log, entityDefinition);
entities.add(entity);
}
result.setData((List<E>) entities);
result.setSuccess(true);
result.setSize(entities.size());
return result;
} catch (Exception e) {
LOG.error(e.getMessage(), e);
throw new IOException(e);
} finally{
if(reader != null) reader.close();
}
}
@Override
public void close() throws IOException {
LOG.info("Shutting down");
}
}