blob: 0af83aec1cc4d957dcf99ad572809d5f7f96fffa [file] [log] [blame]
/*global zip, saveAs*/
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
import Ember from 'ember';
import DS from 'ember-data';
zip.workerScriptsPath = "assets/zip/";
var IO = {
/* Allow queuing of downloads and then get a callback once all the downloads are done.
* sample usage.
* var downloader = IO.fileDownloader();
* downloader.queueItem({
* url: 'http://....',
* onItemFetched: function(data, context) {...},
* context: {}, // context object gets passed back to the callback
* });
* downloader.queueItem({...}); //queue in other items
* downloader.finish(); // once all items are queued. items can be queued from
* // callbacks too. in that case the finish should be called
* // once all items are queued.
* downloader.then(successCallback).catch(failurecallback).finally(callback)
*/
fileDownloader: function(options) {
var itemList = [],
opts = options || {},
numParallel = opts.numParallel || 5,
hasMoreInputs = true,
inProgress = 0,
hasFailed = false,
pendingRequests = {},
pendingRequestID = 0,
failureReason = 'Unknown',
deferredPromise = Ember.RSVP.defer();
function checkForCompletion() {
if (hasFailed) {
if (inProgress === 0) {
deferredPromise.reject("Unknown Error");
}
return;
}
if (hasMoreInputs || itemList.length > 0 || inProgress > 0) {
return;
}
deferredPromise.resolve();
}
function getRequestId() {
return "req_" + pendingRequestID++;
}
function abortPendingRequests() {
Ember.$.each(pendingRequests, function(idx, val) {
try {
val.abort("abort");
} catch(e) {}
});
}
function markFailed(reason) {
if (!hasFailed) {
hasFailed = true;
failureReason = reason;
abortPendingRequests();
}
}
function processNext() {
if (inProgress >= numParallel) {
Ember.Logger.debug(`delaying download as ${inProgress} of ${numParallel} is in progress`);
return;
}
if (itemList.length < 1) {
Ember.Logger.debug("no items to download");
checkForCompletion();
return;
}
inProgress++;
Ember.Logger.debug(`starting download ${inProgress}`);
var item = itemList.shift();
var xhr = Ember.$.ajax({
crossOrigin: true,
url: item.url,
dataType: 'json',
xhrFields: {
withCredentials: true
},
});
var reqID = getRequestId();
pendingRequests[reqID] = xhr;
if(item.onFetch) {
item.onFetch(item.context);
}
xhr.done(function(data/*, statusText, xhr*/) {
delete pendingRequests[reqID];
if (Ember.$.isFunction(item.onItemFetched)) {
try {
item.onItemFetched(data, item.context);
} catch (e) {
markFailed(e || 'failed to process data');
inProgress--;
checkForCompletion();
return;
}
}
inProgress--;
processNext();
}).fail(function(xhr, statusText/*, errorObject*/) {
delete pendingRequests[reqID];
inProgress--;
if(item.retryCount) {
itemList.unshift(item);
item.retryCount--;
if(item.onRetry) {
item.onRetry(item.context);
}
Ember.run.later(processNext, 3000 + Math.random() * 3000);
}
else if(item.onItemFail) {
item.onItemFail(xhr, item.context);
processNext();
}
else {
markFailed(statusText);
checkForCompletion();
}
});
}
return DS.PromiseObject.create({
promise: deferredPromise.promise,
queueItems: function(options) {
options.forEach(this.queueItem);
},
queueItem: function(option) {
itemList.push(option);
processNext();
},
finish: function() {
hasMoreInputs = false;
checkForCompletion();
},
cancel: function() {
markFailed("User cancelled");
checkForCompletion();
}
});
},
/*
* allows to zip files and download that.
* usage:
* zipHelper = IO.zipHelper({
* onProgress: function(filename, current, total) { ...},
* onAdd: function(filename) {...}
* });
* zipHelper.addFile({name: filenameinsidezip, data: data);
* // add all files
* once all files are added call the close
* zipHelper.close(); // or .abort to abort zip
* zipHelper.then(function(zippedBlob) {
* saveAs(filename, zippedBlob);
* }).catch(failureCallback);
*/
zipHelper: function(options) {
var opts = options || {},
zipWriter,
completion = Ember.RSVP.defer(),
fileList = [],
completed = 0,
currentIdx = -1,
numFiles = 0,
hasMoreInputs = true,
inProgress = false,
hasFailed = false;
zip.createWriter(new zip.BlobWriter("application/zip"), function(writer) {
zipWriter = writer;
checkForCompletion();
nextFile();
});
function checkForCompletion() {
if (hasFailed) {
if (zipWriter) {
Ember.Logger.debug("aborting zipping. closing file.");
zipWriter.close(completion.reject);
zipWriter = null;
}
} else {
if (!hasMoreInputs && numFiles === completed) {
Ember.Logger.debug("completed zipping. closing file.");
zipWriter.close(completion.resolve);
}
}
}
function onProgress(current, total) {
if (Ember.$.isFunction(opts.onProgress)) {
opts.onProgress(fileList[currentIdx].name, current, total);
}
}
function onAdd(filename) {
if (Ember.$.isFunction(opts.onAdd)) {
opts.onAdd(filename);
}
}
function nextFile() {
if (hasFailed || completed === numFiles || inProgress) {
return;
}
currentIdx++;
var file = fileList[currentIdx];
inProgress = true;
onAdd(file.name);
zipWriter.add(file.name, new zip.TextReader(file.data), function() {
completed++;
inProgress = false;
if (currentIdx < numFiles - 1) {
nextFile();
}
checkForCompletion();
}, onProgress);
}
return DS.PromiseObject.create({
addFiles: function(files) {
files.forEach(this.addFile);
},
addFile: function(file) {
if (hasFailed) {
Ember.Logger.debug(`Skipping add of file ${file.name} as zip has been aborted`);
return;
}
numFiles++;
fileList.push(file);
if (zipWriter) {
Ember.Logger.debug("adding file from addFile: " + file.name);
nextFile();
}
},
close: function() {
hasMoreInputs = false;
checkForCompletion();
},
promise: completion.promise,
abort: function() {
hasFailed = true;
this.close();
}
});
}
};
export default function downloadDagZip(dag, options) {
var opts = options || {},
batchSize = opts.batchSize || 1000,
dagID = dag.get("entityID"),
baseurl = `${options.timelineHost}/${options.timelineNamespace}`,
itemsToDownload = [
{
url: getUrl('TEZ_APPLICATION', 'tez_' + dag.get("appID")),
context: { name: 'application', type: 'TEZ_APPLICATION' },
onFetch: onFetch,
onRetry: onRetry,
onItemFetched: processSingleItem,
onItemFail: processFailure,
retryCount: 3,
},
{
url: getUrl('TEZ_DAG_EXTRA_INFO', dagID),
context: { name: 'dag-extra-info', type: 'TEZ_DAG_EXTRA_INFO' },
onFetch: onFetch,
onRetry: onRetry,
onItemFetched: processSingleItem,
onItemFail: processFailure,
retryCount: 3,
},
{
url: getUrl('TEZ_DAG_ID', dagID),
context: { name: 'dag', type: 'TEZ_DAG_ID' },
onFetch: onFetch,
onRetry: onRetry,
onItemFetched: processSingleItem,
onItemFail: processFailure,
retryCount: 3,
},
{
url: getUrl('TEZ_VERTEX_ID', null, dagID),
context: { name: 'vertices', type: 'TEZ_VERTEX_ID', part: 0 },
onFetch: onFetch,
onRetry: onRetry,
onItemFetched: processMultipleItems,
onItemFail: processFailure,
retryCount: 3,
},
{
url: getUrl('TEZ_TASK_ID', null, dagID),
context: { name: 'tasks', type: 'TEZ_TASK_ID', part: 0 },
onFetch: onFetch,
onRetry: onRetry,
onItemFetched: processMultipleItems,
onItemFail: processFailure,
retryCount: 3,
},
{
url: getUrl('TEZ_TASK_ATTEMPT_ID', null, dagID),
context: { name: 'task_attempts', type: 'TEZ_TASK_ATTEMPT_ID', part: 0 },
onFetch: onFetch,
onRetry: onRetry,
onItemFetched: processMultipleItems,
onItemFail: processFailure,
retryCount: 3,
}
];
let callerID = options.callerInfo.id,
entityType = options.callerInfo.type;
if(callerID && entityType) {
itemsToDownload.push({
url: getUrl(entityType, callerID),
context: { name: entityType.toLocaleLowerCase(), type: entityType },
onItemFetched: processSingleItem,
onItemFail: checkIfAllDownloaded
});
}
var totalItemsToDownload = itemsToDownload.length,
numItemTypesToDownload = totalItemsToDownload,
downloader = IO.fileDownloader(),
zipHelper = IO.zipHelper({
onProgress: function(filename, current, total) {
Ember.Logger.debug(`${filename}: ${current} of ${total}`);
},
onAdd: function(filename) {
Ember.Logger.debug(`adding ${filename} to Zip`);
}
}),
downloaderProxy = Ember.Object.create({
percent: 0,
message: "",
succeeded: false,
partial: false,
failed: false,
cancel: function() {
downloader.cancel();
}
});
function getUrl(type, id, dagID, fromID) {
var url,
queryBatchSize = batchSize + 1;
if (id) {
url = `${baseurl}/${type}/${id}`;
} else {
url = `${baseurl}/${type}?primaryFilter=TEZ_DAG_ID:${dagID}&limit=${queryBatchSize}`;
if (!!fromID) {
url = `${url}&fromId=${fromID}`;
}
}
return url;
}
function checkIfAllDownloaded() {
numItemTypesToDownload--;
var remainingItems = totalItemsToDownload - numItemTypesToDownload;
downloaderProxy.set("percent", remainingItems / totalItemsToDownload);
if (numItemTypesToDownload === 0) {
downloader.finish();
}
}
function onFetch(context) {
downloaderProxy.set("message", `Fetching ${context.name} data.`);
}
function onRetry(context) {
downloaderProxy.set("message", `Downloading ${context.name} data failed. Retrying!`);
}
function processFailure(data, context) {
var obj = {};
try {
obj[context.name] = JSON.parse(data.responseText);
}
catch(e) {
obj[context.name] = data.responseText;
}
downloaderProxy.set("partial", true);
downloaderProxy.set("message", `Downloading ${context.name} data failed!`);
zipHelper.addFile({name: `error.${context.name}.json`, data: JSON.stringify(obj, null, 2)});
checkIfAllDownloaded();
}
function processSingleItem(data, context) {
var obj = {};
obj[context.name] = data;
zipHelper.addFile({name: `${context.name}.json`, data: JSON.stringify(obj, null, 2)});
checkIfAllDownloaded();
}
function processMultipleItems(data, context) {
var obj = {};
var nextBatchStart;
if (!Ember.$.isArray(data.entities)) {
throw "invalid data";
}
// need to handle no more entries , zero entries
if (data.entities.length > batchSize) {
nextBatchStart = data.entities.pop().entity;
}
obj[context.name] = data.entities;
zipHelper.addFile({name: `${context.name}_part_${context.part}.json`, data: JSON.stringify(obj, null, 2)});
if (!!nextBatchStart) {
context.part++;
downloader.queueItem({
url: getUrl(context.type, null, dagID, nextBatchStart),
context: context,
onItemFetched: processMultipleItems
});
} else {
checkIfAllDownloaded();
}
}
downloader.queueItems(itemsToDownload);
downloader.then(function() {
Ember.Logger.info('Finished download');
zipHelper.close();
}).catch(function(e) {
Ember.Logger.error('Failed to download: ' + e);
zipHelper.abort();
});
zipHelper.then(function(zippedBlob) {
saveAs(zippedBlob, `${dagID}.zip`);
downloaderProxy.set("message", `Download complete.`);
downloaderProxy.set("succeeded", true);
}, function() {
Ember.Logger.error('zip Failed');
downloaderProxy.set("failed", true);
});
return downloaderProxy;
}