blob: c551d42463ceb6e094c20e732575faeee4ff9421 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.cassandra.cql3;
import java.net.InetAddress;
import java.nio.ByteBuffer;
import java.util.*;
import com.google.common.annotations.VisibleForTesting;
import org.apache.cassandra.service.ClientState;
import org.apache.cassandra.transport.ProtocolVersion;
import org.apache.cassandra.utils.AbstractIterator;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.ColumnDefinition;
import org.apache.cassandra.cql3.statements.SelectStatement;
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.rows.*;
import org.apache.cassandra.db.partitions.PartitionIterator;
import org.apache.cassandra.db.marshal.*;
import org.apache.cassandra.service.pager.QueryPager;
import org.apache.cassandra.utils.FBUtilities;
/** a utility for doing internal cql-based queries */
public abstract class UntypedResultSet implements Iterable<UntypedResultSet.Row>
{
public static UntypedResultSet create(ResultSet rs)
{
return new FromResultSet(rs);
}
public static UntypedResultSet create(List<Map<String, ByteBuffer>> results)
{
return new FromResultList(results);
}
public static UntypedResultSet create(SelectStatement select, QueryPager pager, int pageSize)
{
return new FromPager(select, pager, pageSize);
}
/**
* This method is intended for testing purposes, since it executes query on cluster
* and not on the local node only.
*/
@VisibleForTesting
public static UntypedResultSet create(SelectStatement select,
ConsistencyLevel cl,
ClientState clientState,
QueryPager pager,
int pageSize)
{
return new FromDistributedPager(select, cl, clientState, pager, pageSize);
}
public boolean isEmpty()
{
return size() == 0;
}
public abstract int size();
public abstract Row one();
// No implemented by all subclasses, but we use it when we know it's there (for tests)
public abstract List<ColumnSpecification> metadata();
private static class FromResultSet extends UntypedResultSet
{
private final ResultSet cqlRows;
private FromResultSet(ResultSet cqlRows)
{
this.cqlRows = cqlRows;
}
public int size()
{
return cqlRows.size();
}
public Row one()
{
if (cqlRows.size() != 1)
throw new IllegalStateException("One row required, " + cqlRows.size() + " found");
return new Row(cqlRows.metadata.requestNames(), cqlRows.rows.get(0));
}
public Iterator<Row> iterator()
{
return new AbstractIterator<Row>()
{
Iterator<List<ByteBuffer>> iter = cqlRows.rows.iterator();
protected Row computeNext()
{
if (!iter.hasNext())
return endOfData();
return new Row(cqlRows.metadata.requestNames(), iter.next());
}
};
}
public List<ColumnSpecification> metadata()
{
return cqlRows.metadata.requestNames();
}
}
/**
* Pager that calls `execute` rather than `executeInternal`
*/
private static class FromDistributedPager extends UntypedResultSet
{
private final SelectStatement select;
private final ConsistencyLevel cl;
private final ClientState clientState;
private final QueryPager pager;
private final int pageSize;
private final List<ColumnSpecification> metadata;
private FromDistributedPager(SelectStatement select,
ConsistencyLevel cl,
ClientState clientState,
QueryPager pager, int pageSize)
{
this.select = select;
this.cl = cl;
this.clientState = clientState;
this.pager = pager;
this.pageSize = pageSize;
this.metadata = select.getResultMetadata().requestNames();
}
public int size()
{
throw new UnsupportedOperationException();
}
public Row one()
{
throw new UnsupportedOperationException();
}
public Iterator<Row> iterator()
{
return new AbstractIterator<Row>()
{
private Iterator<List<ByteBuffer>> currentPage;
protected Row computeNext()
{
int nowInSec = FBUtilities.nowInSeconds();
while (currentPage == null || !currentPage.hasNext())
{
if (pager.isExhausted())
return endOfData();
try (PartitionIterator iter = pager.fetchPage(pageSize, cl, clientState, System.nanoTime()))
{
currentPage = select.process(iter, nowInSec).rows.iterator();
}
}
return new Row(metadata, currentPage.next());
}
};
}
public List<ColumnSpecification> metadata()
{
return metadata;
}
}
private static class FromResultList extends UntypedResultSet
{
private final List<Map<String, ByteBuffer>> cqlRows;
private FromResultList(List<Map<String, ByteBuffer>> cqlRows)
{
this.cqlRows = cqlRows;
}
public int size()
{
return cqlRows.size();
}
public Row one()
{
if (cqlRows.size() != 1)
throw new IllegalStateException("One row required, " + cqlRows.size() + " found");
return new Row(cqlRows.get(0));
}
public Iterator<Row> iterator()
{
return new AbstractIterator<Row>()
{
Iterator<Map<String, ByteBuffer>> iter = cqlRows.iterator();
protected Row computeNext()
{
if (!iter.hasNext())
return endOfData();
return new Row(iter.next());
}
};
}
public List<ColumnSpecification> metadata()
{
throw new UnsupportedOperationException();
}
}
private static class FromPager extends UntypedResultSet
{
private final SelectStatement select;
private final QueryPager pager;
private final int pageSize;
private final List<ColumnSpecification> metadata;
private FromPager(SelectStatement select, QueryPager pager, int pageSize)
{
this.select = select;
this.pager = pager;
this.pageSize = pageSize;
this.metadata = select.getResultMetadata().requestNames();
}
public int size()
{
throw new UnsupportedOperationException();
}
public Row one()
{
throw new UnsupportedOperationException();
}
public Iterator<Row> iterator()
{
return new AbstractIterator<Row>()
{
private Iterator<List<ByteBuffer>> currentPage;
protected Row computeNext()
{
int nowInSec = FBUtilities.nowInSeconds();
while (currentPage == null || !currentPage.hasNext())
{
if (pager.isExhausted())
return endOfData();
try (ReadExecutionController executionController = pager.executionController();
PartitionIterator iter = pager.fetchPageInternal(pageSize, executionController))
{
currentPage = select.process(iter, nowInSec).rows.iterator();
}
}
return new Row(metadata, currentPage.next());
}
};
}
public List<ColumnSpecification> metadata()
{
return metadata;
}
}
public static class Row
{
private final Map<String, ByteBuffer> data = new HashMap<>();
private final List<ColumnSpecification> columns = new ArrayList<>();
public Row(Map<String, ByteBuffer> data)
{
this.data.putAll(data);
}
public Row(List<ColumnSpecification> names, List<ByteBuffer> columns)
{
this.columns.addAll(names);
for (int i = 0; i < names.size(); i++)
data.put(names.get(i).name.toString(), columns.get(i));
}
public static Row fromInternalRow(CFMetaData metadata, DecoratedKey key, org.apache.cassandra.db.rows.Row row)
{
Map<String, ByteBuffer> data = new HashMap<>();
ByteBuffer[] keyComponents = SelectStatement.getComponents(metadata, key);
for (ColumnDefinition def : metadata.partitionKeyColumns())
data.put(def.name.toString(), keyComponents[def.position()]);
Clustering clustering = row.clustering();
for (ColumnDefinition def : metadata.clusteringColumns())
data.put(def.name.toString(), clustering.get(def.position()));
for (ColumnDefinition def : metadata.partitionColumns())
{
if (def.isSimple())
{
Cell cell = row.getCell(def);
if (cell != null)
data.put(def.name.toString(), cell.value());
}
else
{
ComplexColumnData complexData = row.getComplexColumnData(def);
if (complexData != null)
data.put(def.name.toString(), ((CollectionType)def.type).serializeForNativeProtocol(complexData.iterator(), ProtocolVersion.V3));
}
}
return new Row(data);
}
public boolean has(String column)
{
// Note that containsKey won't work because we may have null values
return data.get(column) != null;
}
public ByteBuffer getBlob(String column)
{
return data.get(column);
}
public String getString(String column)
{
return UTF8Type.instance.compose(data.get(column));
}
public boolean getBoolean(String column)
{
return BooleanType.instance.compose(data.get(column));
}
public byte getByte(String column)
{
return ByteType.instance.compose(data.get(column));
}
public short getShort(String column)
{
return ShortType.instance.compose(data.get(column));
}
public int getInt(String column)
{
return Int32Type.instance.compose(data.get(column));
}
public double getDouble(String column)
{
return DoubleType.instance.compose(data.get(column));
}
public ByteBuffer getBytes(String column)
{
return data.get(column);
}
public InetAddress getInetAddress(String column)
{
return InetAddressType.instance.compose(data.get(column));
}
public UUID getUUID(String column)
{
return UUIDType.instance.compose(data.get(column));
}
public Date getTimestamp(String column)
{
return TimestampType.instance.compose(data.get(column));
}
public long getLong(String column)
{
return LongType.instance.compose(data.get(column));
}
public <T> Set<T> getSet(String column, AbstractType<T> type)
{
ByteBuffer raw = data.get(column);
return raw == null ? null : SetType.getInstance(type, true).compose(raw);
}
public <T> List<T> getList(String column, AbstractType<T> type)
{
ByteBuffer raw = data.get(column);
return raw == null ? null : ListType.getInstance(type, true).compose(raw);
}
public <K, V> Map<K, V> getMap(String column, AbstractType<K> keyType, AbstractType<V> valueType)
{
ByteBuffer raw = data.get(column);
return raw == null ? null : MapType.getInstance(keyType, valueType, true).compose(raw);
}
public Map<String, String> getTextMap(String column)
{
return getMap(column, UTF8Type.instance, UTF8Type.instance);
}
public <T> Set<T> getFrozenSet(String column, AbstractType<T> type)
{
ByteBuffer raw = data.get(column);
return raw == null ? null : SetType.getInstance(type, false).compose(raw);
}
public <T> List<T> getFrozenList(String column, AbstractType<T> type)
{
ByteBuffer raw = data.get(column);
return raw == null ? null : ListType.getInstance(type, false).compose(raw);
}
public <K, V> Map<K, V> getFrozenMap(String column, AbstractType<K> keyType, AbstractType<V> valueType)
{
ByteBuffer raw = data.get(column);
return raw == null ? null : MapType.getInstance(keyType, valueType, false).compose(raw);
}
public Map<String, String> getFrozenTextMap(String column)
{
return getFrozenMap(column, UTF8Type.instance, UTF8Type.instance);
}
public List<ColumnSpecification> getColumns()
{
return columns;
}
@Override
public String toString()
{
return data.toString();
}
}
}