blob: 5715cb906d42fa2478a65e7fc5337925c2c32087 [file]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import {
executeConcurrent,
executeBatched,
chunkArray,
createSemaphore,
} from '../../src/utils/ConcurrentExecutor';
describe('ConcurrentExecutor', () => {
describe('executeConcurrent', () => {
it('should execute operations concurrently', async () => {
const items = [1, 2, 3, 4, 5];
const results: number[] = [];
const result = await executeConcurrent(
items,
async (item) => {
results.push(item);
await new Promise((resolve) => setTimeout(resolve, 10));
return item * 2;
},
{ concurrency: 3 }
);
expect(result.results).toEqual([2, 4, 6, 8, 10]);
expect(result.successCount).toBe(5);
expect(result.failureCount).toBe(0);
expect(result.errors).toHaveLength(0);
});
it('should handle empty items array', async () => {
const result = await executeConcurrent([], async () => 1);
expect(result.results).toEqual([]);
expect(result.successCount).toBe(0);
expect(result.failureCount).toBe(0);
expect(result.durationMs).toBe(0);
});
it('should handle errors without stopping', async () => {
const items = [1, 2, 3, 4, 5];
const result = await executeConcurrent(
items,
async (item) => {
if (item === 3) {
throw new Error('Test error');
}
return item * 2;
},
{ concurrency: 2 }
);
expect(result.successCount).toBe(4);
expect(result.failureCount).toBe(1);
expect(result.errors).toHaveLength(1);
expect(result.errors[0].index).toBe(2);
expect(result.errors[0].error.message).toBe('Test error');
// Verify that failed operations leave undefined in results array
expect(result.results[2]).toBeUndefined();
// Successful operations have values
expect(result.results[0]).toBe(2);
expect(result.results[1]).toBe(4);
expect(result.results[3]).toBe(8);
expect(result.results[4]).toBe(10);
});
it('should stop on error when configured', async () => {
const items = [1, 2, 3, 4, 5];
let processedCount = 0;
const result = await executeConcurrent(
items,
async (item) => {
processedCount++;
if (item === 2) {
throw new Error('Stop error');
}
await new Promise((resolve) => setTimeout(resolve, 10));
return item;
},
{ concurrency: 1, stopOnError: true }
);
expect(result.failureCount).toBe(1);
// With concurrency 1, it should stop processing after error
expect(processedCount).toBeLessThanOrEqual(items.length);
});
it('should limit concurrency', async () => {
const maxConcurrent = { current: 0, max: 0 };
const items = Array.from({ length: 10 }, (_, i) => i);
await executeConcurrent(
items,
async () => {
maxConcurrent.current++;
maxConcurrent.max = Math.max(maxConcurrent.max, maxConcurrent.current);
await new Promise((resolve) => setTimeout(resolve, 20));
maxConcurrent.current--;
return true;
},
{ concurrency: 3 }
);
expect(maxConcurrent.max).toBeLessThanOrEqual(3);
});
it('should preserve result order', async () => {
const items = [100, 50, 150, 25, 75];
const result = await executeConcurrent(
items,
async (item) => {
await new Promise((resolve) => setTimeout(resolve, item / 10));
return item;
},
{ concurrency: 5 }
);
// Results should be in same order as input, regardless of completion order
expect(result.results).toEqual([100, 50, 150, 25, 75]);
});
});
describe('executeBatched', () => {
it('should execute in batches', async () => {
const items = Array.from({ length: 10 }, (_, i) => i);
const batchBoundaries: number[] = [];
const result = await executeBatched(
items,
3,
async (item, index) => {
if (index % 3 === 0) {
batchBoundaries.push(index);
}
return item * 2;
},
{ concurrency: 3 }
);
expect(result.successCount).toBe(10);
expect(result.results).toHaveLength(10);
});
it('should handle empty items', async () => {
const result = await executeBatched([], 5, async () => 1);
expect(result.results).toEqual([]);
expect(result.successCount).toBe(0);
});
it('should aggregate errors from all batches', async () => {
const items = Array.from({ length: 10 }, (_, i) => i);
const result = await executeBatched(
items,
5,
async (item) => {
if (item === 2 || item === 7) {
throw new Error(`Error at ${item}`);
}
return item;
},
{ concurrency: 2 }
);
expect(result.failureCount).toBe(2);
expect(result.errors).toHaveLength(2);
});
});
describe('chunkArray', () => {
it('should chunk array into equal parts', () => {
const array = [1, 2, 3, 4, 5, 6];
const chunks = chunkArray(array, 2);
expect(chunks).toEqual([[1, 2], [3, 4], [5, 6]]);
});
it('should handle uneven chunks', () => {
const array = [1, 2, 3, 4, 5];
const chunks = chunkArray(array, 2);
expect(chunks).toEqual([[1, 2], [3, 4], [5]]);
});
it('should handle chunk size larger than array', () => {
const array = [1, 2, 3];
const chunks = chunkArray(array, 10);
expect(chunks).toEqual([[1, 2, 3]]);
});
it('should handle empty array', () => {
const chunks = chunkArray([], 5);
expect(chunks).toEqual([]);
});
it('should handle chunk size of 1', () => {
const array = [1, 2, 3];
const chunks = chunkArray(array, 1);
expect(chunks).toEqual([[1], [2], [3]]);
});
});
describe('createSemaphore', () => {
it('should limit concurrent operations', async () => {
const sem = createSemaphore(2);
const running: number[] = [];
const maxRunning = { value: 0 };
const tasks = Array.from({ length: 5 }, (_, i) =>
(async () => {
await sem.acquire();
running.push(i);
maxRunning.value = Math.max(maxRunning.value, running.length);
await new Promise((resolve) => setTimeout(resolve, 20));
running.splice(running.indexOf(i), 1);
sem.release();
})()
);
await Promise.all(tasks);
expect(maxRunning.value).toBeLessThanOrEqual(2);
});
it('should track available and waiting counts', async () => {
const sem = createSemaphore(2);
expect(sem.available).toBe(2);
expect(sem.waiting).toBe(0);
await sem.acquire();
expect(sem.available).toBe(1);
await sem.acquire();
expect(sem.available).toBe(0);
// Start a third acquire that will wait
const waitingPromise = sem.acquire();
// Give it a tick to queue up
await new Promise((resolve) => setTimeout(resolve, 1));
expect(sem.waiting).toBe(1);
sem.release();
await waitingPromise;
expect(sem.waiting).toBe(0);
});
it('should handle immediate acquire when available', async () => {
const sem = createSemaphore(3);
const startTime = Date.now();
await sem.acquire();
await sem.acquire();
const elapsed = Date.now() - startTime;
expect(elapsed).toBeLessThan(10); // Should be nearly instant
expect(sem.available).toBe(1);
});
});
});