blob: 2e8d0aa6efad0ba8e996b3fca9b74c78500d0204 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.store.pcapng;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.commons.io.IOUtils;
import org.apache.drill.common.AutoCloseables;
import org.apache.drill.common.exceptions.CustomErrorContext;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.types.TypeProtos.DataMode;
import org.apache.drill.exec.physical.impl.scan.v3.ManagedReader;
import org.apache.drill.exec.physical.impl.scan.v3.file.FileDescrip;
import org.apache.drill.exec.physical.impl.scan.v3.file.FileSchemaNegotiator;
import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
import org.apache.drill.exec.physical.resultSet.RowSetLoader;
import org.apache.drill.exec.record.metadata.SchemaBuilder;
import org.apache.drill.exec.record.metadata.TupleMetadata;
import org.apache.drill.exec.store.dfs.DrillFileSystem;
import org.apache.drill.exec.store.dfs.easy.EasySubScan;
import org.apache.drill.exec.store.pcap.plugin.PcapFormatConfig;
import org.apache.drill.exec.util.Utilities;
import org.apache.drill.exec.vector.accessor.ScalarWriter;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import fr.bmartel.pcapdecoder.PcapDecoder;
import fr.bmartel.pcapdecoder.structure.types.IPcapngType;
import fr.bmartel.pcapdecoder.structure.types.inter.IEnhancedPacketBLock;
public class PcapngBatchReader implements ManagedReader {
private static final Logger logger = LoggerFactory.getLogger(PcapngBatchReader.class);
private final PcapFormatConfig config;
private final EasySubScan scan;
private final FileDescrip file;
private CustomErrorContext errorContext;
private List<SchemaPath> columns;
private List<ColumnDefn> projectedColumns;
private Iterator<IPcapngType> pcapIterator;
private IPcapngType block;
private RowSetLoader loader;
private InputStream in;
private Path path;
public PcapngBatchReader(final PcapFormatConfig config, final EasySubScan scan,
FileSchemaNegotiator negotiator) {
this.config = config;
this.scan = scan;
this.columns = scan.getColumns();
this.file = negotiator.file();
try {
// init InputStream for pcap file
errorContext = negotiator.parentErrorContext();
DrillFileSystem dfs = file.fileSystem();
path = dfs.makeQualified(file.split().getPath());
in = dfs.openPossiblyCompressedStream(path);
// decode the pcap file
PcapDecoder decoder = new PcapDecoder(IOUtils.toByteArray(in));
decoder.decode();
pcapIterator = decoder.getSectionList().iterator();
logger.debug("The config is {}, root is {}, columns has {}", config, scan.getSelectionRoot(), columns);
} catch (IOException e) {
throw UserException
.dataReadError(e)
.message("Failure in initial pcapng inputstream. " + e.getMessage())
.addContext(errorContext)
.build(logger);
} catch (Exception e) {
throw UserException
.dataReadError(e)
.message("Failed to decode the pcapng file. " + e.getMessage())
.addContext(errorContext)
.build(logger);
}
// define the schema
negotiator.tableSchema(defineMetadata(), true);
ResultSetLoader resultSetLoader = negotiator.build();
loader = resultSetLoader.writer();
// bind the writer for columns
bindColumns(loader);
}
/**
* The default of the `stat` parameter is false,
* which means that the packet data is parsed and returned,
* but if true, will return the statistics data about the each pcapng file only
* (consist of the information about collect devices and the summary of the packet data above).
*
* In addition, a pcapng file contains a single Section Header Block (SHB),
* a single Interface Description Block (IDB) and a few Enhanced Packet Blocks (EPB).
* <pre>
* +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
* | SHB | IDB | EPB | EPB | ... | EPB |
* +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
* </pre>
* https://pcapng.github.io/pcapng/draft-tuexen-opsawg-pcapng.html#name-physical-file-layout
*/
@Override
public boolean next() {
while (!loader.isFull()) {
if (!pcapIterator.hasNext()) {
return false;
} else if (config.getStat() && isIEnhancedPacketBlock()) {
continue;
} else if (!config.getStat() && !isIEnhancedPacketBlock()) {
continue;
}
processBlock();
}
return true;
}
@Override
public void close() {
AutoCloseables.closeSilently(in);
}
private boolean isIEnhancedPacketBlock() {
block = pcapIterator.next();
return block instanceof IEnhancedPacketBLock;
}
private void processBlock() {
loader.start();
for (ColumnDefn columnDefn : projectedColumns) {
// pcapng file name
if (columnDefn.getName().equals(PcapColumn.PATH_NAME)) {
columnDefn.load(path.getName());
} else {
// pcapng block data
columnDefn.load(block);
}
}
loader.save();
}
private boolean isSkipQuery() {
return columns.isEmpty();
}
private boolean isStarQuery() {
return Utilities.isStarQuery(columns);
}
private TupleMetadata defineMetadata() {
SchemaBuilder builder = new SchemaBuilder();
processProjected(columns);
for (ColumnDefn columnDefn : projectedColumns) {
columnDefn.define(builder);
}
return builder.buildSchema();
}
/**
* <b> Define the schema based on projected </b><br/>
* 1. SkipQuery: no field specified, such as count(*) <br/>
* 2. StarQuery: select * <br/>
* 3. ProjectPushdownQuery: select a,b,c <br/>
*/
private void processProjected(List<SchemaPath> columns) {
projectedColumns = new ArrayList<ColumnDefn>();
if (isSkipQuery()) {
projectedColumns.add(new ColumnDefn(PcapColumn.DUMMY_NAME, new PcapColumn.PcapDummy()));
} else if (isStarQuery()) {
Set<Map.Entry<String, PcapColumn>> pcapColumns;
if (config.getStat()) {
pcapColumns = PcapColumn.getSummaryColumns().entrySet();
} else {
pcapColumns = PcapColumn.getColumns().entrySet();
}
for (Map.Entry<String, PcapColumn> pcapColumn : pcapColumns) {
makePcapColumns(projectedColumns, pcapColumn.getKey(), pcapColumn.getValue());
}
} else {
for (SchemaPath schemaPath : columns) {
// Support Case-Insensitive
String projectedName = schemaPath.rootName().toLowerCase();
PcapColumn pcapColumn;
if (config.getStat()) {
pcapColumn = PcapColumn.getSummaryColumns().get(projectedName);
} else {
pcapColumn = PcapColumn.getColumns().get(projectedName);
}
if (pcapColumn != null) {
makePcapColumns(projectedColumns, projectedName, pcapColumn);
} else {
makePcapColumns(projectedColumns, projectedName, new PcapColumn.PcapDummy());
logger.debug("{} missing the PcapColumn implement class.", projectedName);
}
}
}
Collections.unmodifiableList(projectedColumns);
}
private void makePcapColumns(List<ColumnDefn> projectedColumns, String name, PcapColumn column) {
projectedColumns.add(new ColumnDefn(name, column));
}
private void bindColumns(RowSetLoader loader) {
for (ColumnDefn columnDefn : projectedColumns) {
columnDefn.bind(loader);
}
}
private static class ColumnDefn {
private final String name;
private PcapColumn processor;
private ScalarWriter writer;
public ColumnDefn(String name, PcapColumn column) {
this.name = name;
this.processor = column;
}
public String getName() {
return name;
}
public PcapColumn getProcessor() {
return processor;
}
public void bind(RowSetLoader loader) {
writer = loader.scalar(getName());
}
public void define(SchemaBuilder builder) {
if (getProcessor().getType().getMode() == DataMode.REQUIRED) {
builder.add(getName(), getProcessor().getType().getMinorType());
} else {
builder.addNullable(getName(), getProcessor().getType().getMinorType());
}
}
public void load(IPcapngType block) {
getProcessor().process(block, writer);
}
public void load(String value) {
writer.setString(value);
}
}
}