blob: bd7e5e084e7ddba5d390cf957c9294c48e487345 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.apex.malhar.flume.interceptor;
import java.util.Arrays;
import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import com.google.common.primitives.Ints;
import static org.apache.apex.malhar.flume.interceptor.ColumnFilteringFormattingInterceptor.Constants.COLUMNS_FORMATTER;
/**
* <p>ColumnFilteringFormattingInterceptor class.</p>
*
* @since 0.9.4
*/
public class ColumnFilteringFormattingInterceptor implements Interceptor
{
private final byte srcSeparator;
private final byte[][] dstSeparators;
private final byte[] prefix;
private final int maxIndex;
private final int maxColumn;
private final int[] columns;
private final int[] positions;
private ColumnFilteringFormattingInterceptor(int[] columns, byte srcSeparator, byte[][] dstSeparators, byte[] prefix)
{
this.columns = columns;
int tempMaxColumn = Integer.MIN_VALUE;
for (int column : columns) {
if (column > tempMaxColumn) {
tempMaxColumn = column;
}
}
maxIndex = tempMaxColumn;
maxColumn = tempMaxColumn + 1;
positions = new int[maxColumn + 1];
this.srcSeparator = srcSeparator;
this.dstSeparators = dstSeparators;
this.prefix = prefix;
}
@Override
public void initialize()
{
/* no-op */
}
@Override
public Event intercept(Event event)
{
byte[] body = event.getBody();
if (body == null) {
return event;
}
final int length = body.length;
/* store positions of character after the separators */
int i = 0;
int index = 0;
while (i < length) {
if (body[i++] == srcSeparator) {
positions[++index] = i;
if (index >= maxIndex) {
break;
}
}
}
int nextVirginIndex;
boolean separatorAtEnd = true;
if (i == length && index < maxColumn) {
nextVirginIndex = index + 2;
positions[nextVirginIndex - 1] = length;
separatorAtEnd = length > 0 ? body[length - 1] == srcSeparator : false;
} else {
nextVirginIndex = index + 1;
}
int newArrayLen = prefix.length;
for (i = columns.length; i-- > 0; ) {
int column = columns[i];
int len = positions[column + 1] - positions[column];
if (len > 0) {
if (positions[column + 1] == length && !separatorAtEnd) {
newArrayLen += len;
} else {
newArrayLen += len - 1;
}
}
newArrayLen += dstSeparators[i].length;
}
byte[] newBody = new byte[newArrayLen];
int newOffset = 0;
if (prefix.length > 0) {
System.arraycopy(prefix, 0, newBody, 0, prefix.length);
newOffset += prefix.length;
}
int dstSeparatorsIdx = 0;
for (int column : columns) {
int len = positions[column + 1] - positions[column];
byte[] separator = dstSeparators[dstSeparatorsIdx++];
if (len > 0) {
System.arraycopy(body, positions[column], newBody, newOffset, len);
newOffset += len;
if (newBody[newOffset - 1] == srcSeparator) {
newOffset--;
}
}
System.arraycopy(separator, 0, newBody, newOffset, separator.length);
newOffset += separator.length;
}
event.setBody(newBody);
Arrays.fill(positions, 1, nextVirginIndex, 0);
return event;
}
@Override
public List<Event> intercept(List<Event> events)
{
for (Event event : events) {
intercept(event);
}
return events;
}
@Override
public void close()
{
}
public static class Builder implements Interceptor.Builder
{
private int[] columns;
private byte srcSeparator;
private byte[][] dstSeparators;
private byte[] prefix;
@Override
public Interceptor build()
{
return new ColumnFilteringFormattingInterceptor(columns, srcSeparator, dstSeparators, prefix);
}
@Override
public void configure(Context context)
{
String formatter = context.getString(COLUMNS_FORMATTER);
if (Strings.isNullOrEmpty(formatter)) {
throw new IllegalArgumentException("This interceptor requires columns format to be specified!");
}
List<String> lSeparators = Lists.newArrayList();
List<Integer> lColumns = Lists.newArrayList();
Pattern colPat = Pattern.compile("\\{\\d+?\\}");
Matcher matcher = colPat.matcher(formatter);
int separatorStart = 0;
String lPrefix = "";
while (matcher.find()) {
String col = matcher.group();
lColumns.add(Integer.parseInt(col.substring(1, col.length() - 1)));
if (separatorStart == 0 && matcher.start() > 0) {
lPrefix = formatter.substring(0, matcher.start());
} else if (separatorStart > 0) {
lSeparators.add(formatter.substring(separatorStart, matcher.start()));
}
separatorStart = matcher.end();
}
if (separatorStart < formatter.length()) {
lSeparators.add(formatter.substring(separatorStart, formatter.length()));
}
columns = Ints.toArray(lColumns);
byte[] emptyStringBytes = "".getBytes();
dstSeparators = new byte[columns.length][];
for (int i = 0; i < columns.length; i++) {
if (i < lSeparators.size()) {
dstSeparators[i] = lSeparators.get(i).getBytes();
} else {
dstSeparators[i] = emptyStringBytes;
}
}
srcSeparator = context.getInteger(ColumnFilteringInterceptor.Constants.SRC_SEPARATOR, (int) ColumnFilteringInterceptor.Constants.SRC_SEPARATOR_DFLT).byteValue();
this.prefix = lPrefix.getBytes();
}
}
public static class Constants extends ColumnFilteringInterceptor.Constants
{
public static final String COLUMNS_FORMATTER = "columnsFormatter";
}
private static final Logger logger = LoggerFactory.getLogger(ColumnFilteringFormattingInterceptor.class);
}