Hone logo
Hone
Problems

Implementing a Custom mergeMap Operator in Angular

This challenge asks you to implement your own version of the mergeMap operator, a fundamental tool in RxJS for handling observable streams. Understanding how mergeMap works is crucial for building complex asynchronous logic in Angular applications, allowing you to transform an outer observable into an inner observable and then flatten the resulting observables into a single output stream.

Problem Description

Your task is to create a function, let's call it customMergeMap, that mimics the behavior of RxJS's mergeMap operator. This function will take an observable as input and a projection function. The projection function will receive each value emitted by the input observable and should return an observable. customMergeMap should then subscribe to each observable returned by the projection function and merge their emissions into a single output observable.

Key Requirements:

  1. Input: The customMergeMap function will accept two arguments:
    • source$: An RxJS Observable.
    • projectFn: A function that takes a value emitted by source$ and returns an RxJS Observable.
  2. Output: The customMergeMap function should return a new RxJS Observable.
  3. Behavior:
    • When source$ emits a value, the projectFn is called with that value.
    • The observable returned by projectFn is subscribed to.
    • All values emitted by these "inner" observables should be emitted by the output observable.
    • The order of emissions from the inner observables does not matter relative to each other; they are merged as they arrive.
    • If source$ completes, the customMergeMap should wait for all active inner observables to complete before completing the output observable.
    • If any inner observable errors, the output observable should error immediately with that error.
  4. Concurrency: Unlike concatMap, mergeMap allows multiple inner observables to be active concurrently.

Edge Cases:

  • Handling empty source observables.
  • Handling source observables that emit null or undefined (though the projection function might not be called in these cases, depending on strictness).
  • Handling inner observables that complete immediately.
  • Handling inner observables that error.

Examples

Example 1:

import { of, Observable } from 'rxjs';
import { delay } from 'rxjs/operators';

// Assume customMergeMap is defined elsewhere
// const customMergeMap = (source$: Observable<any>, projectFn: (value: any) => Observable<any>) => { ... };

const sourceObservable = of(1, 2, 3);

const project = (value: number) => {
  return of(`Processed: ${value}`).pipe(delay(Math.random() * 100)); // Simulate async operation
};

const resultObservable = customMergeMap(sourceObservable, project);

resultObservable.subscribe({
  next: (value) => console.log(value),
  error: (err) => console.error('Error:', err),
  complete: () => console.log('Completed')
});

Expected Output (order may vary due to delay):

Processed: 1
Processed: 2
Processed: 3
Completed

Explanation:

The sourceObservable emits 1, 2, and 3. For each emission, project is called. project(1) returns an observable that emits Processed: 1 after a delay. Similarly for 2 and 3. customMergeMap subscribes to all these inner observables concurrently. As their values are emitted (with potential different delays), they are merged into the resultObservable. Once all inner observables complete, the resultObservable completes.

Example 2:

import { of, throwError, Observable } from 'rxjs';
import { delay } from 'rxjs/operators';

// Assume customMergeMap is defined elsewhere

const sourceObservable = of(1, 'error_trigger', 3);

const project = (value: number | string) => {
  if (value === 'error_trigger') {
    return throwError(() => new Error('Simulated error'));
  }
  return of(`Data: ${value}`).pipe(delay(50));
};

const resultObservable = customMergeMap(sourceObservable, project);

resultObservable.subscribe({
  next: (value) => console.log(value),
  error: (err) => console.error('Error:', err),
  complete: () => console.log('Completed')
});

Expected Output:

Data: 1
Error: Simulated error

Explanation:

The sourceObservable emits 1 and then 'error_trigger'. When 'error_trigger' is processed, the project function returns an observable that immediately throws an error. customMergeMap will then propagate this error to its subscriber and stop processing further emissions. The emission of 3 from the source observable will not be processed.

Constraints

  • Your customMergeMap function should be implemented using only fundamental RxJS concepts and standard TypeScript features. You should not rely on existing RxJS operators like mergeMap, switchMap, concatMap, exhaustMap, etc., for the core logic of customMergeMap itself. You can use operators like pipe, subscribe, next, error, complete from RxJS.
  • The implementation should be efficient and handle a reasonable number of concurrent inner observables without significant performance degradation.
  • The input observables can emit any type of value.
  • The projection function can return any valid RxJS Observable.

Notes

  • Consider how you will manage multiple subscriptions to inner observables and how to properly handle their completion and errors.
  • Think about the state management required within your customMergeMap function to track active inner observables and emitted values.
  • This challenge is a great way to deepen your understanding of how RxJS operators work under the hood, particularly those involving flattening strategies.
  • You might find the Subscription class and its methods useful for managing multiple inner subscriptions.
  • The complete notification for the output observable should only be sent after all inner observables have completed.
Loading editor...
typescript