Implement combineLatest in Angular
RxJS's combineLatest operator is a powerful tool for reacting to changes across multiple observable streams. In Angular applications, it's frequently used to combine data from different sources, such as user input, API responses, or route parameters, to derive a single, unified state. This challenge asks you to recreate the core functionality of combineLatest within an Angular component, demonstrating your understanding of RxJS observables and their integration.
Problem Description
Your task is to create an Angular component that simulates the behavior of RxJS's combineLatest. This component will take an array of RxJS Observables as input and emit a new value whenever any of the input observables emit a new value. The emitted value will be an array containing the latest value from each of the input observables, in the same order they were provided.
Key Requirements:
- Initialization: The
combineLatestfunctionality should only start emitting values after all input observables have emitted at least one value. - Emission on Change: When any of the input observables emit a new value, the combined observable should immediately emit a new array containing the latest values from all input observables.
- Order Preservation: The emitted array must maintain the order of the latest values corresponding to the order of the input observables.
- Completion: The combined observable should complete when all of its source observables complete. If any source observable errors, the combined observable should also error.
- No External RxJS
combineLatest: You must not use the built-in RxJScombineLatestoperator directly in your implementation. You should build the logic yourself.
Expected Behavior:
- If you have three input observables:
obsA,obsB, andobsC. - Initially, no output will be emitted.
- Once
obsAemitsa1,obsBemitsb1, andobsCemitsc1: the output will be[a1, b1, c1]. - If
obsAthen emitsa2: the output will be[a2, b1, c1]. - If
obsBthen emitsb2: the output will be[a2, b2, c1]. - And so on.
Edge Cases:
- Empty Input Array: If an empty array of observables is provided, the resulting observable should complete immediately.
- Error Handling: If any of the source observables throw an error, the combined observable should propagate that error.
- Completion Handling: If a source observable completes, it should no longer contribute to new emissions, but the
combineLatestshould continue as long as other observables are active. It should only complete when all source observables have completed.
Examples
Example 1:
Input Observables:
obs1 = BehaviorSubject(1)
obs2 = BehaviorSubject('a')
Initial State:
obs1 emits 1
obs2 emits 'a'
Output after both have emitted: [1, 'a']
Next Emission:
obs1 emits 2
Output: [2, 'a']
Next Emission:
obs2 emits 'b'
Output: [2, 'b']
Example 2:
Input Observables:
obs1 = new Subject<number>()
obs2 = new Subject<string>()
obs3 = new Subject<boolean>()
Initial State: (No emissions yet)
First Emission Set:
obs1 emits 10
obs2 emits 'hello'
obs3 emits true
Output: [10, 'hello', true]
Next Emission:
obs1 emits 20
Output: [20, 'hello', true]
Next Emission:
obs3 emits false
Output: [20, 'hello', false]
Next Emission:
obs2 emits 'world'
Output: [20, 'world', false]
Example 3 (Completion):
Input Observables:
obs1 = of(1, 2, 3) // Completes after emitting 3
obs2 = new BehaviorSubject('x')
Initial State:
obs1 emits 1
obs2 emits 'x'
Output: [1, 'x']
Next Emission:
obs1 emits 2
Output: [2, 'x']
Next Emission:
obs1 emits 3 (obs1 completes)
Output: [3, 'x']
Next Emission:
obs2 emits 'y'
Output: [3, 'y']
Now, obs1 is complete. obs2 will continue to emit.
If obs2 completes next, the combined observable will also complete.
Constraints
- The implementation must be written in TypeScript.
- The solution should be structured as an Angular component.
- You are encouraged to use RxJS
Observable,Subscription,Subject, andBehaviorSubjectfor building your internal logic. - Performance: The solution should be reasonably efficient, avoiding unnecessary computations or subscriptions. The subscription management should be robust.
Notes
- Consider how you will store the latest values from each observable.
- Think about the state management required to know when all observables have emitted their first value.
- Pay close attention to how you handle subscriptions and unsubscriptions to prevent memory leaks.
- You might need to use internal
SubjectorBehaviorSubjectto emit the combined values from your customcombineLatestlogic. - This challenge focuses on the core logic of combining latest values. You do not need to worry about complex scheduling or throttling, but robust error and completion handling are essential.