blob: b41b6945906b7611c19ad3956796f9f514b15d64 [file] [log] [blame]
/*
* Copyright (c) 2015 DataTorrent, Inc. ALL Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.datatorrent.demos.dimensions.generic;
import java.util.Map;
import com.datatorrent.api.BaseOperator;
import com.datatorrent.api.Context.OperatorContext;
import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.DefaultOutputPort;
/**
*
*/
public class SchemaConverter extends BaseOperator
{
public final DefaultOutputPort<GenericEvent> output = new DefaultOutputPort<GenericEvent>();
public final DefaultInputPort<Map<String, Object>> input = new DefaultInputPort<Map<String, Object>>()
{
@Override
public void process(Map<String, Object> tuple)
{
output.emit(eventSchema.convertMapToGenericEvent(tuple));
}
};
private String eventSchemaJSON;
private transient EventSchema eventSchema;
public SchemaConverter()
{
this.eventSchemaJSON = EventSchema.DEFAULT_SCHEMA_SALES;
getEventSchema();
}
public String getEventSchemaJSON()
{
return eventSchemaJSON;
}
public final EventSchema getEventSchema()
{
if (eventSchema == null) {
try {
eventSchema = EventSchema.createFromJSON(eventSchemaJSON);
}
catch (Exception e) {
throw new IllegalArgumentException("Failed to parse JSON input: " + eventSchemaJSON, e);
}
}
return eventSchema;
}
public void setEventSchemaJSON(String eventSchemaJSON)
{
this.eventSchemaJSON = eventSchemaJSON;
try {
eventSchema = EventSchema.createFromJSON(eventSchemaJSON);
}
catch (Exception e) {
throw new IllegalArgumentException("Failed to parse JSON input: " + eventSchemaJSON, e);
}
}
@Override
public void setup(OperatorContext context)
{
setEventSchemaJSON(eventSchemaJSON);
}
}