blob: 2e4a27f0b11ebb4ee37c69d8def0192aac821ab1 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.samza.util;
import com.microsoft.azure.storage.AccessCondition;
import com.microsoft.azure.storage.StorageException;
import com.microsoft.azure.storage.blob.CloudBlobClient;
import com.microsoft.azure.storage.blob.CloudBlobContainer;
import com.microsoft.azure.storage.blob.CloudPageBlob;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.URISyntaxException;
import java.util.Arrays;
import java.util.List;
import org.apache.samza.AzureClient;
import org.apache.samza.AzureException;
import org.apache.samza.coordinator.data.JobModelBundle;
import org.apache.samza.SamzaException;
import org.apache.samza.job.model.JobModel;
import org.apache.samza.serializers.model.SamzaObjectMapper;
import org.eclipse.jetty.http.HttpStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Client side class that has reference to Azure blob storage.
* Used for writing and reading from the blob.
* Every write requires a valid lease ID.
*/
public class BlobUtils {
private static final Logger LOG = LoggerFactory.getLogger(BlobUtils.class);
private static final long JOB_MODEL_BLOCK_SIZE = 1024000;
private static final long BARRIER_STATE_BLOCK_SIZE = 1024;
private static final long PROCESSOR_LIST_BLOCK_SIZE = 1024;
private CloudBlobClient blobClient;
private CloudBlobContainer container;
private CloudPageBlob blob;
/**
* Creates an object of BlobUtils. It creates the container and page blob if they don't exist already.
* @param client Client handle for access to Azure Storage account.
* @param containerName Name of container inside which we want the blob to reside.
* @param blobName Name of the blob to be managed.
* @param length Length of the page blob.
* @throws AzureException If an Azure storage service error occurred, or when the container name or blob name is invalid.
*/
public BlobUtils(AzureClient client, String containerName, String blobName, long length) {
this.blobClient = client.getBlobClient();
try {
this.container = blobClient.getContainerReference(containerName);
container.createIfNotExists();
this.blob = container.getPageBlobReference(blobName);
if (!blob.exists()) {
blob.create(length, AccessCondition.generateIfNotExistsCondition(), null, null);
}
} catch (URISyntaxException e) {
LOG.error("Container name: " + containerName + " or blob name: " + blobName + " invalid.", e);
throw new AzureException(e);
} catch (StorageException e) {
int httpStatusCode = e.getHttpStatusCode();
if (httpStatusCode == HttpStatus.CONFLICT_409) {
LOG.info("The blob you're trying to create exists already.", e);
} else {
LOG.error("Azure Storage Exception!", e);
throw new AzureException(e);
}
}
}
/**
* Writes the job model to the blob.
* Write is successful only if the lease ID passed is valid and the processor holds the lease.
* Called by the leader.
* @param prevJM Previous job model version that the processor was operating on.
* @param currJM Current job model version that the processor is operating on.
* @param prevJMV Previous job model version that the processor was operating on.
* @param currJMV Current job model version that the processor is operating on.
* @param leaseId LeaseID of the lease that the processor holds on the blob. Null if there is no lease.
* @return true if write to the blob is successful, false if leaseID is null or an Azure storage service error or IO exception occurred.
*/
public boolean publishJobModel(JobModel prevJM, JobModel currJM, String prevJMV, String currJMV, String leaseId) {
try {
if (leaseId == null) {
return false;
}
JobModelBundle bundle = new JobModelBundle(prevJM, currJM, prevJMV, currJMV);
byte[] data = SamzaObjectMapper.getObjectMapper().writerWithDefaultPrettyPrinter().writeValueAsBytes(bundle);
byte[] pageData = Arrays.copyOf(data, (int) JOB_MODEL_BLOCK_SIZE);
InputStream is = new ByteArrayInputStream(pageData);
blob.uploadPages(is, 0, JOB_MODEL_BLOCK_SIZE, AccessCondition.generateLeaseCondition(leaseId), null, null);
LOG.info("Uploaded {} jobModel to blob", bundle.getCurrJobModel());
return true;
} catch (StorageException | IOException e) {
LOG.error("JobModel publish failed for version = " + currJMV, e);
return false;
}
}
/**
* Reads the current job model from the blob.
* @return The current job model published on the blob. Returns null when job model details not found on the blob.
* @throws AzureException in getJobModelBundle() if an Azure storage service error occurred.
* @throws SamzaException in getJobModelBundle() if data retrieved from blob could not be parsed by SamzaObjectMapper.
*/
public JobModel getJobModel() {
LOG.info("Reading the job model from blob.");
JobModelBundle jmBundle = getJobModelBundle();
if (jmBundle == null) {
LOG.error("Job Model details don't exist on the blob.");
return null;
}
return jmBundle.getCurrJobModel();
}
/**
* Reads the current job model version from the blob .
* @return Current job model version published on the blob. Returns null when job model details not found on the blob.
* @throws AzureException in getJobModelBundle() if an Azure storage service error occurred.
* @throws SamzaException in getJobModelBundle() if data retrieved from blob could not be parsed by SamzaObjectMapper.
*/
public String getJobModelVersion() {
LOG.info("Reading the job model version from blob.");
JobModelBundle jmBundle = getJobModelBundle();
if (jmBundle == null) {
LOG.error("Job Model details don't exist on the blob.");
return null;
}
return jmBundle.getCurrJobModelVersion();
}
/**
* Writes the barrier state to the blob.
* Write is successful only if the lease ID passed is valid and the processor holds the lease.
* Called only by the leader.
* @param state Barrier state to be published to the blob.
* @param leaseId LeaseID of the valid lease that the processor holds on the blob. Null if there is no lease.
* @return true if write to the blob is successful, false if leaseID is null or an Azure storage service error or IO exception occurred.
*/
public boolean publishBarrierState(String state, String leaseId) {
try {
if (leaseId == null) {
return false;
}
byte[] data = SamzaObjectMapper.getObjectMapper().writerWithDefaultPrettyPrinter().writeValueAsBytes(state);
byte[] pageData = Arrays.copyOf(data, (int) BARRIER_STATE_BLOCK_SIZE);
InputStream is = new ByteArrayInputStream(pageData);
//uploadPages is only successful when the AccessCondition provided has an active and valid lease ID. It fails otherwise.
blob.uploadPages(is, JOB_MODEL_BLOCK_SIZE, BARRIER_STATE_BLOCK_SIZE, AccessCondition.generateLeaseCondition(leaseId), null, null);
LOG.info("Uploaded barrier state {} to blob", state);
return true;
} catch (StorageException | IOException e) {
LOG.error("Barrier state " + state + " publish failed", e);
return false;
}
}
/**
* Reads the current barrier state from the blob.
* @return Barrier state published on the blob.
* @throws AzureException If an Azure storage service error occurred.
* @throws SamzaException If data retrieved from blob could not be parsed by SamzaObjectMapper.
*/
public String getBarrierState() {
LOG.info("Reading the barrier state from blob.");
byte[] data = new byte[(int) BARRIER_STATE_BLOCK_SIZE];
try {
blob.downloadRangeToByteArray(JOB_MODEL_BLOCK_SIZE, BARRIER_STATE_BLOCK_SIZE, data, 0);
} catch (StorageException e) {
LOG.error("Failed to read barrier state from blob.", e);
throw new AzureException(e);
}
try {
return SamzaObjectMapper.getObjectMapper().readValue(data, String.class);
} catch (IOException e) {
LOG.error("Failed to parse byte data: " + Arrays.toString(data) + " for barrier state retrieved from the blob.", e);
throw new SamzaException(e);
}
}
/**
* Writes the list of live processors in the system to the blob.
* Write is successful only if the lease ID passed is valid and the processor holds the lease.
* Called only by the leader.
* @param processors List of live processors to be published on the blob.
* @param leaseId LeaseID of the valid lease that the processor holds on the blob. Null if there is no lease.
* @return true if write to the blob is successful, false if leaseID is null or an Azure storage service error or IO exception occurred.
*/
public boolean publishLiveProcessorList(List<String> processors, String leaseId) {
try {
if (leaseId == null) {
return false;
}
byte[] data = SamzaObjectMapper.getObjectMapper().writerWithDefaultPrettyPrinter().writeValueAsBytes(processors);
byte[] pageData = Arrays.copyOf(data, (int) BARRIER_STATE_BLOCK_SIZE);
InputStream is = new ByteArrayInputStream(pageData);
blob.uploadPages(is, JOB_MODEL_BLOCK_SIZE + BARRIER_STATE_BLOCK_SIZE, PROCESSOR_LIST_BLOCK_SIZE, AccessCondition.generateLeaseCondition(leaseId), null, null);
LOG.info("Uploaded list of live processors to blob.");
return true;
} catch (StorageException | IOException e) {
LOG.error("Processor list: " + processors + "publish failed", e);
return false;
}
}
/**
* Reads the list of live processors published on the blob.
* @return String list of live processors.
* @throws AzureException If an Azure storage service error occurred.
* @throws SamzaException If data retrieved from blob could not be parsed by SamzaObjectMapper.
*/
public List<String> getLiveProcessorList() {
LOG.info("Read the the list of live processors from blob.");
byte[] data = new byte[(int) PROCESSOR_LIST_BLOCK_SIZE];
try {
blob.downloadRangeToByteArray(JOB_MODEL_BLOCK_SIZE + BARRIER_STATE_BLOCK_SIZE, PROCESSOR_LIST_BLOCK_SIZE, data, 0);
} catch (StorageException e) {
LOG.error("Failed to read the list of live processors from the blob.", new AzureException(e));
throw new AzureException(e);
}
try {
return SamzaObjectMapper.getObjectMapper().readValue(data, List.class);
} catch (IOException e) {
LOG.error("Failed to parse byte data: " + Arrays.toString(data) + " for live processor list retrieved from the blob", new SamzaException(e));
throw new SamzaException(e);
}
}
public CloudBlobClient getBlobClient() {
return this.blobClient;
}
public CloudBlobContainer getBlobContainer() {
return this.container;
}
public CloudPageBlob getBlob() {
return this.blob;
}
private JobModelBundle getJobModelBundle() {
byte[] data = new byte[(int) JOB_MODEL_BLOCK_SIZE];
try {
blob.downloadRangeToByteArray(0, JOB_MODEL_BLOCK_SIZE, data, 0);
} catch (StorageException e) {
LOG.error("Failed to read JobModel details from the blob.", e);
throw new AzureException(e);
}
try {
return SamzaObjectMapper.getObjectMapper().readValue(data, JobModelBundle.class);
} catch (IOException e) {
LOG.error("Failed to parse byte data: " + Arrays.toString(data) + " for JobModel details retrieved from the blob", e);
throw new SamzaException(e);
}
}
}