blob: 43be542b686d2760f66f1b9cffcf7c3d2ca15fc7 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.cassandra.spark.data;
import java.io.ByteArrayInputStream;
import java.io.InputStream;
import java.time.Duration;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import org.apache.commons.io.IOUtils;
import org.apache.cassandra.clients.Sidecar;
import org.apache.cassandra.clients.SidecarStreamConsumerAdapter;
import org.apache.cassandra.sidecar.client.SidecarClient;
import org.apache.cassandra.sidecar.client.SidecarInstance;
import org.apache.cassandra.sidecar.common.data.ListSnapshotFilesResponse;
import org.apache.cassandra.sidecar.common.utils.HttpRange;
import org.apache.cassandra.spark.stats.Stats;
import org.apache.cassandra.spark.utils.ThrowableUtils;
import org.apache.cassandra.spark.utils.streaming.SSTableInputStream;
import org.apache.cassandra.spark.utils.streaming.SSTableSource;
import org.apache.cassandra.spark.utils.streaming.StreamConsumer;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
/**
* An SSTable that is streamed from Sidecar
*/
public class SidecarProvisionedSSTable extends SSTable
{
private static final long serialVersionUID = 6452703925812602832L;
private static final Cache<String, byte[]> COMPRESSION_CACHE = CacheBuilder.newBuilder()
.expireAfterAccess(1, TimeUnit.HOURS)
.maximumSize(2048)
.build();
private final SidecarClient sidecar;
private final SidecarInstance instance;
private final Sidecar.ClientConfig sidecarClientConfig;
private final String keyspace;
private final String table;
private final String snapshotName;
@NotNull
private final String dataFileName;
@NotNull
private final Map<FileType, ListSnapshotFilesResponse.FileInfo> components;
private final int partitionId;
private final Stats stats;
// CHECKSTYLE IGNORE: Constructor with many parameters
protected SidecarProvisionedSSTable(SidecarClient sidecar,
Sidecar.ClientConfig sidecarClientConfig,
SidecarInstance instance,
String keyspace,
String table,
String snapshotName,
@NotNull Map<FileType, ListSnapshotFilesResponse.FileInfo> components,
int partitionId,
Stats stats)
{
this.sidecar = sidecar;
this.sidecarClientConfig = sidecarClientConfig;
this.instance = instance;
this.keyspace = keyspace;
this.table = table;
this.snapshotName = snapshotName;
this.components = components;
this.partitionId = partitionId;
this.stats = stats;
String fileName = Objects.requireNonNull(components.get(FileType.DATA), "Data.db SSTable file component must exist").fileName;
String[] ssTableNameParts = fileName.split("-");
this.dataFileName = parseDataFileName(ssTableNameParts);
}
protected String parseDataFileName(String[] ssTableNameParts)
{
return String.join("-", ssTableNameParts[0], ssTableNameParts[1], ssTableNameParts[2], ssTableNameParts[3]);
}
public SidecarInstance instance()
{
return instance;
}
public int partitionId()
{
return partitionId;
}
@NotNull
@Override
public String getDataFileName()
{
return dataFileName;
}
@Nullable
@Override
protected InputStream openInputStream(FileType fileType)
{
ListSnapshotFilesResponse.FileInfo snapshotFile = components.get(fileType);
if (snapshotFile == null)
{
return null;
}
return openStream(snapshotFile.fileName, snapshotFile.size, fileType);
}
public long length(FileType fileType)
{
ListSnapshotFilesResponse.FileInfo snapshotFile = components.get(fileType);
if (snapshotFile == null)
{
throw new IncompleteSSTableException(fileType);
}
return snapshotFile.size;
}
@Override
public boolean isMissing(FileType fileType)
{
return !components.containsKey(fileType);
}
@Nullable
private InputStream openStream(String component, long size, FileType fileType)
{
if (component == null)
{
return null;
}
if (fileType == FileType.COMPRESSION_INFO)
{
String key = String.format("%s/%s/%s/%s/%s", instance.hostname(), keyspace, table, snapshotName, component);
byte[] bytes;
try
{
bytes = COMPRESSION_CACHE.get(key, () -> IOUtils.toByteArray(open(component, fileType, size)));
}
catch (ExecutionException exception)
{
throw new RuntimeException(ThrowableUtils.rootCause(exception));
}
return new ByteArrayInputStream(bytes);
}
return open(component, fileType, size);
}
public InputStream open(String component, FileType fileType, long size)
{
SSTableSource<SidecarProvisionedSSTable> ssTableSource = source(component, fileType, size);
return new SSTableInputStream<>(ssTableSource, stats);
}
/**
* Build an SSTableSource to async provide the bytes
*
* @param componentName the SSTable component to stream
* @param fileType SSTable file type
* @param size file size in bytes
* @return an SSTableSource implementation that uses Sidecar client to request bytes
*/
private SSTableSource<SidecarProvisionedSSTable> source(String componentName, FileType fileType, long size)
{
SidecarProvisionedSSTable thisSSTable = this;
return new SSTableSource<SidecarProvisionedSSTable>()
{
@Override
public void request(long start, long end, StreamConsumer consumer)
{
sidecar.streamSSTableComponent(instance,
keyspace,
table,
snapshotName,
componentName,
HttpRange.of(start, end),
new SidecarStreamConsumerAdapter(consumer));
}
@Override
public long maxBufferSize()
{
return sidecarClientConfig.maxBufferSize(fileType);
}
@Override
public long chunkBufferSize()
{
return sidecarClientConfig.chunkBufferSize(fileType);
}
@Nullable
@Override
public Duration timeout()
{
int timeout = sidecarClientConfig.timeoutSeconds();
return timeout > 0 ? Duration.ofSeconds(timeout) : null;
}
@Override
public SidecarProvisionedSSTable sstable()
{
return thisSSTable;
}
@Override
public FileType fileType()
{
return fileType;
}
@Override
public long size()
{
return size;
}
};
}
@Override
public String toString()
{
return String.format("{\"hostname\"=\"%s\", \"port\"=\"%d\", \"dataFileName\"=\"%s\", \"partitionId\"=\"%d\"}",
instance.hostname(), instance.port(), dataFileName, partitionId);
}
@Override
public int hashCode()
{
return Objects.hash(instance, dataFileName);
}
@Override
public boolean equals(Object other)
{
if (this == other)
{
return true;
}
if (other == null || this.getClass() != other.getClass())
{
return false;
}
SidecarProvisionedSSTable that = (SidecarProvisionedSSTable) other;
return this.instance.equals(that.instance)
&& this.dataFileName.equals(that.dataFileName);
}
}