blob: 2decc70172188981a34c0ffc4759adc9b1e755d9 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.nexmark.model;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.JsonProcessingException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.Serializable;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.coders.CustomCoder;
import org.apache.beam.sdk.coders.InstantCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VarLongCoder;
import org.apache.beam.sdk.nexmark.NexmarkUtils;
import org.apache.beam.sdk.schemas.JavaFieldSchema;
import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Objects;
import org.joda.time.Instant;
/** A person either creating an auction or making a bid. */
@DefaultSchema(JavaFieldSchema.class)
public class Person implements KnownSize, Serializable {
private static final Coder<Instant> INSTANT_CODER = InstantCoder.of();
private static final Coder<Long> LONG_CODER = VarLongCoder.of();
private static final Coder<String> STRING_CODER = StringUtf8Coder.of();
public static final Coder<Person> CODER =
new CustomCoder<Person>() {
@Override
public void encode(Person value, OutputStream outStream)
throws CoderException, IOException {
LONG_CODER.encode(value.id, outStream);
STRING_CODER.encode(value.name, outStream);
STRING_CODER.encode(value.emailAddress, outStream);
STRING_CODER.encode(value.creditCard, outStream);
STRING_CODER.encode(value.city, outStream);
STRING_CODER.encode(value.state, outStream);
INSTANT_CODER.encode(value.dateTime, outStream);
STRING_CODER.encode(value.extra, outStream);
}
@Override
public Person decode(InputStream inStream) throws CoderException, IOException {
long id = LONG_CODER.decode(inStream);
String name = STRING_CODER.decode(inStream);
String emailAddress = STRING_CODER.decode(inStream);
String creditCard = STRING_CODER.decode(inStream);
String city = STRING_CODER.decode(inStream);
String state = STRING_CODER.decode(inStream);
Instant dateTime = INSTANT_CODER.decode(inStream);
String extra = STRING_CODER.decode(inStream);
return new Person(id, name, emailAddress, creditCard, city, state, dateTime, extra);
}
@Override
public void verifyDeterministic() throws NonDeterministicException {}
@Override
public Object structuralValue(Person v) {
return v;
}
};
/** Id of person. */
@JsonProperty public long id; // primary key
/** Extra person properties. */
@JsonProperty public String name;
@JsonProperty public String emailAddress;
@JsonProperty public String creditCard;
@JsonProperty public String city;
@JsonProperty public String state;
@JsonProperty public Instant dateTime;
/** Additional arbitrary payload for performance testing. */
@JsonProperty public String extra;
// For Avro only.
@SuppressWarnings("unused")
public Person() {
id = 0;
name = null;
emailAddress = null;
creditCard = null;
city = null;
state = null;
dateTime = null;
extra = null;
}
public Person(
long id,
String name,
String emailAddress,
String creditCard,
String city,
String state,
Instant dateTime,
String extra) {
this.id = id;
this.name = name;
this.emailAddress = emailAddress;
this.creditCard = creditCard;
this.city = city;
this.state = state;
this.dateTime = dateTime;
this.extra = extra;
}
/** Return a copy of person which capture the given annotation. (Used for debugging). */
public Person withAnnotation(String annotation) {
return new Person(
id, name, emailAddress, creditCard, city, state, dateTime, annotation + ": " + extra);
}
/** Does person have {@code annotation}? (Used for debugging.) */
public boolean hasAnnotation(String annotation) {
return extra.startsWith(annotation + ": ");
}
/** Remove {@code annotation} from person. (Used for debugging.) */
public Person withoutAnnotation(String annotation) {
if (hasAnnotation(annotation)) {
return new Person(
id,
name,
emailAddress,
creditCard,
city,
state,
dateTime,
extra.substring(annotation.length() + 2));
} else {
return this;
}
}
@Override
public long sizeInBytes() {
return 8L
+ name.length()
+ 1L
+ emailAddress.length()
+ 1L
+ creditCard.length()
+ 1L
+ city.length()
+ 1L
+ state.length()
+ 8L
+ 1L
+ extra.length()
+ 1L;
}
@Override
public String toString() {
try {
return NexmarkUtils.MAPPER.writeValueAsString(this);
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
Person person = (Person) o;
return id == person.id
&& Objects.equal(dateTime, person.dateTime)
&& Objects.equal(name, person.name)
&& Objects.equal(emailAddress, person.emailAddress)
&& Objects.equal(creditCard, person.creditCard)
&& Objects.equal(city, person.city)
&& Objects.equal(state, person.state)
&& Objects.equal(extra, person.extra);
}
@Override
public int hashCode() {
return Objects.hashCode(id, name, emailAddress, creditCard, city, state, dateTime, extra);
}
}