Skip to content

@xcons/emitter

XCons Emitter - Promise Queue and Helper Tools Package with Concurrency Control

📦 Package Name: @xcons/emitter
Supported Environments: Browser, Node.js, Universal (UMD)

🚀 Installation

Installation with NPM

bash
# with npm
npm install @xcons/emitter

Usage with CDN

html
<!-- UMD Bundle - Accessible globally under XConsEmitter namespace -->
<script src="https://unpkg.com/@xcons/emitter@latest/core.js"></script>

<!-- ES6 Module -->
<script type="module">
  import { XQueue, xTimeout, XCancelable } from 'https://unpkg.com/@xcons/emitter@latest/core.js';
</script>

📋 Key Features

  • ✅ Promise Queue: Promise management with concurrency control
  • ✅ Priority Queue: Execute tasks according to priority order
  • ✅ Sequential Queue: Guaranteed sequential processing in FIFO order
  • ✅ Cancelable Promises: Support for canceling running promises
  • ✅ Timeout Helpers: Add timeout feature to any promise
  • ✅ TypeScript Support: Full TypeScript type definitions
  • ✅ Cross-Platform: Browser and Node.js compatibility
  • ✅ Event System: EventEmitter integration for queue events

🎯 Quick Start

Basic Usage

typescript
import { XQueue, xTimeout, XCancelable } from '@xcons/emitter';

// Create queue with concurrency limit
const queue = new XQueue({ concurrency: 3 });

// Add task to queue
await queue.add(async () => {
  const response = await fetch('https://api.example.com');
  return response.json();
});

// Use timeout helper
const result = await xTimeout(fetchData(), { milliseconds: 5000 });

// Create cancelable promise
const cancelable = new XCancelable((resolve, reject, onCancel) => {
  const timer = setTimeout(() => resolve('Completed!'), 1000);
  onCancel(() => clearTimeout(timer));
});

Advanced Example

typescript
import { XQueue, XPriorityQueue, xTimeout } from '@xcons/emitter';

// With priority queue
const priorityQueue = new XQueue({
  concurrency: 2,
  queueClass: XPriorityQueue,
  timeout: 30000
});

// High priority task
await priorityQueue.add(
  async () => await processImportantTask(),
  { priority: 10, id: 'important-task' }
);

// Low priority task
await priorityQueue.add(
  async () => await processBackgroundTask(),
  { priority: 1, id: 'background-task' }
);

// Update task priority at runtime
priorityQueue.setPriority('background-task', 5);

🔧 API Reference

XQueue - Main Queue Class

Main class that manages promises with concurrency control and event system.

Constructor Parameters

typescript
const queue = new XQueue({
  concurrency: 5,           // Maximum number of concurrent tasks
  autoStart: true,          // Auto-start processing
  timeout: 30000,           // Timeout per task (ms)
  throwOnTimeout: false,    // Throw error on timeout
  intervalCap: 10,          // Rate limiting - max tasks per time period
  interval: 1000,           // Rate limiting time period (ms)
  carryoverConcurrencyCount: false, // Carry over concurrency count
  queueClass: XPriorityQueue // Queue implementation
});

Basic Methods

typescript
// Add task
await queue.add(async (options) => {
  // options.signal - AbortSignal for cancellation
  return await someAsyncOperation();
}, {
  priority: 5,              // Task priority (higher = runs first)
  timeout: 10000,           // Custom timeout for this task
  id: 'unique-task-id'      // Unique task identifier
});

// Add multiple tasks
const results = await queue.addAll([
  () => task1(),
  () => task2(),
  () => task3()
], { priority: 10 });

// Queue control
queue.start();              // Start queue
queue.pause();              // Pause queue
queue.clear();              // Clear queue

// Status information
console.log(queue.stats);   // { size, pending, isPaused, concurrency }
console.log('Queue size:', queue.size);
console.log('Pending tasks:', queue.pending);
console.log('Is paused:', queue.isPaused);

Event System

typescript
// Listen to queue events
queue.on('add', () => console.log('New task added'));
queue.on('active', () => console.log('Task started'));
queue.on('completed', (result) => console.log('Task completed:', result));
queue.on('error', (error) => console.log('Task error:', error));
queue.on('idle', () => console.log('Queue emptied'));
queue.on('empty', () => console.log('Queue completely empty'));

Promise-based Waiting

typescript
// Wait for queue to empty
await queue.onEmpty();

// Wait for queue to be smaller than specific size
await queue.onSizeLessThan(5);

// Wait for queue to completely empty and all tasks to complete
await queue.onIdle();

XPriorityQueue - Priority Queue

Queue implementation that organizes tasks according to priority order.

typescript
import { XQueue, XPriorityQueue } from '@xcons/emitter';

// Priority queue usage
const priorityQueue = new XQueue({
  queueClass: XPriorityQueue,
  concurrency: 3
});

// High priority task (runs first)
await priorityQueue.add(
  async () => await criticalTask(),
  { priority: 100 }
);

// Low priority task (runs later)
await priorityQueue.add(
  async () => await backgroundTask(),
  { priority: 1 }
);

// Change priority at runtime
priorityQueue.setPriority('task-id', 50);

XSimpleSequentialQueue - Simple Sequential Queue

Lightweight queue that processes tasks one by one in FIFO (First In, First Out) order.

typescript
import { XSimpleSequentialQueue } from '@xcons/emitter';

const sequentialQueue = new XSimpleSequentialQueue();

// Tasks will run sequentially
await sequentialQueue.add(() => task1());
await sequentialQueue.add(() => task2());
await sequentialQueue.add(() => task3());

// Add multiple tasks
const results = await sequentialQueue.addAll([
  () => processData1(),
  () => processData2(),
  () => processData3()
]);

// Queue statistics
console.log(sequentialQueue.stats);
// { size: 0, isProcessing: false, totalProcessed: 15, totalFailed: 2 }

// Wait for queue to empty
await sequentialQueue.waitUntilEmpty();

// Clear pending tasks
sequentialQueue.clear(true); // true = reject pending tasks

xTimeout - Timeout Helper

Helper function that adds timeout feature to any promise.

typescript
import { xTimeout, TimeoutError } from '@xcons/emitter';

// Basic timeout usage
try {
  const result = await xTimeout(
    fetch('https://api.example.com'),
    { milliseconds: 5000 }
  );
} catch (error) {
  if (error instanceof TimeoutError) {
    console.log('Operation timed out');
  }
}

// Advanced timeout options
const result = await xTimeout(promise, {
  milliseconds: 10000,                    // 10 second timeout
  message: 'Operation took too long',     // Custom error message
  fallback: () => 'Default value',        // Value to return on timeout
  signal: abortController.signal          // Cancel with AbortSignal
});

// Manually clear timeout
const timeoutPromise = xTimeout(promise, { milliseconds: 5000 });
timeoutPromise.clear(); // Cancel timeout

XCancelable - Cancelable Promises

Custom promise implementation with cancelation and cleanup logic.

typescript
import { XCancelable, CancelError } from '@xcons/emitter';

// Create cancelable promise
const cancelablePromise = new XCancelable((resolve, reject, onCancel) => {
  const controller = new AbortController();
  const timer = setTimeout(() => resolve('Completed'), 5000);
  
  // Cleanup code to run when canceled
  onCancel(() => {
    clearTimeout(timer);
    controller.abort();
    console.log('Promise canceled, resources cleaned up');
  });
  
  onCancel.shouldReject = false; // Don't reject promise on cancel
  
  // Long-running operation
  fetch('/api/data', { signal: controller.signal })
    .then(response => response.json())
    .then(resolve)
    .catch(reject);
});

// Cancel promise
cancelablePromise.cancel('User canceled');

// Check cancellation status
if (cancelablePromise.isCanceled) {
  console.log('Promise has been canceled');
}

Function Wrapper

typescript
// Create cancelable function
const cancelableFetch = XCancelable.fn(async (url, onCancel) => {
  const controller = new AbortController();
  onCancel(() => controller.abort());
  
  const response = await fetch(url, { signal: controller.signal });
  return response.json();
});

// Usage
const promise = cancelableFetch('https://api.example.com');
promise.cancel(); // Request is canceled

🎨 Use Case Scenarios

1. API Rate Limiting

typescript
import { XQueue } from '@xcons/emitter';

// Queue compliant with API rate limit
const apiQueue = new XQueue({
  concurrency: 2,        // Maximum 2 requests at the same time
  intervalCap: 10,       // Maximum 10 requests per second
  interval: 1000,        // 1 second interval
  carryoverConcurrencyCount: false
});

// Add API calls to queue
const users = await apiQueue.addAll([
  () => fetchUser(1),
  () => fetchUser(2),
  () => fetchUser(3),
  () => fetchUser(4),
  () => fetchUser(5)
]);

2. File Processing Pipeline

typescript
import { XQueue, XPriorityQueue } from '@xcons/emitter';

const fileProcessingQueue = new XQueue({
  concurrency: 4,
  queueClass: XPriorityQueue,
  timeout: 60000 // 1 minute timeout
});

// Critical files with high priority
await fileProcessingQueue.add(
  () => processFile('important.pdf'),
  { priority: 100, id: 'important-file' }
);

// Normal files with low priority
await fileProcessingQueue.add(
  () => processFile('regular.pdf'),
  { priority: 10, id: 'regular-file' }
);

// Monitor processing status
fileProcessingQueue.on('completed', (result) => {
  console.log(`File processed: ${result.fileName}`);
});

3. Reliable Processing with Retry Logic

typescript
import { XQueue, xTimeout } from '@xcons/emitter';

const retryQueue = new XQueue({ concurrency: 1 });

async function reliableTask(data) {
  return retryQueue.add(async () => {
    const maxRetries = 3;
    let lastError;
    
    for (let i = 0; i < maxRetries; i++) {
      try {
        return await xTimeout(
          processData(data),
          { milliseconds: 30000 }
        );
      } catch (error) {
        lastError = error;
        if (i < maxRetries - 1) {
          await new Promise(resolve => setTimeout(resolve, 1000 * (i + 1)));
        }
      }
    }
    
    throw lastError;
  });
}

4. Batch Processing System

typescript
import { XSimpleSequentialQueue } from '@xcons/emitter';

class BatchProcessor {
  private queue = new XSimpleSequentialQueue();
  private batch = [];
  private batchSize = 10;
  
  async add(item) {
    this.batch.push(item);
    
    if (this.batch.length >= this.batchSize) {
      return this.flush();
    }
  }
  
  async flush() {
    if (this.batch.length === 0) return;
    
    const currentBatch = [...this.batch];
    this.batch = [];
    
    return this.queue.add(async () => {
      console.log(`Processing ${currentBatch.length} items...`);
      return await processBatch(currentBatch);
    });
  }
}

const processor = new BatchProcessor();
await processor.add(data1);
await processor.add(data2);
// ... automatically processes when 10 items are collected

5. Processing with Resource Management

typescript
import { XCancelable } from '@xcons/emitter';

class ResourceManager {
  async withResource(callback) {
    return new XCancelable((resolve, reject, onCancel) => {
      const resource = this.acquireResource();
      
      onCancel(() => {
        console.log('Operation canceled, releasing resource');
        this.releaseResource(resource);
      });
      
      callback(resource)
        .then(resolve)
        .catch(reject)
        .finally(() => {
          this.releaseResource(resource);
        });
    });
  }
  
  private acquireResource() {
    console.log('Resource acquired');
    return { id: Date.now() };
  }
  
  private releaseResource(resource) {
    console.log('Resource released:', resource.id);
  }
}

const manager = new ResourceManager();
const promise = manager.withResource(async (resource) => {
  // Do work with resource
  await longRunningOperation(resource);
});

// Cancel if needed
promise.cancel();

⚠️ Error Types

The package provides custom error classes:

typescript
import { TimeoutError, CancelError, AbortError } from '@xcons/emitter';

try {
  await someOperation();
} catch (error) {
  if (error instanceof TimeoutError) {
    console.log('Operation timed out');
  } else if (error instanceof CancelError) {
    console.log('Operation canceled:', error.isCanceled);
  } else if (error instanceof AbortError) {
    console.log('Operation aborted');
  }
}

🔧 TypeScript Support

The package provides full TypeScript support:

typescript
import { 
  XQueue, 
  XPriorityQueue,
  XCancelable,
  QueueAddOptions,
  XTimeoutOptions
} from '@xcons/emitter';

// Type-safe queue
const typedQueue = new XQueue<XPriorityQueue, QueueAddOptions>({
  queueClass: XPriorityQueue,
  concurrency: 5
});

// Type-safe task
interface TaskResult {
  success: boolean;
  data: any;
}

const result: TaskResult = await typedQueue.add(
  async (): Promise<TaskResult> => {
    return { success: true, data: 'processed' };
  }
);

📊 Performance Tips

1. Concurrency Optimization

typescript
// For CPU-intensive tasks
const cpuQueue = new XQueue({ concurrency: navigator.hardwareConcurrency || 4 });

// For I/O tasks
const ioQueue = new XQueue({ concurrency: 10 });

// For network requests (with rate limiting)
const networkQueue = new XQueue({ 
  concurrency: 5,
  intervalCap: 100,
  interval: 1000 
});

2. Memory Management

typescript
const queue = new XQueue({ concurrency: 3 });

// Clean up large results
queue.on('completed', (result) => {
  // Clean up large data structures
  if (result && typeof result.cleanup === 'function') {
    result.cleanup();
  }
});

// Cleanup when queue empties
queue.on('idle', () => {
  // Global cleanup operations
  performGarbageCollection();
});

3. Error Recovery

typescript
const robustQueue = new XQueue({ 
  concurrency: 2,
  timeout: 30000,
  throwOnTimeout: false
});

robustQueue.on('error', (error, task) => {
  console.error('Task error:', error);
  
  // Re-queue critical errors
  if (isCriticalError(error)) {
    robustQueue.add(task, { priority: 100 });
  }
});

� Browser and Node.js Compatibility

In Browser Environment

html
<!DOCTYPE html>
<html>
<head>
  <script src="https://unpkg.com/@xcons/emitter@latest/core.js"></script>
</head>
<body>
  <script>
    // Access via global namespace
    const { XQueue, xTimeout, XCancelable } = XConsEmitter;
    
    const queue = new XQueue({ concurrency: 3 });
    
    // Usage with Web Worker
    queue.add(async () => {
      return new Promise(resolve => {
        const worker = new Worker('worker.js');
        worker.onmessage = e => resolve(e.data);
        worker.postMessage('process');
      });
    });
  </script>
</body>
</html>

In Node.js Environment

javascript
const { XQueue, xTimeout } = require('@xcons/emitter');
const fs = require('fs').promises;

const fileQueue = new XQueue({ concurrency: 5 });

// File processing
async function processFiles(filePaths) {
  return fileQueue.addAll(
    filePaths.map(path => async () => {
      const content = await fs.readFile(path, 'utf8');
      return processFileContent(content);
    })
  );
}

📦 Package Information

Access to package information at runtime:

typescript
import { VERSION, BUILD_TIME, PACKAGE_INFO } from '@xcons/emitter';

console.log('Package Version:', VERSION);        // "2.0.1"
console.log('Build Time:', BUILD_TIME);          // ISO timestamp
console.log('Package Info:', PACKAGE_INFO);      // All package metadata

🔍 Debug and Development

Logging in Development Mode

typescript
// Add event listeners for debug
if (process.env.NODE_ENV === 'development') {
  queue.on('add', () => console.log('📄 Task added'));
  queue.on('active', () => console.log('🚀 Task started'));
  queue.on('completed', (result) => console.log('✅ Task completed:', result));
  queue.on('error', (error) => console.log('❌ Task error:', error));
  queue.on('idle', () => console.log('💤 Queue empty'));
}

Performance Monitoring

typescript
class MonitoredQueue extends XQueue {
  constructor(options) {
    super(options);
    
    this.stats = {
      totalTasks: 0,
      completedTasks: 0,
      failedTasks: 0,
      totalTime: 0
    };
    
    this.on('add', () => this.stats.totalTasks++);
    this.on('completed', (result, executionTime) => {
      this.stats.completedTasks++;
      this.stats.totalTime += executionTime;
    });
    this.on('error', () => this.stats.failedTasks++);
  }
  
  getPerformanceReport() {
    return {
      ...this.stats,
      averageTime: this.stats.totalTime / this.stats.completedTasks,
      successRate: this.stats.completedTasks / this.stats.totalTasks
    };
  }
}

💡 Tip: XCons Emitter is designed for high-performance asynchronous operation management. It can be safely used in production environments by offering enterprise features such as rate limiting, priority management, and resource control.