Appearance
@xcons/emitter
XCons Emitter - Promise Queue and Helper Tools Package with Concurrency Control
📦 Package Name:
@xcons/emitter
� Supported Environments: Browser, Node.js, Universal (UMD)
🚀 Installation
Installation with NPM
bash
# with npm
npm install @xcons/emitterUsage with CDN
html
<!-- UMD Bundle - Accessible globally under XConsEmitter namespace -->
<script src="https://unpkg.com/@xcons/emitter@latest/core.js"></script>
<!-- ES6 Module -->
<script type="module">
import { XQueue, xTimeout, XCancelable } from 'https://unpkg.com/@xcons/emitter@latest/core.js';
</script>📋 Key Features
- ✅ Promise Queue: Promise management with concurrency control
- ✅ Priority Queue: Execute tasks according to priority order
- ✅ Sequential Queue: Guaranteed sequential processing in FIFO order
- ✅ Cancelable Promises: Support for canceling running promises
- ✅ Timeout Helpers: Add timeout feature to any promise
- ✅ TypeScript Support: Full TypeScript type definitions
- ✅ Cross-Platform: Browser and Node.js compatibility
- ✅ Event System: EventEmitter integration for queue events
🎯 Quick Start
Basic Usage
typescript
import { XQueue, xTimeout, XCancelable } from '@xcons/emitter';
// Create queue with concurrency limit
const queue = new XQueue({ concurrency: 3 });
// Add task to queue
await queue.add(async () => {
const response = await fetch('https://api.example.com');
return response.json();
});
// Use timeout helper
const result = await xTimeout(fetchData(), { milliseconds: 5000 });
// Create cancelable promise
const cancelable = new XCancelable((resolve, reject, onCancel) => {
const timer = setTimeout(() => resolve('Completed!'), 1000);
onCancel(() => clearTimeout(timer));
});Advanced Example
typescript
import { XQueue, XPriorityQueue, xTimeout } from '@xcons/emitter';
// With priority queue
const priorityQueue = new XQueue({
concurrency: 2,
queueClass: XPriorityQueue,
timeout: 30000
});
// High priority task
await priorityQueue.add(
async () => await processImportantTask(),
{ priority: 10, id: 'important-task' }
);
// Low priority task
await priorityQueue.add(
async () => await processBackgroundTask(),
{ priority: 1, id: 'background-task' }
);
// Update task priority at runtime
priorityQueue.setPriority('background-task', 5);🔧 API Reference
XQueue - Main Queue Class
Main class that manages promises with concurrency control and event system.
Constructor Parameters
typescript
const queue = new XQueue({
concurrency: 5, // Maximum number of concurrent tasks
autoStart: true, // Auto-start processing
timeout: 30000, // Timeout per task (ms)
throwOnTimeout: false, // Throw error on timeout
intervalCap: 10, // Rate limiting - max tasks per time period
interval: 1000, // Rate limiting time period (ms)
carryoverConcurrencyCount: false, // Carry over concurrency count
queueClass: XPriorityQueue // Queue implementation
});Basic Methods
typescript
// Add task
await queue.add(async (options) => {
// options.signal - AbortSignal for cancellation
return await someAsyncOperation();
}, {
priority: 5, // Task priority (higher = runs first)
timeout: 10000, // Custom timeout for this task
id: 'unique-task-id' // Unique task identifier
});
// Add multiple tasks
const results = await queue.addAll([
() => task1(),
() => task2(),
() => task3()
], { priority: 10 });
// Queue control
queue.start(); // Start queue
queue.pause(); // Pause queue
queue.clear(); // Clear queue
// Status information
console.log(queue.stats); // { size, pending, isPaused, concurrency }
console.log('Queue size:', queue.size);
console.log('Pending tasks:', queue.pending);
console.log('Is paused:', queue.isPaused);Event System
typescript
// Listen to queue events
queue.on('add', () => console.log('New task added'));
queue.on('active', () => console.log('Task started'));
queue.on('completed', (result) => console.log('Task completed:', result));
queue.on('error', (error) => console.log('Task error:', error));
queue.on('idle', () => console.log('Queue emptied'));
queue.on('empty', () => console.log('Queue completely empty'));Promise-based Waiting
typescript
// Wait for queue to empty
await queue.onEmpty();
// Wait for queue to be smaller than specific size
await queue.onSizeLessThan(5);
// Wait for queue to completely empty and all tasks to complete
await queue.onIdle();XPriorityQueue - Priority Queue
Queue implementation that organizes tasks according to priority order.
typescript
import { XQueue, XPriorityQueue } from '@xcons/emitter';
// Priority queue usage
const priorityQueue = new XQueue({
queueClass: XPriorityQueue,
concurrency: 3
});
// High priority task (runs first)
await priorityQueue.add(
async () => await criticalTask(),
{ priority: 100 }
);
// Low priority task (runs later)
await priorityQueue.add(
async () => await backgroundTask(),
{ priority: 1 }
);
// Change priority at runtime
priorityQueue.setPriority('task-id', 50);XSimpleSequentialQueue - Simple Sequential Queue
Lightweight queue that processes tasks one by one in FIFO (First In, First Out) order.
typescript
import { XSimpleSequentialQueue } from '@xcons/emitter';
const sequentialQueue = new XSimpleSequentialQueue();
// Tasks will run sequentially
await sequentialQueue.add(() => task1());
await sequentialQueue.add(() => task2());
await sequentialQueue.add(() => task3());
// Add multiple tasks
const results = await sequentialQueue.addAll([
() => processData1(),
() => processData2(),
() => processData3()
]);
// Queue statistics
console.log(sequentialQueue.stats);
// { size: 0, isProcessing: false, totalProcessed: 15, totalFailed: 2 }
// Wait for queue to empty
await sequentialQueue.waitUntilEmpty();
// Clear pending tasks
sequentialQueue.clear(true); // true = reject pending tasksxTimeout - Timeout Helper
Helper function that adds timeout feature to any promise.
typescript
import { xTimeout, TimeoutError } from '@xcons/emitter';
// Basic timeout usage
try {
const result = await xTimeout(
fetch('https://api.example.com'),
{ milliseconds: 5000 }
);
} catch (error) {
if (error instanceof TimeoutError) {
console.log('Operation timed out');
}
}
// Advanced timeout options
const result = await xTimeout(promise, {
milliseconds: 10000, // 10 second timeout
message: 'Operation took too long', // Custom error message
fallback: () => 'Default value', // Value to return on timeout
signal: abortController.signal // Cancel with AbortSignal
});
// Manually clear timeout
const timeoutPromise = xTimeout(promise, { milliseconds: 5000 });
timeoutPromise.clear(); // Cancel timeoutXCancelable - Cancelable Promises
Custom promise implementation with cancelation and cleanup logic.
typescript
import { XCancelable, CancelError } from '@xcons/emitter';
// Create cancelable promise
const cancelablePromise = new XCancelable((resolve, reject, onCancel) => {
const controller = new AbortController();
const timer = setTimeout(() => resolve('Completed'), 5000);
// Cleanup code to run when canceled
onCancel(() => {
clearTimeout(timer);
controller.abort();
console.log('Promise canceled, resources cleaned up');
});
onCancel.shouldReject = false; // Don't reject promise on cancel
// Long-running operation
fetch('/api/data', { signal: controller.signal })
.then(response => response.json())
.then(resolve)
.catch(reject);
});
// Cancel promise
cancelablePromise.cancel('User canceled');
// Check cancellation status
if (cancelablePromise.isCanceled) {
console.log('Promise has been canceled');
}Function Wrapper
typescript
// Create cancelable function
const cancelableFetch = XCancelable.fn(async (url, onCancel) => {
const controller = new AbortController();
onCancel(() => controller.abort());
const response = await fetch(url, { signal: controller.signal });
return response.json();
});
// Usage
const promise = cancelableFetch('https://api.example.com');
promise.cancel(); // Request is canceled🎨 Use Case Scenarios
1. API Rate Limiting
typescript
import { XQueue } from '@xcons/emitter';
// Queue compliant with API rate limit
const apiQueue = new XQueue({
concurrency: 2, // Maximum 2 requests at the same time
intervalCap: 10, // Maximum 10 requests per second
interval: 1000, // 1 second interval
carryoverConcurrencyCount: false
});
// Add API calls to queue
const users = await apiQueue.addAll([
() => fetchUser(1),
() => fetchUser(2),
() => fetchUser(3),
() => fetchUser(4),
() => fetchUser(5)
]);2. File Processing Pipeline
typescript
import { XQueue, XPriorityQueue } from '@xcons/emitter';
const fileProcessingQueue = new XQueue({
concurrency: 4,
queueClass: XPriorityQueue,
timeout: 60000 // 1 minute timeout
});
// Critical files with high priority
await fileProcessingQueue.add(
() => processFile('important.pdf'),
{ priority: 100, id: 'important-file' }
);
// Normal files with low priority
await fileProcessingQueue.add(
() => processFile('regular.pdf'),
{ priority: 10, id: 'regular-file' }
);
// Monitor processing status
fileProcessingQueue.on('completed', (result) => {
console.log(`File processed: ${result.fileName}`);
});3. Reliable Processing with Retry Logic
typescript
import { XQueue, xTimeout } from '@xcons/emitter';
const retryQueue = new XQueue({ concurrency: 1 });
async function reliableTask(data) {
return retryQueue.add(async () => {
const maxRetries = 3;
let lastError;
for (let i = 0; i < maxRetries; i++) {
try {
return await xTimeout(
processData(data),
{ milliseconds: 30000 }
);
} catch (error) {
lastError = error;
if (i < maxRetries - 1) {
await new Promise(resolve => setTimeout(resolve, 1000 * (i + 1)));
}
}
}
throw lastError;
});
}4. Batch Processing System
typescript
import { XSimpleSequentialQueue } from '@xcons/emitter';
class BatchProcessor {
private queue = new XSimpleSequentialQueue();
private batch = [];
private batchSize = 10;
async add(item) {
this.batch.push(item);
if (this.batch.length >= this.batchSize) {
return this.flush();
}
}
async flush() {
if (this.batch.length === 0) return;
const currentBatch = [...this.batch];
this.batch = [];
return this.queue.add(async () => {
console.log(`Processing ${currentBatch.length} items...`);
return await processBatch(currentBatch);
});
}
}
const processor = new BatchProcessor();
await processor.add(data1);
await processor.add(data2);
// ... automatically processes when 10 items are collected5. Processing with Resource Management
typescript
import { XCancelable } from '@xcons/emitter';
class ResourceManager {
async withResource(callback) {
return new XCancelable((resolve, reject, onCancel) => {
const resource = this.acquireResource();
onCancel(() => {
console.log('Operation canceled, releasing resource');
this.releaseResource(resource);
});
callback(resource)
.then(resolve)
.catch(reject)
.finally(() => {
this.releaseResource(resource);
});
});
}
private acquireResource() {
console.log('Resource acquired');
return { id: Date.now() };
}
private releaseResource(resource) {
console.log('Resource released:', resource.id);
}
}
const manager = new ResourceManager();
const promise = manager.withResource(async (resource) => {
// Do work with resource
await longRunningOperation(resource);
});
// Cancel if needed
promise.cancel();⚠️ Error Types
The package provides custom error classes:
typescript
import { TimeoutError, CancelError, AbortError } from '@xcons/emitter';
try {
await someOperation();
} catch (error) {
if (error instanceof TimeoutError) {
console.log('Operation timed out');
} else if (error instanceof CancelError) {
console.log('Operation canceled:', error.isCanceled);
} else if (error instanceof AbortError) {
console.log('Operation aborted');
}
}🔧 TypeScript Support
The package provides full TypeScript support:
typescript
import {
XQueue,
XPriorityQueue,
XCancelable,
QueueAddOptions,
XTimeoutOptions
} from '@xcons/emitter';
// Type-safe queue
const typedQueue = new XQueue<XPriorityQueue, QueueAddOptions>({
queueClass: XPriorityQueue,
concurrency: 5
});
// Type-safe task
interface TaskResult {
success: boolean;
data: any;
}
const result: TaskResult = await typedQueue.add(
async (): Promise<TaskResult> => {
return { success: true, data: 'processed' };
}
);📊 Performance Tips
1. Concurrency Optimization
typescript
// For CPU-intensive tasks
const cpuQueue = new XQueue({ concurrency: navigator.hardwareConcurrency || 4 });
// For I/O tasks
const ioQueue = new XQueue({ concurrency: 10 });
// For network requests (with rate limiting)
const networkQueue = new XQueue({
concurrency: 5,
intervalCap: 100,
interval: 1000
});2. Memory Management
typescript
const queue = new XQueue({ concurrency: 3 });
// Clean up large results
queue.on('completed', (result) => {
// Clean up large data structures
if (result && typeof result.cleanup === 'function') {
result.cleanup();
}
});
// Cleanup when queue empties
queue.on('idle', () => {
// Global cleanup operations
performGarbageCollection();
});3. Error Recovery
typescript
const robustQueue = new XQueue({
concurrency: 2,
timeout: 30000,
throwOnTimeout: false
});
robustQueue.on('error', (error, task) => {
console.error('Task error:', error);
// Re-queue critical errors
if (isCriticalError(error)) {
robustQueue.add(task, { priority: 100 });
}
});� Browser and Node.js Compatibility
In Browser Environment
html
<!DOCTYPE html>
<html>
<head>
<script src="https://unpkg.com/@xcons/emitter@latest/core.js"></script>
</head>
<body>
<script>
// Access via global namespace
const { XQueue, xTimeout, XCancelable } = XConsEmitter;
const queue = new XQueue({ concurrency: 3 });
// Usage with Web Worker
queue.add(async () => {
return new Promise(resolve => {
const worker = new Worker('worker.js');
worker.onmessage = e => resolve(e.data);
worker.postMessage('process');
});
});
</script>
</body>
</html>In Node.js Environment
javascript
const { XQueue, xTimeout } = require('@xcons/emitter');
const fs = require('fs').promises;
const fileQueue = new XQueue({ concurrency: 5 });
// File processing
async function processFiles(filePaths) {
return fileQueue.addAll(
filePaths.map(path => async () => {
const content = await fs.readFile(path, 'utf8');
return processFileContent(content);
})
);
}📦 Package Information
Access to package information at runtime:
typescript
import { VERSION, BUILD_TIME, PACKAGE_INFO } from '@xcons/emitter';
console.log('Package Version:', VERSION); // "2.0.1"
console.log('Build Time:', BUILD_TIME); // ISO timestamp
console.log('Package Info:', PACKAGE_INFO); // All package metadata🔍 Debug and Development
Logging in Development Mode
typescript
// Add event listeners for debug
if (process.env.NODE_ENV === 'development') {
queue.on('add', () => console.log('📄 Task added'));
queue.on('active', () => console.log('🚀 Task started'));
queue.on('completed', (result) => console.log('✅ Task completed:', result));
queue.on('error', (error) => console.log('❌ Task error:', error));
queue.on('idle', () => console.log('💤 Queue empty'));
}Performance Monitoring
typescript
class MonitoredQueue extends XQueue {
constructor(options) {
super(options);
this.stats = {
totalTasks: 0,
completedTasks: 0,
failedTasks: 0,
totalTime: 0
};
this.on('add', () => this.stats.totalTasks++);
this.on('completed', (result, executionTime) => {
this.stats.completedTasks++;
this.stats.totalTime += executionTime;
});
this.on('error', () => this.stats.failedTasks++);
}
getPerformanceReport() {
return {
...this.stats,
averageTime: this.stats.totalTime / this.stats.completedTasks,
successRate: this.stats.completedTasks / this.stats.totalTasks
};
}
}💡 Tip: XCons Emitter is designed for high-performance asynchronous operation management. It can be safely used in production environments by offering enterprise features such as rate limiting, priority management, and resource control.