blob: 874a44bd56b3f13a3b46df72468d982d402886b6 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.query.h2.dml;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.UUID;
import javax.cache.processor.EntryProcessor;
import javax.cache.processor.EntryProcessorException;
import javax.cache.processor.EntryProcessorResult;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.binary.BinaryObjectImpl;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.distributed.dht.IgniteClusterReadOnlyException;
import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode;
import org.apache.ignite.internal.processors.odbc.SqlStateCode;
import org.apache.ignite.internal.processors.tracing.MTC;
import org.apache.ignite.internal.processors.tracing.MTC.TraceSurroundings;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.X;
import static org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode.createJdbcSqlException;
import static org.apache.ignite.internal.processors.tracing.SpanTags.ERROR;
import static org.apache.ignite.internal.processors.tracing.SpanTags.SQL_CACHE_UPDATES;
import static org.apache.ignite.internal.processors.tracing.SpanType.SQL_CACHE_UPDATE;
/**
* Batch sender class.
*/
public class DmlBatchSender {
/** Comparator. */
private static final BatchEntryComparator COMP = new BatchEntryComparator();
/** Cache context. */
private final GridCacheContext cctx;
/** Batch size. */
private final int size;
/** Batches. */
private final Map<UUID, Batch> batches = new HashMap<>();
/** Result count. */
private long updateCnt;
/** Failed keys. */
private List<Object> failedKeys;
/** Exception. */
private SQLException err;
/** Per row updates counter */
private int[] cntPerRow;
/**
* Constructor.
*
* @param cctx Cache context.
* @param size Batch.
* @param qryNum Number of queries.
*/
public DmlBatchSender(GridCacheContext cctx, int size, int qryNum) {
this.cctx = cctx;
this.size = size;
cntPerRow = new int[qryNum];
}
/**
* Add entry to batch.
*
* @param key Key.
* @param proc Processor.
* @param rowNum Row number.
* @throws IgniteCheckedException If failed.
*/
public void add(Object key, EntryProcessor<Object, Object, Boolean> proc, int rowNum)
throws IgniteCheckedException {
assert key != null;
assert proc != null;
assert rowNum < cntPerRow.length;
ClusterNode node = primaryNodeByKey(key);
UUID nodeId = node.id();
Batch batch = batches.get(nodeId);
if (batch == null) {
batch = new Batch();
batches.put(nodeId, batch);
}
if (batch.containsKey(key)) { // Force cache update if duplicates found.
sendBatch(batch);
}
batch.put(key, rowNum, proc);
if (batch.size() >= size)
sendBatch(batch);
}
/**
* @param key Key.
* @return Primary node for given key.
* @throws IgniteCheckedException If primary node is not found.
*/
public ClusterNode primaryNodeByKey(Object key) throws IgniteCheckedException {
ClusterNode node = cctx.affinity().primaryByKey(key, AffinityTopologyVersion.NONE);
if (node == null)
throw new IgniteCheckedException("Failed to map key to node.");
return node;
}
/**
* Flush any remaining entries.
*/
public void flush() {
for (Batch batch : batches.values()) {
if (!batch.isEmpty())
sendBatch(batch);
}
}
/**
* @return Update count.
*/
public long updateCount() {
return updateCnt;
}
/**
* @return Failed keys.
*/
public List<Object> failedKeys() {
return failedKeys != null ? failedKeys : Collections.emptyList();
}
/**
* @return Error.
*/
public SQLException error() {
return err;
}
/**
* Returns per row updates counter as array.
*
* @return Per row updates counter as array.
*/
public int[] perRowCounterAsArray() {
return cntPerRow;
}
/**
* Sets row as failed.
*
* @param rowNum Row number.
*/
public void setFailed(int rowNum) {
cntPerRow[rowNum] = Statement.EXECUTE_FAILED;
}
/**
* Send the batch.
*
* @param batch Batch.
*/
private void sendBatch(Batch batch) {
try (
TraceSurroundings ignored = MTC.support(cctx.kernalContext().tracing()
.create(SQL_CACHE_UPDATE, MTC.span())
.addTag(SQL_CACHE_UPDATES, () -> Integer.toString(batch.size())))
) {
DmlPageProcessingResult pageRes = processPage(cctx, batch);
batch.clear();
updateCnt += pageRes.count();
if (failedKeys == null)
failedKeys = new ArrayList<>();
failedKeys.addAll(F.asList(pageRes.errorKeys()));
if (pageRes.error() != null) {
MTC.span().addTag(ERROR, pageRes.error()::getMessage);
if (err == null)
err = pageRes.error();
else
err.setNextException(pageRes.error());
}
}
}
/**
* Execute given entry processors and collect errors, if any.
* @param cctx Cache context.
* @param batch Rows to process.
* @return Triple [number of rows actually changed; keys that failed to update (duplicates or concurrently
* updated ones); chain of exceptions for all keys whose processing resulted in error, or null for no errors].
*/
@SuppressWarnings({"unchecked"})
private DmlPageProcessingResult processPage(GridCacheContext cctx, Batch batch) {
Map<Object, EntryProcessorResult<Boolean>> res;
try {
res = cctx.cache().invokeAll(batch.rowProcessors());
}
catch (IgniteCheckedException e) {
for (Integer rowNum : batch.rowNumbers().values()) {
assert rowNum != null;
cntPerRow[rowNum] = Statement.EXECUTE_FAILED;
}
if (X.hasCause(e, IgniteClusterReadOnlyException.class)) {
SQLException sqlEx = new SQLException(
e.getMessage(),
SqlStateCode.CLUSTER_READ_ONLY_MODE_ENABLED,
IgniteQueryErrorCode.CLUSTER_READ_ONLY_MODE_ENABLED,
e
);
return new DmlPageProcessingResult(0, null, sqlEx);
}
return new DmlPageProcessingResult(0, null,
new SQLException(e.getMessage(), SqlStateCode.INTERNAL_ERROR, IgniteQueryErrorCode.UNKNOWN, e));
}
if (F.isEmpty(res)) {
countAllRows(batch.rowNumbers().values());
return new DmlPageProcessingResult(batch.size(), null, null);
}
DmlPageProcessingErrorResult splitRes = splitErrors(res, batch);
int keysCnt = splitRes.errorKeys().length;
return new DmlPageProcessingResult(batch.size() - keysCnt - splitRes.errorCount(), splitRes.errorKeys(),
splitRes.error());
}
/**
* Process errors of entry processor - split the keys into duplicated/concurrently modified and those whose
* processing yielded an exception.
*
* @param res Result of {@link GridCacheAdapter#invokeAll)}
* @param batch Batch.
* @return pair [array of duplicated/concurrently modified keys, SQL exception for erroneous keys] (exception is
* null if all keys are duplicates/concurrently modified ones).
*/
private DmlPageProcessingErrorResult splitErrors(Map<Object, EntryProcessorResult<Boolean>> res, Batch batch) {
Set<Object> errKeys = new LinkedHashSet<>(res.keySet());
countAllRows(batch.rowNumbers().values());
SQLException currSqlEx = null;
SQLException firstSqlEx = null;
int errors = 0;
// Let's form a chain of SQL exceptions
for (Map.Entry<Object, EntryProcessorResult<Boolean>> e : res.entrySet()) {
try {
e.getValue().get();
}
catch (EntryProcessorException ex) {
SQLException next = createJdbcSqlException("Failed to process key '" + e.getKey() + '\'',
IgniteQueryErrorCode.ENTRY_PROCESSING);
next.initCause(ex);
if (currSqlEx != null)
currSqlEx.setNextException(next);
else
firstSqlEx = next;
currSqlEx = next;
errKeys.remove(e.getKey());
errors++;
}
finally {
Object key = e.getKey();
Integer rowNum = batch.rowNumbers().get(key);
assert rowNum != null;
cntPerRow[rowNum] = Statement.EXECUTE_FAILED;
}
}
return new DmlPageProcessingErrorResult(errKeys.toArray(), firstSqlEx, errors);
}
/**
* Updates counters as if all rowNums were successfully processed.
*
* @param rowNums Rows.
*/
private void countAllRows(Collection<Integer> rowNums) {
for (Integer rowNum : rowNums) {
assert rowNum != null;
if (cntPerRow[rowNum] > -1)
cntPerRow[rowNum]++;
}
}
/**
* Batch for update.
*/
private static class Batch {
/** Map from keys to row numbers. */
private Map<Object, Integer> rowNums = new HashMap<>();
/** Map from keys to entry processors. */
private Map<Object, EntryProcessor<Object, Object, Boolean>> rowProcs = new TreeMap<>(COMP);
/**
* Checks if batch contains key.
*
* @param key Key.
* @return {@code True} if contains.
*/
public boolean containsKey(Object key) {
boolean res = rowNums.containsKey(key);
assert res == rowProcs.containsKey(key);
return res;
}
/**
* Returns batch size.
*
* @return Batch size.
*/
public int size() {
int res = rowNums.size();
assert res == rowProcs.size();
return res;
}
/**
* Adds row to batch.
*
* @param key Key.
* @param rowNum Row number.
* @param proc Entry processor.
* @return {@code True} if there was an entry associated with the given key.
*/
public boolean put(Object key, Integer rowNum, EntryProcessor<Object, Object, Boolean> proc) {
Integer prevNum = rowNums.put(key, rowNum);
EntryProcessor prevProc = rowProcs.put(key, proc);
assert (prevNum == null) == (prevProc == null);
return prevNum != null;
}
/**
* Clears batch.
*/
public void clear() {
assert rowNums.size() == rowProcs.size();
rowNums.clear();
rowProcs.clear();
}
/**
* Checks if batch is empty.
*
* @return {@code True} if empty.
*/
public boolean isEmpty() {
assert rowNums.size() == rowProcs.size();
return rowNums.isEmpty();
}
/**
* Row numbers map getter.
*
* @return Row numbers map.
*/
public Map<Object, Integer> rowNumbers() {
return rowNums;
}
/**
* Row processors map getter.
*
* @return Row processors map.
*/
public Map<Object, EntryProcessor<Object, Object, Boolean>> rowProcessors() {
return rowProcs;
}
}
/**
* Batch entries comparator.
*/
private static final class BatchEntryComparator implements Comparator<Object> {
/** {@inheritDoc} */
@Override public int compare(Object first, Object second) {
// We assume that only simple types or BinaryObjectImpl are possible. The latter comes from the fact
// that we use BinaryObjectBuilder which produces only on-heap binary objects.
return BinaryObjectImpl.compareForDml(first, second);
}
}
}