Implement a Custom exhaustMap Operator in Angular
This challenge focuses on understanding and implementing a custom version of the exhaustMap RxJS operator within an Angular application. Mastering exhaustMap is crucial for managing asynchronous operations in a way that prevents race conditions and ensures that only the latest request, initiated by an event, is processed at any given time.
Problem Description
Your task is to create a custom RxJS operator that mimics the behavior of the built-in exhaustMap operator. This custom operator will be used within an Angular service to handle incoming user requests for data.
When an observable emits a value, your custom exhaustMap should:
- Start a new inner observable based on the emitted value.
- While an inner observable is active (i.e., has not completed or errored), ignore any new emissions from the source observable.
- Once the active inner observable completes or errors, then and only then, subscribe to the next emission from the source observable.
- The custom operator should be designed to be reusable and integrate seamlessly into an Angular application, likely within a service that deals with HTTP requests.
Key Requirements:
- Implement a function that returns an RxJS
OperatorFunction. - The operator should accept a
projectfunction as an argument. This function takes the source observable's value and returns an observable (the inner observable). - The operator must correctly handle the "exhaust" logic: ignore subsequent source emissions while an inner observable is running.
- The operator should emit values from the inner observable.
- The operator should propagate errors from both the source observable and the inner observables.
- The operator should complete when the source observable completes and all active inner observables have also completed.
Expected Behavior:
Imagine a scenario where a user rapidly clicks a button that triggers an API call. exhaustMap ensures that only the first click's API call is initiated. Subsequent clicks, while the first call is in progress, are ignored. Once the first call finishes, the next click (if any) would trigger a new call.
Edge Cases:
- Source observable completes immediately: The operator should complete without errors.
- Inner observable errors: The operator should propagate the error from the inner observable.
- Source observable errors: The operator should propagate the error from the source observable.
projectfunction returns an empty observable: This should be treated as a completed inner observable.- Rapid succession of source emissions: Verify that only one inner observable is active at a time.
Examples
Example 1: Basic Usage
import { of, interval } from 'rxjs';
import { take, tap } from 'rxjs/operators';
import { customExhaustMap } from './custom-exhaust-map'; // Assuming your operator is here
// Simulate a source observable emitting values with a delay
const source$ = interval(500).pipe(take(4), tap(val => console.log(`Source emitted: ${val}`)));
// Project function that creates an inner observable
const projectFn = (value: number) => {
console.log(`Starting inner observable for: ${value}`);
return of(`Result for ${value}`).pipe(
tap(result => console.log(`Inner observable for ${value} emitted: ${result}`))
);
};
source$.pipe(
customExhaustMap(projectFn)
).subscribe({
next: value => console.log(`Output: ${value}`),
error: err => console.error(`Error: ${err}`),
complete: () => console.log('Completed')
});
// Expected Console Output (order might vary slightly due to async nature):
// Source emitted: 0
// Starting inner observable for: 0
// Inner observable for 0 emitted: Result for 0
// Output: Result for 0
// Source emitted: 1 (ignored because inner for 0 is still active)
// Source emitted: 2 (ignored because inner for 0 is still active)
// Source emitted: 3 (ignored because inner for 0 is still active)
// ... (after inner for 0 completes) ...
// Source emitted: 1 (this emission would have been processed IF source emitted again AFTER inner completed, which it doesn't in this take(4))
// ... wait for inner to complete ...
// Completed
Example 2: Rapid Emissions and Completion
import { of, timer } from 'rxjs';
import { tap } from 'rxjs/operators';
import { customExhaustMap } from './custom-exhaust-map';
// Source observable emitting rapidly
const source$ = of(1, 2, 3, 4, 5).pipe(tap(val => console.log(`Source emitted: ${val}`)));
// Inner observable with a delay
const projectFn = (value: number) => {
console.log(`Starting inner for ${value}`);
return timer(1000).pipe(
tap(() => console.log(`Inner for ${value} completed`)),
map(() => `Processed: ${value}`)
);
};
source$.pipe(
customExhaustMap(projectFn)
).subscribe({
next: value => console.log(`Output: ${value}`),
error: err => console.error(`Error: ${err}`),
complete: () => console.log('Completed')
});
// Expected Console Output:
// Source emitted: 1
// Starting inner for 1
// Source emitted: 2 (ignored)
// Source emitted: 3 (ignored)
// Source emitted: 4 (ignored)
// Source emitted: 5 (ignored)
// ... after 1 second ...
// Inner for 1 completed
// Output: Processed: 1
// Completed (since source has no more emissions)
Example 3: Error Handling
import { of, throwError, timer } from 'rxjs';
import { tap, map } from 'rxjs/operators';
import { customExhaustMap } from './custom-exhaust-map';
const source$ = of(1, 2, 3).pipe(tap(val => console.log(`Source emitted: ${val}`)));
const projectFn = (value: number) => {
if (value === 2) {
console.log(`Inner for ${value} will throw error`);
return throwError(() => new Error(`Error in inner for ${value}`));
}
console.log(`Starting inner for ${value}`);
return timer(500).pipe(
tap(() => console.log(`Inner for ${value} completed`)),
map(() => `Processed: ${value}`)
);
};
source$.pipe(
customExhaustMap(projectFn)
).subscribe({
next: value => console.log(`Output: ${value}`),
error: err => console.error(`Error: ${err.message}`),
complete: () => console.log('Completed')
});
// Expected Console Output:
// Source emitted: 1
// Starting inner for 1
// Source emitted: 2 (ignored)
// Source emitted: 3 (ignored)
// ... after 500ms ...
// Inner for 1 completed
// Output: Processed: 1
// Source emitted: 2 (this would have been processed if source emitted again after inner completed, but it doesn't)
// Starting inner for 2 (This won't happen in this exact sequence as the source has no more emissions after 3, and inner for 1 completed)
// Let's re-evaluate for a scenario where source emits more:
// Corrected Example 3 scenario for error propagation:
const sourceWithError$ = of(1, 2, 3, 4).pipe(tap(val => console.log(`Source emitted: ${val}`)));
const projectFnWithError = (value: number) => {
if (value === 2) {
console.log(`Inner for ${value} will throw error`);
return throwError(() => new Error(`Error in inner for ${value}`));
}
console.log(`Starting inner for ${value}`);
return timer(500).pipe(
tap(() => console.log(`Inner for ${value} completed`)),
map(() => `Processed: ${value}`)
);
};
sourceWithError$.pipe(
customExhaustMap(projectFnWithError)
).subscribe({
next: value => console.log(`Output: ${value}`),
error: err => console.error(`Error: ${err.message}`),
complete: () => console.log('Completed')
});
// Expected Console Output for Corrected Example 3:
// Source emitted: 1
// Starting inner for 1
// Source emitted: 2 (ignored)
// Source emitted: 3 (ignored)
// Source emitted: 4 (ignored)
// ... after 500ms ...
// Inner for 1 completed
// Output: Processed: 1
// Source emitted: 2 (this is now processed as inner for 1 completed)
// Inner for 2 will throw error
// Error: Error in inner for 2
Constraints
- The custom
exhaustMapoperator must be implemented using RxJS primitives and operators available in@rxjs/operators. - The implementation should aim for efficiency and avoid unnecessary subscriptions or complex state management.
- The solution should be written in TypeScript.
- You should not use the built-in
exhaustMapoperator in your implementation.
Notes
- Think about the internal state management required to track whether an inner observable is currently active.
- Consider how to handle the completion and error signals from both the source and inner observables.
- This exercise is a great way to deepen your understanding of how RxJS operators work internally and how to build your own.
- You'll likely need to use operators like
switchMapormergeMapinternally as building blocks, or manage subscriptions manually usingObservable'ssubscribemethod andSubscriptionobjects. However, the goal is to achieveexhaustMap's behavior.