| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package com.datatorrent.contrib.enrich; |
| |
| import java.text.ParseException; |
| import java.text.SimpleDateFormat; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.List; |
| import java.util.Map; |
| |
| import javax.validation.constraints.NotNull; |
| |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import org.apache.commons.lang3.StringUtils; |
| import org.apache.hadoop.classification.InterfaceStability; |
| |
| import com.google.common.collect.Maps; |
| import com.google.common.primitives.Ints; |
| import com.univocity.parsers.fixed.FixedWidthFields; |
| import com.univocity.parsers.fixed.FixedWidthParser; |
| import com.univocity.parsers.fixed.FixedWidthParserSettings; |
| |
| import com.datatorrent.contrib.parser.AbstractCsvParser.FIELD_TYPE; |
| import com.datatorrent.contrib.parser.AbstractCsvParser.Field; |
| |
| /** |
| * This implementation of {@link FSLoader} is used to load data from fixed width |
| * file.User needs to set {@link FixedWidthFSLoader#fieldDescription} to specify |
| * field information. |
| * |
| * @since 3.6.0 |
| */ |
| @InterfaceStability.Evolving |
| public class FixedWidthFSLoader extends FSLoader |
| { |
| |
| private transient List<FixedWidthField> fields; |
| /** |
| * Indicates whether first line of the file is a header. Default is false |
| */ |
| private boolean hasHeader; |
| |
| /** |
| * Specifies information related to fields in fixed-width file. Format is |
| * [NAME]:[FIELD_TYPE]:[WIDTH]:[date format if FIELD_TYPE is DATE] FIELD_TYPE |
| * can take on of the values of {@link FIELD_TYPE} i.e BOOLEAN, DOUBLE, |
| * INTEGER, FLOAT, LONG, SHORT, CHARACTER, STRING, DATE e.g. |
| * Year:INTEGER:4,Make:STRING:5,Model:STRING:40,Description:STRING:40, |
| * Price:DOUBLE:8,Date:DATE:10:\"dd:mm:yyyy\". Date format needs to be within |
| * quotes (" ") |
| */ |
| @NotNull |
| private String fieldDescription; |
| |
| /** |
| * Array containing headers |
| */ |
| private transient String[] header; |
| /** |
| * Padding character. Default is white space. |
| */ |
| private char padding = ' '; |
| private transient FixedWidthParser fixedWidthParser; |
| private transient boolean initialized; |
| |
| private static final Logger logger = LoggerFactory.getLogger(FixedWidthFSLoader.class); |
| |
| /** |
| * Gets the option if file has header or not. |
| * |
| * @return hasHeader,indicating whether first line of the file is a header. |
| */ |
| public boolean isHasHeader() |
| { |
| return hasHeader; |
| } |
| |
| /** |
| * Set to true if file has header |
| * |
| * @param hasHeader |
| * Indicates whether first line of the file is a header. Default is |
| * false |
| */ |
| public void setHasHeader(boolean hasHeader) |
| { |
| this.hasHeader = hasHeader; |
| } |
| |
| /** |
| * Gets the field description |
| * |
| * @return fieldDescription. String specifying information related to fields |
| * in fixed-width file. |
| */ |
| public String getFieldDescription() |
| { |
| return fieldDescription; |
| } |
| |
| /** |
| * Sets fieldDescription |
| * |
| * @param fieldDescription |
| * a String specifying information related to fields in fixed-width |
| * file. Format is [NAME]:[FIELD_TYPE]:[WIDTH]:[date format if |
| * FIELD_TYPE is DATE] FIELD_TYPE can take on of the values of |
| * {@link FIELD_TYPE} |
| * e.g.Year:INTEGER:4,Make:STRING:5,Model:STRING:40,Description: |
| * STRING:40, Price:DOUBLE:8,Date:DATE:10:\"dd:mm:yyyy\" Date format |
| * needs to be within quotes (" ") |
| */ |
| public void setFieldDescription(String fieldDescription) |
| { |
| this.fieldDescription = fieldDescription; |
| } |
| |
| /** |
| * Gets the character used for padding in the fixed-width file.Default is |
| * white space (' ') |
| * |
| * @return Padding character. Default is white space. |
| */ |
| public char getPadding() |
| { |
| return padding; |
| } |
| |
| /** |
| * Sets the character used for padding in fixed-width file.Default is white |
| * space (' ') |
| * |
| * @param padding |
| * Padding character. Default is white space. |
| */ |
| public void setPadding(char padding) |
| { |
| this.padding = padding; |
| } |
| |
| public static class FixedWidthField extends Field |
| { |
| int width; |
| String dateFormat; |
| |
| public int getWidth() |
| { |
| return width; |
| } |
| |
| public void setWidth(int width) |
| { |
| this.width = width; |
| } |
| |
| public String getDateFormat() |
| { |
| return dateFormat; |
| } |
| |
| public void setDateFormat(String dateFormat) |
| { |
| this.dateFormat = dateFormat; |
| } |
| |
| } |
| |
| /** |
| * Extracts the fields from a fixed width record and returns a map containing |
| * field names and values |
| */ |
| @Override |
| Map<String, Object> extractFields(String line) |
| { |
| if (!initialized) { |
| init(); |
| initialized = true; |
| } |
| String[] values = fixedWidthParser.parseLine(line); |
| if (hasHeader && Arrays.deepEquals(values, header)) { |
| return null; |
| } |
| Map<String, Object> map = Maps.newHashMap(); |
| int i = 0; |
| for (FixedWidthField field : fields) { |
| map.put(field.getName(), getValue(field, values[i++])); |
| } |
| return map; |
| } |
| |
| private void init() |
| { |
| fields = new ArrayList<FixedWidthField>(); |
| List<String> headers = new ArrayList<String>(); |
| List<Integer> fieldWidth = new ArrayList<Integer>(); |
| for (String tmp : fieldDescription.split(",")) { |
| String[] fieldTuple = tmp.split(":(?=([^\"]*\"[^\"]*\")*[^\"]*$)", -1); |
| FixedWidthField field = new FixedWidthField(); |
| field.setName(fieldTuple[0]); |
| field.setType(fieldTuple[1]); |
| field.setWidth(Integer.parseInt(fieldTuple[2])); |
| headers.add(fieldTuple[0]); |
| fieldWidth.add(Integer.parseInt(fieldTuple[2])); |
| if (field.getType() == FIELD_TYPE.DATE) { |
| if (fieldTuple.length > 3) { |
| field.setDateFormat(fieldTuple[3].replace("\"", "")); |
| } else { |
| logger.error("Date format is missing for the field {}", field.getName()); |
| throw new RuntimeException("Missing date format"); |
| } |
| } |
| fields.add(field); |
| } |
| header = headers.toArray(new String[headers.size()]); |
| int[] width = Ints.toArray(fieldWidth); |
| FixedWidthFields lengths = new FixedWidthFields(header, width); |
| FixedWidthParserSettings settings = new FixedWidthParserSettings(lengths); |
| settings.getFormat().setPadding(this.padding); |
| fixedWidthParser = new FixedWidthParser(settings); |
| } |
| |
| private Object getValue(FixedWidthField field, String value) |
| { |
| if (StringUtils.isEmpty(value)) { |
| return null; |
| } |
| switch (field.getType()) { |
| case BOOLEAN: |
| return Boolean.parseBoolean(value); |
| case DOUBLE: |
| return Double.parseDouble(value); |
| case INTEGER: |
| return Integer.parseInt(value); |
| case FLOAT: |
| return Float.parseFloat(value); |
| case LONG: |
| return Long.parseLong(value); |
| case SHORT: |
| return Short.parseShort(value); |
| case CHARACTER: |
| return value.charAt(0); |
| case DATE: |
| try { |
| return new SimpleDateFormat(field.getDateFormat()).parse(value); |
| } catch (ParseException e) { |
| logger.error("Error parsing date for format {} and value {}", field.getDateFormat(), value); |
| throw new RuntimeException(e); |
| } |
| default: |
| return value; |
| } |
| } |
| } |