Skip to main content

Documentation Index

Fetch the complete documentation index at: https://mintlify.com/ReactiveX/rxjs/llms.txt

Use this file to discover all available pages before exploring further.

Overview

Branches out the source Observable values as nested Observables (“windows”) based on opening and closing signals. When openings emits, a new window starts. For each opening, closingSelector is called to get an Observable that determines when that specific window closes.
windowToggle provides fine-grained control over window lifecycles, allowing multiple concurrent windows with independent durations.

Type Signature

function windowToggle<T, O>(
  openings: ObservableInput<O>,
  closingSelector: (openValue: O) => ObservableInput<any>
): OperatorFunction<T, Observable<T>>

Parameters

openings
ObservableInput<O>
required
An Observable (or Promise, Array, etc.) that signals when to start new windows. Each emission triggers the creation of a new window.
closingSelector
(openValue: O) => ObservableInput<any>
required
A function that receives the value emitted by openings and returns an Observable (or other ObservableInput). When this returned Observable emits its first value, the associated window completes.

Returns

return
OperatorFunction<T, Observable<T>>
A function that returns an Observable of window Observables. Each window collects values between its opening and closing signals.

Usage Examples

Basic Example: Toggle Windows with Intervals

import { fromEvent, interval, windowToggle, EMPTY, mergeAll } from 'rxjs';

const clicks = fromEvent(document, 'click');
const openings = interval(1000);

// Every other second, capture clicks from the next 500ms
const result = clicks.pipe(
  windowToggle(
    openings,
    i => i % 2 ? interval(500) : EMPTY
  ),
  mergeAll()
);

result.subscribe(x => console.log(x));
// At 1s (i=0): EMPTY closes immediately
// At 2s (i=1): window open for 500ms, captures clicks
// At 3s (i=2): EMPTY closes immediately  
// At 4s (i=3): window open for 500ms, captures clicks

Dynamic Window Duration

import { fromEvent, windowToggle, timer, mergeMap, toArray } from 'rxjs';

const mouseMoves = fromEvent<MouseEvent>(document, 'mousemove');
const clicks = fromEvent<MouseEvent>(document, 'click');

let clickCount = 0;

mouseMoves.pipe(
  windowToggle(
    clicks,
    () => {
      clickCount++;
      // First click: 1s window, second: 2s, third: 3s, etc.
      const duration = clickCount * 1000;
      console.log(`Window ${clickCount} will last ${duration}ms`);
      return timer(duration);
    }
  ),
  mergeMap((window$, i) => 
    window$.pipe(
      toArray(),
      map(moves => ({
        window: i,
        moveCount: moves.length,
        duration: clickCount * 1000
      }))
    )
  )
).subscribe(result => {
  console.log('Window result:', result);
});

Multiple Concurrent Windows

import { interval, windowToggle, timer, mergeMap, map } from 'rxjs';

const source = interval(100).pipe(
  map(i => ({ value: i, timestamp: Date.now() }))
);

// Start a new window every 500ms, each lasts 1 second
const openings = interval(500);

source.pipe(
  windowToggle(
    openings,
    (openIndex) => {
      console.log(`Window ${openIndex} opened`);
      return timer(1000);
    }
  ),
  mergeMap((window$, windowId) => 
    window$.pipe(
      toArray(),
      map(items => ({
        windowId,
        itemCount: items.length,
        values: items.map(i => i.value)
      }))
    )
  ),
  take(5)
).subscribe(result => {
  console.log('Window closed:', result);
});

// At ~0.5s: window 0 opens
// At ~1.0s: window 1 opens (window 0 still active)
// At ~1.5s: window 0 closes (1000ms elapsed), window 2 opens
// Multiple windows active simultaneously!

Marble Diagram

Source:   --a--b--c--d--e--f--g--h--i--j--|
Openings: -----O-----------O-----------O--|
Closing:      |--300ms--|  |--300ms--|
Window1:  -----b--c--d|
Window2:                  --g--h--i|
Window3:                            (closes at end)
Result:   ----------W1----------W2----------W3--|

Common Use Cases

  1. User-Controlled Recording: Start/stop recording based on user actions
  2. Conditional Monitoring: Monitor different durations based on conditions
  3. Overlapping Time Windows: Analyze data with multiple active windows
  4. Event Correlation: Collect related events within dynamic windows
  5. Sampling Strategies: Sample data differently based on triggers
  6. A/B Testing Windows: Different window durations for different scenarios
Multiple windows can be active at the same time. All active windows receive all emitted values from the source Observable.

Advanced Example: Performance Profiling

import { fromEvent, windowToggle, mergeMap, reduce, timer } from 'rxjs';

interface PerformanceEvent {
  type: string;
  duration: number;
  timestamp: number;
}

const performanceEvents$ = new Subject<PerformanceEvent>();
const startProfiling$ = new Subject<string>();
const profileDuration = 10000; // 10 seconds

performanceEvents$.pipe(
  windowToggle(
    startProfiling$,
    (profileId) => {
      console.log(`Started profiling: ${profileId}`);
      return timer(profileDuration);
    }
  ),
  mergeMap((window$, index) => {
    const startTime = Date.now();
    
    return window$.pipe(
      reduce((acc, event) => {
        acc.totalEvents++;
        acc.totalDuration += event.duration;
        
        if (!acc.byType[event.type]) {
          acc.byType[event.type] = { count: 0, totalDuration: 0 };
        }
        acc.byType[event.type].count++;
        acc.byType[event.type].totalDuration += event.duration;
        
        acc.maxDuration = Math.max(acc.maxDuration, event.duration);
        
        return acc;
      }, {
        profileIndex: index,
        totalEvents: 0,
        totalDuration: 0,
        maxDuration: 0,
        byType: {} as Record<string, { count: number; totalDuration: number }>
      }),
      map(stats => ({
        ...stats,
        avgDuration: stats.totalDuration / stats.totalEvents,
        profileDuration: Date.now() - startTime,
        byType: Object.entries(stats.byType).map(([type, data]) => ({
          type,
          count: data.count,
          avgDuration: data.totalDuration / data.count
        }))
      }))
    );
  })
).subscribe(report => {
  console.log('Performance report:', report);
  if (report.avgDuration > 100) {
    console.warn('Performance degradation detected!');
  }
});

// Start profiling sessions
startProfiling$.next('session-1');
setTimeout(() => startProfiling$.next('session-2'), 5000);

Traffic Monitoring with Peak Detection

import { interval, windowToggle, timer, mergeMap, scan, last } from 'rxjs';

interface Request {
  endpoint: string;
  duration: number;
  statusCode: number;
}

const requests$ = interval(100).pipe(
  map((): Request => ({
    endpoint: ['/api/users', '/api/posts', '/api/comments'][Math.floor(Math.random() * 3)],
    duration: Math.random() * 500,
    statusCode: Math.random() > 0.9 ? 500 : 200
  }))
);

// Monitor peaks: when error rate is high, open longer monitoring window
const checkInterval$ = interval(5000);
let consecutiveErrors = 0;

requests$.pipe(
  windowToggle(
    checkInterval$,
    () => {
      // If we've seen many errors, monitor for longer
      const duration = consecutiveErrors > 3 ? 10000 : 3000;
      console.log(`Monitoring window: ${duration}ms`);
      return timer(duration);
    }
  ),
  mergeMap(window$ => 
    window$.pipe(
      scan((acc, req) => {
        acc.total++;
        if (req.statusCode >= 500) acc.errors++;
        acc.totalDuration += req.duration;
        return acc;
      }, { total: 0, errors: 0, totalDuration: 0 }),
      last(),
      map(stats => {
        const errorRate = stats.errors / stats.total;
        consecutiveErrors = errorRate > 0.1 ? consecutiveErrors + 1 : 0;
        
        return {
          total: stats.total,
          errors: stats.errors,
          errorRate: (errorRate * 100).toFixed(2) + '%',
          avgDuration: (stats.totalDuration / stats.total).toFixed(2),
          alert: errorRate > 0.1
        };
      })
    )
  )
).subscribe(stats => {
  console.log('Window stats:', stats);
  if (stats.alert) {
    console.error('HIGH ERROR RATE DETECTED!');
  }
});

User Session Analysis

import { merge, windowToggle, mergeMap, groupBy, reduce } from 'rxjs';

const sessionStarts$ = new Subject<string>(); // userId
const sessionEnds$ = new Subject<string>();   // userId
const userActions$ = new Subject<{ userId: string; action: string }>();

const activeSessions = new Map<string, Subject<any>>();

userActions$.pipe(
  windowToggle(
    sessionStarts$,
    userId => {
      console.log(`Session started: ${userId}`);
      activeSessions.set(userId, new Subject());
      
      // Window closes when that specific user's session ends
      return sessionEnds$.pipe(
        filter(endUserId => endUserId === userId),
        take(1),
        tap(() => {
          console.log(`Session ended: ${userId}`);
          activeSessions.delete(userId);
        })
      );
    }
  ),
  mergeMap((window$) => 
    window$.pipe(
      filter(action => activeSessions.has(action.userId)),
      groupBy(action => action.userId),
      mergeMap(userGroup$ => 
        userGroup$.pipe(
          reduce((acc, action) => {
            acc.actions.push(action.action);
            acc.count++;
            return acc;
          }, {
            userId: userGroup$.key,
            actions: [] as string[],
            count: 0
          })
        )
      )
    )
  )
).subscribe(sessionSummary => {
  console.log('Session summary:', sessionSummary);
});

// Usage
sessionStarts$.next('user1');
sessionStarts$.next('user2');
userActions$.next({ userId: 'user1', action: 'view-page' });
userActions$.next({ userId: 'user2', action: 'click-button' });
sessionEnds$.next('user1');

Error Handling

If either the openings Observable or any closing Observable errors, the error is propagated to all active windows and the output Observable.
import { interval, windowToggle, throwError, catchError } from 'rxjs';

const source = interval(100);
const openings = interval(1000);

source.pipe(
  windowToggle(
    openings,
    i => i === 2 ? throwError(() => new Error('Closing error')) : timer(300)
  ),
  mergeMap((window$, i) => 
    window$.pipe(
      toArray(),
      map(values => ({ window: i, values })),
      catchError(error => {
        console.error(`Error in window ${i}:`, error.message);
        return of({ window: i, values: [], error: error.message });
      })
    )
  )
).subscribe(result => console.log(result));