blob: 62ef7ef08fdfb520485a557cc0aa5382b04056ec [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.query.h2.dml;
import java.lang.reflect.Array;
import java.sql.BatchUpdateException;
import java.sql.SQLException;
import java.sql.Time;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Date;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import javax.cache.processor.MutableEntry;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.cache.query.SqlFieldsQuery;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.binary.BinaryArray;
import org.apache.ignite.internal.processors.cache.CacheOperationContext;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.QueryCursorImpl;
import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode;
import org.apache.ignite.internal.processors.cache.query.SqlFieldsQueryEx;
import org.apache.ignite.internal.processors.odbc.SqlStateCode;
import org.apache.ignite.internal.processors.query.GridQueryRowDescriptor;
import org.apache.ignite.internal.processors.query.IgniteSQLException;
import org.apache.ignite.internal.processors.query.h2.DmlStatementsProcessor;
import org.apache.ignite.internal.processors.query.h2.H2Utils;
import org.apache.ignite.internal.processors.query.h2.UpdateResult;
import org.apache.ignite.internal.processors.tracing.MTC;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.T3;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.transactions.TransactionDuplicateKeyException;
import org.h2.util.DateTimeUtils;
import org.h2.util.LocalDateTimeUtils;
import org.h2.value.Value;
import org.h2.value.ValueDate;
import org.h2.value.ValueTime;
import org.h2.value.ValueTimestamp;
import static org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode.DUPLICATE_KEY;
import static org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode.createJdbcSqlException;
import static org.apache.ignite.internal.processors.tracing.SpanTags.SQL_CACHE_UPDATES;
import static org.apache.ignite.internal.processors.tracing.SpanType.SQL_CACHE_UPDATE;
/**
* DML utility methods.
*/
public class DmlUtils {
/**
* Convert value to column's expected type by means of H2.
*
* @param val Source value.
* @param desc Row descriptor.
* @param expCls Expected value class.
* @param type Expected column type to convert to.
* @return Converted object.
*/
@SuppressWarnings({"ConstantConditions", "SuspiciousSystemArraycopy"})
public static Object convert(Object val, GridQueryRowDescriptor desc, Class<?> expCls,
int type, String columnName) {
if (val == null)
return null;
Class<?> currCls = val.getClass();
try {
// H2 thinks that java.util.Date is always a Timestamp, while binary marshaller expects
// precise Date instance. Let's satisfy it.
if (val instanceof Date && currCls != Date.class && expCls == Date.class)
return new Date(((Date)val).getTime());
// User-given UUID is always serialized by H2 to byte array, so we have to deserialize manually
if (type == Value.UUID && currCls == byte[].class) {
return U.unmarshal(desc.context().marshaller(), (byte[])val,
U.resolveClassLoader(desc.context().gridConfig()));
}
if (val instanceof Timestamp && LocalDateTimeUtils.LOCAL_DATE_TIME == expCls)
return LocalDateTimeUtils.valueToLocalDateTime(ValueTimestamp.get((Timestamp)val));
if (val instanceof Date && LocalDateTimeUtils.LOCAL_DATE == expCls) {
return LocalDateTimeUtils.valueToLocalDate(ValueDate.fromDateValue(
DateTimeUtils.dateValueFromDate(((Date)val).getTime())));
}
if (val instanceof Time && LocalDateTimeUtils.LOCAL_TIME == expCls)
return LocalDateTimeUtils.valueToLocalTime(ValueTime.get((Time)val));
// We have to convert arrays of reference types manually -
// see https://issues.apache.org/jira/browse/IGNITE-4327
// Still, we only can convert from Object[] to something more precise.
if (type == Value.ARRAY && val instanceof BinaryArray)
return val;
if (type == Value.ARRAY && currCls != expCls) {
if (currCls != Object[].class) {
throw new IgniteCheckedException("Unexpected array type - only conversion from Object[] " +
"is assumed");
}
// Why would otherwise type be Value.ARRAY?
assert expCls.isArray();
Object[] curr = (Object[])val;
Object newArr = Array.newInstance(expCls.getComponentType(), curr.length);
System.arraycopy(curr, 0, newArr, 0, curr.length);
return newArr;
}
Object res = H2Utils.convert(val, desc.context().kernalContext().query().objectContext(), type);
// We can get a Timestamp instead of Date when converting a String to Date
// without query - let's handle this
if (res instanceof Date && res.getClass() != Date.class && expCls == Date.class)
return new Date(((Date)res).getTime());
return res;
}
catch (Exception e) {
throw new IgniteSQLException("Value conversion failed [column=" + columnName + ", from=" + currCls.getName() + ", to=" +
expCls.getName() + ']', IgniteQueryErrorCode.CONVERSION_FAILED, e);
}
}
/**
* Check whether query is batched.
*
* @param qry Query.
* @return {@code True} if batched.
*/
public static boolean isBatched(SqlFieldsQuery qry) {
return (qry instanceof SqlFieldsQueryEx) && ((SqlFieldsQueryEx)qry).isBatched();
}
/**
* @param plan Update plan.
* @param cursor Cursor over select results.
* @param pageSize Page size.
* @return Pair [number of successfully processed items; keys that have failed to be processed]
* @throws IgniteCheckedException if failed.
*/
public static UpdateResult processSelectResult(UpdatePlan plan, Iterable<List<?>> cursor,
int pageSize) throws IgniteCheckedException {
switch (plan.mode()) {
case MERGE:
return new UpdateResult(doMerge(plan, cursor, pageSize), X.EMPTY_OBJECT_ARRAY);
case INSERT:
return new UpdateResult(dmlDoInsert(plan, cursor, pageSize), X.EMPTY_OBJECT_ARRAY);
case UPDATE:
return doUpdate(plan, cursor, pageSize);
case DELETE:
return doDelete(plan.cacheContext(), cursor, pageSize);
default:
throw new IgniteSQLException("Unexpected DML operation [mode=" + plan.mode() + ']',
IgniteQueryErrorCode.UNEXPECTED_OPERATION);
}
}
/**
* Execute INSERT statement plan.
* @param cursor Cursor to take inserted data from.
* @param pageSize Batch size for streaming, anything <= 0 for single page operations.
* @return Number of items affected.
* @throws IgniteCheckedException if failed, particularly in case of duplicate keys.
*/
@SuppressWarnings({"unchecked"})
private static long dmlDoInsert(UpdatePlan plan, Iterable<List<?>> cursor, int pageSize) throws IgniteCheckedException {
GridCacheContext cctx = plan.cacheContext();
// If we have just one item to put, just do so
if (plan.rowCount() == 1) {
IgniteBiTuple t = plan.processRow(cursor.iterator().next());
try (
MTC.TraceSurroundings ignored = MTC.support(cctx.kernalContext().tracing()
.create(SQL_CACHE_UPDATE, MTC.span())
.addTag(SQL_CACHE_UPDATES, () -> "1"))
) {
if (cctx.cache().putIfAbsent(t.getKey(), t.getValue()))
return 1;
else
throw new TransactionDuplicateKeyException("Duplicate key during INSERT [key=" + t.getKey() + ']');
}
}
else {
// Keys that failed to INSERT due to duplication.
DmlBatchSender sender = new DmlBatchSender(cctx, pageSize, 1);
for (List<?> row : cursor) {
final IgniteBiTuple keyValPair = plan.processRow(row);
sender.add(keyValPair.getKey(), new DmlStatementsProcessor.InsertEntryProcessor(keyValPair.getValue()), 0);
}
sender.flush();
SQLException resEx = sender.error();
if (!F.isEmpty(sender.failedKeys())) {
String msg = "Failed to INSERT some keys because they are already in cache " +
"[keys=" + sender.failedKeys() + ']';
SQLException dupEx = new SQLException(msg, SqlStateCode.CONSTRAINT_VIOLATION);
if (resEx == null)
resEx = dupEx;
else
resEx.setNextException(dupEx);
}
if (resEx != null)
throw new IgniteSQLException(resEx);
return sender.updateCount();
}
}
/**
* Perform UPDATE operation on top of results of SELECT.
* @param cursor SELECT results.
* @param pageSize Batch size for streaming, anything <= 0 for single page operations.
* @return Pair [cursor corresponding to results of UPDATE (contains number of items affected); keys whose values
* had been modified concurrently (arguments for a re-run)].
*/
private static UpdateResult doUpdate(UpdatePlan plan, Iterable<List<?>> cursor, int pageSize)
throws IgniteCheckedException {
GridCacheContext cctx = plan.cacheContext();
DmlBatchSender sender = new DmlBatchSender(cctx, pageSize, 1);
for (List<?> row : cursor) {
T3<Object, Object, Object> row0 = plan.processRowForUpdate(row);
Object key = row0.get1();
Object oldVal = row0.get2();
Object newVal = row0.get3();
sender.add(key, new DmlStatementsProcessor.ModifyingEntryProcessor(
oldVal,
new DmlStatementsProcessor.EntryValueUpdater(newVal)),
0
);
}
sender.flush();
SQLException resEx = sender.error();
if (resEx != null) {
if (!F.isEmpty(sender.failedKeys())) {
// Don't go for a re-run if processing of some keys yielded exceptions and report keys that
// had been modified concurrently right away.
String msg = "Failed to UPDATE some keys because they had been modified concurrently " +
"[keys=" + sender.failedKeys() + ']';
SQLException dupEx = createJdbcSqlException(msg, IgniteQueryErrorCode.CONCURRENT_UPDATE);
dupEx.setNextException(resEx);
resEx = dupEx;
}
throw new IgniteSQLException(resEx);
}
return new UpdateResult(sender.updateCount(), sender.failedKeys().toArray(),
cursor instanceof QueryCursorImpl ? ((QueryCursorImpl)cursor).partitionResult() : null);
}
/**
* Execute MERGE statement plan.
* @param cursor Cursor to take inserted data from.
* @param pageSize Batch size to stream data from {@code cursor}, anything <= 0 for single page operations.
* @return Number of items affected.
* @throws IgniteCheckedException if failed.
*/
@SuppressWarnings("unchecked")
private static long doMerge(UpdatePlan plan, Iterable<List<?>> cursor, int pageSize) throws IgniteCheckedException {
GridCacheContext cctx = plan.cacheContext();
// If we have just one item to put, just do so
if (plan.rowCount() == 1) {
IgniteBiTuple t = plan.processRow(cursor.iterator().next());
try (
MTC.TraceSurroundings ignored = MTC.support(cctx.kernalContext().tracing()
.create(SQL_CACHE_UPDATE, MTC.span())
.addTag(SQL_CACHE_UPDATES, () -> "1"))
) {
cctx.cache().put(t.getKey(), t.getValue());
}
return 1;
}
else {
int resCnt = 0;
Map<Object, Object> rows = new LinkedHashMap<>();
for (Iterator<List<?>> it = cursor.iterator(); it.hasNext();) {
List<?> row = it.next();
IgniteBiTuple t = plan.processRow(row);
rows.put(t.getKey(), t.getValue());
if ((pageSize > 0 && rows.size() == pageSize) || !it.hasNext()) {
try (
MTC.TraceSurroundings ignored = MTC.support(cctx.kernalContext().tracing()
.create(SQL_CACHE_UPDATE, MTC.span())
.addTag(SQL_CACHE_UPDATES, () -> Integer.toString(rows.size())))
) {
cctx.cache().putAll(rows);
resCnt += rows.size();
if (it.hasNext())
rows.clear();
}
}
}
return resCnt;
}
}
/**
* Perform DELETE operation on top of results of SELECT.
*
* @param cctx Cache context.
* @param cursor SELECT results.
* @param pageSize Batch size for streaming, anything <= 0 for single page operations.
* @return Results of DELETE (number of items affected AND keys that failed to be updated).
*/
private static UpdateResult doDelete(GridCacheContext cctx, Iterable<List<?>> cursor, int pageSize)
throws IgniteCheckedException {
DmlBatchSender sender = new DmlBatchSender(cctx, pageSize, 1);
for (List<?> row : cursor) {
if (row.size() != 2)
continue;
Object key = row.get(0);
ClusterNode node = sender.primaryNodeByKey(key);
IgniteInClosure<MutableEntry<Object, Object>> rmvC =
DmlStatementsProcessor.getRemoveClosure(node, key);
sender.add(
key,
new DmlStatementsProcessor.ModifyingEntryProcessor(row.get(1), rmvC),
0
);
}
sender.flush();
SQLException resEx = sender.error();
if (resEx != null) {
if (!F.isEmpty(sender.failedKeys())) {
// Don't go for a re-run if processing of some keys yielded exceptions and report keys that
// had been modified concurrently right away.
String msg = "Failed to DELETE some keys because they had been modified concurrently " +
"[keys=" + sender.failedKeys() + ']';
SQLException conEx = createJdbcSqlException(msg, IgniteQueryErrorCode.CONCURRENT_UPDATE);
conEx.setNextException(resEx);
resEx = conEx;
}
throw new IgniteSQLException(resEx);
}
return new UpdateResult(sender.updateCount(), sender.failedKeys().toArray(),
cursor instanceof QueryCursorImpl ? ((QueryCursorImpl)cursor).partitionResult() : null);
}
/**
* Performs the planned update.
* @param plan Update plan.
* @param rows Rows to update.
* @param pageSize Page size.
* @return {@link List} of update results.
* @throws IgniteCheckedException If failed.
*/
public static List<UpdateResult> processSelectResultBatched(UpdatePlan plan, List<List<List<?>>> rows, int pageSize)
throws IgniteCheckedException {
switch (plan.mode()) {
case MERGE:
// TODO
throw new IgniteCheckedException("Unsupported, fix");
case INSERT:
return doInsertBatched(plan, rows, pageSize);
default:
throw new IgniteSQLException("Unexpected batched DML operation [mode=" + plan.mode() + ']',
IgniteQueryErrorCode.UNEXPECTED_OPERATION);
}
}
/**
* Execute INSERT statement plan.
*
* @param plan Plan to execute.
* @param cursor Cursor to take inserted data from. I.e. list of batch arguments for each query.
* @param pageSize Batch size for streaming, anything <= 0 for single page operations.
* @return Number of items affected.
* @throws IgniteCheckedException if failed, particularly in case of duplicate keys.
*/
private static List<UpdateResult> doInsertBatched(UpdatePlan plan, List<List<List<?>>> cursor, int pageSize)
throws IgniteCheckedException {
GridCacheContext cctx = plan.cacheContext();
DmlBatchSender snd = new DmlBatchSender(cctx, pageSize, cursor.size());
int rowNum = 0;
SQLException resEx = null;
for (List<List<?>> qryRow : cursor) {
for (List<?> row : qryRow) {
try {
final IgniteBiTuple keyValPair = plan.processRow(row);
snd.add(keyValPair.getKey(), new DmlStatementsProcessor.InsertEntryProcessor(keyValPair.getValue()), rowNum);
}
catch (Exception e) {
String sqlState;
int code;
if (e instanceof IgniteSQLException) {
sqlState = ((IgniteSQLException)e).sqlState();
code = ((IgniteSQLException)e).statusCode();
}
else {
sqlState = SqlStateCode.INTERNAL_ERROR;
code = IgniteQueryErrorCode.UNKNOWN;
}
resEx = chainException(resEx, new SQLException(e.getMessage(), sqlState, code, e));
snd.setFailed(rowNum);
}
}
rowNum++;
}
try {
snd.flush();
}
catch (Exception e) {
resEx = chainException(resEx, new SQLException(e.getMessage(), SqlStateCode.INTERNAL_ERROR,
IgniteQueryErrorCode.UNKNOWN, e));
}
resEx = chainException(resEx, snd.error());
if (!F.isEmpty(snd.failedKeys())) {
SQLException e = new SQLException("Failed to INSERT some keys because they are already in cache [keys=" +
snd.failedKeys() + ']', SqlStateCode.CONSTRAINT_VIOLATION, DUPLICATE_KEY);
resEx = chainException(resEx, e);
}
if (resEx != null) {
BatchUpdateException e = new BatchUpdateException(resEx.getMessage(), resEx.getSQLState(),
resEx.getErrorCode(), snd.perRowCounterAsArray(), resEx);
throw new IgniteCheckedException(e);
}
int[] cntPerRow = snd.perRowCounterAsArray();
List<UpdateResult> res = new ArrayList<>(cntPerRow.length);
for (int i = 0; i < cntPerRow.length; i++ ) {
int cnt = cntPerRow[i];
res.add(new UpdateResult(cnt, X.EMPTY_OBJECT_ARRAY));
}
return res;
}
/**
* Adds exception to the chain.
*
* @param main Exception to add another exception to.
* @param add Exception which should be added to chain.
* @return Chained exception.
*/
public static SQLException chainException(SQLException main, SQLException add) {
if (main == null) {
if (add != null) {
main = add;
return main;
}
else
return null;
}
else {
main.setNextException(add);
return main;
}
}
/**
* Makes current operation context as keepBinary.
*
* @param cctx Cache context.
* @return Old operation context.
*/
public static CacheOperationContext setKeepBinaryContext(GridCacheContext<?, ?> cctx) {
CacheOperationContext opCtx = cctx.operationContextPerCall();
// Force keepBinary for operation context to avoid binary deserialization inside entry processor
if (cctx.binaryMarshaller()) {
CacheOperationContext newOpCtx = null;
if (opCtx == null)
// Mimics behavior of GridCacheAdapter#keepBinary and GridCacheProxyImpl#keepBinary
newOpCtx = new CacheOperationContext(false, true, null, false, null, false, null, true);
else if (!opCtx.isKeepBinary())
newOpCtx = opCtx.keepBinary();
if (newOpCtx != null)
cctx.operationContextPerCall(newOpCtx);
}
return opCtx;
}
/**
* Restore previous binary context.
*
* @param cctx Cache context.
* @param oldOpCtx Old operation context.
*/
public static void restoreKeepBinaryContext(GridCacheContext<?, ?> cctx, CacheOperationContext oldOpCtx) {
cctx.operationContextPerCall(oldOpCtx);
}
/**
* Private constructor.
*/
private DmlUtils() {
// No-op.
}
}