diff --git a/src/Subject.ts b/src/Subject.ts index 16b8627..3cbf249 100644 --- a/src/Subject.ts +++ b/src/Subject.ts @@ -1,5 +1,6 @@ import { IObservable } from './Observable'; import { Observer } from './Observer'; +import { Subscriber } from './Subscriber'; import { Subscription } from './Subscription'; /** @@ -16,24 +17,47 @@ export interface SubjectConstructor { * Implementation */ export class Subject implements IObservable, Observer { + private closed = false; + private closedByError = false; + private lastError: unknown | undefined; + + private subscribers: Subscriber[] = []; + public constructor() {} + private isClosed(): boolean { + return this.closed || this.closedByError; + } + public next(value: T): void { - void value; - throw new Error('TODO'); + if (this.isClosed()) return; + this.subscribers.forEach(s => s.next(value)); } public error(err: unknown): void { - void err; - throw new Error('TODO'); + if (this.isClosed()) return; + this.closedByError = true; + this.lastError = err; + this.subscribers.forEach(s => s.error(err)); } public complete(): void { - throw new Error('TODO'); + if (this.isClosed()) return; + this.closed = true; + this.subscribers.forEach(s => s.complete()); } public subscribe(obs: Partial>): Subscription { - void obs; - throw new Error('TODO'); + const subscriber = new Subscriber(obs); + + if (this.closed) { + subscriber.complete(); + } else if (this.closedByError) { + subscriber.error(this.lastError); + } else { + this.subscribers.push(subscriber); + } + + return subscriber; } }