blob: e947ed7003c064f3f40674cf5aa30ca615cb2727 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.phoenix.util;
import org.apache.hadoop.hbase.Cell;
import org.apache.phoenix.index.CDCTableInfo;
import org.apache.phoenix.schema.PTable;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import static org.apache.phoenix.query.QueryConstants.CDC_CHANGE_IMAGE;
import static org.apache.phoenix.query.QueryConstants.CDC_DELETE_EVENT_TYPE;
import static org.apache.phoenix.query.QueryConstants.CDC_EVENT_TYPE;
import static org.apache.phoenix.query.QueryConstants.CDC_POST_IMAGE;
import static org.apache.phoenix.query.QueryConstants.CDC_PRE_IMAGE;
import static org.apache.phoenix.query.QueryConstants.CDC_UPSERT_EVENT_TYPE;
public class CDCChangeBuilder {
private final boolean isChangeImageInScope;
private final boolean isPreImageInScope;
private final boolean isPostImageInScope;
private final CDCTableInfo cdcDataTableInfo;
private String changeType;
private long lastDeletedTimestamp;
private long changeTimestamp;
private Map<String, Object> preImage = null;
private Map<String, Object> changeImage = null;
public CDCChangeBuilder(CDCTableInfo cdcDataTableInfo) {
this.cdcDataTableInfo = cdcDataTableInfo;
Set<PTable.CDCChangeScope> changeScopes = cdcDataTableInfo.getIncludeScopes();
isChangeImageInScope = changeScopes.contains(PTable.CDCChangeScope.CHANGE);
isPreImageInScope = changeScopes.contains(PTable.CDCChangeScope.PRE);
isPostImageInScope = changeScopes.contains(PTable.CDCChangeScope.POST);
}
public void initChange(long ts) {
changeTimestamp = ts;
changeType = null;
lastDeletedTimestamp = 0L;
if (isPreImageInScope || isPostImageInScope) {
preImage = new HashMap<>();
}
if (isChangeImageInScope || isPostImageInScope) {
changeImage = new HashMap<>();
}
}
public long getChangeTimestamp() {
return changeTimestamp;
}
public boolean isDeletionEvent() {
return changeType == CDC_DELETE_EVENT_TYPE;
}
public boolean isNonEmptyEvent() {
return changeType != null;
}
public void markAsDeletionEvent() {
changeType = CDC_DELETE_EVENT_TYPE;
}
public long getLastDeletedTimestamp() {
return lastDeletedTimestamp;
}
public void setLastDeletedTimestamp(long lastDeletedTimestamp) {
this.lastDeletedTimestamp = lastDeletedTimestamp;
}
public boolean isChangeRelevant(Cell cell) {
if (cell.getTimestamp() > changeTimestamp) {
return false;
}
if (cell.getType() != Cell.Type.DeleteFamily && !isOlderThanChange(cell) &&
isDeletionEvent()) {
// We don't need to build the change image in this case.
return false;
}
return true;
}
public void registerChange(Cell cell, int columnNum, Object value) {
if (!isChangeRelevant(cell)) {
return;
}
CDCTableInfo.CDCColumnInfo columnInfo =
cdcDataTableInfo.getColumnInfoList().get(columnNum);
String cdcColumnName = columnInfo.getColumnDisplayName(cdcDataTableInfo);
if (isOlderThanChange(cell)) {
if ((isPreImageInScope || isPostImageInScope) &&
!preImage.containsKey(cdcColumnName)) {
preImage.put(cdcColumnName, value);
}
} else if (cell.getTimestamp() == changeTimestamp) {
assert !isDeletionEvent() : "Not expected to find a change for delete event";
changeType = CDC_UPSERT_EVENT_TYPE;
if (isChangeImageInScope || isPostImageInScope) {
changeImage.put(cdcColumnName, value);
}
}
}
public Map buildCDCEvent() {
assert (changeType != null) : "Not expected when no event was detected";
Map<String, Object> cdcChange = new HashMap<>();
if (isPreImageInScope) {
cdcChange.put(CDC_PRE_IMAGE, preImage);
}
if (changeType == CDC_UPSERT_EVENT_TYPE) {
if (isChangeImageInScope) {
cdcChange.put(CDC_CHANGE_IMAGE, changeImage);
}
if (isPostImageInScope) {
Map<String, Object> postImage = new HashMap<>();
if (!isDeletionEvent()) {
postImage.putAll(preImage);
postImage.putAll(changeImage);
}
cdcChange.put(CDC_POST_IMAGE, postImage);
}
}
cdcChange.put(CDC_EVENT_TYPE, changeType);
return cdcChange;
}
public boolean isOlderThanChange(Cell cell) {
return (cell.getTimestamp() < changeTimestamp &&
cell.getTimestamp() > lastDeletedTimestamp) ? true : false;
}
}