blob: 874a44bd56b3f13a3b46df72468d982d402886b6 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.ignite.internal.processors.query.h2.dml;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.UUID;
import javax.cache.processor.EntryProcessor;
import javax.cache.processor.EntryProcessorException;
import javax.cache.processor.EntryProcessorResult;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.binary.BinaryObjectImpl;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.distributed.dht.IgniteClusterReadOnlyException;
import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode;
import org.apache.ignite.internal.processors.odbc.SqlStateCode;
import org.apache.ignite.internal.processors.tracing.MTC;
import org.apache.ignite.internal.processors.tracing.MTC.TraceSurroundings;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.X;
import static org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode.createJdbcSqlException;
import static org.apache.ignite.internal.processors.tracing.SpanTags.ERROR;
import static org.apache.ignite.internal.processors.tracing.SpanTags.SQL_CACHE_UPDATES;
import static org.apache.ignite.internal.processors.tracing.SpanType.SQL_CACHE_UPDATE;
* Batch sender class.
public class DmlBatchSender {
/** Comparator. */
private static final BatchEntryComparator COMP = new BatchEntryComparator();
/** Cache context. */
private final GridCacheContext cctx;
/** Batch size. */
private final int size;
/** Batches. */
private final Map<UUID, Batch> batches = new HashMap<>();
/** Result count. */
private long updateCnt;
/** Failed keys. */
private List<Object> failedKeys;
/** Exception. */
private SQLException err;
/** Per row updates counter */
private int[] cntPerRow;
* Constructor.
* @param cctx Cache context.
* @param size Batch.
* @param qryNum Number of queries.
public DmlBatchSender(GridCacheContext cctx, int size, int qryNum) {
this.cctx = cctx;
this.size = size;
cntPerRow = new int[qryNum];
* Add entry to batch.
* @param key Key.
* @param proc Processor.
* @param rowNum Row number.
* @throws IgniteCheckedException If failed.
public void add(Object key, EntryProcessor<Object, Object, Boolean> proc, int rowNum)
throws IgniteCheckedException {
assert key != null;
assert proc != null;
assert rowNum < cntPerRow.length;
ClusterNode node = primaryNodeByKey(key);
UUID nodeId =;
Batch batch = batches.get(nodeId);
if (batch == null) {
batch = new Batch();
batches.put(nodeId, batch);
if (batch.containsKey(key)) { // Force cache update if duplicates found.
batch.put(key, rowNum, proc);
if (batch.size() >= size)
* @param key Key.
* @return Primary node for given key.
* @throws IgniteCheckedException If primary node is not found.
public ClusterNode primaryNodeByKey(Object key) throws IgniteCheckedException {
ClusterNode node = cctx.affinity().primaryByKey(key, AffinityTopologyVersion.NONE);
if (node == null)
throw new IgniteCheckedException("Failed to map key to node.");
return node;
* Flush any remaining entries.
public void flush() {
for (Batch batch : batches.values()) {
if (!batch.isEmpty())
* @return Update count.
public long updateCount() {
return updateCnt;
* @return Failed keys.
public List<Object> failedKeys() {
return failedKeys != null ? failedKeys : Collections.emptyList();
* @return Error.
public SQLException error() {
return err;
* Returns per row updates counter as array.
* @return Per row updates counter as array.
public int[] perRowCounterAsArray() {
return cntPerRow;
* Sets row as failed.
* @param rowNum Row number.
public void setFailed(int rowNum) {
cntPerRow[rowNum] = Statement.EXECUTE_FAILED;
* Send the batch.
* @param batch Batch.
private void sendBatch(Batch batch) {
try (
TraceSurroundings ignored =
.create(SQL_CACHE_UPDATE, MTC.span())
.addTag(SQL_CACHE_UPDATES, () -> Integer.toString(batch.size())))
) {
DmlPageProcessingResult pageRes = processPage(cctx, batch);
updateCnt += pageRes.count();
if (failedKeys == null)
failedKeys = new ArrayList<>();
if (pageRes.error() != null) {
MTC.span().addTag(ERROR, pageRes.error()::getMessage);
if (err == null)
err = pageRes.error();
* Execute given entry processors and collect errors, if any.
* @param cctx Cache context.
* @param batch Rows to process.
* @return Triple [number of rows actually changed; keys that failed to update (duplicates or concurrently
* updated ones); chain of exceptions for all keys whose processing resulted in error, or null for no errors].
private DmlPageProcessingResult processPage(GridCacheContext cctx, Batch batch) {
Map<Object, EntryProcessorResult<Boolean>> res;
try {
res = cctx.cache().invokeAll(batch.rowProcessors());
catch (IgniteCheckedException e) {
for (Integer rowNum : batch.rowNumbers().values()) {
assert rowNum != null;
cntPerRow[rowNum] = Statement.EXECUTE_FAILED;
if (X.hasCause(e, IgniteClusterReadOnlyException.class)) {
SQLException sqlEx = new SQLException(
return new DmlPageProcessingResult(0, null, sqlEx);
return new DmlPageProcessingResult(0, null,
new SQLException(e.getMessage(), SqlStateCode.INTERNAL_ERROR, IgniteQueryErrorCode.UNKNOWN, e));
if (F.isEmpty(res)) {
return new DmlPageProcessingResult(batch.size(), null, null);
DmlPageProcessingErrorResult splitRes = splitErrors(res, batch);
int keysCnt = splitRes.errorKeys().length;
return new DmlPageProcessingResult(batch.size() - keysCnt - splitRes.errorCount(), splitRes.errorKeys(),
* Process errors of entry processor - split the keys into duplicated/concurrently modified and those whose
* processing yielded an exception.
* @param res Result of {@link GridCacheAdapter#invokeAll)}
* @param batch Batch.
* @return pair [array of duplicated/concurrently modified keys, SQL exception for erroneous keys] (exception is
* null if all keys are duplicates/concurrently modified ones).
private DmlPageProcessingErrorResult splitErrors(Map<Object, EntryProcessorResult<Boolean>> res, Batch batch) {
Set<Object> errKeys = new LinkedHashSet<>(res.keySet());
SQLException currSqlEx = null;
SQLException firstSqlEx = null;
int errors = 0;
// Let's form a chain of SQL exceptions
for (Map.Entry<Object, EntryProcessorResult<Boolean>> e : res.entrySet()) {
try {
catch (EntryProcessorException ex) {
SQLException next = createJdbcSqlException("Failed to process key '" + e.getKey() + '\'',
if (currSqlEx != null)
firstSqlEx = next;
currSqlEx = next;
finally {
Object key = e.getKey();
Integer rowNum = batch.rowNumbers().get(key);
assert rowNum != null;
cntPerRow[rowNum] = Statement.EXECUTE_FAILED;
return new DmlPageProcessingErrorResult(errKeys.toArray(), firstSqlEx, errors);
* Updates counters as if all rowNums were successfully processed.
* @param rowNums Rows.
private void countAllRows(Collection<Integer> rowNums) {
for (Integer rowNum : rowNums) {
assert rowNum != null;
if (cntPerRow[rowNum] > -1)
* Batch for update.
private static class Batch {
/** Map from keys to row numbers. */
private Map<Object, Integer> rowNums = new HashMap<>();
/** Map from keys to entry processors. */
private Map<Object, EntryProcessor<Object, Object, Boolean>> rowProcs = new TreeMap<>(COMP);
* Checks if batch contains key.
* @param key Key.
* @return {@code True} if contains.
public boolean containsKey(Object key) {
boolean res = rowNums.containsKey(key);
assert res == rowProcs.containsKey(key);
return res;
* Returns batch size.
* @return Batch size.
public int size() {
int res = rowNums.size();
assert res == rowProcs.size();
return res;
* Adds row to batch.
* @param key Key.
* @param rowNum Row number.
* @param proc Entry processor.
* @return {@code True} if there was an entry associated with the given key.
public boolean put(Object key, Integer rowNum, EntryProcessor<Object, Object, Boolean> proc) {
Integer prevNum = rowNums.put(key, rowNum);
EntryProcessor prevProc = rowProcs.put(key, proc);
assert (prevNum == null) == (prevProc == null);
return prevNum != null;
* Clears batch.
public void clear() {
assert rowNums.size() == rowProcs.size();
* Checks if batch is empty.
* @return {@code True} if empty.
public boolean isEmpty() {
assert rowNums.size() == rowProcs.size();
return rowNums.isEmpty();
* Row numbers map getter.
* @return Row numbers map.
public Map<Object, Integer> rowNumbers() {
return rowNums;
* Row processors map getter.
* @return Row processors map.
public Map<Object, EntryProcessor<Object, Object, Boolean>> rowProcessors() {
return rowProcs;
* Batch entries comparator.
private static final class BatchEntryComparator implements Comparator<Object> {
/** {@inheritDoc} */
@Override public int compare(Object first, Object second) {
// We assume that only simple types or BinaryObjectImpl are possible. The latter comes from the fact
// that we use BinaryObjectBuilder which produces only on-heap binary objects.
return BinaryObjectImpl.compareForDml(first, second);