-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathIntermediateStage.ts
66 lines (58 loc) · 1.93 KB
/
IntermediateStage.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
import { PipelineEvent } from '../PipelineEvent';
import { Stage } from './Stage';
/**
* Represents an intermediate operation on each element in the pipeline. Intermediate
* stages perform some action on each element and then forward the resulting element from said
* action to the next stage in line until the element reaches the terminal stage.
*
* All of the built-in intermediate stages are located inside the `operators` module.
*
* @template IN The type parameter of each incoming element in the pipeline.
* @template OUT The type parameter of each outgoing element to be forwarded to the downstream stage.
*/
export abstract class IntermediateStage<IN, OUT> extends Stage<IN> {
/**
* The downstream stage to forward elements to.
*/
protected _downstream: Stage<OUT> | undefined;
/**
* Indicate whether this stage should not be run.
*/
private _detached = false;
/**
* Set the downstream stage to forward elements to.
*
* @param downstream The downstream stage to forward elements to.
*/
pipeTo(downstream: Stage<OUT>) {
this._downstream = downstream;
}
get isDetached() {
return this._detached;
}
/**
* Resume this stage and its downstream stage as well. If a subclass overrides
* this method, ensure that it also calls `super.resume()` so that the next stages
* are resumed as well, otherwise, the stages downstream will not receive any more elements.
*/
override resume(): void {
this._downstream.resume();
this._detached = false;
}
/**
* Detach this stage from the pipeline, preventing it from running.
*/
protected _detach() {
this._detached = true;
}
/**
* Forward the given pipeline event to the downstream stage
*
* @param event The event to forward to the downstream stage
*/
protected _cascadeEvent(event: PipelineEvent) {
if (this._downstream instanceof IntermediateStage) {
this._downstream._cascadeEvent(event);
}
}
}