blob: fd74f0d76095cf019a68ee7643521cfd87210aef [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.apache.hugegraph.backend.store.cassandra;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.hugegraph.backend.BackendException;
import org.apache.hugegraph.backend.id.Id;
import org.apache.hugegraph.backend.id.IdGenerator;
import org.apache.hugegraph.backend.id.IdUtil;
import org.apache.hugegraph.backend.serializer.BytesBuffer;
import org.apache.hugegraph.backend.serializer.TableBackendEntry;
import org.apache.hugegraph.backend.serializer.TableSerializer;
import org.apache.hugegraph.backend.store.BackendEntry;
import org.apache.hugegraph.config.HugeConfig;
import org.apache.hugegraph.schema.PropertyKey;
import org.apache.hugegraph.schema.SchemaElement;
import org.apache.hugegraph.structure.HugeElement;
import org.apache.hugegraph.structure.HugeIndex;
import org.apache.hugegraph.structure.HugeProperty;
import org.apache.hugegraph.structure.HugeVertex;
import org.apache.hugegraph.type.HugeType;
import org.apache.hugegraph.type.define.DataType;
import org.apache.hugegraph.type.define.HugeKeys;
import org.apache.hugegraph.util.E;
import org.apache.hugegraph.util.InsertionOrderUtil;
import org.apache.hugegraph.util.JsonUtil;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
public class CassandraSerializer extends TableSerializer {
public CassandraSerializer(HugeConfig config) {
super(config);
}
@Override
public CassandraBackendEntry newBackendEntry(HugeType type, Id id) {
return new CassandraBackendEntry(type, id);
}
@Override
protected TableBackendEntry newBackendEntry(TableBackendEntry.Row row) {
return new CassandraBackendEntry(row);
}
@Override
protected TableBackendEntry newBackendEntry(HugeIndex index) {
TableBackendEntry backendEntry = newBackendEntry(index.type(),
index.id());
if (index.indexLabel().olap()) {
backendEntry.olap(true);
}
return backendEntry;
}
@Override
protected CassandraBackendEntry convertEntry(BackendEntry backendEntry) {
if (!(backendEntry instanceof CassandraBackendEntry)) {
throw new BackendException("Not supported by CassandraSerializer");
}
return (CassandraBackendEntry) backendEntry;
}
@Override
protected Set<Object> parseIndexElemIds(TableBackendEntry entry) {
return ImmutableSet.of(entry.column(HugeKeys.ELEMENT_IDS));
}
@Override
protected Id toId(Number number) {
return IdGenerator.of(number.longValue());
}
@Override
protected Id[] toIdArray(Object object) {
assert object instanceof Collection;
@SuppressWarnings("unchecked")
Collection<Number> numbers = (Collection<Number>) object;
Id[] ids = new Id[numbers.size()];
int i = 0;
for (Number number : numbers) {
ids[i++] = toId(number);
}
return ids;
}
@Override
protected Object toLongSet(Collection<Id> ids) {
Set<Long> results = InsertionOrderUtil.newSet();
for (Id id : ids) {
results.add(id.asLong());
}
return results;
}
@Override
protected Object toLongList(Collection<Id> ids) {
List<Long> results = new ArrayList<>(ids.size());
for (Id id : ids) {
results.add(id.asLong());
}
return results;
}
@Override
protected void formatProperties(HugeElement element,
TableBackendEntry.Row row) {
if (!element.hasProperties() && !element.removed()) {
row.column(HugeKeys.PROPERTIES, ImmutableMap.of());
} else {
// Format properties
for (HugeProperty<?> prop : element.getProperties()) {
this.formatProperty(prop, row);
}
}
}
@Override
protected void parseProperties(HugeElement element,
TableBackendEntry.Row row) {
Map<Number, Object> props = row.column(HugeKeys.PROPERTIES);
for (Map.Entry<Number, Object> prop : props.entrySet()) {
Id pkeyId = this.toId(prop.getKey());
this.parseProperty(pkeyId, prop.getValue(), element);
}
}
@Override
public BackendEntry writeOlapVertex(HugeVertex vertex) {
CassandraBackendEntry entry = newBackendEntry(HugeType.OLAP,
vertex.id());
entry.column(HugeKeys.ID, this.writeId(vertex.id()));
Collection<HugeProperty<?>> properties = vertex.getProperties();
E.checkArgument(properties.size() == 1,
"Expect only 1 property for olap vertex, but got %s",
properties.size());
HugeProperty<?> property = properties.iterator().next();
PropertyKey pk = property.propertyKey();
entry.subId(pk.id());
entry.column(HugeKeys.PROPERTY_VALUE,
this.writeProperty(pk, property.value()));
entry.olap(true);
return entry;
}
@Override
protected Object writeProperty(PropertyKey propertyKey, Object value) {
BytesBuffer buffer = BytesBuffer.allocate(BytesBuffer.BUF_PROPERTY);
if (propertyKey == null) {
/*
* Since we can't know the type of the property value in some
* scenarios so need to construct a fake property key to
* serialize to reuse code.
*/
propertyKey = new PropertyKey(null, IdGenerator.of(0L), "fake");
propertyKey.dataType(DataType.fromClass(value.getClass()));
}
buffer.writeProperty(propertyKey, value);
buffer.forReadWritten();
return buffer.asByteBuffer();
}
@Override
@SuppressWarnings("unchecked")
protected <T> T readProperty(PropertyKey pkey, Object value) {
BytesBuffer buffer = BytesBuffer.wrap((ByteBuffer) value);
return (T) buffer.readProperty(pkey);
}
@Override
protected Object writeId(Id id) {
return IdUtil.writeBinString(id);
}
@Override
protected Id readId(Object id) {
return IdUtil.readBinString(id);
}
@Override
protected void writeUserdata(SchemaElement schema,
TableBackendEntry entry) {
assert entry instanceof CassandraBackendEntry;
for (Map.Entry<String, Object> e : schema.userdata().entrySet()) {
entry.column(HugeKeys.USER_DATA, e.getKey(),
JsonUtil.toJson(e.getValue()));
}
}
@Override
protected void readUserdata(SchemaElement schema,
TableBackendEntry entry) {
assert entry instanceof CassandraBackendEntry;
// Parse all user data of a schema element
Map<String, String> userdata = entry.column(HugeKeys.USER_DATA);
for (Map.Entry<String, String> e : userdata.entrySet()) {
String key = e.getKey();
Object value = JsonUtil.fromJson(e.getValue(), Object.class);
schema.userdata(key, value);
}
}
}