Documentation Index Fetch the complete documentation index at: https://mintlify.com/ReactiveX/rxjs/llms.txt
Use this file to discover all available pages before exploring further.
Overview
Branches out the source Observable values as nested Observables (“windows”) based on opening and closing signals. When openings emits, a new window starts. For each opening, closingSelector is called to get an Observable that determines when that specific window closes.
windowToggle provides fine-grained control over window lifecycles, allowing multiple concurrent windows with independent durations.
Type Signature
function windowToggle < T , O >(
openings : ObservableInput < O >,
closingSelector : ( openValue : O ) => ObservableInput < any >
) : OperatorFunction < T , Observable < T >>
Parameters
openings
ObservableInput<O>
required
An Observable (or Promise, Array, etc.) that signals when to start new windows. Each emission triggers the creation of a new window.
closingSelector
(openValue: O) => ObservableInput<any>
required
A function that receives the value emitted by openings and returns an Observable (or other ObservableInput). When this returned Observable emits its first value, the associated window completes.
Returns
return
OperatorFunction<T, Observable<T>>
A function that returns an Observable of window Observables. Each window collects values between its opening and closing signals.
Usage Examples
Basic Example: Toggle Windows with Intervals
Simple Toggle
Count Window Items
import { fromEvent , interval , windowToggle , EMPTY , mergeAll } from 'rxjs' ;
const clicks = fromEvent ( document , 'click' );
const openings = interval ( 1000 );
// Every other second, capture clicks from the next 500ms
const result = clicks . pipe (
windowToggle (
openings ,
i => i % 2 ? interval ( 500 ) : EMPTY
),
mergeAll ()
);
result . subscribe ( x => console . log ( x ));
// At 1s (i=0): EMPTY closes immediately
// At 2s (i=1): window open for 500ms, captures clicks
// At 3s (i=2): EMPTY closes immediately
// At 4s (i=3): window open for 500ms, captures clicks
Dynamic Window Duration
import { fromEvent , windowToggle , timer , mergeMap , toArray } from 'rxjs' ;
const mouseMoves = fromEvent < MouseEvent >( document , 'mousemove' );
const clicks = fromEvent < MouseEvent >( document , 'click' );
let clickCount = 0 ;
mouseMoves . pipe (
windowToggle (
clicks ,
() => {
clickCount ++ ;
// First click: 1s window, second: 2s, third: 3s, etc.
const duration = clickCount * 1000 ;
console . log ( `Window ${ clickCount } will last ${ duration } ms` );
return timer ( duration );
}
),
mergeMap (( window$ , i ) =>
window$ . pipe (
toArray (),
map ( moves => ({
window: i ,
moveCount: moves . length ,
duration: clickCount * 1000
}))
)
)
). subscribe ( result => {
console . log ( 'Window result:' , result );
});
Multiple Concurrent Windows
import { interval , windowToggle , timer , mergeMap , map } from 'rxjs' ;
const source = interval ( 100 ). pipe (
map ( i => ({ value: i , timestamp: Date . now () }))
);
// Start a new window every 500ms, each lasts 1 second
const openings = interval ( 500 );
source . pipe (
windowToggle (
openings ,
( openIndex ) => {
console . log ( `Window ${ openIndex } opened` );
return timer ( 1000 );
}
),
mergeMap (( window$ , windowId ) =>
window$ . pipe (
toArray (),
map ( items => ({
windowId ,
itemCount: items . length ,
values: items . map ( i => i . value )
}))
)
),
take ( 5 )
). subscribe ( result => {
console . log ( 'Window closed:' , result );
});
// At ~0.5s: window 0 opens
// At ~1.0s: window 1 opens (window 0 still active)
// At ~1.5s: window 0 closes (1000ms elapsed), window 2 opens
// Multiple windows active simultaneously!
Marble Diagram
Source: --a--b--c--d--e--f--g--h--i--j--|
Openings: -----O-----------O-----------O--|
Closing: |--300ms--| |--300ms--|
Window1: -----b--c--d|
Window2: --g--h--i|
Window3: (closes at end)
Result: ----------W1----------W2----------W3--|
Common Use Cases
User-Controlled Recording : Start/stop recording based on user actions
Conditional Monitoring : Monitor different durations based on conditions
Overlapping Time Windows : Analyze data with multiple active windows
Event Correlation : Collect related events within dynamic windows
Sampling Strategies : Sample data differently based on triggers
A/B Testing Windows : Different window durations for different scenarios
Multiple windows can be active at the same time. All active windows receive all emitted values from the source Observable.
import { fromEvent , windowToggle , mergeMap , reduce , timer } from 'rxjs' ;
interface PerformanceEvent {
type : string ;
duration : number ;
timestamp : number ;
}
const performanceEvents$ = new Subject < PerformanceEvent >();
const startProfiling$ = new Subject < string >();
const profileDuration = 10000 ; // 10 seconds
performanceEvents$ . pipe (
windowToggle (
startProfiling$ ,
( profileId ) => {
console . log ( `Started profiling: ${ profileId } ` );
return timer ( profileDuration );
}
),
mergeMap (( window$ , index ) => {
const startTime = Date . now ();
return window$ . pipe (
reduce (( acc , event ) => {
acc . totalEvents ++ ;
acc . totalDuration += event . duration ;
if ( ! acc . byType [ event . type ]) {
acc . byType [ event . type ] = { count: 0 , totalDuration: 0 };
}
acc . byType [ event . type ]. count ++ ;
acc . byType [ event . type ]. totalDuration += event . duration ;
acc . maxDuration = Math . max ( acc . maxDuration , event . duration );
return acc ;
}, {
profileIndex: index ,
totalEvents: 0 ,
totalDuration: 0 ,
maxDuration: 0 ,
byType: {} as Record < string , { count : number ; totalDuration : number }>
}),
map ( stats => ({
... stats ,
avgDuration: stats . totalDuration / stats . totalEvents ,
profileDuration: Date . now () - startTime ,
byType: Object . entries ( stats . byType ). map (([ type , data ]) => ({
type ,
count: data . count ,
avgDuration: data . totalDuration / data . count
}))
}))
);
})
). subscribe ( report => {
console . log ( 'Performance report:' , report );
if ( report . avgDuration > 100 ) {
console . warn ( 'Performance degradation detected!' );
}
});
// Start profiling sessions
startProfiling$ . next ( 'session-1' );
setTimeout (() => startProfiling$ . next ( 'session-2' ), 5000 );
Traffic Monitoring with Peak Detection
import { interval , windowToggle , timer , mergeMap , scan , last } from 'rxjs' ;
interface Request {
endpoint : string ;
duration : number ;
statusCode : number ;
}
const requests$ = interval ( 100 ). pipe (
map (() : Request => ({
endpoint: [ '/api/users' , '/api/posts' , '/api/comments' ][ Math . floor ( Math . random () * 3 )],
duration: Math . random () * 500 ,
statusCode: Math . random () > 0.9 ? 500 : 200
}))
);
// Monitor peaks: when error rate is high, open longer monitoring window
const checkInterval$ = interval ( 5000 );
let consecutiveErrors = 0 ;
requests$ . pipe (
windowToggle (
checkInterval$ ,
() => {
// If we've seen many errors, monitor for longer
const duration = consecutiveErrors > 3 ? 10000 : 3000 ;
console . log ( `Monitoring window: ${ duration } ms` );
return timer ( duration );
}
),
mergeMap ( window$ =>
window$ . pipe (
scan (( acc , req ) => {
acc . total ++ ;
if ( req . statusCode >= 500 ) acc . errors ++ ;
acc . totalDuration += req . duration ;
return acc ;
}, { total: 0 , errors: 0 , totalDuration: 0 }),
last (),
map ( stats => {
const errorRate = stats . errors / stats . total ;
consecutiveErrors = errorRate > 0.1 ? consecutiveErrors + 1 : 0 ;
return {
total: stats . total ,
errors: stats . errors ,
errorRate: ( errorRate * 100 ). toFixed ( 2 ) + '%' ,
avgDuration: ( stats . totalDuration / stats . total ). toFixed ( 2 ),
alert: errorRate > 0.1
};
})
)
)
). subscribe ( stats => {
console . log ( 'Window stats:' , stats );
if ( stats . alert ) {
console . error ( 'HIGH ERROR RATE DETECTED!' );
}
});
User Session Analysis
import { merge , windowToggle , mergeMap , groupBy , reduce } from 'rxjs' ;
const sessionStarts$ = new Subject < string >(); // userId
const sessionEnds$ = new Subject < string >(); // userId
const userActions$ = new Subject <{ userId : string ; action : string }>();
const activeSessions = new Map < string , Subject < any >>();
userActions$ . pipe (
windowToggle (
sessionStarts$ ,
userId => {
console . log ( `Session started: ${ userId } ` );
activeSessions . set ( userId , new Subject ());
// Window closes when that specific user's session ends
return sessionEnds$ . pipe (
filter ( endUserId => endUserId === userId ),
take ( 1 ),
tap (() => {
console . log ( `Session ended: ${ userId } ` );
activeSessions . delete ( userId );
})
);
}
),
mergeMap (( window$ ) =>
window$ . pipe (
filter ( action => activeSessions . has ( action . userId )),
groupBy ( action => action . userId ),
mergeMap ( userGroup$ =>
userGroup$ . pipe (
reduce (( acc , action ) => {
acc . actions . push ( action . action );
acc . count ++ ;
return acc ;
}, {
userId: userGroup$ . key ,
actions: [] as string [],
count: 0
})
)
)
)
)
). subscribe ( sessionSummary => {
console . log ( 'Session summary:' , sessionSummary );
});
// Usage
sessionStarts$ . next ( 'user1' );
sessionStarts$ . next ( 'user2' );
userActions$ . next ({ userId: 'user1' , action: 'view-page' });
userActions$ . next ({ userId: 'user2' , action: 'click-button' });
sessionEnds$ . next ( 'user1' );
Error Handling
If either the openings Observable or any closing Observable errors, the error is propagated to all active windows and the output Observable.
import { interval , windowToggle , throwError , catchError } from 'rxjs' ;
const source = interval ( 100 );
const openings = interval ( 1000 );
source . pipe (
windowToggle (
openings ,
i => i === 2 ? throwError (() => new Error ( 'Closing error' )) : timer ( 300 )
),
mergeMap (( window$ , i ) =>
window$ . pipe (
toArray (),
map ( values => ({ window: i , values })),
catchError ( error => {
console . error ( `Error in window ${ i } :` , error . message );
return of ({ window: i , values: [], error: error . message });
})
)
)
). subscribe ( result => console . log ( result ));