blob: 45c5dd6e7c0a4e1c80f471114924b40b73c51e18 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.federation.store.driver.impl;
import java.io.IOException;
import org.apache.commons.codec.binary.Base64;
import org.apache.commons.codec.binary.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreSerializer;
import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord;
import org.apache.hadoop.hdfs.server.federation.store.records.impl.pb.PBRecord;
import org.apache.hadoop.util.ReflectionUtils;
import com.google.protobuf.Message;
/**
* Protobuf implementation of the State Store serializer.
*/
public final class StateStoreSerializerPBImpl extends StateStoreSerializer {
private static final String PB_IMPL_PACKAGE_SUFFIX = "impl.pb";
private static final String PB_IMPL_CLASS_SUFFIX = "PBImpl";
private Configuration localConf = new Configuration();
private StateStoreSerializerPBImpl() {
}
@Override
@SuppressWarnings("unchecked")
public <T> T newRecordInstance(Class<T> clazz) {
try {
String clazzPBImpl = getPBImplClassName(clazz);
Class<?> pbClazz = localConf.getClassByName(clazzPBImpl);
Object retObject = ReflectionUtils.newInstance(pbClazz, localConf);
return (T)retObject;
} catch (ClassNotFoundException e) {
throw new RuntimeException(e);
}
}
private String getPBImplClassName(Class<?> clazz) {
String srcPackagePart = getPackageName(clazz);
String srcClassName = getClassName(clazz);
String destPackagePart = srcPackagePart + "." + PB_IMPL_PACKAGE_SUFFIX;
String destClassPart = srcClassName + PB_IMPL_CLASS_SUFFIX;
return destPackagePart + "." + destClassPart;
}
private String getClassName(Class<?> clazz) {
String fqName = clazz.getName();
return (fqName.substring(fqName.lastIndexOf(".") + 1, fqName.length()));
}
private String getPackageName(Class<?> clazz) {
return clazz.getPackage().getName();
}
@Override
public byte[] serialize(BaseRecord record) {
byte[] byteArray64 = null;
if (record instanceof PBRecord) {
PBRecord recordPB = (PBRecord) record;
Message msg = recordPB.getProto();
byte[] byteArray = msg.toByteArray();
byteArray64 = Base64.encodeBase64(byteArray, false);
}
return byteArray64;
}
@Override
public String serializeString(BaseRecord record) {
byte[] byteArray64 = serialize(record);
String base64Encoded = StringUtils.newStringUtf8(byteArray64);
return base64Encoded;
}
@Override
public <T extends BaseRecord> T deserialize(
byte[] byteArray, Class<T> clazz) throws IOException {
T record = newRecord(clazz);
if (record instanceof PBRecord) {
PBRecord pbRecord = (PBRecord) record;
byte[] byteArray64 = Base64.encodeBase64(byteArray, false);
String base64Encoded = StringUtils.newStringUtf8(byteArray64);
pbRecord.readInstance(base64Encoded);
}
return record;
}
@Override
public <T extends BaseRecord> T deserialize(String data, Class<T> clazz)
throws IOException {
byte[] byteArray64 = Base64.decodeBase64(data);
return deserialize(byteArray64, clazz);
}
}