diff --git a/src/operators/finalize.ts b/src/operators/finalize.ts index 02c9ce6..7a17616 100644 --- a/src/operators/finalize.ts +++ b/src/operators/finalize.ts @@ -2,7 +2,31 @@ import { Observable } from '../Observable'; import { OperatorFunction } from './types'; export function finalize(callback: () => void): OperatorFunction { - void Observable; - void callback; - throw new Error('TODO'); + return source => { + return new Observable(observer => { + let finalized = false; + + const finalizeFn = () => { + if (!finalized) { + finalized = true; + callback(); + } + }; + + const complete = () => { + observer.complete(); + finalizeFn(); + }; + + const sub = source.subscribe({ + next: val => observer.next(val), + error: err => observer.error(err), + complete: () => complete(), + }); + + sub.add(finalizeFn); + + return sub; + }); + }; } diff --git a/src/operators/retry.ts b/src/operators/retry.ts index ed26d09..725f8fc 100644 --- a/src/operators/retry.ts +++ b/src/operators/retry.ts @@ -1,8 +1,37 @@ import { Observable } from '../Observable'; +import { Subscription } from '../Subscription'; import { OperatorFunction } from './types'; export function retry(limit?: number): OperatorFunction { - void Observable; - void limit; - throw new Error('TODO'); + return source => { + return new Observable(observer => { + let count: number | undefined = limit; + let sub: Subscription = new Subscription(); + + const makeSubscription = (onError: (err: unknown) => void) => { + sub = source.subscribe({ + next: value => observer.next(value), + error: onError, + complete: () => observer.complete(), + }); + }; + + const error = (err: unknown) => { + if (count === undefined) { + makeSubscription(error); + } else if (count > 0) { + count = count - 1; + makeSubscription(error); + } else { + observer.error(err); + } + }; + + makeSubscription(error); + + return () => { + sub.unsubscribe(); + }; + }); + }; } diff --git a/src/operators/scan.ts b/src/operators/scan.ts index a3f65e4..78384e3 100644 --- a/src/operators/scan.ts +++ b/src/operators/scan.ts @@ -2,7 +2,24 @@ import { Observable } from '../Observable'; import { OperatorFunction } from './types'; export function scan(accumulator: (acc: R, value: T) => R, seed: R): OperatorFunction { - void Observable; - void accumulator, seed; - throw new Error('TODO'); + return source => { + return new Observable(observer => { + let acc: R = seed; + + const next = (value: T) => { + try { + acc = accumulator(acc, value); + observer.next(acc); + } catch (e) { + observer.error(e); + } + }; + + return source.subscribe({ + next, + error: err => observer.error(err), + complete: () => observer.complete(), + }); + }); + }; } diff --git a/src/operators/switchMap.ts b/src/operators/switchMap.ts index 12e4c6e..f5dbde1 100644 --- a/src/operators/switchMap.ts +++ b/src/operators/switchMap.ts @@ -1,7 +1,62 @@ import { Observable } from '../Observable'; +import { Subscription } from '../Subscription'; import { OperatorFunction } from './types'; export function switchMap(project: (value: T) => Observable): OperatorFunction { - void project; - throw new Error('TODO'); + return source => { + return new Observable(obs => { + let sourceIsCompleted = false; + let innerSubscription = new Subscription(); + + const innerNext = (resultValue: R) => { + obs.next(resultValue); + }; + + const innerError = (err: unknown) => { + obs.error(err); + }; + + const innerComplete = () => { + if (sourceIsCompleted) { + obs.complete(); + } + }; + + const next = (value: T) => { + innerSubscription.unsubscribe(); + try { + innerSubscription = project(value).subscribe({ + next: innerNext, + error: innerError, + complete: innerComplete, + }); + } catch (e) { + obs.error(e); + } + }; + + const error = (err: unknown) => { + obs.error(err); + }; + + const complete = () => { + sourceIsCompleted = true; + + if (innerSubscription.closed) { + obs.complete(); + } + }; + + const mainSub = source.subscribe({ + next, + error, + complete, + }); + + return () => { + mainSub.unsubscribe(); + innerSubscription.unsubscribe(); + }; + }); + }; }