blob: ce532169fce300913710339e575dcd621680786c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.samza.sql.data;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import org.apache.commons.lang3.Validate;
import org.apache.samza.sql.SamzaSqlRelRecord;
import com.fasterxml.jackson.annotation.JsonProperty;
/**
* Samza sql relational message. Each Samza sql relational message represents a relational row in a table.
* Each row of the relational table consists of a primary key and {@link SamzaSqlRelRecord}, which consists of a list
* of column values and the associated column names. Please note that the primary key itself could be a
* {@link SamzaSqlRelRecord}.
*/
public class SamzaSqlRelMessage implements Serializable {
public static final String KEY_NAME = "__key__";
public static final String OP_NAME = "__op__";
public static final String DELETE_OP = "DELETE";
// key could be a record in itself.
private final Object key;
@JsonProperty("samzaSqlRelRecord")
private final SamzaSqlRelRecord samzaSqlRelRecord;
/**
* hold metadata about the message or event, e.g., the eventTime timestamp
*/
@JsonProperty("samzaSqlRelMsgMetadata")
private SamzaSqlRelMsgMetadata samzaSqlRelMsgMetadata;
/**
* Creates a {@link SamzaSqlRelMessage} from the list of relational fields and values.
* If the field list contains KEY, then it extracts the key out of the fields to create a
* {@link SamzaSqlRelRecord} along with key, otherwise creates a {@link SamzaSqlRelRecord}
* without the key.
* @param fieldNames Ordered list of field names in the row.
* @param fieldValues Ordered list of all the values in the row. Some of the fields can be null, This could be
* result of delete change capture event in the stream or because of the result of the outer join
* or the fields themselves are null in the original stream.
* @param metadata the message/event's metadata
*/
public SamzaSqlRelMessage(List<String> fieldNames, List<Object> fieldValues, SamzaSqlRelMsgMetadata metadata) {
Validate.isTrue(fieldNames.size() == fieldValues.size(), "Field Names and values are not of same length.");
Validate.notNull(metadata, "Message metadata is NULL");
int keyIndex = fieldNames.indexOf(KEY_NAME);
Object key = null;
if (keyIndex != -1) {
key = fieldValues.get(keyIndex);
}
this.key = key;
this.samzaSqlRelRecord = new SamzaSqlRelRecord(fieldNames, fieldValues);
this.samzaSqlRelMsgMetadata = metadata;
}
/**
* Create the SamzaSqlRelMessage, Each rel message represents a row in the table.
* So it can contain a key and a list of fields in the row.
* @param key Represents the key in the row, Key can be null.
* @param fieldNames Ordered list of field names in the row.
* @param fieldValues Ordered list of all the values in the row. Some of the fields can be null, This could be result of
* delete change capture event in the stream or because of the result of the outer join or the fields
* themselves are null in the original stream.
* @param metadata the message/event's metadata
*/
public SamzaSqlRelMessage(Object key, List<String> fieldNames, List<Object> fieldValues, SamzaSqlRelMsgMetadata metadata) {
Validate.isTrue(fieldNames.size() == fieldValues.size(), "Field Names and values are not of same length.");
Validate.notNull(metadata, "Message metadata is NULL");
List<String> tmpFieldNames = new ArrayList<>();
List<Object> tmpFieldValues = new ArrayList<>();
this.key = key;
tmpFieldNames.add(KEY_NAME);
tmpFieldValues.add(key);
tmpFieldNames.addAll(fieldNames);
tmpFieldValues.addAll(fieldValues);
this.samzaSqlRelRecord = new SamzaSqlRelRecord(tmpFieldNames, tmpFieldValues);
this.samzaSqlRelMsgMetadata = metadata;
}
/**
* Creates the SamzaSqlRelMessage from {@link SamzaSqlRelRecord}.
* @param samzaSqlRelRecord represents the rel record.
* @param metadata the message/event's metadata
*/
public SamzaSqlRelMessage(@JsonProperty("samzaSqlRelRecord") SamzaSqlRelRecord samzaSqlRelRecord,
@JsonProperty("samzaSqlRelMsgMetadata") SamzaSqlRelMsgMetadata metadata) {
this(samzaSqlRelRecord.getFieldNames(), samzaSqlRelRecord.getFieldValues(), metadata);
}
@JsonProperty("samzaSqlRelRecord")
public SamzaSqlRelRecord getSamzaSqlRelRecord() {
return samzaSqlRelRecord;
}
@JsonProperty("samzaSqlRelMsgMetadata")
public SamzaSqlRelMsgMetadata getSamzaSqlRelMsgMetadata() {
return samzaSqlRelMsgMetadata;
}
public Object getKey() {
return key;
}
public void setEventTime(long eventTime) {
this.samzaSqlRelMsgMetadata.setEventTime(eventTime);
}
public void setArrivalTime(long arrivalTime) {
this.samzaSqlRelMsgMetadata.setArrivalTime(arrivalTime);
}
public void setScanTime(long scanTimeNs, long scanTimeMs) {
this.samzaSqlRelMsgMetadata.setScanTime(scanTimeNs, scanTimeMs);
}
@Override
public int hashCode() {
return Objects.hash(key, samzaSqlRelRecord);
}
@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
SamzaSqlRelMessage other = (SamzaSqlRelMessage) obj;
return Objects.equals(key, other.key) && Objects.equals(samzaSqlRelRecord, other.samzaSqlRelRecord);
}
@Override
public String toString() {
return "RelMessage: {" + samzaSqlRelRecord + " " + samzaSqlRelMsgMetadata + "}";
}
/**
* Create composite key from the rel message.
* @param message Represents the samza sql rel message to extract the key values from.
* @param keyValueIdx list of key values in the form of field indices within the rel message.
* @param keyPartNames Represents the key field names.
* @return the composite key of the rel message
*/
public static SamzaSqlRelRecord createSamzaSqlCompositeKey(SamzaSqlRelMessage message, List<Integer> keyValueIdx,
List<String> keyPartNames) {
Validate.isTrue(keyValueIdx.size() == keyPartNames.size(), "Key part name and value list sizes are different");
ArrayList<Object> keyPartValues = new ArrayList<>();
for (int idx : keyValueIdx) {
keyPartValues.add(message.getSamzaSqlRelRecord().getFieldValues().get(idx));
}
return new SamzaSqlRelRecord(keyPartNames, keyPartValues);
}
/**
* Create composite key from the rel message.
* @param message Represents the samza sql rel message to extract the key values and names from.
* @param relIdx list of keys in the form of field indices within the rel message.
* @return the composite key of the rel message
*/
public static SamzaSqlRelRecord createSamzaSqlCompositeKey(SamzaSqlRelMessage message, List<Integer> relIdx) {
return createSamzaSqlCompositeKey(message, relIdx,
getSamzaSqlCompositeKeyFieldNames(message.getSamzaSqlRelRecord().getFieldNames(), relIdx));
}
/**
* Get composite key field names.
* @param fieldNames list of field names to extract the key names from.
* @param nameIds indices within the field names.
* @return list of composite key field names
*/
public static List<String> getSamzaSqlCompositeKeyFieldNames(List<String> fieldNames,
List<Integer> nameIds) {
List<String> keyPartNames = new ArrayList<>();
for (int idx : nameIds) {
keyPartNames.add(fieldNames.get(idx));
}
return keyPartNames;
}
}