blob: c9b779f237bc32ac097b96c71be20c4e7e878447 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.odbc.odbc;
import java.util.ArrayList;
import java.util.Collection;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.binary.BinaryReaderExImpl;
import org.apache.ignite.internal.binary.BinaryThreadLocalContext;
import org.apache.ignite.internal.binary.BinaryWriterExImpl;
import org.apache.ignite.internal.binary.GridBinaryMarshaller;
import org.apache.ignite.internal.binary.streams.BinaryHeapInputStream;
import org.apache.ignite.internal.binary.streams.BinaryHeapOutputStream;
import org.apache.ignite.internal.binary.streams.BinaryInputStream;
import org.apache.ignite.internal.processors.cache.binary.CacheObjectBinaryProcessorImpl;
import org.apache.ignite.internal.processors.odbc.ClientListenerMessageParser;
import org.apache.ignite.internal.processors.odbc.ClientListenerProtocolVersion;
import org.apache.ignite.internal.processors.odbc.ClientListenerRequest;
import org.apache.ignite.internal.processors.odbc.ClientListenerResponse;
import org.apache.ignite.internal.processors.odbc.ClientMessage;
import org.apache.ignite.internal.processors.odbc.SqlListenerUtils;
import org.jetbrains.annotations.NotNull;
/**
* JDBC message parser.
*/
public class OdbcMessageParser implements ClientListenerMessageParser {
/** Marshaller. */
private final GridBinaryMarshaller marsh;
/** Initial output stream capacity. */
protected static final int INIT_CAP = 1024;
/** Kernal context. */
protected final GridKernalContext ctx;
/** Logger. */
private final IgniteLogger log;
/** Protocol version */
private final ClientListenerProtocolVersion ver;
/**
* @param ctx Context.
* @param ver Protocol version.
*/
public OdbcMessageParser(GridKernalContext ctx, ClientListenerProtocolVersion ver) {
this.ctx = ctx;
this.ver = ver;
log = ctx.log(getClass());
if (ctx.cacheObjects() instanceof CacheObjectBinaryProcessorImpl) {
CacheObjectBinaryProcessorImpl cacheObjProc = (CacheObjectBinaryProcessorImpl)ctx.cacheObjects();
marsh = cacheObjProc.marshaller();
}
else {
throw new IgniteException("ODBC can only be used with BinaryMarshaller (please set it " +
"through IgniteConfiguration.setMarshaller())");
}
}
/** {@inheritDoc} */
@Override public ClientListenerRequest decode(ClientMessage msg) {
assert msg != null;
BinaryInputStream stream = new BinaryHeapInputStream(msg.payload());
BinaryReaderExImpl reader = new BinaryReaderExImpl(marsh.context(), stream, ctx.config().getClassLoader(), true);
byte cmd = reader.readByte();
ClientListenerRequest res;
switch (cmd) {
case OdbcRequest.QRY_EXEC: {
String schema = reader.readString();
String sql = reader.readString();
int paramNum = reader.readInt();
Object[] params = readParameterRow(reader, paramNum);
int timeout = 0;
if (ver.compareTo(OdbcConnectionContext.VER_2_3_2) >= 0)
timeout = reader.readInt();
boolean autoCommit = true;
if (ver.compareTo(OdbcConnectionContext.VER_2_7_0) >= 0)
autoCommit = reader.readBoolean();
res = new OdbcQueryExecuteRequest(schema, sql, params, timeout, autoCommit);
break;
}
case OdbcRequest.QRY_EXEC_BATCH: {
String schema = reader.readString();
String sql = reader.readString();
int paramRowLen = reader.readInt();
int rowNum = reader.readInt();
boolean last = reader.readBoolean();
Object[][] params = new Object[rowNum][];
for (int i = 0; i < rowNum; ++i)
params[i] = readParameterRow(reader, paramRowLen);
int timeout = 0;
if (ver.compareTo(OdbcConnectionContext.VER_2_3_2) >= 0)
timeout = reader.readInt();
boolean autoCommit = true;
if (ver.compareTo(OdbcConnectionContext.VER_2_7_0) >= 0)
autoCommit = reader.readBoolean();
res = new OdbcQueryExecuteBatchRequest(schema, sql, last, params, timeout, autoCommit);
break;
}
case OdbcRequest.STREAMING_BATCH:
{
String schema = reader.readString();
int num = reader.readInt();
ArrayList<OdbcQuery> queries = new ArrayList<>(num);
for (int i = 0; i < num; ++i)
{
OdbcQuery qry = new OdbcQuery();
qry.readBinary(reader);
queries.add(qry);
}
boolean last = reader.readBoolean();
long order = reader.readLong();
res = new OdbcStreamingBatchRequest(schema, queries, last, order);
break;
}
case OdbcRequest.QRY_FETCH: {
long queryId = reader.readLong();
int pageSize = reader.readInt();
res = new OdbcQueryFetchRequest(queryId, pageSize);
break;
}
case OdbcRequest.QRY_CLOSE: {
long queryId = reader.readLong();
res = new OdbcQueryCloseRequest(queryId);
break;
}
case OdbcRequest.META_COLS: {
String schema = reader.readString();
String table = reader.readString();
String column = reader.readString();
res = new OdbcQueryGetColumnsMetaRequest(schema, table, column);
break;
}
case OdbcRequest.META_TBLS: {
String catalog = reader.readString();
String schema = reader.readString();
String table = reader.readString();
String tableType = reader.readString();
res = new OdbcQueryGetTablesMetaRequest(catalog, schema, table, tableType);
break;
}
case OdbcRequest.META_PARAMS: {
String schema = reader.readString();
String sqlQuery = reader.readString();
res = new OdbcQueryGetParamsMetaRequest(schema, sqlQuery);
break;
}
case OdbcRequest.MORE_RESULTS: {
long queryId = reader.readLong();
int pageSize = reader.readInt();
res = new OdbcQueryMoreResultsRequest(queryId, pageSize);
break;
}
default:
throw new IgniteException("Unknown ODBC command: [cmd=" + cmd + ']');
}
return res;
}
/**
* Read row of parameters using reader.
* @param reader reader
* @param paramNum Number of parameters in a row
* @return Parameters array.
*/
@NotNull private static Object[] readParameterRow(BinaryReaderExImpl reader, int paramNum) {
Object[] params = new Object[paramNum];
for (int i = 0; i < paramNum; ++i)
params[i] = SqlListenerUtils.readObject(reader, true);
return params;
}
/** {@inheritDoc} */
@Override public ClientMessage encode(ClientListenerResponse msg0) {
assert msg0 != null;
assert msg0 instanceof OdbcResponse;
OdbcResponse msg = (OdbcResponse)msg0;
// Creating new binary writer
BinaryWriterExImpl writer = new BinaryWriterExImpl(marsh.context(), new BinaryHeapOutputStream(INIT_CAP),
BinaryThreadLocalContext.get().schemaHolder(), null);
// Writing status.
if (ver.compareTo(OdbcConnectionContext.VER_2_1_5) < 0) {
writer.writeByte((byte) (msg.status() == ClientListenerResponse.STATUS_SUCCESS ?
ClientListenerResponse.STATUS_SUCCESS : ClientListenerResponse.STATUS_FAILED));
} else
writer.writeInt(msg.status());
if (msg.status() != ClientListenerResponse.STATUS_SUCCESS) {
writer.writeString(msg.error());
return new ClientMessage(writer.array());
}
Object res0 = msg.response();
if (res0 == null)
return new ClientMessage(writer.array());
else if (res0 instanceof OdbcQueryExecuteResult) {
OdbcQueryExecuteResult res = (OdbcQueryExecuteResult) res0;
if (log.isDebugEnabled())
log.debug("Resulting query ID: " + res.queryId());
writer.writeLong(res.queryId());
Collection<OdbcColumnMeta> metas = res.columnsMetadata();
assert metas != null;
writer.writeInt(metas.size());
for (OdbcColumnMeta meta : metas)
meta.write(writer);
writeAffectedRows(writer, res.affectedRows());
}
else if (res0 instanceof OdbcQueryExecuteBatchResult) {
OdbcQueryExecuteBatchResult res = (OdbcQueryExecuteBatchResult) res0;
writer.writeBoolean(res.errorMessage() == null);
writeAffectedRows(writer, res.affectedRows());
if (res.errorMessage() != null) {
writer.writeLong(res.errorSetIdx());
writer.writeString(res.errorMessage());
if (ver.compareTo(OdbcConnectionContext.VER_2_1_5) >= 0)
writer.writeInt(res.errorCode());
}
}
else if (res0 instanceof OdbcStreamingBatchResult) {
OdbcStreamingBatchResult res = (OdbcStreamingBatchResult) res0;
writer.writeString(res.error());
writer.writeInt(res.status());
writer.writeLong(res.order());
}
else if (res0 instanceof OdbcQueryFetchResult) {
OdbcQueryFetchResult res = (OdbcQueryFetchResult) res0;
if (log.isDebugEnabled())
log.debug("Resulting query ID: " + res.queryId());
writer.writeLong(res.queryId());
Collection<?> items0 = res.items();
assert items0 != null;
writer.writeBoolean(res.last());
writer.writeInt(items0.size());
for (Object row0 : items0) {
if (row0 != null) {
Collection<?> row = (Collection<?>)row0;
writer.writeInt(row.size());
for (Object obj : row)
SqlListenerUtils.writeObject(writer, obj, true);
}
}
}
else if (res0 instanceof OdbcQueryMoreResultsResult) {
OdbcQueryMoreResultsResult res = (OdbcQueryMoreResultsResult) res0;
if (log.isDebugEnabled())
log.debug("Resulting query ID: " + res.queryId());
writer.writeLong(res.queryId());
Collection<?> items0 = res.items();
assert items0 != null;
writer.writeBoolean(res.last());
writer.writeInt(items0.size());
for (Object row0 : items0) {
if (row0 != null) {
Collection<?> row = (Collection<?>)row0;
writer.writeInt(row.size());
for (Object obj : row)
SqlListenerUtils.writeObject(writer, obj, true);
}
}
}
else if (res0 instanceof OdbcQueryCloseResult) {
OdbcQueryCloseResult res = (OdbcQueryCloseResult) res0;
if (log.isDebugEnabled())
log.debug("Resulting query ID: " + res.getQueryId());
writer.writeLong(res.getQueryId());
}
else if (res0 instanceof OdbcQueryGetColumnsMetaResult) {
OdbcQueryGetColumnsMetaResult res = (OdbcQueryGetColumnsMetaResult) res0;
Collection<OdbcColumnMeta> columnsMeta = res.meta();
assert columnsMeta != null;
writer.writeInt(columnsMeta.size());
for (OdbcColumnMeta columnMeta : columnsMeta)
columnMeta.write(writer);
}
else if (res0 instanceof OdbcQueryGetTablesMetaResult) {
OdbcQueryGetTablesMetaResult res = (OdbcQueryGetTablesMetaResult) res0;
Collection<OdbcTableMeta> tablesMeta = res.meta();
assert tablesMeta != null;
writer.writeInt(tablesMeta.size());
for (OdbcTableMeta tableMeta : tablesMeta)
tableMeta.writeBinary(writer);
}
else if (res0 instanceof OdbcQueryGetParamsMetaResult) {
OdbcQueryGetParamsMetaResult res = (OdbcQueryGetParamsMetaResult) res0;
byte[] typeIds = res.typeIds();
SqlListenerUtils.writeObject(writer, typeIds, true);
}
else
assert false : "Should not reach here.";
return new ClientMessage(writer.array());
}
/** {@inheritDoc} */
@Override public int decodeCommandType(ClientMessage msg) {
assert msg != null;
return msg.payload()[0];
}
/** {@inheritDoc} */
@Override public long decodeRequestId(ClientMessage msg) {
return 0;
}
/**
* @param writer Writer to use.
* @param affectedRows Affected rows.
*/
private void writeAffectedRows(BinaryWriterExImpl writer, long[] affectedRows) {
if (ver.compareTo(OdbcConnectionContext.VER_2_3_2) < 0) {
long summ = 0;
for (Long value : affectedRows)
summ += value == null ? 0 : value;
writer.writeLong(summ);
}
else {
writer.writeInt(affectedRows.length);
for (long value : affectedRows)
writer.writeLong(value);
}
}
}