blob: 1f400f5c12b061a7f92eec3603e8ac90daf45245 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.kaha.impl.async;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.util.Map;
import org.apache.activemq.kaha.impl.async.DataFileAppender.WriteCommand;
import org.apache.activemq.kaha.impl.async.DataFileAppender.WriteKey;
import org.apache.activemq.util.ByteSequence;
/**
* Optimized Store reader and updater. Single threaded and synchronous. Use in
* conjunction with the DataFileAccessorPool of concurrent use.
*
*
*/
final class DataFileAccessor {
private final DataFile dataFile;
private final Map<WriteKey, WriteCommand> inflightWrites;
private final RandomAccessFile file;
private boolean disposed;
/**
* Construct a Store reader
*
* @param fileId
* @throws IOException
*/
public DataFileAccessor(AsyncDataManager dataManager, DataFile dataFile) throws IOException {
this.dataFile = dataFile;
this.inflightWrites = dataManager.getInflightWrites();
this.file = dataFile.openRandomAccessFile(false);
}
public DataFile getDataFile() {
return dataFile;
}
public void dispose() {
if (disposed) {
return;
}
disposed = true;
try {
dataFile.closeRandomAccessFile(file);
} catch (IOException e) {
e.printStackTrace();
}
}
public ByteSequence readRecord(Location location) throws IOException {
if (!location.isValid()) {
throw new IOException("Invalid location: " + location);
}
WriteCommand asyncWrite = (WriteCommand)inflightWrites.get(new WriteKey(location));
if (asyncWrite != null) {
return asyncWrite.data;
}
try {
if (location.getSize() == Location.NOT_SET) {
file.seek(location.getOffset());
location.setSize(file.readInt());
file.seek(location.getOffset() + AsyncDataManager.ITEM_HEAD_SPACE);
} else {
file.seek(location.getOffset() + AsyncDataManager.ITEM_HEAD_SPACE);
}
byte[] data = new byte[location.getSize() - AsyncDataManager.ITEM_HEAD_FOOT_SPACE];
file.readFully(data);
return new ByteSequence(data, 0, data.length);
} catch (RuntimeException e) {
throw new IOException("Invalid location: " + location + ", : " + e);
}
}
public void readLocationDetails(Location location) throws IOException {
WriteCommand asyncWrite = (WriteCommand)inflightWrites.get(new WriteKey(location));
if (asyncWrite != null) {
location.setSize(asyncWrite.location.getSize());
location.setType(asyncWrite.location.getType());
} else {
file.seek(location.getOffset());
location.setSize(file.readInt());
location.setType(file.readByte());
}
}
public boolean readLocationDetailsAndValidate(Location location) {
try {
WriteCommand asyncWrite = (WriteCommand)inflightWrites.get(new WriteKey(location));
if (asyncWrite != null) {
location.setSize(asyncWrite.location.getSize());
location.setType(asyncWrite.location.getType());
} else {
file.seek(location.getOffset());
location.setSize(file.readInt());
location.setType(file.readByte());
byte data[] = new byte[3];
file.seek(location.getOffset() + AsyncDataManager.ITEM_HEAD_OFFSET_TO_SOR);
file.readFully(data);
if (data[0] != AsyncDataManager.ITEM_HEAD_SOR[0]
|| data[1] != AsyncDataManager.ITEM_HEAD_SOR[1]
|| data[2] != AsyncDataManager.ITEM_HEAD_SOR[2]) {
return false;
}
file.seek(location.getOffset() + location.getSize() - AsyncDataManager.ITEM_FOOT_SPACE);
file.readFully(data);
if (data[0] != AsyncDataManager.ITEM_HEAD_EOR[0]
|| data[1] != AsyncDataManager.ITEM_HEAD_EOR[1]
|| data[2] != AsyncDataManager.ITEM_HEAD_EOR[2]) {
return false;
}
}
} catch (IOException e) {
return false;
}
return true;
}
public void updateRecord(Location location, ByteSequence data, boolean sync) throws IOException {
file.seek(location.getOffset() + AsyncDataManager.ITEM_HEAD_SPACE);
int size = Math.min(data.getLength(), location.getSize());
file.write(data.getData(), data.getOffset(), size);
if (sync) {
file.getFD().sync();
}
}
}