| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.carbondata.processing.loading.iterator; |
| |
| import java.util.concurrent.ArrayBlockingQueue; |
| import java.util.concurrent.TimeUnit; |
| |
| import org.apache.carbondata.common.CarbonIterator; |
| import org.apache.carbondata.common.logging.LogServiceFactory; |
| import org.apache.carbondata.core.util.CarbonProperties; |
| |
| import org.apache.log4j.Logger; |
| |
| /** |
| * It is wrapper class to hold the rows in batches when record writer writes the data and allows |
| * to iterate on it during data load. It uses blocking queue to coordinate between read and write. |
| */ |
| public class CarbonOutputIteratorWrapper extends CarbonIterator<Object[]> { |
| |
| private static final Logger LOG = |
| LogServiceFactory.getLogService(CarbonOutputIteratorWrapper.class.getName()); |
| |
| private boolean close; |
| |
| /** |
| * Number of rows kept in memory at most will be batchSize * queue size |
| */ |
| private int batchSize = CarbonProperties.getInstance().getBatchSize(); |
| |
| private RowBatch loadBatch = new RowBatch(batchSize); |
| |
| private RowBatch readBatch; |
| |
| private ArrayBlockingQueue<RowBatch> queue = new ArrayBlockingQueue<>(10); |
| |
| public void write(Object[] row) throws InterruptedException { |
| if (close) { |
| // already might be closed forcefully |
| return; |
| } |
| if (!loadBatch.addRow(row)) { |
| loadBatch.readyRead(); |
| queue.put(loadBatch); |
| loadBatch = new RowBatch(batchSize); |
| } |
| } |
| |
| @Override |
| public boolean hasNext() { |
| if (readBatch == null || !readBatch.hasNext()) { |
| // if readBatch don't have next row, set it to null |
| readBatch = null; |
| try { |
| // if the output is not closed, poll a batch from the queue in a loop. |
| // if the queue is always empty, it will wait for the last default element of the output, |
| // after that, the output will be closed and the loop will be finished. |
| while (!close) { |
| readBatch = queue.poll(5, TimeUnit.MINUTES); |
| if (readBatch == null) { |
| LOG.warn("try to poll a row batch again."); |
| } else { |
| // if readBatch is not null, break this loop to continue |
| break; |
| } |
| } |
| |
| // when the output is closed and readBatch is null, should poll a batch immediately. |
| // it is a double-checking also of the last poll operation. in some cases, the output is |
| // closed and the readBatch is null, but the queue is not empty, contain the last load |
| // batch or the last default batch. |
| if (close && readBatch == null) { |
| LOG.warn("try to poll a row batch one more time."); |
| readBatch = queue.poll(); |
| } |
| // checking again whether readBatch is null or not |
| if (readBatch == null) { |
| return false; |
| } |
| } catch (InterruptedException e) { |
| throw new RuntimeException(e); |
| } |
| } |
| return readBatch.hasNext(); |
| } |
| |
| @Override |
| public Object[] next() { |
| return readBatch.next(); |
| } |
| |
| public void closeWriter(boolean isForceClose) { |
| if (close) { |
| // already might be closed forcefully |
| return; |
| } |
| try { |
| if (isForceClose) { |
| // first make close is set to true, when force close happens because of dead consumer. |
| // so that, write() method will stop taking input rows. |
| close = true; |
| // once write() method stops taking input rows, clear the queue. |
| // If queue is cleared before close is set to true, then queue will be again filled |
| // by .write() and it can go to blocking put() forever as consumer is dead. |
| queue.clear(); |
| return; |
| } |
| // below code will ensure that the last RowBatch is consumed properly |
| loadBatch.readyRead(); |
| if (loadBatch.size > 0) { |
| queue.put(loadBatch); |
| } |
| } catch (InterruptedException e) { |
| throw new RuntimeException(e); |
| } |
| // It is required if the thread waits for take. |
| if (queue.isEmpty()) { |
| if (!queue.offer(new RowBatch(0))) { |
| LOG.warn("The default last element is not added to queue"); |
| } |
| } |
| // after try to add the default last element, close the output. |
| close = true; |
| } |
| |
| private static class RowBatch extends CarbonIterator<Object[]> { |
| |
| private int counter; |
| |
| private Object[][] batch; |
| |
| private int size; |
| |
| private RowBatch(int size) { |
| batch = new Object[size][]; |
| this.size = size; |
| } |
| |
| /** |
| * Add row to the batch, it can hold rows till the batch size. |
| * @param row |
| * @return false if the row cannot be added as batch is full. |
| */ |
| public boolean addRow(Object[] row) { |
| batch[counter++] = row; |
| return counter < size; |
| } |
| |
| public void readyRead() { |
| size = counter; |
| counter = 0; |
| } |
| |
| @Override |
| public boolean hasNext() { |
| return counter < size; |
| } |
| |
| @Override |
| public Object[] next() { |
| assert (counter < size); |
| return batch[counter++]; |
| } |
| } |
| |
| } |