blob: 63f692106ee017232f55d5b1f05dbeb86682bdcb [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.test;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.streams.TopologyTestDriver;
import java.time.Instant;
import java.util.Objects;
import java.util.StringJoiner;
/**
* A key/value pair, including timestamp and record headers, to be sent to or received from {@link TopologyTestDriver}.
* If [a] record does not contain a timestamp,
* {@link TestInputTopic} will auto advance it's time when the record is piped.
*/
public class TestRecord<K, V> {
private final Headers headers;
private final K key;
private final V value;
private final Instant recordTime;
/**
* Creates a record.
*
* @param key The key that will be included in the record
* @param value The value of the record
* @param headers the record headers that will be included in the record
* @param recordTime The timestamp of the record.
*/
public TestRecord(final K key, final V value, final Headers headers, final Instant recordTime) {
this.key = key;
this.value = value;
this.recordTime = recordTime;
this.headers = new RecordHeaders(headers);
}
/**
* Creates a record.
*
* @param key The key that will be included in the record
* @param value The value of the record
* @param headers the record headers that will be included in the record
* @param timestampMs The timestamp of the record, in milliseconds since the beginning of the epoch.
*/
public TestRecord(final K key, final V value, final Headers headers, final Long timestampMs) {
if (timestampMs != null) {
if (timestampMs < 0) {
throw new IllegalArgumentException(
String.format("Invalid timestamp: %d. Timestamp should always be non-negative or null.", timestampMs));
}
this.recordTime = Instant.ofEpochMilli(timestampMs);
} else {
this.recordTime = null;
}
this.key = key;
this.value = value;
this.headers = new RecordHeaders(headers);
}
/**
* Creates a record.
*
* @param key The key of the record
* @param value The value of the record
* @param recordTime The timestamp of the record as Instant.
*/
public TestRecord(final K key, final V value, final Instant recordTime) {
this(key, value, null, recordTime);
}
/**
* Creates a record.
*
* @param key The key of the record
* @param value The value of the record
* @param headers The record headers that will be included in the record
*/
public TestRecord(final K key, final V value, final Headers headers) {
this.key = key;
this.value = value;
this.headers = new RecordHeaders(headers);
this.recordTime = null;
}
/**
* Creates a record.
*
* @param key The key of the record
* @param value The value of the record
*/
public TestRecord(final K key, final V value) {
this.key = key;
this.value = value;
this.headers = new RecordHeaders();
this.recordTime = null;
}
/**
* Create a record with {@code null} key.
*
* @param value The value of the record
*/
public TestRecord(final V value) {
this(null, value);
}
/**
* Create a {@code TestRecord} from a {@link ConsumerRecord}.
*
* @param record The v
*/
public TestRecord(final ConsumerRecord<K, V> record) {
Objects.requireNonNull(record);
this.key = record.key();
this.value = record.value();
this.headers = record.headers();
this.recordTime = Instant.ofEpochMilli(record.timestamp());
}
/**
* Create a {@code TestRecord} from a {@link ProducerRecord}.
*
* @param record The record contents
*/
public TestRecord(final ProducerRecord<K, V> record) {
Objects.requireNonNull(record);
this.key = record.key();
this.value = record.value();
this.headers = record.headers();
this.recordTime = Instant.ofEpochMilli(record.timestamp());
}
/**
* @return The headers.
*/
public Headers headers() {
return headers;
}
/**
* @return The key (or {@code null} if no key is specified).
*/
public K key() {
return key;
}
/**
* @return The value.
*/
public V value() {
return value;
}
/**
* @return The timestamp, which is in milliseconds since epoch.
*/
public Long timestamp() {
return this.recordTime == null ? null : this.recordTime.toEpochMilli();
}
/**
* @return The headers.
*/
public Headers getHeaders() {
return headers;
}
/**
* @return The key (or null if no key is specified)
*/
public K getKey() {
return key;
}
/**
* @return The value.
*/
public V getValue() {
return value;
}
/**
* @return The timestamp.
*/
public Instant getRecordTime() {
return recordTime;
}
@Override
public String toString() {
return new StringJoiner(", ", TestRecord.class.getSimpleName() + "[", "]")
.add("key=" + key)
.add("value=" + value)
.add("headers=" + headers)
.add("recordTime=" + recordTime)
.toString();
}
@Override
public boolean equals(final Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
final TestRecord<?, ?> that = (TestRecord<?, ?>) o;
return Objects.equals(headers, that.headers) &&
Objects.equals(key, that.key) &&
Objects.equals(value, that.value) &&
Objects.equals(recordTime, that.recordTime);
}
@Override
public int hashCode() {
return Objects.hash(headers, key, value, recordTime);
}
}