blob: d5b83201688bc35166b61d32b004f5b9f8ce4380 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.samza.util;
import com.microsoft.azure.storage.StorageException;
import com.microsoft.azure.storage.table.CloudTable;
import com.microsoft.azure.storage.table.CloudTableClient;
import com.microsoft.azure.storage.table.TableOperation;
import com.microsoft.azure.storage.table.TableQuery;
import org.apache.samza.AzureClient;
import org.apache.samza.AzureException;
import org.apache.samza.coordinator.data.ProcessorEntity;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.URISyntaxException;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
/**
* Client side class that has a reference to Azure Table Storage.
* Enables the user to add or delete information from the table, make updates to the table and retrieve information from the table.
* Every row in a table is uniquely identified by a combination of the PARTIITON KEY and ROW KEY.
* PARTITION KEY = Group ID = Job Model Version (for this case).
* ROW KEY = Unique entity ID for a group = Processor ID (for this case).
*/
public class TableUtils {
private static final Logger LOG = LoggerFactory.getLogger(TableUtils.class);
private static final String PARTITION_KEY = "PartitionKey";
private static final long LIVENESS_DEBOUNCE_TIME_SEC = 30;
private final String initialState;
private final CloudTable table;
public TableUtils(AzureClient client, String tableName, String initialState) {
this.initialState = initialState;
CloudTableClient tableClient = client.getTableClient();
try {
table = tableClient.getTableReference(tableName);
table.createIfNotExists();
} catch (URISyntaxException e) {
LOG.error("\nConnection string specifies an invalid URI.", e);
throw new AzureException(e);
} catch (StorageException e) {
LOG.error("Azure storage exception.", e);
throw new AzureException(e);
}
}
/**
* Add a row which denotes an active processor to the processor table.
* @param jmVersion Job model version that the processor is operating on.
* @param pid Unique processor ID.
* @param isLeader Denotes whether the processor is a leader or not.
* @throws AzureException If an Azure storage service error occurred.
*/
public void addProcessorEntity(String jmVersion, String pid, boolean isLeader) {
ProcessorEntity entity = new ProcessorEntity(jmVersion, pid);
entity.setIsLeader(isLeader);
entity.updateLiveness();
TableOperation add = TableOperation.insert(entity);
try {
table.execute(add);
} catch (StorageException e) {
LOG.error("Azure storage exception while adding processor entity with job model version: " + jmVersion + "and pid: " + pid, e);
throw new AzureException(e);
}
}
/**
* Retrieve a particular row in the processor table, given the partition key and the row key.
* @param jmVersion Job model version of the processor row to be retrieved.
* @param pid Unique processor ID of the processor row to be retrieved.
* @return An instance of required processor entity. Null if does not exist.
* @throws AzureException If an Azure storage service error occurred.
*/
public ProcessorEntity getEntity(String jmVersion, String pid) {
try {
TableOperation retrieveEntity = TableOperation.retrieve(jmVersion, pid, ProcessorEntity.class);
return table.execute(retrieveEntity).getResultAsType();
} catch (StorageException e) {
LOG.error("Azure storage exception while retrieving processor entity with job model version: " + jmVersion + "and pid: " + pid, e);
throw new AzureException(e);
}
}
/**
* Updates the liveness value of a particular processor with a randomly generated integer, which in turn updates the last modified since timestamp of the row.
* @param jmVersion Job model version of the processor row to be updated.
* @param pid Unique processor ID of the processor row to be updated.
*/
public void updateHeartbeat(String jmVersion, String pid) {
try {
TableOperation retrieveEntity = TableOperation.retrieve(jmVersion, pid, ProcessorEntity.class);
ProcessorEntity entity = table.execute(retrieveEntity).getResultAsType();
entity.updateLiveness();
TableOperation update = TableOperation.replace(entity);
table.execute(update);
} catch (StorageException e) {
LOG.error("Azure storage exception while updating heartbeat for job model version: " + jmVersion + "and pid: " + pid, e);
}
}
/**
* Updates the isLeader value when the processor starts or stops being a leader.
* @param jmVersion Job model version of the processor row to be updated.
* @param pid Unique processor ID of the processor row to be updated.
* @param isLeader Denotes whether the processor is a leader or not.
* @throws AzureException If an Azure storage service error occurred.
*/
public void updateIsLeader(String jmVersion, String pid, boolean isLeader) {
try {
TableOperation retrieveEntity = TableOperation.retrieve(jmVersion, pid, ProcessorEntity.class);
ProcessorEntity entity = table.execute(retrieveEntity).getResultAsType();
entity.setIsLeader(isLeader);
TableOperation update = TableOperation.replace(entity);
table.execute(update);
} catch (StorageException e) {
LOG.error("Azure storage exception while updating isLeader value for job model version: " + jmVersion + "and pid: " + pid, e);
throw new AzureException(e);
}
}
/**
* Deletes a specified row in the processor table.
*
* Note: Table service uses optimistic locking by default. Hence, if there is an update after retrieving the entity,
* then the delete operation will fail.
*
* @param jmVersion Job model version of the processor row to be deleted.
* @param pid Unique processor ID of the processor row to be deleted.
* @param force True, to disable optimistic locking on the table. False, otherwise. Setting to false may result in
* AzureException when there is concurrent access to the table.
*
* @throws AzureException If an Azure storage service error occurred.
*/
public void deleteProcessorEntity(String jmVersion, String pid, boolean force) {
try {
TableOperation retrieveEntity = TableOperation.retrieve(jmVersion, pid, ProcessorEntity.class);
ProcessorEntity entity = table.execute(retrieveEntity).getResultAsType();
if (force) {
entity.setEtag("*");
}
TableOperation remove = TableOperation.delete(entity);
table.execute(remove);
} catch (StorageException e) {
LOG.error("Azure storage exception while deleting processor entity with job model version: " + jmVersion + "and pid: " + pid, e);
throw new AzureException(e);
}
}
/**
* Deletes a specified row in the processor table.
*
* Note: Table service uses optimistic locking by default. In order to disable it, set the ETag on the ProcessorEntity
* to "*" before invoking this method.
*
* @param entity ProcessorEntity that has to be deleted
* @throws AzureException If an Azure storage service error occurred.
*/
public void deleteProcessorEntity(ProcessorEntity entity) {
try {
TableOperation remove = TableOperation.delete(entity);
table.execute(remove);
} catch (StorageException e) {
LOG.error("Azure storage exception while deleting processor entity with job model version: " +
entity.getJobModelVersion() + "and pid: " + entity.getProcessorId(), e);
throw new AzureException(e);
}
}
/**
* Retrieve all rows in a table with the given partition key.
* @param partitionKey Job model version of the processors to be retrieved.
* @return Iterable list of processor entities.
*/
public Iterable<ProcessorEntity> getEntitiesWithPartition(String partitionKey) {
String partitionFilter = TableQuery.generateFilterCondition(PARTITION_KEY, TableQuery.QueryComparisons.EQUAL, partitionKey);
TableQuery<ProcessorEntity> partitionQuery = TableQuery.from(ProcessorEntity.class).where(partitionFilter);
return table.execute(partitionQuery);
}
/**
* Gets the list of all active processors that are heartbeating to the processor table.
* @param currentJMVersion Current job model version that the processors in the application are operating on.
* @return List of ids of currently active processors in the application, retrieved from the processor table.
*/
public Set<String> getActiveProcessorsList(AtomicReference<String> currentJMVersion) {
Iterable<ProcessorEntity> tableList = getEntitiesWithPartition(currentJMVersion.get());
Set<String> activeProcessorsList = new HashSet<>();
for (ProcessorEntity entity: tableList) {
if (System.currentTimeMillis() - entity.getTimestamp().getTime() <= (LIVENESS_DEBOUNCE_TIME_SEC * 1000)) {
activeProcessorsList.add(entity.getRowKey());
}
}
Iterable<ProcessorEntity> unassignedList = getEntitiesWithPartition(initialState);
for (ProcessorEntity entity: unassignedList) {
long temp = System.currentTimeMillis() - entity.getTimestamp().getTime();
LOG.info("Time elapsed since last heartbeat: {}", temp);
if (temp <= (LIVENESS_DEBOUNCE_TIME_SEC * 1000)) {
activeProcessorsList.add(entity.getRowKey());
}
}
LOG.info("Active processors list: {}", activeProcessorsList);
return activeProcessorsList;
}
public CloudTable getTable() {
return table;
}
}