blob: ddb3587958de8deaa8df06371184c1e77e466e7a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to you under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.calcite.adapter.redis;
import org.apache.calcite.DataContext;
import org.apache.calcite.linq4j.AbstractEnumerable;
import org.apache.calcite.linq4j.Enumerable;
import org.apache.calcite.linq4j.Enumerator;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.calcite.rel.type.RelProtoDataType;
import org.apache.calcite.schema.ScannableTable;
import org.apache.calcite.schema.Table;
import org.apache.calcite.schema.impl.AbstractTable;
import org.apache.calcite.util.Pair;
import com.google.common.collect.ImmutableMap;
import org.checkerframework.checker.nullness.qual.Nullable;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
/**
* Table mapped onto a redis table.
*/
public class RedisTable extends AbstractTable
implements ScannableTable {
final RedisSchema schema;
final String tableName;
final RelProtoDataType protoRowType;
final ImmutableMap<String, Object> allFields;
final String dataFormat;
final RedisConfig redisConfig;
RedisEnumerator redisEnumerator;
public RedisTable(
RedisSchema schema,
String tableName,
RelProtoDataType protoRowType,
Map<String, Object> allFields,
String dataFormat,
RedisConfig redisConfig) {
this.schema = schema;
this.tableName = tableName;
this.protoRowType = protoRowType;
this.allFields = allFields == null ? ImmutableMap.of()
: ImmutableMap.copyOf(allFields);
this.dataFormat = dataFormat;
this.redisConfig = redisConfig;
}
@Override public RelDataType getRowType(RelDataTypeFactory typeFactory) {
if (protoRowType != null) {
return protoRowType.apply(typeFactory);
}
final List<RelDataType> types = new ArrayList<RelDataType>(allFields.size());
final List<String> names = new ArrayList<String>(allFields.size());
for (Object key : allFields.keySet()) {
final RelDataType type = typeFactory.createJavaType(allFields.get(key).getClass());
names.add(key.toString());
types.add(type);
}
return typeFactory.createStructType(Pair.zip(names, types));
}
static Table create(
RedisSchema schema,
String tableName,
RedisConfig redisConfig,
RelProtoDataType protoRowType) {
RedisTableFieldInfo tableFieldInfo = schema.getTableFieldInfo(tableName);
Map<String, Object> allFields = RedisEnumerator.deduceRowType(tableFieldInfo);
return new RedisTable(schema, tableName, protoRowType,
allFields, tableFieldInfo.getDataFormat(), redisConfig);
}
static Table create(
RedisSchema schema,
String tableName,
Map operand,
RelProtoDataType protoRowType) {
RedisConfig redisConfig = new RedisConfig(schema.host, schema.port,
schema.database, schema.password);
return create(schema, tableName, redisConfig, protoRowType);
}
@Override public Enumerable<@Nullable Object[]> scan(DataContext root) {
return new AbstractEnumerable<Object[]>() {
@Override public Enumerator<Object[]> enumerator() {
return new RedisEnumerator(redisConfig, schema, tableName);
}
};
}
}