blob: 30bdc0a5412fa8f3e7acb9a5b50cd733be7c1f42 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
'use strict';
const Long = require('long');
import BinaryUtils, { OPERATION } from './internal/BinaryUtils';
import BinaryCommunicator from "./internal/BinaryCommunicator";
import {PRIMITIVE_TYPE} from "./internal/Constants";
import {CompositeType} from "./ObjectType";
import MessageBuffer from "./internal/MessageBuffer";
import {CacheEntry} from "./CacheClient";
export abstract class BaseCursor<T> {
protected _id: Long;
protected _hasNext: boolean;
protected _communicator: BinaryCommunicator;
protected _operation: OPERATION;
protected _buffer: MessageBuffer;
protected _keyType: object;
protected _valueType: object;
protected _values: T[];
protected _valueIndex: number;
/**
* Returns one element (cache entry) from the query results.
*
* Every new call returns the next cache entry from the query results.
* If the method returns null, no more entries are available.
*
* @async
*
* @return {Promise<T>} - a cache entry.
*/
async getValue(): Promise<T> {
if (!this._values || this._valueIndex >= this._values.length) {
await this._getValues();
this._valueIndex = 0;
}
if (this._values && this._values.length > 0) {
const value = this._values[this._valueIndex];
this._valueIndex++;
return value;
}
return null;
}
/**
* Checks if more elements are available in the query results.
*
* @return {boolean} - true if more cache entries are available, false otherwise.
*/
hasMore(): boolean {
return this._hasNext ||
this._values && this._valueIndex < this._values.length;
}
/**
* Returns all elements (cache entries) from the query results.
*
* May be used instead of getValue() method if the number of returned entries
* is relatively small and will not cause memory utilization issues.
*
* @async
*
* @return {Promise<Array<T>>} - all cache entries returned by SQL or Scan query.
*/
async getAll(): Promise<T[]> {
let result: T[] = [];
let values: T[];
do {
values = await this._getValues();
if (values) {
result = result.concat(values);
}
} while (this._hasNext);
return result;
}
/**
* Closes the cursor. Obtaining elements from the results is not possible after this.
*
* This method should be called if no more elements are needed.
* It is not neccessary to call it if all elements have been already obtained.
*
* @async
*/
async close() {
// Close cursor only if the server has more pages: the server closes cursor automatically on last page
if (this._id && this._hasNext) {
await this._communicator.send(
BinaryUtils.OPERATION.RESOURCE_CLOSE,
async (payload) => {
await this._write(payload);
});
}
}
/** Private methods */
/**
* @ignore
*/
constructor(communicator: BinaryCommunicator, operation: OPERATION, buffer: MessageBuffer, keyType = null, valueType = null) {
this._communicator = communicator;
this._operation = operation;
this._buffer = buffer;
this._keyType = keyType;
this._valueType = valueType;
this._id = null;
this._hasNext = false;
this._values = null;
this._valueIndex = 0;
}
/**
* @ignore
*/
async _getNext() {
this._hasNext = false;
this._values = null;
this._buffer = null;
await this._communicator.send(
this._operation,
async (payload) => {
await this._write(payload);
},
async (payload) => {
this._buffer = payload;
});
}
/**
* @ignore
*/
async _getValues(): Promise<T[]> {
if (!this._buffer && this._hasNext) {
await this._getNext();
}
await this._read(this._buffer)
this._buffer = null;
return this._values;
}
/**
* @ignore
*/
async _write(buffer) {
buffer.writeLong(this._id);
}
/**
* @ignore
*/
_readId(buffer) {
this._id = buffer.readLong();
}
/**
* @ignore
*/
abstract _readRow(buffer: MessageBuffer): Promise<T>;
/**
* @ignore
*/
async _read(buffer) {
const rowCount = buffer.readInteger();
this._values = new Array(rowCount);
for (let i = 0; i < rowCount; i++) {
this._values[i] = await this._readRow(buffer);
}
this._hasNext = buffer.readBoolean();
}
}
/**
* Class representing a cursor to obtain results of SQL and Scan query operations.
*
* The class has no public constructor. An instance of this class is obtained
* via query() method of {@link CacheClient} objects.
* One instance of this class returns results of one SQL or Scan query operation.
*
* @hideconstructor
*/
export class Cursor extends BaseCursor<CacheEntry> {
/** Private methods */
/**
* @ignore
*/
constructor(communicator: BinaryCommunicator, operation: OPERATION, buffer: MessageBuffer, keyType = null, valueType = null) {
super(communicator, operation, buffer, keyType, valueType);
}
/**
* @ignore
*/
async _readRow(buffer: MessageBuffer): Promise<CacheEntry> {
return new CacheEntry(
await this._communicator.readObject(buffer, this._keyType),
await this._communicator.readObject(buffer, this._valueType));
}
}
/**
* Class representing a cursor to obtain results of SQL Fields query operation.
*
* The class has no public constructor. An instance of this class is obtained
* via query() method of {@link CacheClient} objects.
* One instance of this class returns results of one SQL Fields query operation.
*
* @hideconstructor
* @extends Cursor
*/
export class SqlFieldsCursor extends BaseCursor<Array<object>> {
private _fieldCount: number;
private _fieldTypes: (PRIMITIVE_TYPE | CompositeType)[];
private _fieldNames: string[];
/**
* Returns one element (array with values of the fields) from the query results.
*
* Every new call returns the next element from the query results.
* If the method returns null, no more elements are available.
*
* @async
*
* @return {Promise<Array<*>>} - array with values of the fields requested by the query.
*
*/
async getValue(): Promise<Array<object>> {
return await super.getValue();
}
/**
* Returns all elements (arrays with values of the fields) from the query results.
*
* May be used instead of getValue() method if the number of returned elements
* is relatively small and will not cause memory utilization issues.
*
* @async
*
* @return {Promise<Array<Array<*>>>} - all results returned by SQL Fields query.
* Every element of the array is an array with values of the fields requested by the query.
*
*/
async getAll(): Promise<Array<object>[]> {
return await super.getAll();
}
/**
* Returns names of the fields which were requested in the SQL Fields query.
*
* Empty array is returned if "include field names" flag was false in the query.
*
* @return {Array<string>} - field names.
* The order of names corresponds to the order of field values returned in the results of the query.
*/
getFieldNames(): string[] {
return this._fieldNames;
}
/**
* Specifies types of the fields returned by the SQL Fields query.
*
* By default, a type of every field is not specified that means during operations the Ignite client
* will try to make automatic mapping between JavaScript types and Ignite object types -
* according to the mapping table defined in the description of the {@link ObjectType} class.
*
* @param {...PRIMITIVE_TYPE | CompositeType} fieldTypes - types of the returned fields.
* The order of types must correspond the order of field values returned in the results of the query.
* A type of every field can be:
* - either a type code of primitive (simple) type
* - or an instance of class representing non-primitive (composite) type
* - or null (means the type is not specified)
*
* @return {SqlFieldsCursor} - the same instance of the SqlFieldsCursor.
*/
setFieldTypes(...fieldTypes: Array<PRIMITIVE_TYPE | CompositeType>) {
this._fieldTypes = fieldTypes;
return this;
}
/** Private methods */
/**
* @ignore
*/
constructor(communicator: BinaryCommunicator, buffer: MessageBuffer) {
super(communicator, OPERATION.QUERY_SQL_FIELDS_CURSOR_GET_PAGE, buffer);
this._fieldNames = [];
}
/**
* @ignore
*/
async _readFieldNames(buffer: MessageBuffer, includeFieldNames: boolean) {
this._id = buffer.readLong();
this._fieldCount = buffer.readInteger();
if (includeFieldNames) {
for (let i = 0; i < this._fieldCount; i++) {
this._fieldNames[i] = await this._communicator.readObject(buffer);
}
}
}
/**
* @ignore
*/
async _readRow(buffer: MessageBuffer): Promise<Array<object>> {
let values: object[] = new Array(this._fieldCount);
let fieldType;
for (let i = 0; i < this._fieldCount; i++) {
fieldType = this._fieldTypes && i < this._fieldTypes.length ? this._fieldTypes[i] : null;
values[i] = await this._communicator.readObject(buffer, fieldType);
}
return values;
}
}