| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.iotdb.db.query.reader.universal; |
| |
| import java.io.IOException; |
| import java.util.List; |
| import java.util.PriorityQueue; |
| import org.apache.iotdb.tsfile.read.TimeValuePair; |
| import org.apache.iotdb.tsfile.read.reader.IPointReader; |
| |
| /** |
| * This class implements {@link IPointReader} for data sources with different priorities. |
| */ |
| public class PriorityMergeReader implements IPointReader { |
| |
| // max time of all added readers in PriorityMergeReader |
| // or min time of all added readers in DescPriorityMergeReader |
| protected long currentReadStopTime; |
| |
| protected PriorityQueue<Element> heap; |
| |
| public PriorityMergeReader() { |
| heap = new PriorityQueue<>((o1, o2) -> { |
| int timeCompare = Long.compare(o1.timeValuePair.getTimestamp(), |
| o2.timeValuePair.getTimestamp()); |
| return timeCompare != 0 ? timeCompare : Long.compare(o2.priority, o1.priority); |
| }); |
| } |
| |
| // only used in external sort, need to refactor later |
| public PriorityMergeReader(List<IPointReader> prioritySeriesReaders, int startPriority) |
| throws IOException { |
| heap = new PriorityQueue<>((o1, o2) -> { |
| int timeCompare = Long.compare(o1.timeValuePair.getTimestamp(), |
| o2.timeValuePair.getTimestamp()); |
| return timeCompare != 0 ? timeCompare : Long.compare(o2.priority, o1.priority); |
| }); |
| for (IPointReader reader : prioritySeriesReaders) { |
| addReader(reader, startPriority++); |
| } |
| } |
| |
| public void addReader(IPointReader reader, long priority) throws IOException { |
| if (reader.hasNextTimeValuePair()) { |
| heap.add(new Element(reader, reader.nextTimeValuePair(), priority)); |
| } else { |
| reader.close(); |
| } |
| } |
| |
| public void addReader(IPointReader reader, long priority, long endTime) throws IOException { |
| if (reader.hasNextTimeValuePair()) { |
| heap.add(new Element(reader, reader.nextTimeValuePair(), priority)); |
| currentReadStopTime = Math.max(currentReadStopTime, endTime); |
| } else { |
| reader.close(); |
| } |
| } |
| |
| public long getCurrentReadStopTime() { |
| return currentReadStopTime; |
| } |
| |
| @Override |
| public boolean hasNextTimeValuePair() { |
| return !heap.isEmpty(); |
| } |
| |
| @Override |
| public TimeValuePair nextTimeValuePair() throws IOException { |
| Element top = heap.poll(); |
| TimeValuePair ret = top.timeValuePair; |
| TimeValuePair topNext = null; |
| if (top.hasNext()) { |
| top.next(); |
| topNext = top.currPair(); |
| } |
| long topNextTime = topNext == null ? Long.MAX_VALUE : topNext.getTimestamp(); |
| updateHeap(ret.getTimestamp(), topNextTime); |
| if (topNext != null) { |
| top.timeValuePair = topNext; |
| heap.add(top); |
| } |
| return ret; |
| } |
| |
| @Override |
| public TimeValuePair currentTimeValuePair() throws IOException { |
| return heap.peek().timeValuePair; |
| } |
| |
| private void updateHeap(long topTime, long topNextTime) throws IOException { |
| while (!heap.isEmpty() && heap.peek().currTime() == topTime) { |
| Element e = heap.poll(); |
| if (!e.hasNext()) { |
| e.reader.close(); |
| continue; |
| } |
| |
| e.next(); |
| if (e.currTime() == topNextTime) { |
| // if the next value of the peek will be overwritten by the next of the top, skip it |
| if (e.hasNext()) { |
| e.next(); |
| heap.add(e); |
| } else { |
| // the chunk is end |
| e.close(); |
| } |
| } else { |
| heap.add(e); |
| } |
| } |
| } |
| |
| @Override |
| public void close() throws IOException { |
| while (!heap.isEmpty()) { |
| Element e = heap.poll(); |
| e.close(); |
| } |
| } |
| |
| static class Element { |
| |
| IPointReader reader; |
| TimeValuePair timeValuePair; |
| long priority; |
| |
| Element(IPointReader reader, TimeValuePair timeValuePair, long priority) { |
| this.reader = reader; |
| this.timeValuePair = timeValuePair; |
| this.priority = priority; |
| } |
| |
| long currTime() { |
| return timeValuePair.getTimestamp(); |
| } |
| |
| TimeValuePair currPair() { |
| return timeValuePair; |
| } |
| |
| boolean hasNext() throws IOException { |
| return reader.hasNextTimeValuePair(); |
| } |
| |
| void next() throws IOException { |
| timeValuePair = reader.nextTimeValuePair(); |
| } |
| |
| void close() throws IOException { |
| reader.close(); |
| } |
| } |
| } |