blob: f20acc52247bdd3eacd05144458eea231773f113 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.stateless.repository;
import org.apache.nifi.controller.repository.ContentRepository;
import org.apache.nifi.controller.repository.claim.ContentClaim;
import org.apache.nifi.controller.repository.claim.ResourceClaim;
import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
import org.apache.nifi.stream.io.ByteCountingOutputStream;
import org.apache.nifi.stream.io.StreamUtils;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.file.Files;
import java.nio.file.OpenOption;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.Collection;
import java.util.Collections;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
public class ByteArrayContentRepository implements ContentRepository {
private ResourceClaimManager resourceClaimManager;
@Override
public void initialize(final ResourceClaimManager claimManager) {
resourceClaimManager = claimManager;
}
@Override
public void shutdown() {
}
@Override
public Set<String> getContainerNames() {
return Collections.singleton("container");
}
@Override
public long getContainerCapacity(final String containerName) {
return 0;
}
@Override
public long getContainerUsableSpace(final String containerName) {
return 0;
}
@Override
public String getContainerFileStoreName(final String containerName) {
return "container";
}
@Override
public ContentClaim create(final boolean lossTolerant) {
final ContentClaim contentClaim = new ByteArrayContentClaim();
resourceClaimManager.incrementClaimantCount(contentClaim.getResourceClaim());
return contentClaim;
}
@Override
public int incrementClaimaintCount(final ContentClaim claim) {
if (claim == null) {
return 0;
}
return resourceClaimManager.incrementClaimantCount(claim.getResourceClaim());
}
@Override
public int getClaimantCount(final ContentClaim claim) {
if (claim == null) {
return 0;
}
return resourceClaimManager.getClaimantCount(claim.getResourceClaim());
}
@Override
public int decrementClaimantCount(final ContentClaim claim) {
if (claim == null) {
return 0;
}
return resourceClaimManager.decrementClaimantCount(claim.getResourceClaim());
}
@Override
public boolean remove(final ContentClaim claim) {
return true;
}
@Override
public ContentClaim clone(final ContentClaim original, final boolean lossTolerant) throws IOException {
final ContentClaim clone = create(lossTolerant);
try (final InputStream in = read(original);
final OutputStream out = write(clone)) {
StreamUtils.copy(in, out);
}
return clone;
}
@Override
public long merge(final Collection<ContentClaim> claims, final ContentClaim destination, final byte[] header, final byte[] footer, final byte[] demarcator) throws IOException {
if (claims.contains(destination)) {
throw new IllegalArgumentException("destination cannot be within claims");
}
try (final ByteCountingOutputStream out = new ByteCountingOutputStream(write(destination))) {
if (header != null) {
out.write(header);
}
int i = 0;
for (final ContentClaim claim : claims) {
try (final InputStream in = read(claim)) {
StreamUtils.copy(in, out);
}
if (++i < claims.size() && demarcator != null) {
out.write(demarcator);
}
}
if (footer != null) {
out.write(footer);
}
return out.getBytesWritten();
}
}
@Override
public long importFrom(final Path content, final ContentClaim claim) throws IOException {
try (final InputStream in = Files.newInputStream(content, StandardOpenOption.READ)) {
return importFrom(in, claim);
}
}
@Override
public long importFrom(final InputStream content, final ContentClaim claim) throws IOException {
try (final OutputStream out = write(claim)) {
return StreamUtils.copy(content, out);
}
}
@Override
public long exportTo(final ContentClaim claim, final Path destination, final boolean append) throws IOException {
final OpenOption[] openOptions = append ? new StandardOpenOption[] {StandardOpenOption.CREATE, StandardOpenOption.APPEND} :
new StandardOpenOption[] {StandardOpenOption.CREATE};
try (final OutputStream out = Files.newOutputStream(destination, openOptions)) {
return exportTo(claim, out);
}
}
@Override
public long exportTo(final ContentClaim claim, final Path destination, final boolean append, final long offset, final long length) throws IOException {
final OpenOption[] openOptions = append ? new StandardOpenOption[] {StandardOpenOption.CREATE, StandardOpenOption.APPEND} :
new StandardOpenOption[] {StandardOpenOption.CREATE};
try (final OutputStream out = Files.newOutputStream(destination, openOptions)) {
return exportTo(claim, out, offset, length);
}
}
@Override
public long exportTo(final ContentClaim claim, final OutputStream destination) throws IOException {
try (final InputStream in = read(claim)) {
return StreamUtils.copy(in, destination);
}
}
@Override
public long exportTo(final ContentClaim claim, final OutputStream destination, final long offset, final long length) throws IOException {
try (final InputStream in = read(claim)) {
StreamUtils.skip(in, offset);
StreamUtils.copy(in, destination, length);
}
return length;
}
@Override
public long size(final ContentClaim claim) {
return claim.getLength();
}
@Override
public InputStream read(final ContentClaim claim) {
final ByteArrayContentClaim byteArrayContentClaim = verifyClaim(claim);
return byteArrayContentClaim.read();
}
@Override
public InputStream read(final ResourceClaim claim) throws IOException {
if (claim == null) {
return new ByteArrayInputStream(new byte[0]);
}
if (!(claim instanceof ByteArrayResourceClaim)) {
throw new IllegalArgumentException("Cannot access Resource Claim " + claim + " because the Resource Claim does not belong to this Content Repository");
}
final ByteArrayResourceClaim byteArrayResourceClaim = (ByteArrayResourceClaim) claim;
return byteArrayResourceClaim.read();
}
@Override
public OutputStream write(final ContentClaim claim) {
final ByteArrayContentClaim byteArrayContentClaim = verifyClaim(claim);
return byteArrayContentClaim.writeTo();
}
private ByteArrayContentClaim verifyClaim(final ContentClaim claim) {
Objects.requireNonNull(claim);
if (!(claim instanceof ByteArrayContentClaim)) {
throw new IllegalArgumentException("Cannot access Content Claim " + claim + " because the Content Claim does not belong to this Content Repository");
}
return (ByteArrayContentClaim) claim;
}
@Override
public void purge() {
}
@Override
public void cleanup() {
}
public byte[] getBytes(final ContentClaim contentClaim) {
final ResourceClaim resourceClaim = contentClaim.getResourceClaim();
if (!(resourceClaim instanceof ByteArrayResourceClaim)) {
throw new IllegalArgumentException("Given ContentClaim was not created by this Repository");
}
return ((ByteArrayResourceClaim) resourceClaim).contents;
}
@Override
public boolean isAccessible(final ContentClaim contentClaim) {
return false;
}
private static class ByteArrayContentClaim implements ContentClaim {
private final ByteArrayResourceClaim resourceClaim = new ByteArrayResourceClaim();
@Override
public ResourceClaim getResourceClaim() {
return resourceClaim;
}
@Override
public long getOffset() {
return 0;
}
@Override
public long getLength() {
return resourceClaim.getLength();
}
@Override
public int compareTo(final ContentClaim o) {
return resourceClaim.compareTo(o.getResourceClaim());
}
public OutputStream writeTo() {
return resourceClaim.writeTo();
}
public InputStream read() {
return resourceClaim.read();
}
}
private static class ByteArrayResourceClaim implements ResourceClaim {
private static final AtomicLong idCounter = new AtomicLong(0L);
private final String id = String.valueOf(idCounter.getAndIncrement());
private byte[] contents;
@Override
public String getId() {
return id;
}
@Override
public String getContainer() {
return "container";
}
@Override
public String getSection() {
return "section";
}
@Override
public boolean isLossTolerant() {
return true;
}
@Override
public boolean isWritable() {
return contents == null;
}
@Override
public boolean isInUse() {
return true;
}
public long getLength() {
return contents == null ? 0L : contents.length;
}
public OutputStream writeTo() {
return new ByteArrayOutputStream() {
@Override
public void close() throws IOException {
super.close();
ByteArrayResourceClaim.this.contents = toByteArray();
}
};
}
public InputStream read() {
if (contents == null) {
return new ByteArrayInputStream(new byte[0]);
}
return new ByteArrayInputStream(contents);
}
@Override
public boolean equals(final Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()){
return false;
}
final ByteArrayResourceClaim that = (ByteArrayResourceClaim) o;
return Objects.equals(id, that.id);
}
@Override
public int hashCode() {
return Objects.hash(id);
}
}
}