Compare commits

...

6 Commits

Author SHA1 Message Date
Guillaume ARM
ada576a77a feat: correction for other operators: scan, switchMap, retry and finalize 2023-07-05 16:49:40 +02:00
Guillaume ARM
44f4a97b95 feat: map operator correction 2023-07-05 16:28:50 +02:00
Guillaume ARM
346bbd851a feat: Subject correction 2023-07-05 16:28:43 +02:00
Guillaume ARM
4fede0db41 feat: Observable correction 2023-07-05 16:28:33 +02:00
Guillaume ARM
30f215ee87 feat: correction Subscriber 2023-07-05 15:58:39 +02:00
Guillaume ARM
964afddc90 feat: correction Subscription 2023-07-05 15:23:29 +02:00
9 changed files with 241 additions and 43 deletions

View File

@ -3,6 +3,7 @@
*/ */
import { Observer } from './Observer'; import { Observer } from './Observer';
import { Subscriber } from './Subscriber';
import { Subscription, TeardownLogic } from './Subscription'; import { Subscription, TeardownLogic } from './Subscription';
type SubscribeFunction<T> = (obs: Partial<Observer<T>>) => Subscription; type SubscribeFunction<T> = (obs: Partial<Observer<T>>) => Subscription;
@ -21,13 +22,17 @@ export interface ObservableConstructor {
* Implementation * Implementation
*/ */
export class Observable<T> implements IObservable<T> { export class Observable<T> implements IObservable<T> {
constructor(subscriptionFactory: SubscriptionFactory<T>) { constructor(private subscriptionFactory: SubscriptionFactory<T>) {}
void subscriptionFactory;
throw new Error('TODO');
}
subscribe(obs: Partial<Observer<T>>): Subscription { subscribe(obs: Partial<Observer<T>>): Subscription {
void obs; const subscriber = new Subscriber(obs);
throw new Error('TODO');
try {
subscriber.add(this.subscriptionFactory(subscriber));
} catch (e) {
subscriber.error(e);
}
return subscriber;
} }
} }

View File

@ -1,5 +1,6 @@
import { IObservable } from './Observable'; import { IObservable } from './Observable';
import { Observer } from './Observer'; import { Observer } from './Observer';
import { Subscriber } from './Subscriber';
import { Subscription } from './Subscription'; import { Subscription } from './Subscription';
/** /**
@ -16,24 +17,47 @@ export interface SubjectConstructor {
* Implementation * Implementation
*/ */
export class Subject<T> implements IObservable<T>, Observer<T> { export class Subject<T> implements IObservable<T>, Observer<T> {
private closed = false;
private closedByError = false;
private lastError: unknown | undefined;
private subscribers: Subscriber<T>[] = [];
public constructor() {} public constructor() {}
private isClosed(): boolean {
return this.closed || this.closedByError;
}
public next(value: T): void { public next(value: T): void {
void value; if (this.isClosed()) return;
throw new Error('TODO'); this.subscribers.forEach(s => s.next(value));
} }
public error(err: unknown): void { public error(err: unknown): void {
void err; if (this.isClosed()) return;
throw new Error('TODO'); this.closedByError = true;
this.lastError = err;
this.subscribers.forEach(s => s.error(err));
} }
public complete(): void { public complete(): void {
throw new Error('TODO'); if (this.isClosed()) return;
this.closed = true;
this.subscribers.forEach(s => s.complete());
} }
public subscribe(obs: Partial<Observer<T>>): Subscription { public subscribe(obs: Partial<Observer<T>>): Subscription {
void obs; const subscriber = new Subscriber(obs);
throw new Error('TODO');
if (this.closed) {
subscriber.complete();
} else if (this.closedByError) {
subscriber.error(this.lastError);
} else {
this.subscribers.push(subscriber);
}
return subscriber;
} }
} }

View File

@ -14,26 +14,37 @@ export interface SubscriberConstructor {
/** /**
* Implementation * Implementation
*/ */
const noop = () => {};
export class Subscriber<T> extends Subscription implements Observer<T> { export class Subscriber<T> extends Subscription implements Observer<T> {
private observer: Observer<T>;
public constructor(obs: Partial<Observer<T>>) { public constructor(obs: Partial<Observer<T>>) {
super(); super();
void obs; this.observer = {
throw new Error('TODO'); next: obs.next ?? noop,
error: obs.error ?? noop,
complete: obs.complete ?? noop,
};
} }
/* Observer implementation */ /* Observer implementation */
public next(value: T): void { public next(value: T): void {
void value; if (this.closed) return;
throw new Error('TODO'); this.observer.next(value);
} }
public error(err: unknown): void { public error(err: unknown): void {
void err; if (this.closed) return;
throw new Error('TODO'); this.observer.error(err);
this.unsubscribe();
} }
public complete(): void { public complete(): void {
throw new Error('TODO'); if (this.closed) return;
this.observer.complete();
this.unsubscribe();
} }
} }

View File

@ -17,29 +17,47 @@ export interface SubscriptionConstructor {
export type TeardownLogic = Subscription | Unsubscribable | Teardown | void; export type TeardownLogic = Subscription | Unsubscribable | Teardown | void;
const noop = () => {};
const executeTeardownLogic = (tl: TeardownLogic): void => {
if (typeof tl === 'function') {
tl();
return;
}
if (tl) {
tl.unsubscribe();
}
};
/** /**
* Implementation * Implementation
*/ */
export class Subscription implements ISubscription { export class Subscription implements ISubscription {
private teardowns: TeardownLogic[] = [];
public closed = false; public closed = false;
public constructor(action?: Teardown) { public constructor(private action: Teardown = noop) {}
void action;
throw new Error('TODO');
}
public unsubscribe(): void { public unsubscribe(): void {
throw new Error('TODO'); if (this.closed) return;
this.closed = true;
executeTeardownLogic(this.action);
this.teardowns.forEach(executeTeardownLogic);
} }
public add(tl: TeardownLogic): void { public add(tl: TeardownLogic): void {
void tl; if (this.closed) {
throw new Error('TODO'); executeTeardownLogic(tl);
} else {
this.teardowns.push(tl);
}
} }
public remove(tl: TeardownLogic): void { public remove(tl: TeardownLogic): void {
void tl; this.teardowns = this.teardowns.filter(t => t !== tl);
throw new Error('TODO');
} }
} }

View File

@ -2,7 +2,31 @@ import { Observable } from '../Observable';
import { OperatorFunction } from './types'; import { OperatorFunction } from './types';
export function finalize<T>(callback: () => void): OperatorFunction<T, T> { export function finalize<T>(callback: () => void): OperatorFunction<T, T> {
void Observable; return source => {
void callback; return new Observable(observer => {
throw new Error('TODO'); let finalized = false;
const finalizeFn = () => {
if (!finalized) {
finalized = true;
callback();
}
};
const complete = () => {
observer.complete();
finalizeFn();
};
const sub = source.subscribe({
next: val => observer.next(val),
error: err => observer.error(err),
complete: () => complete(),
});
sub.add(finalizeFn);
return sub;
});
};
} }

View File

@ -2,7 +2,22 @@ import { Observable } from '../Observable';
import { OperatorFunction } from './types'; import { OperatorFunction } from './types';
export function map<T, R>(project: (value: T) => R): OperatorFunction<T, R> { export function map<T, R>(project: (value: T) => R): OperatorFunction<T, R> {
void Observable; return source => {
void project; return new Observable(observer => {
throw new Error('TODO'); const next = (value: T) => {
try {
const result = project(value);
observer.next(result);
} catch (e) {
observer.error(e);
}
};
return source.subscribe({
next,
error: err => observer.error(err),
complete: () => observer.complete(),
});
});
};
} }

View File

@ -1,8 +1,37 @@
import { Observable } from '../Observable'; import { Observable } from '../Observable';
import { Subscription } from '../Subscription';
import { OperatorFunction } from './types'; import { OperatorFunction } from './types';
export function retry<T>(limit?: number): OperatorFunction<T, T> { export function retry<T>(limit?: number): OperatorFunction<T, T> {
void Observable; return source => {
void limit; return new Observable(observer => {
throw new Error('TODO'); let count: number | undefined = limit;
let sub: Subscription = new Subscription();
const makeSubscription = (onError: (err: unknown) => void) => {
sub = source.subscribe({
next: value => observer.next(value),
error: onError,
complete: () => observer.complete(),
});
};
const error = (err: unknown) => {
if (count === undefined) {
makeSubscription(error);
} else if (count > 0) {
count = count - 1;
makeSubscription(error);
} else {
observer.error(err);
}
};
makeSubscription(error);
return () => {
sub.unsubscribe();
};
});
};
} }

View File

@ -2,7 +2,24 @@ import { Observable } from '../Observable';
import { OperatorFunction } from './types'; import { OperatorFunction } from './types';
export function scan<T, R>(accumulator: (acc: R, value: T) => R, seed: R): OperatorFunction<T, R> { export function scan<T, R>(accumulator: (acc: R, value: T) => R, seed: R): OperatorFunction<T, R> {
void Observable; return source => {
void accumulator, seed; return new Observable(observer => {
throw new Error('TODO'); let acc: R = seed;
const next = (value: T) => {
try {
acc = accumulator(acc, value);
observer.next(acc);
} catch (e) {
observer.error(e);
}
};
return source.subscribe({
next,
error: err => observer.error(err),
complete: () => observer.complete(),
});
});
};
} }

View File

@ -1,7 +1,62 @@
import { Observable } from '../Observable'; import { Observable } from '../Observable';
import { Subscription } from '../Subscription';
import { OperatorFunction } from './types'; import { OperatorFunction } from './types';
export function switchMap<T, R>(project: (value: T) => Observable<R>): OperatorFunction<T, R> { export function switchMap<T, R>(project: (value: T) => Observable<R>): OperatorFunction<T, R> {
void project; return source => {
throw new Error('TODO'); return new Observable(obs => {
let sourceIsCompleted = false;
let innerSubscription = new Subscription();
const innerNext = (resultValue: R) => {
obs.next(resultValue);
};
const innerError = (err: unknown) => {
obs.error(err);
};
const innerComplete = () => {
if (sourceIsCompleted) {
obs.complete();
}
};
const next = (value: T) => {
innerSubscription.unsubscribe();
try {
innerSubscription = project(value).subscribe({
next: innerNext,
error: innerError,
complete: innerComplete,
});
} catch (e) {
obs.error(e);
}
};
const error = (err: unknown) => {
obs.error(err);
};
const complete = () => {
sourceIsCompleted = true;
if (innerSubscription.closed) {
obs.complete();
}
};
const mainSub = source.subscribe({
next,
error,
complete,
});
return () => {
mainSub.unsubscribe();
innerSubscription.unsubscribe();
};
});
};
} }