blob: bd502e0deadcecca9b547599c2a887dd0d48087f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.extensions.jackson;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import java.io.Serializable;
import java.util.Arrays;
import java.util.List;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.SerializableCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Iterables;
import org.junit.Rule;
import org.junit.Test;
/** Test Jackson transforms {@link ParseJsons} and {@link AsJsons}. */
public class JacksonTransformsTest {
private static final List<String> VALID_JSONS =
Arrays.asList("{\"myString\":\"abc\",\"myInt\":3}", "{\"myString\":\"def\",\"myInt\":4}");
private static final List<String> INVALID_JSONS =
Arrays.asList("{myString:\"abc\",\"myInt\":3,\"other\":1}", "{", "");
private static final List<String> EMPTY_JSONS = Arrays.asList("{}", "{}");
private static final List<String> EXTRA_PROPERTIES_JSONS =
Arrays.asList(
"{\"myString\":\"abc\",\"myInt\":3,\"other\":1}", "{\"myString\":\"def\",\"myInt\":4}");
private static final List<MyPojo> POJOS =
Arrays.asList(new MyPojo("abc", 3), new MyPojo("def", 4));
private static final List<MyEmptyBean> EMPTY_BEANS =
Arrays.asList(new MyEmptyBean("abc", 3), new MyEmptyBean("def", 4));
@Rule public final transient TestPipeline pipeline = TestPipeline.create();
@Test
public void parseValidJsons() {
PCollection<MyPojo> output =
pipeline
.apply(Create.of(VALID_JSONS))
.apply(ParseJsons.of(MyPojo.class))
.setCoder(SerializableCoder.of(MyPojo.class));
PAssert.that(output).containsInAnyOrder(POJOS);
pipeline.run();
}
@Test(expected = Pipeline.PipelineExecutionException.class)
public void failParsingInvalidJsons() {
PCollection<MyPojo> output =
pipeline
.apply(Create.of(Iterables.concat(VALID_JSONS, INVALID_JSONS)))
.apply(ParseJsons.of(MyPojo.class))
.setCoder(SerializableCoder.of(MyPojo.class));
PAssert.that(output).containsInAnyOrder(POJOS);
pipeline.run();
}
@Test(expected = Pipeline.PipelineExecutionException.class)
public void failParsingWithoutCustomMapper() {
PCollection<MyPojo> output =
pipeline
.apply(Create.of(EXTRA_PROPERTIES_JSONS))
.apply(ParseJsons.of(MyPojo.class))
.setCoder(SerializableCoder.of(MyPojo.class));
PAssert.that(output).empty();
pipeline.run();
}
@Test
public void parseUsingCustomMapper() {
ObjectMapper customMapper =
new ObjectMapper().configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
PCollection<MyPojo> output =
pipeline
.apply(Create.of(EXTRA_PROPERTIES_JSONS))
.apply(ParseJsons.of(MyPojo.class).withMapper(customMapper))
.setCoder(SerializableCoder.of(MyPojo.class));
PAssert.that(output).containsInAnyOrder(POJOS);
pipeline.run();
}
@Test
public void writeValidObjects() {
PCollection<String> output =
pipeline
.apply(Create.of(POJOS))
.apply(AsJsons.of(MyPojo.class))
.setCoder(StringUtf8Coder.of());
PAssert.that(output).containsInAnyOrder(VALID_JSONS);
pipeline.run();
}
@Test(expected = Pipeline.PipelineExecutionException.class)
public void failWritingWithoutCustomMapper() {
pipeline
.apply(Create.of(EMPTY_BEANS))
.apply(AsJsons.of(MyEmptyBean.class))
.setCoder(StringUtf8Coder.of());
pipeline.run();
}
@Test
public void writeUsingCustomMapper() {
ObjectMapper customMapper =
new ObjectMapper().configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false);
PCollection<String> output =
pipeline
.apply(Create.of(EMPTY_BEANS))
.apply(AsJsons.of(MyEmptyBean.class).withMapper(customMapper))
.setCoder(StringUtf8Coder.of());
PAssert.that(output).containsInAnyOrder(EMPTY_JSONS);
pipeline.run();
}
/** Pojo for tests. */
@SuppressWarnings({"WeakerAccess", "unused"})
public static class MyPojo implements Serializable {
private String myString;
private int myInt;
public MyPojo() {}
public MyPojo(String myString, int myInt) {
this.myString = myString;
this.myInt = myInt;
}
public String getMyString() {
return myString;
}
public void setMyString(String myString) {
this.myString = myString;
}
public int getMyInt() {
return myInt;
}
public void setMyInt(int myInt) {
this.myInt = myInt;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (!(o instanceof MyPojo)) {
return false;
}
MyPojo myPojo = (MyPojo) o;
return myInt == myPojo.myInt
&& (myString != null ? myString.equals(myPojo.myString) : myPojo.myString == null);
}
@Override
public int hashCode() {
int result = myString != null ? myString.hashCode() : 0;
result = 31 * result + myInt;
return result;
}
}
/** Pojo for tests. */
@SuppressWarnings({"WeakerAccess", "unused"})
public static class MyEmptyBean implements Serializable {
private String myString;
private int myInt;
public MyEmptyBean(String myString, int myInt) {
this.myString = myString;
this.myInt = myInt;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
MyEmptyBean that = (MyEmptyBean) o;
if (myInt != that.myInt) {
return false;
}
return myString != null ? myString.equals(that.myString) : that.myString == null;
}
@Override
public int hashCode() {
int result = myString != null ? myString.hashCode() : 0;
result = 31 * result + myInt;
return result;
}
}
}