blob: 9a48b02cc04408b70dcbb87d05e49f733eb57c33 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.serialization;
import org.apache.nifi.serialization.record.DataType;
import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.SchemaIdentifier;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
public class SimpleRecordSchema implements RecordSchema {
private List<RecordField> fields = null;
private Map<String, RecordField> fieldMap = null;
private final boolean textAvailable;
private final AtomicReference<String> text = new AtomicReference<>();
private final String schemaFormat;
private final SchemaIdentifier schemaIdentifier;
private String schemaName;
private String schemaNamespace;
private volatile int hashCode;
public SimpleRecordSchema(final List<RecordField> fields) {
this(fields, null, null, false, SchemaIdentifier.EMPTY);
}
public SimpleRecordSchema(final List<RecordField> fields, final SchemaIdentifier id) {
this(fields, null, null, false, id);
}
public SimpleRecordSchema(final String text, final String schemaFormat, final SchemaIdentifier id) {
this(text, schemaFormat, true, id);
}
public SimpleRecordSchema(final SchemaIdentifier id) {
this(null, null, false, id);
}
public SimpleRecordSchema(final List<RecordField> fields, final String text, final String schemaFormat, final SchemaIdentifier id) {
this(fields, text, schemaFormat, true, id);
}
private SimpleRecordSchema(final List<RecordField> fields, final String text, final String schemaFormat, final boolean textAvailable, final SchemaIdentifier id) {
this(text, schemaFormat, textAvailable, id);
setFields(fields);
}
private SimpleRecordSchema(final String text, final String schemaFormat, final boolean textAvailable, final SchemaIdentifier id) {
this.text.set(text);
this.schemaFormat = schemaFormat;
this.schemaIdentifier = id;
this.textAvailable = textAvailable;
}
@Override
public Optional<String> getSchemaText() {
if (textAvailable) {
return Optional.ofNullable(text.get());
} else {
return Optional.empty();
}
}
@Override
public Optional<String> getSchemaFormat() {
return Optional.ofNullable(schemaFormat);
}
@Override
public List<RecordField> getFields() {
return fields;
}
public void setFields(final List<RecordField> fields) {
if (this.fields != null) {
throw new IllegalArgumentException("Fields have already been set.");
}
this.fields = Collections.unmodifiableList(new ArrayList<>(fields));
this.fieldMap = new HashMap<>(fields.size() * 2);
for (final RecordField field : fields) {
RecordField previousValue = fieldMap.put(field.getFieldName(), field);
if (previousValue != null) {
throw new IllegalArgumentException("Two fields are given with the same name (or alias) of '" + field.getFieldName() + "'");
}
for (final String alias : field.getAliases()) {
previousValue = fieldMap.put(alias, field);
if (previousValue != null) {
throw new IllegalArgumentException("Two fields are given with the same name (or alias) of '" + field.getFieldName() + "'");
}
}
}
}
@Override
public int getFieldCount() {
return fields.size();
}
@Override
public RecordField getField(final int index) {
return fields.get(index);
}
@Override
public List<DataType> getDataTypes() {
return getFields().stream().map(RecordField::getDataType)
.collect(Collectors.toList());
}
@Override
public List<String> getFieldNames() {
return getFields().stream().map(RecordField::getFieldName)
.collect(Collectors.toList());
}
@Override
public Optional<DataType> getDataType(final String fieldName) {
final RecordField field = fieldMap.get(fieldName);
if (field == null) {
return Optional.empty();
}
return Optional.of(field.getDataType());
}
@Override
public Optional<RecordField> getField(final String fieldName) {
return Optional.ofNullable(fieldMap.get(fieldName));
}
@Override
public boolean equals(final Object obj) {
if (obj == null) {
return false;
}
if (obj == this) {
return true;
}
if (!(obj instanceof RecordSchema)) {
return false;
}
final RecordSchema other = (RecordSchema) obj;
if (getSchemaNamespace().isPresent() && getSchemaNamespace().equals(other.getSchemaNamespace())
&& getSchemaName().isPresent() && getSchemaName().equals(other.getSchemaName())) {
return true;
}
return fields.equals(other.getFields());
}
@Override
public int hashCode() {
int computed = this.hashCode;
if (computed == 0) {
computed = this.hashCode = 143 + 3 * fields.hashCode();
}
return computed;
}
private static String createText(final List<RecordField> fields) {
final StringBuilder sb = new StringBuilder("[");
for (int i = 0; i < fields.size(); i++) {
final RecordField field = fields.get(i);
sb.append("\"");
sb.append(field.getFieldName());
sb.append("\" : \"");
sb.append(field.getDataType());
sb.append("\"");
if (i < fields.size() - 1) {
sb.append(", ");
}
}
sb.append("]");
return sb.toString();
}
@Override
public String toString() {
String textValue = text.get();
if (textValue != null) {
return textValue;
}
textValue = createText(fields);
final boolean updated = text.compareAndSet(null, textValue);
if (updated) {
return textValue;
} else {
return text.get();
}
}
@Override
public SchemaIdentifier getIdentifier() {
return schemaIdentifier;
}
/**
* Set schema name.
* @param schemaName schema name as defined in a root record.
*/
public void setSchemaName(String schemaName) {
this.schemaName = schemaName;
}
@Override
public Optional<String> getSchemaName() {
return Optional.ofNullable(schemaName);
}
/**
* Set schema namespace.
* @param schemaNamespace schema namespace as defined in a root record.
*/
public void setSchemaNamespace(String schemaNamespace) {
this.schemaNamespace = schemaNamespace;
}
@Override
public Optional<String> getSchemaNamespace() {
return Optional.ofNullable(schemaNamespace);
}
}