feat: Subject correction
This commit is contained in:
parent
4fede0db41
commit
346bbd851a
@ -1,5 +1,6 @@
|
|||||||
import { IObservable } from './Observable';
|
import { IObservable } from './Observable';
|
||||||
import { Observer } from './Observer';
|
import { Observer } from './Observer';
|
||||||
|
import { Subscriber } from './Subscriber';
|
||||||
import { Subscription } from './Subscription';
|
import { Subscription } from './Subscription';
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -16,24 +17,47 @@ export interface SubjectConstructor {
|
|||||||
* Implementation
|
* Implementation
|
||||||
*/
|
*/
|
||||||
export class Subject<T> implements IObservable<T>, Observer<T> {
|
export class Subject<T> implements IObservable<T>, Observer<T> {
|
||||||
|
private closed = false;
|
||||||
|
private closedByError = false;
|
||||||
|
private lastError: unknown | undefined;
|
||||||
|
|
||||||
|
private subscribers: Subscriber<T>[] = [];
|
||||||
|
|
||||||
public constructor() {}
|
public constructor() {}
|
||||||
|
|
||||||
|
private isClosed(): boolean {
|
||||||
|
return this.closed || this.closedByError;
|
||||||
|
}
|
||||||
|
|
||||||
public next(value: T): void {
|
public next(value: T): void {
|
||||||
void value;
|
if (this.isClosed()) return;
|
||||||
throw new Error('TODO');
|
this.subscribers.forEach(s => s.next(value));
|
||||||
}
|
}
|
||||||
|
|
||||||
public error(err: unknown): void {
|
public error(err: unknown): void {
|
||||||
void err;
|
if (this.isClosed()) return;
|
||||||
throw new Error('TODO');
|
this.closedByError = true;
|
||||||
|
this.lastError = err;
|
||||||
|
this.subscribers.forEach(s => s.error(err));
|
||||||
}
|
}
|
||||||
|
|
||||||
public complete(): void {
|
public complete(): void {
|
||||||
throw new Error('TODO');
|
if (this.isClosed()) return;
|
||||||
|
this.closed = true;
|
||||||
|
this.subscribers.forEach(s => s.complete());
|
||||||
}
|
}
|
||||||
|
|
||||||
public subscribe(obs: Partial<Observer<T>>): Subscription {
|
public subscribe(obs: Partial<Observer<T>>): Subscription {
|
||||||
void obs;
|
const subscriber = new Subscriber(obs);
|
||||||
throw new Error('TODO');
|
|
||||||
|
if (this.closed) {
|
||||||
|
subscriber.complete();
|
||||||
|
} else if (this.closedByError) {
|
||||||
|
subscriber.error(this.lastError);
|
||||||
|
} else {
|
||||||
|
this.subscribers.push(subscriber);
|
||||||
|
}
|
||||||
|
|
||||||
|
return subscriber;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user