Skip to main content
RxJS Observables

Module Overview

Estimated Time: 4-5 hours | Difficulty: Advanced | Prerequisites: Module 8
RxJS is the foundation of Angular’s async patterns. This module covers advanced operators, subjects, error handling strategies, and real-world patterns for complex data flows. What You’ll Learn:
  • Observable creation and subscription
  • Transformation, filtering, and combination operators
  • Subjects and multicasting
  • Error handling and recovery
  • Testing observables
  • Performance optimization

Observable Fundamentals

┌─────────────────────────────────────────────────────────────────────────┐
│                    Observable Stream                                     │
├─────────────────────────────────────────────────────────────────────────┤
│                                                                          │
│   Observable: A lazy push collection of multiple values over time       │
│                                                                          │
│   ──────(1)────(2)────(3)────(4)────|──────────────────────────▶        │
│           │      │      │      │     │                                   │
│         emit   emit   emit   emit  complete                              │
│                                                                          │
│   Observer: Consumes values delivered by Observable                     │
│   ┌─────────────────────────────────────────────────────────────────┐   │
│   │  {                                                               │   │
│   │    next: (value) => { ... },     // Handle each value           │   │
│   │    error: (err) => { ... },      // Handle errors               │   │
│   │    complete: () => { ... }       // Handle completion            │   │
│   │  }                                                               │   │
│   └─────────────────────────────────────────────────────────────────┘   │
│                                                                          │
│   Subscription: Represents the execution of an Observable               │
│   • Call unsubscribe() to cancel and free resources                     │
│                                                                          │
└─────────────────────────────────────────────────────────────────────────┘

Creating Observables

import { 
  Observable, of, from, fromEvent, interval, timer,
  range, defer, generate, EMPTY, NEVER, throwError
} from 'rxjs';

// Create from scratch
const custom$ = new Observable<number>(subscriber => {
  subscriber.next(1);
  subscriber.next(2);
  subscriber.next(3);
  
  // Async values
  setTimeout(() => {
    subscriber.next(4);
    subscriber.complete();
  }, 1000);
  
  // Cleanup function
  return () => {
    console.log('Cleanup on unsubscribe');
  };
});

// From values
const values$ = of(1, 2, 3);  // Emits 1, 2, 3 then completes

// From array, promise, iterable
const array$ = from([1, 2, 3]);
const promise$ = from(fetch('/api/data').then(r => r.json()));
const iterable$ = from('hello');  // Emits 'h', 'e', 'l', 'l', 'o'

// From DOM events
const clicks$ = fromEvent(document, 'click');
const input$ = fromEvent<KeyboardEvent>(inputElement, 'keyup');

// Time-based
const every1s$ = interval(1000);        // Emits 0, 1, 2... every second
const after2s$ = timer(2000);           // Emits 0 after 2 seconds
const after2sThen1s$ = timer(2000, 1000); // Emit after 2s, then every 1s

// Lazy creation (new Observable per subscriber)
const lazy$ = defer(() => {
  console.log('Created at subscribe time');
  return of(Date.now());
});

// Error observable
const error$ = throwError(() => new Error('Something failed'));

// Special observables
const empty$ = EMPTY;   // Completes immediately
const never$ = NEVER;   // Never emits or completes

Transformation Operators

import { 
  map, pluck, mapTo, scan, reduce, buffer, bufferTime,
  concatMap, mergeMap, switchMap, exhaustMap
} from 'rxjs/operators';

// map - transform each value
source$.pipe(
  map(x => x * 2)
);

// scan - accumulate values (like reduce, but emits each step)
clicks$.pipe(
  scan((count) => count + 1, 0)
);  // Emits: 1, 2, 3, 4...

// reduce - accumulate all values, emit once at complete
source$.pipe(
  reduce((acc, val) => acc + val, 0)
);

// buffer - collect values into arrays
clicks$.pipe(
  bufferTime(1000)  // Emit array of clicks every second
);

// Higher-order mapping (Observable of Observables)
searchTerm$.pipe(
  // switchMap: Cancel previous, subscribe to new
  switchMap(term => this.http.get(`/api/search?q=${term}`))
);

userId$.pipe(
  // concatMap: Queue requests, process sequentially
  concatMap(id => this.http.get(`/api/users/${id}`))
);

logEvents$.pipe(
  // mergeMap: All requests in parallel
  mergeMap(event => this.http.post('/api/logs', event))
);

submitButton$.pipe(
  // exhaustMap: Ignore new while processing
  exhaustMap(() => this.http.post('/api/submit', data))
);

Flattening Operators Comparison

┌─────────────────────────────────────────────────────────────────────────┐
│              Higher-Order Mapping Operators                              │
├─────────────────────────────────────────────────────────────────────────┤
│                                                                          │
│   Source:  ──A──────B──────C─────────────|                              │
│                                                                          │
│   switchMap (cancel previous):                                          │
│   ──A──────────B──────────C──────result|                                │
│     └──(cancelled)                                                       │
│            └──(cancelled)                                                │
│                   └──────────────|                                       │
│                                                                          │
│   concatMap (sequential queue):                                         │
│   ──A──────result──B──result──C──result|                                │
│     └──────|       └──|       └──|                                       │
│                                                                          │
│   mergeMap (parallel):                                                   │
│   ──A────result                                                          │
│     └──|  B──result                                                      │
│           └|  C──result                                                  │
│               └|                                                         │
│                                                                          │
│   exhaustMap (ignore during):                                           │
│   ──A──────result─────────C──result|                                    │
│     └──────|  (B ignored)  └──|                                         │
│                                                                          │
└─────────────────────────────────────────────────────────────────────────┘

Filtering Operators

import {
  filter, take, takeLast, takeUntil, takeWhile,
  skip, skipUntil, skipWhile, first, last,
  distinct, distinctUntilChanged, distinctUntilKeyChanged,
  debounceTime, throttleTime, auditTime, sampleTime
} from 'rxjs/operators';

// filter - emit values that pass predicate
numbers$.pipe(
  filter(n => n % 2 === 0)  // Only even numbers
);

// take/skip
source$.pipe(take(5));       // First 5 values
source$.pipe(skip(2));       // Skip first 2
source$.pipe(takeLast(3));   // Last 3 values
source$.pipe(first());       // First value, then complete
source$.pipe(last());        // Wait for complete, emit last

// takeUntil - complete when another observable emits
data$.pipe(
  takeUntil(this.destroy$)   // Stop when component destroys
);

// Distinct values
source$.pipe(
  distinctUntilChanged()     // Only emit when value changes
);

users$.pipe(
  distinctUntilKeyChanged('id')  // Only when user.id changes
);

// Time-based filtering
searchInput$.pipe(
  debounceTime(300)   // Wait 300ms of silence before emitting
);

scroll$.pipe(
  throttleTime(100)   // Emit at most every 100ms
);

Debounce vs Throttle

┌─────────────────────────────────────────────────────────────────────────┐
│              Debounce vs Throttle                                        │
├─────────────────────────────────────────────────────────────────────────┤
│                                                                          │
│   Input:     ──a─b─c─────d─e─f─g─────h───|                              │
│                                                                          │
│   debounceTime(300ms):                                                   │
│              ────────c─────────────g─────h|                              │
│              (wait for pause)                                            │
│                                                                          │
│   throttleTime(300ms):                                                   │
│              ──a───────d───────g───────h─|                              │
│              (emit first, then wait)                                     │
│                                                                          │
│   Use Cases:                                                             │
│   • debounce: Search input, resize events                               │
│   • throttle: Scroll events, mouse move                                 │
│                                                                          │
└─────────────────────────────────────────────────────────────────────────┘

Combination Operators

import {
  merge, concat, combineLatest, forkJoin, zip,
  withLatestFrom, startWith, pairwise, race
} from 'rxjs';

// merge - combine multiple streams, emit all
const allClicks$ = merge(
  fromEvent(button1, 'click'),
  fromEvent(button2, 'click'),
  fromEvent(button3, 'click')
);

// concat - subscribe sequentially
const sequential$ = concat(
  of(1, 2),
  of(3, 4),
  of(5, 6)
);  // Emits: 1, 2, 3, 4, 5, 6

// combineLatest - emit when any emits, with latest from all
const combined$ = combineLatest([
  this.user$,
  this.settings$,
  this.permissions$
]).pipe(
  map(([user, settings, permissions]) => ({
    user,
    settings,
    permissions
  }))
);

// forkJoin - wait for all to complete, emit last values
const allData$ = forkJoin({
  users: this.http.get<User[]>('/api/users'),
  products: this.http.get<Product[]>('/api/products'),
  orders: this.http.get<Order[]>('/api/orders')
});

// zip - pair values by index
const paired$ = zip(
  letters$,    // a, b, c
  numbers$     // 1, 2, 3
);  // [a,1], [b,2], [c,3]

// withLatestFrom - combine with latest from another
save$.pipe(
  withLatestFrom(this.form.valueChanges),
  map(([_, formValue]) => formValue)
);

// startWith - emit initial value first
search$.pipe(
  startWith(''),  // Start with empty search
  switchMap(term => this.search(term))
);

// pairwise - emit previous and current
position$.pipe(
  pairwise(),
  map(([prev, curr]) => ({
    dx: curr.x - prev.x,
    dy: curr.y - prev.y
  }))
);

Subjects

Subjects are both Observable and Observer:
import { Subject, BehaviorSubject, ReplaySubject, AsyncSubject } from 'rxjs';

// Subject - no initial value, late subscribers miss past emissions
const subject = new Subject<string>();
subject.next('Hello');  // No subscribers yet, missed
subject.subscribe(v => console.log('A:', v));
subject.next('World');  // A: World
subject.subscribe(v => console.log('B:', v));
subject.next('!');      // A: !, B: !

// BehaviorSubject - has initial value, new subscribers get latest
const behavior = new BehaviorSubject<number>(0);
console.log(behavior.value);  // 0
behavior.subscribe(v => console.log('A:', v));  // A: 0
behavior.next(1);  // A: 1
behavior.subscribe(v => console.log('B:', v));  // B: 1
behavior.next(2);  // A: 2, B: 2

// ReplaySubject - replay N values to new subscribers
const replay = new ReplaySubject<number>(3);  // Buffer last 3
replay.next(1);
replay.next(2);
replay.next(3);
replay.next(4);
replay.subscribe(v => console.log(v));  // 2, 3, 4

// AsyncSubject - only emit last value on complete
const async = new AsyncSubject<number>();
async.subscribe(v => console.log(v));
async.next(1);
async.next(2);
async.next(3);
async.complete();  // Logs: 3

State Management with BehaviorSubject

@Injectable({ providedIn: 'root' })
export class StateService {
  private state = new BehaviorSubject<AppState>({
    user: null,
    theme: 'light',
    notifications: []
  });
  
  // Expose as observable (read-only)
  readonly state$ = this.state.asObservable();
  
  // Selectors
  readonly user$ = this.state$.pipe(
    map(state => state.user),
    distinctUntilChanged()
  );
  
  readonly theme$ = this.state$.pipe(
    map(state => state.theme),
    distinctUntilChanged()
  );
  
  // Current value (synchronous)
  get snapshot() {
    return this.state.value;
  }
  
  // Actions
  setUser(user: User | null) {
    this.state.next({
      ...this.state.value,
      user
    });
  }
  
  toggleTheme() {
    this.state.next({
      ...this.state.value,
      theme: this.state.value.theme === 'light' ? 'dark' : 'light'
    });
  }
  
  addNotification(notification: Notification) {
    this.state.next({
      ...this.state.value,
      notifications: [...this.state.value.notifications, notification]
    });
  }
}

Error Handling

import { 
  catchError, retry, retryWhen, throwError,
  EMPTY, of
} from 'rxjs';

// Catch and replace with fallback
data$.pipe(
  catchError(error => {
    console.error('Error:', error);
    return of([]);  // Return empty array as fallback
  })
);

// Catch and rethrow
data$.pipe(
  catchError(error => {
    this.logger.error(error);
    return throwError(() => new Error('Custom error message'));
  })
);

// Catch and complete (swallow error)
data$.pipe(
  catchError(() => EMPTY)
);

// Retry immediately
data$.pipe(
  retry(3)  // Retry up to 3 times
);

// Retry with delay (exponential backoff)
data$.pipe(
  retryWhen(errors =>
    errors.pipe(
      scan((retryCount, error) => {
        if (retryCount >= 3) {
          throw error;
        }
        return retryCount + 1;
      }, 0),
      delayWhen(retryCount => 
        timer(Math.pow(2, retryCount) * 1000)  // 1s, 2s, 4s
      )
    )
  )
);

// Modern retry with config
import { retry } from 'rxjs';

data$.pipe(
  retry({
    count: 3,
    delay: (error, retryCount) => {
      console.log(`Retry ${retryCount} after error:`, error);
      return timer(retryCount * 1000);
    },
    resetOnSuccess: true
  })
);

Multicasting

Share a single subscription among multiple subscribers:
import { share, shareReplay, publish, refCount } from 'rxjs';

// Without sharing (each subscriber = new HTTP request)
const data$ = this.http.get('/api/data');
data$.subscribe();  // HTTP request 1
data$.subscribe();  // HTTP request 2

// With sharing (single HTTP request)
const sharedData$ = this.http.get('/api/data').pipe(
  share()
);
sharedData$.subscribe();  // HTTP request
sharedData$.subscribe();  // Uses same request

// shareReplay - cache and replay for late subscribers
const cachedData$ = this.http.get('/api/data').pipe(
  shareReplay(1)  // Cache last value
);

// First subscriber triggers HTTP
cachedData$.subscribe();

// Later subscriber gets cached value
setTimeout(() => {
  cachedData$.subscribe();  // No new HTTP request
}, 5000);

Custom Operators

import { OperatorFunction, pipe } from 'rxjs';
import { filter, map, tap } from 'rxjs/operators';

// Simple custom operator
function double(): OperatorFunction<number, number> {
  return source$ => source$.pipe(
    map(value => value * 2)
  );
}

// Custom operator with parameters
function filterByProperty<T, K extends keyof T>(
  key: K,
  value: T[K]
): OperatorFunction<T, T> {
  return source$ => source$.pipe(
    filter(item => item[key] === value)
  );
}

// Composed operator
function debugLog<T>(tag: string): OperatorFunction<T, T> {
  return source$ => source$.pipe(
    tap({
      next: value => console.log(`[${tag}] Next:`, value),
      error: err => console.error(`[${tag}] Error:`, err),
      complete: () => console.log(`[${tag}] Complete`)
    })
  );
}

// Usage
numbers$.pipe(
  double(),
  debugLog('doubled')
);

users$.pipe(
  filterByProperty('role', 'admin')
);

Real-World Patterns

Polling with Pause

@Injectable({ providedIn: 'root' })
export class PollingService {
  private pause$ = new BehaviorSubject<boolean>(false);
  
  poll<T>(
    request: () => Observable<T>,
    intervalMs: number
  ): Observable<T> {
    return this.pause$.pipe(
      switchMap(paused => 
        paused ? EMPTY : timer(0, intervalMs)
      ),
      switchMap(() => request()),
      retry({ delay: 5000 })  // Retry on error after 5s
    );
  }
  
  pause() {
    this.pause$.next(true);
  }
  
  resume() {
    this.pause$.next(false);
  }
}
@Component({
  template: `
    <input [formControl]="searchControl" />
    <ul>
      @for (result of results$ | async; track result.id) {
        <li>{{ result.name }}</li>
      }
    </ul>
  `
})
export class TypeaheadComponent {
  searchControl = new FormControl('');
  
  results$ = this.searchControl.valueChanges.pipe(
    debounceTime(300),
    distinctUntilChanged(),
    filter(term => term.length >= 2),
    switchMap(term => 
      this.searchService.search(term).pipe(
        catchError(() => of([]))
      )
    )
  );
}

Race Condition Prevention

// Bad: Race condition possible
userId$.pipe(
  mergeMap(id => this.userService.getUser(id))
).subscribe(user => this.user = user);
// If ID changes quickly: A→B, response B might arrive before A

// Good: Cancel previous with switchMap
userId$.pipe(
  switchMap(id => this.userService.getUser(id))
).subscribe(user => this.user = user);
// ID A request cancelled when B arrives

Practice Exercise

Exercise: Build a Live Dashboard

Create a dashboard that:
  1. Polls multiple APIs every 30 seconds
  2. Combines data from all sources
  3. Pauses polling when tab is hidden
  4. Shows connection status
  5. Has retry with exponential backoff
@Injectable({ providedIn: 'root' })
export class DashboardService {
  private http = inject(HttpClient);
  
  private isVisible$ = fromEvent(document, 'visibilitychange').pipe(
    map(() => document.visibilityState === 'visible'),
    startWith(true)
  );
  
  private connectionStatus = new BehaviorSubject<'connected' | 'disconnected' | 'error'>('connected');
  readonly connectionStatus$ = this.connectionStatus.asObservable();
  
  getDashboardData(): Observable<DashboardData> {
    return this.isVisible$.pipe(
      switchMap(visible => 
        visible 
          ? timer(0, 30000)  // Poll every 30s
          : EMPTY            // Stop when hidden
      ),
      tap(() => this.connectionStatus.next('connected')),
      switchMap(() => this.fetchAllData()),
      catchError((error, caught) => {
        this.connectionStatus.next('error');
        return timer(5000).pipe(
          tap(() => this.connectionStatus.next('connected')),
          switchMap(() => caught)  // Retry after 5s
        );
      }),
      retry({
        count: 3,
        delay: (error, retryCount) => {
          const delay = Math.pow(2, retryCount) * 1000;
          console.log(`Retrying in ${delay}ms...`);
          return timer(delay);
        }
      })
    );
  }
  
  private fetchAllData(): Observable<DashboardData> {
    return forkJoin({
      users: this.http.get<User[]>('/api/users'),
      orders: this.http.get<Order[]>('/api/orders'),
      revenue: this.http.get<Revenue>('/api/revenue'),
      alerts: this.http.get<Alert[]>('/api/alerts')
    }).pipe(
      map(data => this.transformData(data))
    );
  }
  
  private transformData(data: any): DashboardData {
    return {
      ...data,
      totalUsers: data.users.length,
      totalOrders: data.orders.length,
      criticalAlerts: data.alerts.filter(a => a.level === 'critical')
    };
  }
}

@Component({
  template: `
    <div class="status-bar" [class]="connectionStatus$ | async">
      {{ connectionStatus$ | async }}
    </div>
    
    @if (dashboard$ | async; as data) {
      <div class="stats">
        <div class="stat">Users: {{ data.totalUsers }}</div>
        <div class="stat">Orders: {{ data.totalOrders }}</div>
        <div class="stat">Revenue: {{ data.revenue.total | currency }}</div>
      </div>
      
      @if (data.criticalAlerts.length > 0) {
        <div class="alerts">
          @for (alert of data.criticalAlerts; track alert.id) {
            <div class="alert critical">{{ alert.message }}</div>
          }
        </div>
      }
    } @else {
      <div class="loading">Loading dashboard...</div>
    }
  `
})
export class DashboardComponent {
  private dashboardService = inject(DashboardService);
  
  dashboard$ = this.dashboardService.getDashboardData();
  connectionStatus$ = this.dashboardService.connectionStatus$;
}

Summary

1

Creation

Use of, from, fromEvent, interval for creating observables
2

Transformation

map, switchMap, mergeMap, concatMap for data transformation
3

Filtering

filter, debounceTime, distinctUntilChanged for stream control
4

Combination

combineLatest, forkJoin, merge for combining streams
5

Error Handling

catchError, retry for robust error recovery

Next Steps

Next: Change Detection & Performance

Optimize Angular performance with change detection strategies