| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package com.datatorrent.lib.appdata.gpo; |
| |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| |
| import org.codehaus.jettison.json.JSONObject; |
| import org.junit.Assert; |
| import org.junit.Test; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import org.apache.commons.lang3.mutable.MutableInt; |
| |
| import com.google.common.collect.Lists; |
| import com.google.common.collect.Maps; |
| import com.google.common.collect.Sets; |
| |
| import com.datatorrent.lib.appdata.schemas.FieldsDescriptor; |
| import com.datatorrent.lib.appdata.schemas.SchemaUtils; |
| import com.datatorrent.lib.appdata.schemas.Type; |
| |
| public class GPOUtilsTest |
| { |
| private static final Logger logger = LoggerFactory.getLogger(GPOUtilsTest.class); |
| |
| @Test |
| public void testSerializationLength() |
| { |
| final Map<String, Type> fieldToType = Maps.newHashMap(); |
| |
| final String tboolean = "tboolean"; |
| final Boolean tbooleanv = true; |
| final String tchar = "tchar"; |
| final Character tcharv = 'A'; |
| final String tstring = "tstring"; |
| final String tstringv = "hello"; |
| final String tfloat = "tfloat"; |
| final Float tfloatv = 1.0f; |
| final String tdouble = "tdouble"; |
| final Double tdoublev = 2.0; |
| final String tbyte = "tbyte"; |
| final Byte tbytev = 50; |
| final String tshort = "tshort"; |
| final Short tshortv = 1000; |
| final String tinteger = "tinteger"; |
| final Integer tintegerv = 100000; |
| final String tlong = "tlong"; |
| final Long tlongv = 10000000000L; |
| |
| int totalBytes = 1 //boolean |
| + 2 //char |
| + 4 + tstringv.getBytes().length //string |
| + 4 //float |
| + 8 //double |
| + 1 //byte |
| + 2 //short |
| + 4 //int |
| + 8; //long |
| |
| fieldToType.put(tboolean, Type.BOOLEAN); |
| fieldToType.put(tchar, Type.CHAR); |
| fieldToType.put(tstring, Type.STRING); |
| fieldToType.put(tfloat, Type.FLOAT); |
| fieldToType.put(tdouble, Type.DOUBLE); |
| fieldToType.put(tbyte, Type.BYTE); |
| fieldToType.put(tshort, Type.SHORT); |
| fieldToType.put(tinteger, Type.INTEGER); |
| fieldToType.put(tlong, Type.LONG); |
| |
| FieldsDescriptor fd = new FieldsDescriptor(fieldToType); |
| |
| GPOMutable gpo = new GPOMutable(fd); |
| |
| gpo.setFieldGeneric(tboolean, tbooleanv); |
| gpo.setFieldGeneric(tchar, tcharv); |
| gpo.setField(tstring, tstringv); |
| gpo.setFieldGeneric(tfloat, tfloatv); |
| gpo.setFieldGeneric(tdouble, tdoublev); |
| gpo.setFieldGeneric(tbyte, tbytev); |
| gpo.setFieldGeneric(tshort, tshortv); |
| gpo.setFieldGeneric(tinteger, tintegerv); |
| gpo.setFieldGeneric(tlong, tlongv); |
| |
| int serializeLength = GPOUtils.serializedLength(gpo); |
| |
| Assert.assertEquals("The serialized byte length is incorrect.", totalBytes, serializeLength); |
| } |
| |
| @Test |
| public void testCornerCaseSerializationLegnth() |
| { |
| final Map<String, Type> fieldToType = Maps.newHashMap(); |
| fieldToType.put("a", Type.OBJECT); |
| fieldToType.put("b", Type.OBJECT); |
| |
| GPOMutable gpo = new GPOMutable(new FieldsDescriptor(fieldToType)); |
| |
| int serializeLength = GPOUtils.serializedLength(gpo); |
| |
| Assert.assertEquals(0, serializeLength); |
| } |
| |
| @Test |
| public void simpleSerializeDeserializeTest() |
| { |
| final Map<String, Type> fieldToType = Maps.newHashMap(); |
| |
| final String tboolean = "tboolean"; |
| final Boolean tbooleanv = true; |
| final String tchar = "tchar"; |
| final Character tcharv = 'A'; |
| final String tstring = "tstring"; |
| final String tstringv = "hello"; |
| final String tfloat = "tfloat"; |
| final Float tfloatv = 1.0f; |
| final String tdouble = "tdouble"; |
| final Double tdoublev = 2.0; |
| final String tbyte = "tbyte"; |
| final Byte tbytev = 50; |
| final String tshort = "tshort"; |
| final Short tshortv = 1000; |
| final String tinteger = "tinteger"; |
| final Integer tintegerv = 100000; |
| final String tlong = "tlong"; |
| final Long tlongv = 10000000000L; |
| |
| int totalBytes = 1 //boolean |
| + 2 //char |
| + 4 + tstringv.getBytes().length //string |
| + 4 //float |
| + 8 //double |
| + 1 //byte |
| + 2 //short |
| + 4 //int |
| + 8; //long |
| |
| logger.debug("Correct total bytes {}.", totalBytes); |
| |
| fieldToType.put(tboolean, Type.BOOLEAN); |
| fieldToType.put(tchar, Type.CHAR); |
| fieldToType.put(tstring, Type.STRING); |
| fieldToType.put(tfloat, Type.FLOAT); |
| fieldToType.put(tdouble, Type.DOUBLE); |
| fieldToType.put(tbyte, Type.BYTE); |
| fieldToType.put(tshort, Type.SHORT); |
| fieldToType.put(tinteger, Type.INTEGER); |
| fieldToType.put(tlong, Type.LONG); |
| |
| FieldsDescriptor fd = new FieldsDescriptor(fieldToType); |
| |
| GPOMutable gpo = new GPOMutable(fd); |
| |
| gpo.setFieldGeneric(tboolean, tbooleanv); |
| gpo.setFieldGeneric(tchar, tcharv); |
| gpo.setField(tstring, tstringv); |
| gpo.setFieldGeneric(tfloat, tfloatv); |
| gpo.setFieldGeneric(tdouble, tdoublev); |
| gpo.setFieldGeneric(tbyte, tbytev); |
| gpo.setFieldGeneric(tshort, tshortv); |
| gpo.setFieldGeneric(tinteger, tintegerv); |
| gpo.setFieldGeneric(tlong, tlongv); |
| |
| GPOByteArrayList byteArrayList = new GPOByteArrayList(); |
| byte[] gpoByte = GPOUtils.serialize(gpo, byteArrayList); |
| logger.debug("GPO num bytes: {}", gpoByte.length); |
| GPOMutable dgpo = GPOUtils.deserialize(fd, gpoByte, new MutableInt(0)); |
| |
| Assert.assertEquals("Values must equal", tbooleanv, dgpo.getField(tboolean)); |
| Assert.assertEquals("Values must equal", tcharv, dgpo.getField(tchar)); |
| Assert.assertEquals("Values must equal", tstringv, dgpo.getField(tstring)); |
| Assert.assertEquals("Values must equal", tfloatv, dgpo.getField(tfloat)); |
| Assert.assertEquals("Values must equal", tdoublev, dgpo.getField(tdouble)); |
| Assert.assertEquals("Values must equal", tbytev, dgpo.getField(tbyte)); |
| Assert.assertEquals("Values must equal", tshortv, dgpo.getField(tshort)); |
| Assert.assertEquals("Values must equal", tintegerv, dgpo.getField(tinteger)); |
| Assert.assertEquals("Values must equal", tlongv, dgpo.getField(tlong)); |
| } |
| |
| @Test |
| public void validDeserializeTest() throws Exception |
| { |
| final Map<String, Type> fieldToType = Maps.newHashMap(); |
| |
| final String tboolean = "tboolean"; |
| final Boolean tbooleanv = true; |
| final String tchar = "tchar"; |
| final Character tcharv = 'A'; |
| final String tstring = "tstring"; |
| final String tstringv = "hello"; |
| final String tfloat = "tfloat"; |
| final Float tfloatv = 1.0f; |
| final String tdouble = "tdouble"; |
| final Double tdoublev = 2.0; |
| final String tbyte = "tbyte"; |
| final Byte tbytev = 50; |
| final String tshort = "tshort"; |
| final Short tshortv = 1000; |
| final String tinteger = "tinteger"; |
| final Integer tintegerv = 100000; |
| final String tlong = "tlong"; |
| final Long tlongv = 10000000000L; |
| |
| fieldToType.put(tboolean, Type.BOOLEAN); |
| fieldToType.put(tchar, Type.CHAR); |
| fieldToType.put(tstring, Type.STRING); |
| fieldToType.put(tfloat, Type.FLOAT); |
| fieldToType.put(tdouble, Type.DOUBLE); |
| fieldToType.put(tbyte, Type.BYTE); |
| fieldToType.put(tshort, Type.SHORT); |
| fieldToType.put(tinteger, Type.INTEGER); |
| fieldToType.put(tlong, Type.LONG); |
| |
| JSONObject jo = new JSONObject(); |
| jo.put(tboolean, tbooleanv); |
| jo.put(tchar, tcharv); |
| jo.put(tstring, tstringv); |
| jo.put(tfloat, tfloatv); |
| jo.put(tdouble, tdoublev); |
| jo.put(tbyte, tbytev); |
| jo.put(tshort, tshortv); |
| jo.put(tinteger, tintegerv); |
| jo.put(tlong, tlongv); |
| |
| String json = jo.toString(2); |
| logger.debug("Input json: {}", json); |
| |
| GPOMutable gpom = GPOUtils.deserialize(new FieldsDescriptor(fieldToType), jo); |
| |
| Assert.assertEquals("Results must equal", tbooleanv, gpom.getField(tboolean)); |
| Assert.assertEquals("Results must equal", tcharv, gpom.getField(tchar)); |
| Assert.assertEquals("Results must equal", tfloatv, gpom.getField(tfloat)); |
| Assert.assertEquals("Results must equal", tdoublev, gpom.getField(tdouble)); |
| Assert.assertEquals("Results must equal", tbytev, gpom.getField(tbyte)); |
| Assert.assertEquals("Results must equal", tshortv, gpom.getField(tshort)); |
| Assert.assertEquals("Results must equal", tintegerv, gpom.getField(tinteger)); |
| Assert.assertEquals("Results must equal", tlongv, gpom.getField(tlong)); |
| } |
| |
| @Test |
| @SuppressWarnings("AssertEqualsBetweenInconvertibleTypes") |
| public void testDeserializeToMap() throws Exception |
| { |
| Map<String, Type> fieldToType = Maps.newHashMap(); |
| fieldToType.put("name", Type.STRING); |
| fieldToType.put("longvals", Type.LONG); |
| fieldToType.put("stringvals", Type.STRING); |
| |
| FieldsDescriptor fd = new FieldsDescriptor(fieldToType); |
| String json = SchemaUtils.jarResourceFileToString("deserializeToMapTest.json"); |
| |
| Map<String, Set<Object>> resultMap = GPOUtils.deserializeToMap(fd, new JSONObject(json)); |
| |
| Set<Object> names = resultMap.get("name"); |
| Set<Object> longvals = resultMap.get("longvals"); |
| Set<Object> stringvals = resultMap.get("stringvals"); |
| |
| Assert.assertEquals(Sets.newHashSet("tim"), names); |
| Assert.assertEquals(Sets.newHashSet(1L, 2L, 3L), longvals); |
| Assert.assertEquals(Sets.newHashSet("a", "b", "c"), stringvals); |
| } |
| |
| @Test |
| public void objectSerdeTest() |
| { |
| Map<String, Type> fieldToTypeKey = Maps.newHashMap(); |
| fieldToTypeKey.put("publisher", Type.STRING); |
| fieldToTypeKey.put("advertiser", Type.STRING); |
| FieldsDescriptor fdkey = new FieldsDescriptor(fieldToTypeKey); |
| |
| Map<String, Type> fieldToTypeAgg = Maps.newHashMap(); |
| fieldToTypeAgg.put("clicks", Type.LONG); |
| fieldToTypeAgg.put("impressions", Type.LONG); |
| FieldsDescriptor fdagg = new FieldsDescriptor(fieldToTypeAgg); |
| |
| Map<String, Type> fieldToType = Maps.newHashMap(); |
| fieldToType.put("fdkeys", Type.OBJECT); |
| fieldToType.put("fdvalues", Type.OBJECT); |
| fieldToType.put("keys", Type.OBJECT); |
| fieldToType.put("values", Type.OBJECT); |
| |
| Map<String, Serde> fieldToSerde = Maps.newHashMap(); |
| fieldToSerde.put("fdkeys", SerdeFieldsDescriptor.INSTANCE); |
| fieldToSerde.put("fdvalues", SerdeFieldsDescriptor.INSTANCE); |
| fieldToSerde.put("keys", SerdeListGPOMutable.INSTANCE); |
| fieldToSerde.put("values", SerdeListGPOMutable.INSTANCE); |
| |
| FieldsDescriptor metaDataFD = new FieldsDescriptor(fieldToType, fieldToSerde, new PayloadFix()); |
| |
| GPOMutable gpo = new GPOMutable(metaDataFD); |
| |
| GPOMutable key1 = new GPOMutable(fdkey); |
| key1.setField("publisher", "google"); |
| key1.setField("advertiser", "safeway"); |
| GPOMutable key2 = new GPOMutable(fdkey); |
| key2.setField("publisher", "twitter"); |
| key2.setField("advertiser", "blockbuster"); |
| |
| GPOMutable agg1 = new GPOMutable(fdagg); |
| agg1.setField("clicks", 10L); |
| agg1.setField("impressions", 11L); |
| GPOMutable agg2 = new GPOMutable(fdagg); |
| agg2.setField("clicks", 5L); |
| agg2.setField("impressions", 4L); |
| |
| List<GPOMutable> keys = Lists.newArrayList(key1, key2); |
| List<GPOMutable> aggs = Lists.newArrayList(agg1, agg2); |
| |
| gpo.getFieldsObject()[0] = fdkey; |
| gpo.getFieldsObject()[1] = fdagg; |
| gpo.getFieldsObject()[2] = keys; |
| gpo.getFieldsObject()[3] = aggs; |
| |
| GPOByteArrayList bal = new GPOByteArrayList(); |
| byte[] serialized = GPOUtils.serialize(gpo, bal); |
| |
| GPOMutable newGPO = GPOUtils.deserialize(metaDataFD, serialized, new MutableInt(0)); |
| newGPO.setFieldDescriptor(metaDataFD); |
| |
| Assert.assertEquals(gpo, newGPO); |
| Assert.assertArrayEquals(gpo.getFieldsObject(), newGPO.getFieldsObject()); |
| } |
| |
| private static final Logger LOG = LoggerFactory.getLogger(GPOUtilsTest.class); |
| |
| public static class PayloadFix implements SerdeObjectPayloadFix |
| { |
| @Override |
| public void fix(Object[] objects) |
| { |
| FieldsDescriptor keyfd = (FieldsDescriptor)objects[0]; |
| FieldsDescriptor valuefd = (FieldsDescriptor)objects[1]; |
| |
| @SuppressWarnings("unchecked") |
| List<GPOMutable> keyMutables = (List<GPOMutable>)objects[2]; |
| @SuppressWarnings("unchecked") |
| List<GPOMutable> aggregateMutables = (List<GPOMutable>)objects[3]; |
| |
| fix(keyfd, keyMutables); |
| fix(valuefd, aggregateMutables); |
| } |
| |
| private void fix(FieldsDescriptor fd, List<GPOMutable> mutables) |
| { |
| for (int index = 0; index < mutables.size(); index++) { |
| mutables.get(index).setFieldDescriptor(fd); |
| } |
| } |
| } |
| } |