Compare commits
6 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
ada576a77a | ||
|
|
44f4a97b95 | ||
|
|
346bbd851a | ||
|
|
4fede0db41 | ||
|
|
30f215ee87 | ||
|
|
964afddc90 |
@ -3,6 +3,7 @@
|
||||
*/
|
||||
|
||||
import { Observer } from './Observer';
|
||||
import { Subscriber } from './Subscriber';
|
||||
import { Subscription, TeardownLogic } from './Subscription';
|
||||
|
||||
type SubscribeFunction<T> = (obs: Partial<Observer<T>>) => Subscription;
|
||||
@ -21,13 +22,17 @@ export interface ObservableConstructor {
|
||||
* Implementation
|
||||
*/
|
||||
export class Observable<T> implements IObservable<T> {
|
||||
constructor(subscriptionFactory: SubscriptionFactory<T>) {
|
||||
void subscriptionFactory;
|
||||
throw new Error('TODO');
|
||||
}
|
||||
constructor(private subscriptionFactory: SubscriptionFactory<T>) {}
|
||||
|
||||
subscribe(obs: Partial<Observer<T>>): Subscription {
|
||||
void obs;
|
||||
throw new Error('TODO');
|
||||
const subscriber = new Subscriber(obs);
|
||||
|
||||
try {
|
||||
subscriber.add(this.subscriptionFactory(subscriber));
|
||||
} catch (e) {
|
||||
subscriber.error(e);
|
||||
}
|
||||
|
||||
return subscriber;
|
||||
}
|
||||
}
|
||||
|
||||
@ -1,5 +1,6 @@
|
||||
import { IObservable } from './Observable';
|
||||
import { Observer } from './Observer';
|
||||
import { Subscriber } from './Subscriber';
|
||||
import { Subscription } from './Subscription';
|
||||
|
||||
/**
|
||||
@ -16,24 +17,47 @@ export interface SubjectConstructor {
|
||||
* Implementation
|
||||
*/
|
||||
export class Subject<T> implements IObservable<T>, Observer<T> {
|
||||
private closed = false;
|
||||
private closedByError = false;
|
||||
private lastError: unknown | undefined;
|
||||
|
||||
private subscribers: Subscriber<T>[] = [];
|
||||
|
||||
public constructor() {}
|
||||
|
||||
private isClosed(): boolean {
|
||||
return this.closed || this.closedByError;
|
||||
}
|
||||
|
||||
public next(value: T): void {
|
||||
void value;
|
||||
throw new Error('TODO');
|
||||
if (this.isClosed()) return;
|
||||
this.subscribers.forEach(s => s.next(value));
|
||||
}
|
||||
|
||||
public error(err: unknown): void {
|
||||
void err;
|
||||
throw new Error('TODO');
|
||||
if (this.isClosed()) return;
|
||||
this.closedByError = true;
|
||||
this.lastError = err;
|
||||
this.subscribers.forEach(s => s.error(err));
|
||||
}
|
||||
|
||||
public complete(): void {
|
||||
throw new Error('TODO');
|
||||
if (this.isClosed()) return;
|
||||
this.closed = true;
|
||||
this.subscribers.forEach(s => s.complete());
|
||||
}
|
||||
|
||||
public subscribe(obs: Partial<Observer<T>>): Subscription {
|
||||
void obs;
|
||||
throw new Error('TODO');
|
||||
const subscriber = new Subscriber(obs);
|
||||
|
||||
if (this.closed) {
|
||||
subscriber.complete();
|
||||
} else if (this.closedByError) {
|
||||
subscriber.error(this.lastError);
|
||||
} else {
|
||||
this.subscribers.push(subscriber);
|
||||
}
|
||||
|
||||
return subscriber;
|
||||
}
|
||||
}
|
||||
|
||||
@ -14,26 +14,37 @@ export interface SubscriberConstructor {
|
||||
/**
|
||||
* Implementation
|
||||
*/
|
||||
|
||||
const noop = () => {};
|
||||
|
||||
export class Subscriber<T> extends Subscription implements Observer<T> {
|
||||
private observer: Observer<T>;
|
||||
|
||||
public constructor(obs: Partial<Observer<T>>) {
|
||||
super();
|
||||
|
||||
void obs;
|
||||
throw new Error('TODO');
|
||||
this.observer = {
|
||||
next: obs.next ?? noop,
|
||||
error: obs.error ?? noop,
|
||||
complete: obs.complete ?? noop,
|
||||
};
|
||||
}
|
||||
|
||||
/* Observer implementation */
|
||||
public next(value: T): void {
|
||||
void value;
|
||||
throw new Error('TODO');
|
||||
if (this.closed) return;
|
||||
this.observer.next(value);
|
||||
}
|
||||
|
||||
public error(err: unknown): void {
|
||||
void err;
|
||||
throw new Error('TODO');
|
||||
if (this.closed) return;
|
||||
this.observer.error(err);
|
||||
this.unsubscribe();
|
||||
}
|
||||
|
||||
public complete(): void {
|
||||
throw new Error('TODO');
|
||||
if (this.closed) return;
|
||||
this.observer.complete();
|
||||
this.unsubscribe();
|
||||
}
|
||||
}
|
||||
|
||||
@ -17,29 +17,47 @@ export interface SubscriptionConstructor {
|
||||
|
||||
export type TeardownLogic = Subscription | Unsubscribable | Teardown | void;
|
||||
|
||||
const noop = () => {};
|
||||
|
||||
const executeTeardownLogic = (tl: TeardownLogic): void => {
|
||||
if (typeof tl === 'function') {
|
||||
tl();
|
||||
return;
|
||||
}
|
||||
|
||||
if (tl) {
|
||||
tl.unsubscribe();
|
||||
}
|
||||
};
|
||||
|
||||
/**
|
||||
* Implementation
|
||||
*/
|
||||
|
||||
export class Subscription implements ISubscription {
|
||||
private teardowns: TeardownLogic[] = [];
|
||||
public closed = false;
|
||||
|
||||
public constructor(action?: Teardown) {
|
||||
void action;
|
||||
throw new Error('TODO');
|
||||
}
|
||||
public constructor(private action: Teardown = noop) {}
|
||||
|
||||
public unsubscribe(): void {
|
||||
throw new Error('TODO');
|
||||
if (this.closed) return;
|
||||
|
||||
this.closed = true;
|
||||
|
||||
executeTeardownLogic(this.action);
|
||||
this.teardowns.forEach(executeTeardownLogic);
|
||||
}
|
||||
|
||||
public add(tl: TeardownLogic): void {
|
||||
void tl;
|
||||
throw new Error('TODO');
|
||||
if (this.closed) {
|
||||
executeTeardownLogic(tl);
|
||||
} else {
|
||||
this.teardowns.push(tl);
|
||||
}
|
||||
}
|
||||
|
||||
public remove(tl: TeardownLogic): void {
|
||||
void tl;
|
||||
throw new Error('TODO');
|
||||
this.teardowns = this.teardowns.filter(t => t !== tl);
|
||||
}
|
||||
}
|
||||
|
||||
@ -2,7 +2,31 @@ import { Observable } from '../Observable';
|
||||
import { OperatorFunction } from './types';
|
||||
|
||||
export function finalize<T>(callback: () => void): OperatorFunction<T, T> {
|
||||
void Observable;
|
||||
void callback;
|
||||
throw new Error('TODO');
|
||||
return source => {
|
||||
return new Observable(observer => {
|
||||
let finalized = false;
|
||||
|
||||
const finalizeFn = () => {
|
||||
if (!finalized) {
|
||||
finalized = true;
|
||||
callback();
|
||||
}
|
||||
};
|
||||
|
||||
const complete = () => {
|
||||
observer.complete();
|
||||
finalizeFn();
|
||||
};
|
||||
|
||||
const sub = source.subscribe({
|
||||
next: val => observer.next(val),
|
||||
error: err => observer.error(err),
|
||||
complete: () => complete(),
|
||||
});
|
||||
|
||||
sub.add(finalizeFn);
|
||||
|
||||
return sub;
|
||||
});
|
||||
};
|
||||
}
|
||||
|
||||
@ -2,7 +2,22 @@ import { Observable } from '../Observable';
|
||||
import { OperatorFunction } from './types';
|
||||
|
||||
export function map<T, R>(project: (value: T) => R): OperatorFunction<T, R> {
|
||||
void Observable;
|
||||
void project;
|
||||
throw new Error('TODO');
|
||||
return source => {
|
||||
return new Observable(observer => {
|
||||
const next = (value: T) => {
|
||||
try {
|
||||
const result = project(value);
|
||||
observer.next(result);
|
||||
} catch (e) {
|
||||
observer.error(e);
|
||||
}
|
||||
};
|
||||
|
||||
return source.subscribe({
|
||||
next,
|
||||
error: err => observer.error(err),
|
||||
complete: () => observer.complete(),
|
||||
});
|
||||
});
|
||||
};
|
||||
}
|
||||
|
||||
@ -1,8 +1,37 @@
|
||||
import { Observable } from '../Observable';
|
||||
import { Subscription } from '../Subscription';
|
||||
import { OperatorFunction } from './types';
|
||||
|
||||
export function retry<T>(limit?: number): OperatorFunction<T, T> {
|
||||
void Observable;
|
||||
void limit;
|
||||
throw new Error('TODO');
|
||||
return source => {
|
||||
return new Observable(observer => {
|
||||
let count: number | undefined = limit;
|
||||
let sub: Subscription = new Subscription();
|
||||
|
||||
const makeSubscription = (onError: (err: unknown) => void) => {
|
||||
sub = source.subscribe({
|
||||
next: value => observer.next(value),
|
||||
error: onError,
|
||||
complete: () => observer.complete(),
|
||||
});
|
||||
};
|
||||
|
||||
const error = (err: unknown) => {
|
||||
if (count === undefined) {
|
||||
makeSubscription(error);
|
||||
} else if (count > 0) {
|
||||
count = count - 1;
|
||||
makeSubscription(error);
|
||||
} else {
|
||||
observer.error(err);
|
||||
}
|
||||
};
|
||||
|
||||
makeSubscription(error);
|
||||
|
||||
return () => {
|
||||
sub.unsubscribe();
|
||||
};
|
||||
});
|
||||
};
|
||||
}
|
||||
|
||||
@ -2,7 +2,24 @@ import { Observable } from '../Observable';
|
||||
import { OperatorFunction } from './types';
|
||||
|
||||
export function scan<T, R>(accumulator: (acc: R, value: T) => R, seed: R): OperatorFunction<T, R> {
|
||||
void Observable;
|
||||
void accumulator, seed;
|
||||
throw new Error('TODO');
|
||||
return source => {
|
||||
return new Observable(observer => {
|
||||
let acc: R = seed;
|
||||
|
||||
const next = (value: T) => {
|
||||
try {
|
||||
acc = accumulator(acc, value);
|
||||
observer.next(acc);
|
||||
} catch (e) {
|
||||
observer.error(e);
|
||||
}
|
||||
};
|
||||
|
||||
return source.subscribe({
|
||||
next,
|
||||
error: err => observer.error(err),
|
||||
complete: () => observer.complete(),
|
||||
});
|
||||
});
|
||||
};
|
||||
}
|
||||
|
||||
@ -1,7 +1,62 @@
|
||||
import { Observable } from '../Observable';
|
||||
import { Subscription } from '../Subscription';
|
||||
import { OperatorFunction } from './types';
|
||||
|
||||
export function switchMap<T, R>(project: (value: T) => Observable<R>): OperatorFunction<T, R> {
|
||||
void project;
|
||||
throw new Error('TODO');
|
||||
return source => {
|
||||
return new Observable(obs => {
|
||||
let sourceIsCompleted = false;
|
||||
let innerSubscription = new Subscription();
|
||||
|
||||
const innerNext = (resultValue: R) => {
|
||||
obs.next(resultValue);
|
||||
};
|
||||
|
||||
const innerError = (err: unknown) => {
|
||||
obs.error(err);
|
||||
};
|
||||
|
||||
const innerComplete = () => {
|
||||
if (sourceIsCompleted) {
|
||||
obs.complete();
|
||||
}
|
||||
};
|
||||
|
||||
const next = (value: T) => {
|
||||
innerSubscription.unsubscribe();
|
||||
try {
|
||||
innerSubscription = project(value).subscribe({
|
||||
next: innerNext,
|
||||
error: innerError,
|
||||
complete: innerComplete,
|
||||
});
|
||||
} catch (e) {
|
||||
obs.error(e);
|
||||
}
|
||||
};
|
||||
|
||||
const error = (err: unknown) => {
|
||||
obs.error(err);
|
||||
};
|
||||
|
||||
const complete = () => {
|
||||
sourceIsCompleted = true;
|
||||
|
||||
if (innerSubscription.closed) {
|
||||
obs.complete();
|
||||
}
|
||||
};
|
||||
|
||||
const mainSub = source.subscribe({
|
||||
next,
|
||||
error,
|
||||
complete,
|
||||
});
|
||||
|
||||
return () => {
|
||||
mainSub.unsubscribe();
|
||||
innerSubscription.unsubscribe();
|
||||
};
|
||||
});
|
||||
};
|
||||
}
|
||||
|
||||
Loading…
Reference in New Issue
Block a user