blob: ccf3e251e539e4a385e5c9c2db9cab826f69f914 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.bulkload;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteDataStreamer;
import org.apache.ignite.IgniteIllegalStateException;
import org.apache.ignite.internal.util.lang.IgniteClosureX;
import org.apache.ignite.lang.IgniteBiTuple;
import java.util.List;
/**
* Bulk load (COPY) command processor used on server to keep various context data and process portions of input
* received from the client side.
*/
public class BulkLoadProcessor implements AutoCloseable {
/** Parser of the input bytes. */
private final BulkLoadParser inputParser;
/**
* Converter, which transforms the list of strings parsed from the input stream to the key+value entry to add to
* the cache.
*/
private final IgniteClosureX<List<?>, IgniteBiTuple<?, ?>> dataConverter;
/** Streamer that puts actual key/value into the cache. */
private final BulkLoadCacheWriter outputStreamer;
/** Becomes true after {@link #close()} method is called. */
private boolean isClosed;
/**
* Creates bulk load processor.
*
* @param inputParser Parser of the input bytes.
* @param dataConverter Converter, which transforms the list of strings parsed from the input stream to the
* key+value entry to add to the cache.
* @param outputStreamer Streamer that puts actual key/value into the cache.
*/
public BulkLoadProcessor(BulkLoadParser inputParser, IgniteClosureX<List<?>, IgniteBiTuple<?, ?>> dataConverter,
BulkLoadCacheWriter outputStreamer) {
this.inputParser = inputParser;
this.dataConverter = dataConverter;
this.outputStreamer = outputStreamer;
isClosed = false;
}
/**
* Returns the streamer that puts actual key/value into the cache.
*
* @return Streamer that puts actual key/value into the cache.
*/
public BulkLoadCacheWriter outputStreamer() {
return outputStreamer;
}
/**
* Processes the incoming batch and writes data to the cache by calling the data converter and output streamer.
*
* @param batchData Data from the current batch.
* @param isLastBatch true if this is the last batch.
* @throws IgniteIllegalStateException when called after {@link #close()}.
*/
public void processBatch(byte[] batchData, boolean isLastBatch) throws IgniteCheckedException {
if (isClosed)
throw new IgniteIllegalStateException("Attempt to process a batch on a closed BulkLoadProcessor");
Iterable<List<Object>> inputRecords = inputParser.parseBatch(batchData, isLastBatch);
for (List<Object> record : inputRecords) {
IgniteBiTuple<?, ?> kv = dataConverter.apply(record);
outputStreamer.apply(kv);
}
}
/**
* Aborts processing and closes the underlying objects ({@link IgniteDataStreamer}).
*/
@Override public void close() throws Exception {
if (isClosed)
return;
isClosed = true;
outputStreamer.close();
}
}