Module Overview
Estimated Time: 4-5 hours | Difficulty: Advanced | Prerequisites: Module 8
- Observable creation and subscription
- Transformation, filtering, and combination operators
- Subjects and multicasting
- Error handling and recovery
- Testing observables
- Performance optimization
Observable Fundamentals
Copy
┌─────────────────────────────────────────────────────────────────────────┐
│ Observable Stream │
├─────────────────────────────────────────────────────────────────────────┤
│ │
│ Observable: A lazy push collection of multiple values over time │
│ │
│ ──────(1)────(2)────(3)────(4)────|──────────────────────────▶ │
│ │ │ │ │ │ │
│ emit emit emit emit complete │
│ │
│ Observer: Consumes values delivered by Observable │
│ ┌─────────────────────────────────────────────────────────────────┐ │
│ │ { │ │
│ │ next: (value) => { ... }, // Handle each value │ │
│ │ error: (err) => { ... }, // Handle errors │ │
│ │ complete: () => { ... } // Handle completion │ │
│ │ } │ │
│ └─────────────────────────────────────────────────────────────────┘ │
│ │
│ Subscription: Represents the execution of an Observable │
│ • Call unsubscribe() to cancel and free resources │
│ │
└─────────────────────────────────────────────────────────────────────────┘
Creating Observables
Copy
import {
Observable, of, from, fromEvent, interval, timer,
range, defer, generate, EMPTY, NEVER, throwError
} from 'rxjs';
// Create from scratch
const custom$ = new Observable<number>(subscriber => {
subscriber.next(1);
subscriber.next(2);
subscriber.next(3);
// Async values
setTimeout(() => {
subscriber.next(4);
subscriber.complete();
}, 1000);
// Cleanup function
return () => {
console.log('Cleanup on unsubscribe');
};
});
// From values
const values$ = of(1, 2, 3); // Emits 1, 2, 3 then completes
// From array, promise, iterable
const array$ = from([1, 2, 3]);
const promise$ = from(fetch('/api/data').then(r => r.json()));
const iterable$ = from('hello'); // Emits 'h', 'e', 'l', 'l', 'o'
// From DOM events
const clicks$ = fromEvent(document, 'click');
const input$ = fromEvent<KeyboardEvent>(inputElement, 'keyup');
// Time-based
const every1s$ = interval(1000); // Emits 0, 1, 2... every second
const after2s$ = timer(2000); // Emits 0 after 2 seconds
const after2sThen1s$ = timer(2000, 1000); // Emit after 2s, then every 1s
// Lazy creation (new Observable per subscriber)
const lazy$ = defer(() => {
console.log('Created at subscribe time');
return of(Date.now());
});
// Error observable
const error$ = throwError(() => new Error('Something failed'));
// Special observables
const empty$ = EMPTY; // Completes immediately
const never$ = NEVER; // Never emits or completes
Transformation Operators
Copy
import {
map, pluck, mapTo, scan, reduce, buffer, bufferTime,
concatMap, mergeMap, switchMap, exhaustMap
} from 'rxjs/operators';
// map - transform each value
source$.pipe(
map(x => x * 2)
);
// scan - accumulate values (like reduce, but emits each step)
clicks$.pipe(
scan((count) => count + 1, 0)
); // Emits: 1, 2, 3, 4...
// reduce - accumulate all values, emit once at complete
source$.pipe(
reduce((acc, val) => acc + val, 0)
);
// buffer - collect values into arrays
clicks$.pipe(
bufferTime(1000) // Emit array of clicks every second
);
// Higher-order mapping (Observable of Observables)
searchTerm$.pipe(
// switchMap: Cancel previous, subscribe to new
switchMap(term => this.http.get(`/api/search?q=${term}`))
);
userId$.pipe(
// concatMap: Queue requests, process sequentially
concatMap(id => this.http.get(`/api/users/${id}`))
);
logEvents$.pipe(
// mergeMap: All requests in parallel
mergeMap(event => this.http.post('/api/logs', event))
);
submitButton$.pipe(
// exhaustMap: Ignore new while processing
exhaustMap(() => this.http.post('/api/submit', data))
);
Flattening Operators Comparison
Copy
┌─────────────────────────────────────────────────────────────────────────┐
│ Higher-Order Mapping Operators │
├─────────────────────────────────────────────────────────────────────────┤
│ │
│ Source: ──A──────B──────C─────────────| │
│ │
│ switchMap (cancel previous): │
│ ──A──────────B──────────C──────result| │
│ └──(cancelled) │
│ └──(cancelled) │
│ └──────────────| │
│ │
│ concatMap (sequential queue): │
│ ──A──────result──B──result──C──result| │
│ └──────| └──| └──| │
│ │
│ mergeMap (parallel): │
│ ──A────result │
│ └──| B──result │
│ └| C──result │
│ └| │
│ │
│ exhaustMap (ignore during): │
│ ──A──────result─────────C──result| │
│ └──────| (B ignored) └──| │
│ │
└─────────────────────────────────────────────────────────────────────────┘
Filtering Operators
Copy
import {
filter, take, takeLast, takeUntil, takeWhile,
skip, skipUntil, skipWhile, first, last,
distinct, distinctUntilChanged, distinctUntilKeyChanged,
debounceTime, throttleTime, auditTime, sampleTime
} from 'rxjs/operators';
// filter - emit values that pass predicate
numbers$.pipe(
filter(n => n % 2 === 0) // Only even numbers
);
// take/skip
source$.pipe(take(5)); // First 5 values
source$.pipe(skip(2)); // Skip first 2
source$.pipe(takeLast(3)); // Last 3 values
source$.pipe(first()); // First value, then complete
source$.pipe(last()); // Wait for complete, emit last
// takeUntil - complete when another observable emits
data$.pipe(
takeUntil(this.destroy$) // Stop when component destroys
);
// Distinct values
source$.pipe(
distinctUntilChanged() // Only emit when value changes
);
users$.pipe(
distinctUntilKeyChanged('id') // Only when user.id changes
);
// Time-based filtering
searchInput$.pipe(
debounceTime(300) // Wait 300ms of silence before emitting
);
scroll$.pipe(
throttleTime(100) // Emit at most every 100ms
);
Debounce vs Throttle
Copy
┌─────────────────────────────────────────────────────────────────────────┐
│ Debounce vs Throttle │
├─────────────────────────────────────────────────────────────────────────┤
│ │
│ Input: ──a─b─c─────d─e─f─g─────h───| │
│ │
│ debounceTime(300ms): │
│ ────────c─────────────g─────h| │
│ (wait for pause) │
│ │
│ throttleTime(300ms): │
│ ──a───────d───────g───────h─| │
│ (emit first, then wait) │
│ │
│ Use Cases: │
│ • debounce: Search input, resize events │
│ • throttle: Scroll events, mouse move │
│ │
└─────────────────────────────────────────────────────────────────────────┘
Combination Operators
Copy
import {
merge, concat, combineLatest, forkJoin, zip,
withLatestFrom, startWith, pairwise, race
} from 'rxjs';
// merge - combine multiple streams, emit all
const allClicks$ = merge(
fromEvent(button1, 'click'),
fromEvent(button2, 'click'),
fromEvent(button3, 'click')
);
// concat - subscribe sequentially
const sequential$ = concat(
of(1, 2),
of(3, 4),
of(5, 6)
); // Emits: 1, 2, 3, 4, 5, 6
// combineLatest - emit when any emits, with latest from all
const combined$ = combineLatest([
this.user$,
this.settings$,
this.permissions$
]).pipe(
map(([user, settings, permissions]) => ({
user,
settings,
permissions
}))
);
// forkJoin - wait for all to complete, emit last values
const allData$ = forkJoin({
users: this.http.get<User[]>('/api/users'),
products: this.http.get<Product[]>('/api/products'),
orders: this.http.get<Order[]>('/api/orders')
});
// zip - pair values by index
const paired$ = zip(
letters$, // a, b, c
numbers$ // 1, 2, 3
); // [a,1], [b,2], [c,3]
// withLatestFrom - combine with latest from another
save$.pipe(
withLatestFrom(this.form.valueChanges),
map(([_, formValue]) => formValue)
);
// startWith - emit initial value first
search$.pipe(
startWith(''), // Start with empty search
switchMap(term => this.search(term))
);
// pairwise - emit previous and current
position$.pipe(
pairwise(),
map(([prev, curr]) => ({
dx: curr.x - prev.x,
dy: curr.y - prev.y
}))
);
Subjects
Subjects are both Observable and Observer:Copy
import { Subject, BehaviorSubject, ReplaySubject, AsyncSubject } from 'rxjs';
// Subject - no initial value, late subscribers miss past emissions
const subject = new Subject<string>();
subject.next('Hello'); // No subscribers yet, missed
subject.subscribe(v => console.log('A:', v));
subject.next('World'); // A: World
subject.subscribe(v => console.log('B:', v));
subject.next('!'); // A: !, B: !
// BehaviorSubject - has initial value, new subscribers get latest
const behavior = new BehaviorSubject<number>(0);
console.log(behavior.value); // 0
behavior.subscribe(v => console.log('A:', v)); // A: 0
behavior.next(1); // A: 1
behavior.subscribe(v => console.log('B:', v)); // B: 1
behavior.next(2); // A: 2, B: 2
// ReplaySubject - replay N values to new subscribers
const replay = new ReplaySubject<number>(3); // Buffer last 3
replay.next(1);
replay.next(2);
replay.next(3);
replay.next(4);
replay.subscribe(v => console.log(v)); // 2, 3, 4
// AsyncSubject - only emit last value on complete
const async = new AsyncSubject<number>();
async.subscribe(v => console.log(v));
async.next(1);
async.next(2);
async.next(3);
async.complete(); // Logs: 3
State Management with BehaviorSubject
Copy
@Injectable({ providedIn: 'root' })
export class StateService {
private state = new BehaviorSubject<AppState>({
user: null,
theme: 'light',
notifications: []
});
// Expose as observable (read-only)
readonly state$ = this.state.asObservable();
// Selectors
readonly user$ = this.state$.pipe(
map(state => state.user),
distinctUntilChanged()
);
readonly theme$ = this.state$.pipe(
map(state => state.theme),
distinctUntilChanged()
);
// Current value (synchronous)
get snapshot() {
return this.state.value;
}
// Actions
setUser(user: User | null) {
this.state.next({
...this.state.value,
user
});
}
toggleTheme() {
this.state.next({
...this.state.value,
theme: this.state.value.theme === 'light' ? 'dark' : 'light'
});
}
addNotification(notification: Notification) {
this.state.next({
...this.state.value,
notifications: [...this.state.value.notifications, notification]
});
}
}
Error Handling
Copy
import {
catchError, retry, retryWhen, throwError,
EMPTY, of
} from 'rxjs';
// Catch and replace with fallback
data$.pipe(
catchError(error => {
console.error('Error:', error);
return of([]); // Return empty array as fallback
})
);
// Catch and rethrow
data$.pipe(
catchError(error => {
this.logger.error(error);
return throwError(() => new Error('Custom error message'));
})
);
// Catch and complete (swallow error)
data$.pipe(
catchError(() => EMPTY)
);
// Retry immediately
data$.pipe(
retry(3) // Retry up to 3 times
);
// Retry with delay (exponential backoff)
data$.pipe(
retryWhen(errors =>
errors.pipe(
scan((retryCount, error) => {
if (retryCount >= 3) {
throw error;
}
return retryCount + 1;
}, 0),
delayWhen(retryCount =>
timer(Math.pow(2, retryCount) * 1000) // 1s, 2s, 4s
)
)
)
);
// Modern retry with config
import { retry } from 'rxjs';
data$.pipe(
retry({
count: 3,
delay: (error, retryCount) => {
console.log(`Retry ${retryCount} after error:`, error);
return timer(retryCount * 1000);
},
resetOnSuccess: true
})
);
Multicasting
Share a single subscription among multiple subscribers:Copy
import { share, shareReplay, publish, refCount } from 'rxjs';
// Without sharing (each subscriber = new HTTP request)
const data$ = this.http.get('/api/data');
data$.subscribe(); // HTTP request 1
data$.subscribe(); // HTTP request 2
// With sharing (single HTTP request)
const sharedData$ = this.http.get('/api/data').pipe(
share()
);
sharedData$.subscribe(); // HTTP request
sharedData$.subscribe(); // Uses same request
// shareReplay - cache and replay for late subscribers
const cachedData$ = this.http.get('/api/data').pipe(
shareReplay(1) // Cache last value
);
// First subscriber triggers HTTP
cachedData$.subscribe();
// Later subscriber gets cached value
setTimeout(() => {
cachedData$.subscribe(); // No new HTTP request
}, 5000);
Custom Operators
Copy
import { OperatorFunction, pipe } from 'rxjs';
import { filter, map, tap } from 'rxjs/operators';
// Simple custom operator
function double(): OperatorFunction<number, number> {
return source$ => source$.pipe(
map(value => value * 2)
);
}
// Custom operator with parameters
function filterByProperty<T, K extends keyof T>(
key: K,
value: T[K]
): OperatorFunction<T, T> {
return source$ => source$.pipe(
filter(item => item[key] === value)
);
}
// Composed operator
function debugLog<T>(tag: string): OperatorFunction<T, T> {
return source$ => source$.pipe(
tap({
next: value => console.log(`[${tag}] Next:`, value),
error: err => console.error(`[${tag}] Error:`, err),
complete: () => console.log(`[${tag}] Complete`)
})
);
}
// Usage
numbers$.pipe(
double(),
debugLog('doubled')
);
users$.pipe(
filterByProperty('role', 'admin')
);
Real-World Patterns
Polling with Pause
Copy
@Injectable({ providedIn: 'root' })
export class PollingService {
private pause$ = new BehaviorSubject<boolean>(false);
poll<T>(
request: () => Observable<T>,
intervalMs: number
): Observable<T> {
return this.pause$.pipe(
switchMap(paused =>
paused ? EMPTY : timer(0, intervalMs)
),
switchMap(() => request()),
retry({ delay: 5000 }) // Retry on error after 5s
);
}
pause() {
this.pause$.next(true);
}
resume() {
this.pause$.next(false);
}
}
Typeahead Search
Copy
@Component({
template: `
<input [formControl]="searchControl" />
<ul>
@for (result of results$ | async; track result.id) {
<li>{{ result.name }}</li>
}
</ul>
`
})
export class TypeaheadComponent {
searchControl = new FormControl('');
results$ = this.searchControl.valueChanges.pipe(
debounceTime(300),
distinctUntilChanged(),
filter(term => term.length >= 2),
switchMap(term =>
this.searchService.search(term).pipe(
catchError(() => of([]))
)
)
);
}
Race Condition Prevention
Copy
// Bad: Race condition possible
userId$.pipe(
mergeMap(id => this.userService.getUser(id))
).subscribe(user => this.user = user);
// If ID changes quickly: A→B, response B might arrive before A
// Good: Cancel previous with switchMap
userId$.pipe(
switchMap(id => this.userService.getUser(id))
).subscribe(user => this.user = user);
// ID A request cancelled when B arrives
Practice Exercise
Exercise: Build a Live Dashboard
Create a dashboard that:
- Polls multiple APIs every 30 seconds
- Combines data from all sources
- Pauses polling when tab is hidden
- Shows connection status
- Has retry with exponential backoff
Solution
Solution
Copy
@Injectable({ providedIn: 'root' })
export class DashboardService {
private http = inject(HttpClient);
private isVisible$ = fromEvent(document, 'visibilitychange').pipe(
map(() => document.visibilityState === 'visible'),
startWith(true)
);
private connectionStatus = new BehaviorSubject<'connected' | 'disconnected' | 'error'>('connected');
readonly connectionStatus$ = this.connectionStatus.asObservable();
getDashboardData(): Observable<DashboardData> {
return this.isVisible$.pipe(
switchMap(visible =>
visible
? timer(0, 30000) // Poll every 30s
: EMPTY // Stop when hidden
),
tap(() => this.connectionStatus.next('connected')),
switchMap(() => this.fetchAllData()),
catchError((error, caught) => {
this.connectionStatus.next('error');
return timer(5000).pipe(
tap(() => this.connectionStatus.next('connected')),
switchMap(() => caught) // Retry after 5s
);
}),
retry({
count: 3,
delay: (error, retryCount) => {
const delay = Math.pow(2, retryCount) * 1000;
console.log(`Retrying in ${delay}ms...`);
return timer(delay);
}
})
);
}
private fetchAllData(): Observable<DashboardData> {
return forkJoin({
users: this.http.get<User[]>('/api/users'),
orders: this.http.get<Order[]>('/api/orders'),
revenue: this.http.get<Revenue>('/api/revenue'),
alerts: this.http.get<Alert[]>('/api/alerts')
}).pipe(
map(data => this.transformData(data))
);
}
private transformData(data: any): DashboardData {
return {
...data,
totalUsers: data.users.length,
totalOrders: data.orders.length,
criticalAlerts: data.alerts.filter(a => a.level === 'critical')
};
}
}
@Component({
template: `
<div class="status-bar" [class]="connectionStatus$ | async">
{{ connectionStatus$ | async }}
</div>
@if (dashboard$ | async; as data) {
<div class="stats">
<div class="stat">Users: {{ data.totalUsers }}</div>
<div class="stat">Orders: {{ data.totalOrders }}</div>
<div class="stat">Revenue: {{ data.revenue.total | currency }}</div>
</div>
@if (data.criticalAlerts.length > 0) {
<div class="alerts">
@for (alert of data.criticalAlerts; track alert.id) {
<div class="alert critical">{{ alert.message }}</div>
}
</div>
}
} @else {
<div class="loading">Loading dashboard...</div>
}
`
})
export class DashboardComponent {
private dashboardService = inject(DashboardService);
dashboard$ = this.dashboardService.getDashboardData();
connectionStatus$ = this.dashboardService.connectionStatus$;
}
Summary
1
Creation
Use of, from, fromEvent, interval for creating observables
2
Transformation
map, switchMap, mergeMap, concatMap for data transformation
3
Filtering
filter, debounceTime, distinctUntilChanged for stream control
4
Combination
combineLatest, forkJoin, merge for combining streams
5
Error Handling
catchError, retry for robust error recovery
Next Steps
Next: Change Detection & Performance
Optimize Angular performance with change detection strategies