Hone logo
Hone
Problems

Implement shareReplay in Angular

The shareReplay operator is a powerful tool in RxJS for multicasting a single subscription to an Observable to multiple subscribers, while replaying a specified number of past emissions to new subscribers. This is incredibly useful for scenarios where you have expensive operations or data fetches that you don't want to repeat for every component that subscribes. Your challenge is to implement a custom version of this functionality within an Angular context.

Problem Description

You need to create a function, let's call it customShareReplay, that mimics the behavior of RxJS's shareReplay operator. This function will take an Observable as input and return a new Observable.

Key Requirements:

  1. Multicasting: All subscribers to the returned Observable should share a single subscription to the original source Observable.
  2. Replay: New subscribers should receive the last bufferSize emissions from the source Observable, even if they subscribe after the source has already emitted them.
  3. Ref Counting: The subscription to the source Observable should be established only when the first subscriber subscribes and should be torn down when the last subscriber unsubscribes.
  4. Error Handling: If the source Observable errors, all current subscribers should receive the error.
  5. Completion Handling: If the source Observable completes, all current subscribers should receive the completion notification.

Expected Behavior:

  • When the first subscriber subscribes, the source Observable is subscribed to.
  • Subsequent subscribers do not trigger a new subscription to the source.
  • If a subscriber subscribes after the source has emitted some values, they should immediately receive the last bufferSize values.
  • When all subscribers have unsubscribed, the subscription to the source is closed.

Edge Cases:

  • bufferSize of 0: No past emissions are replayed.
  • refCount is false (though for simplicity, we'll focus on ref counting behavior where it's true).
  • Subscribing and unsubscribing rapidly.

Examples

Let's assume we have a service that fetches data, and we want to share this data across multiple components without refetching.

Scenario: A service emits a sequence of numbers and completes. We want to share this and replay the last 2 numbers.

Example 1:

import { Observable, Subject, of } from 'rxjs';
import { take } from 'rxjs/operators';

// Assume customShareReplay is implemented

const source$ = of(1, 2, 3, 4, 5);
const shared$ = customShareReplay(source$, { bufferSize: 2 });

// Subscriber 1 subscribes
const sub1 = shared$.subscribe(val => console.log('Sub 1:', val));
// Expected Output (after some delay if source was async):
// Sub 1: 1
// Sub 1: 2
// Sub 1: 3
// Sub 1: 4
// Sub 1: 5

// Subscriber 2 subscribes shortly after
setTimeout(() => {
  const sub2 = shared$.subscribe(val => console.log('Sub 2:', val));
  // Expected Output:
  // Sub 2: 4  (replays last 2)
  // Sub 2: 5
  sub1.unsubscribe(); // Unsubscribe sub1
}, 100);

// Subscriber 3 subscribes even later
setTimeout(() => {
  const sub3 = shared$.subscribe(val => console.log('Sub 3:', val));
  // Expected Output:
  // Sub 3: 4  (replays last 2)
  // Sub 3: 5
}, 200);

// Expected completion log will appear once all subscriptions are closed.

Example 2:

import { Observable, Subject, interval } from 'rxjs';
import { take } from 'rxjs/operators';

// Assume customShareReplay is implemented

const source$ = interval(100).pipe(take(5)); // Emits 0, 1, 2, 3, 4 every 100ms
const shared$ = customShareReplay(source$, { bufferSize: 3 });

// Subscriber 1 subscribes
const sub1 = shared$.subscribe(val => console.log('Sub 1:', val));
// Expected Output:
// Sub 1: 0
// Sub 1: 1
// Sub 1: 2
// Sub 1: 3
// Sub 1: 4 (then completes)

// Subscriber 2 subscribes after a delay
setTimeout(() => {
  const sub2 = shared$.subscribe(val => console.log('Sub 2:', val));
  // Expected Output:
  // Sub 2: 2 (replays last 3)
  // Sub 2: 3
  // Sub 2: 4 (then completes)
  sub1.unsubscribe();
  sub2.unsubscribe();
}, 450); // After 450ms, values 0, 1, 2, 3, 4 have been emitted.

Constraints

  • bufferSize will be a non-negative integer.
  • The input source will always be a valid RxJS Observable.
  • The function should be implemented in TypeScript.
  • The implementation should be efficient and avoid unnecessary memory usage.
  • Focus on the core shareReplay logic with refCount: true.

Notes

  • You will need to manage the subscription lifecycle carefully.
  • Consider using a Subject or a similar mechanism to broadcast emissions to all current subscribers.
  • A mechanism to store past emissions will be necessary to fulfill the bufferSize requirement.
  • The refCount aspect is crucial for managing the connection to the source observable.
  • You can use existing RxJS operators within your implementation to help manage the stream, but the core shareReplay logic should be your own.
  • Think about how to handle the complete and error notifications from the source and propagate them to all subscribers.
Loading editor...
typescript