blob: ddb6cfd6e06d266343937349e6e9ffba5ff705bd [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.work.filter;
import io.netty.buffer.DrillBuf;
import org.apache.drill.common.AutoCloseables;
import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.proto.BitData;
import com.google.common.base.Preconditions;
import java.util.ArrayList;
import java.util.List;
/**
* A binary wire transferable representation of the RuntimeFilter which contains
* the runtime filter definition and its corresponding data.
*/
public class RuntimeFilterWritable implements AutoCloseables.Closeable{
private BitData.RuntimeFilterBDef runtimeFilterBDef;
private DrillBuf[] data;
private String identifier;
public RuntimeFilterWritable(BitData.RuntimeFilterBDef runtimeFilterBDef, DrillBuf... data) {
List<Integer> bfSizeInBytes = runtimeFilterBDef.getBloomFilterSizeInBytesList();
int bufArrLen = data.length;
Preconditions.checkArgument(bfSizeInBytes.size() == bufArrLen, "the input DrillBuf number does not match the metadata definition!");
this.runtimeFilterBDef = runtimeFilterBDef;
this.data = data;
this.identifier = "majorFragmentId:" + runtimeFilterBDef.getMajorFragmentId()
+ ",minorFragmentId:" + runtimeFilterBDef.getMinorFragmentId()
+ ", srcOperatorId:" + runtimeFilterBDef.getHjOpId();
}
public BitData.RuntimeFilterBDef getRuntimeFilterBDef() {
return runtimeFilterBDef;
}
public DrillBuf[] getData() {
return data;
}
public void setData(DrillBuf... data) {
this.data = data;
}
public List<BloomFilter> unwrap() {
List<Integer> sizeInBytes = runtimeFilterBDef.getBloomFilterSizeInBytesList();
List<BloomFilter> bloomFilters = new ArrayList<>(sizeInBytes.size());
for (int i = 0; i < sizeInBytes.size(); i++) {
DrillBuf byteBuf = data[i];
int offset = 0;
int size = sizeInBytes.get(i);
DrillBuf bloomFilterContent = byteBuf.slice(offset, size);
BloomFilter bloomFilter = new BloomFilter(bloomFilterContent);
bloomFilters.add(bloomFilter);
}
return bloomFilters;
}
public void aggregate(RuntimeFilterWritable runtimeFilterWritable) {
List<BloomFilter> thisFilters = this.unwrap();
List<BloomFilter> otherFilters = runtimeFilterWritable.unwrap();
for (int i = 0; i < thisFilters.size(); i++) {
BloomFilter thisOne = thisFilters.get(i);
BloomFilter otherOne = otherFilters.get(i);
thisOne.or(otherOne);
}
for (BloomFilter bloomFilter : otherFilters) {
bloomFilter.getContent().clear();
}
}
public RuntimeFilterWritable duplicate(BufferAllocator bufferAllocator) {
int len = data.length;
DrillBuf[] cloned = new DrillBuf[len];
int i = 0;
for (DrillBuf src : data) {
int capacity = src.readableBytes();
DrillBuf duplicateOne = bufferAllocator.buffer(capacity);
int readerIndex = src.readerIndex();
duplicateOne.writeBytes(src);
src.readerIndex(readerIndex);
cloned[i] = duplicateOne;
i++;
}
return new RuntimeFilterWritable(runtimeFilterBDef, cloned);
}
public void retainBuffers(final int increment) {
if (increment <= 0) {
return;
}
for (final DrillBuf buf : data) {
buf.retain(increment);
}
}
//TODO: Not used currently because of DRILL-6826
public RuntimeFilterWritable newRuntimeFilterWritable(BufferAllocator bufferAllocator) {
int bufNum = data.length;
DrillBuf [] newBufs = new DrillBuf[bufNum];
int i = 0;
for (DrillBuf buf : data) {
DrillBuf transferredBuffer = buf.transferOwnership(bufferAllocator).buffer;
newBufs[i] = transferredBuffer;
i++;
}
return new RuntimeFilterWritable(this.runtimeFilterBDef, newBufs);
}
public String toString() {
return identifier;
}
@Override
public boolean equals(Object other) {
if (other == null) {
return false;
}
if (other instanceof RuntimeFilterWritable) {
RuntimeFilterWritable otherRFW = (RuntimeFilterWritable) other;
return this.identifier.equals(otherRFW.identifier);
}
return false;
}
@Override
public int hashCode() {
return identifier.hashCode();
}
@Override
public void close() {
for (DrillBuf buf : data) {
buf.release();
}
}
}