Compare commits
6 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
ada576a77a | ||
|
|
44f4a97b95 | ||
|
|
346bbd851a | ||
|
|
4fede0db41 | ||
|
|
30f215ee87 | ||
|
|
964afddc90 |
@ -3,6 +3,7 @@
|
|||||||
*/
|
*/
|
||||||
|
|
||||||
import { Observer } from './Observer';
|
import { Observer } from './Observer';
|
||||||
|
import { Subscriber } from './Subscriber';
|
||||||
import { Subscription, TeardownLogic } from './Subscription';
|
import { Subscription, TeardownLogic } from './Subscription';
|
||||||
|
|
||||||
type SubscribeFunction<T> = (obs: Partial<Observer<T>>) => Subscription;
|
type SubscribeFunction<T> = (obs: Partial<Observer<T>>) => Subscription;
|
||||||
@ -21,13 +22,17 @@ export interface ObservableConstructor {
|
|||||||
* Implementation
|
* Implementation
|
||||||
*/
|
*/
|
||||||
export class Observable<T> implements IObservable<T> {
|
export class Observable<T> implements IObservable<T> {
|
||||||
constructor(subscriptionFactory: SubscriptionFactory<T>) {
|
constructor(private subscriptionFactory: SubscriptionFactory<T>) {}
|
||||||
void subscriptionFactory;
|
|
||||||
throw new Error('TODO');
|
|
||||||
}
|
|
||||||
|
|
||||||
subscribe(obs: Partial<Observer<T>>): Subscription {
|
subscribe(obs: Partial<Observer<T>>): Subscription {
|
||||||
void obs;
|
const subscriber = new Subscriber(obs);
|
||||||
throw new Error('TODO');
|
|
||||||
|
try {
|
||||||
|
subscriber.add(this.subscriptionFactory(subscriber));
|
||||||
|
} catch (e) {
|
||||||
|
subscriber.error(e);
|
||||||
|
}
|
||||||
|
|
||||||
|
return subscriber;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -1,5 +1,6 @@
|
|||||||
import { IObservable } from './Observable';
|
import { IObservable } from './Observable';
|
||||||
import { Observer } from './Observer';
|
import { Observer } from './Observer';
|
||||||
|
import { Subscriber } from './Subscriber';
|
||||||
import { Subscription } from './Subscription';
|
import { Subscription } from './Subscription';
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -16,24 +17,47 @@ export interface SubjectConstructor {
|
|||||||
* Implementation
|
* Implementation
|
||||||
*/
|
*/
|
||||||
export class Subject<T> implements IObservable<T>, Observer<T> {
|
export class Subject<T> implements IObservable<T>, Observer<T> {
|
||||||
|
private closed = false;
|
||||||
|
private closedByError = false;
|
||||||
|
private lastError: unknown | undefined;
|
||||||
|
|
||||||
|
private subscribers: Subscriber<T>[] = [];
|
||||||
|
|
||||||
public constructor() {}
|
public constructor() {}
|
||||||
|
|
||||||
|
private isClosed(): boolean {
|
||||||
|
return this.closed || this.closedByError;
|
||||||
|
}
|
||||||
|
|
||||||
public next(value: T): void {
|
public next(value: T): void {
|
||||||
void value;
|
if (this.isClosed()) return;
|
||||||
throw new Error('TODO');
|
this.subscribers.forEach(s => s.next(value));
|
||||||
}
|
}
|
||||||
|
|
||||||
public error(err: unknown): void {
|
public error(err: unknown): void {
|
||||||
void err;
|
if (this.isClosed()) return;
|
||||||
throw new Error('TODO');
|
this.closedByError = true;
|
||||||
|
this.lastError = err;
|
||||||
|
this.subscribers.forEach(s => s.error(err));
|
||||||
}
|
}
|
||||||
|
|
||||||
public complete(): void {
|
public complete(): void {
|
||||||
throw new Error('TODO');
|
if (this.isClosed()) return;
|
||||||
|
this.closed = true;
|
||||||
|
this.subscribers.forEach(s => s.complete());
|
||||||
}
|
}
|
||||||
|
|
||||||
public subscribe(obs: Partial<Observer<T>>): Subscription {
|
public subscribe(obs: Partial<Observer<T>>): Subscription {
|
||||||
void obs;
|
const subscriber = new Subscriber(obs);
|
||||||
throw new Error('TODO');
|
|
||||||
|
if (this.closed) {
|
||||||
|
subscriber.complete();
|
||||||
|
} else if (this.closedByError) {
|
||||||
|
subscriber.error(this.lastError);
|
||||||
|
} else {
|
||||||
|
this.subscribers.push(subscriber);
|
||||||
|
}
|
||||||
|
|
||||||
|
return subscriber;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -14,26 +14,37 @@ export interface SubscriberConstructor {
|
|||||||
/**
|
/**
|
||||||
* Implementation
|
* Implementation
|
||||||
*/
|
*/
|
||||||
|
|
||||||
|
const noop = () => {};
|
||||||
|
|
||||||
export class Subscriber<T> extends Subscription implements Observer<T> {
|
export class Subscriber<T> extends Subscription implements Observer<T> {
|
||||||
|
private observer: Observer<T>;
|
||||||
|
|
||||||
public constructor(obs: Partial<Observer<T>>) {
|
public constructor(obs: Partial<Observer<T>>) {
|
||||||
super();
|
super();
|
||||||
|
|
||||||
void obs;
|
this.observer = {
|
||||||
throw new Error('TODO');
|
next: obs.next ?? noop,
|
||||||
|
error: obs.error ?? noop,
|
||||||
|
complete: obs.complete ?? noop,
|
||||||
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Observer implementation */
|
/* Observer implementation */
|
||||||
public next(value: T): void {
|
public next(value: T): void {
|
||||||
void value;
|
if (this.closed) return;
|
||||||
throw new Error('TODO');
|
this.observer.next(value);
|
||||||
}
|
}
|
||||||
|
|
||||||
public error(err: unknown): void {
|
public error(err: unknown): void {
|
||||||
void err;
|
if (this.closed) return;
|
||||||
throw new Error('TODO');
|
this.observer.error(err);
|
||||||
|
this.unsubscribe();
|
||||||
}
|
}
|
||||||
|
|
||||||
public complete(): void {
|
public complete(): void {
|
||||||
throw new Error('TODO');
|
if (this.closed) return;
|
||||||
|
this.observer.complete();
|
||||||
|
this.unsubscribe();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -17,29 +17,47 @@ export interface SubscriptionConstructor {
|
|||||||
|
|
||||||
export type TeardownLogic = Subscription | Unsubscribable | Teardown | void;
|
export type TeardownLogic = Subscription | Unsubscribable | Teardown | void;
|
||||||
|
|
||||||
|
const noop = () => {};
|
||||||
|
|
||||||
|
const executeTeardownLogic = (tl: TeardownLogic): void => {
|
||||||
|
if (typeof tl === 'function') {
|
||||||
|
tl();
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (tl) {
|
||||||
|
tl.unsubscribe();
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Implementation
|
* Implementation
|
||||||
*/
|
*/
|
||||||
|
|
||||||
export class Subscription implements ISubscription {
|
export class Subscription implements ISubscription {
|
||||||
|
private teardowns: TeardownLogic[] = [];
|
||||||
public closed = false;
|
public closed = false;
|
||||||
|
|
||||||
public constructor(action?: Teardown) {
|
public constructor(private action: Teardown = noop) {}
|
||||||
void action;
|
|
||||||
throw new Error('TODO');
|
|
||||||
}
|
|
||||||
|
|
||||||
public unsubscribe(): void {
|
public unsubscribe(): void {
|
||||||
throw new Error('TODO');
|
if (this.closed) return;
|
||||||
|
|
||||||
|
this.closed = true;
|
||||||
|
|
||||||
|
executeTeardownLogic(this.action);
|
||||||
|
this.teardowns.forEach(executeTeardownLogic);
|
||||||
}
|
}
|
||||||
|
|
||||||
public add(tl: TeardownLogic): void {
|
public add(tl: TeardownLogic): void {
|
||||||
void tl;
|
if (this.closed) {
|
||||||
throw new Error('TODO');
|
executeTeardownLogic(tl);
|
||||||
|
} else {
|
||||||
|
this.teardowns.push(tl);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public remove(tl: TeardownLogic): void {
|
public remove(tl: TeardownLogic): void {
|
||||||
void tl;
|
this.teardowns = this.teardowns.filter(t => t !== tl);
|
||||||
throw new Error('TODO');
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -2,7 +2,31 @@ import { Observable } from '../Observable';
|
|||||||
import { OperatorFunction } from './types';
|
import { OperatorFunction } from './types';
|
||||||
|
|
||||||
export function finalize<T>(callback: () => void): OperatorFunction<T, T> {
|
export function finalize<T>(callback: () => void): OperatorFunction<T, T> {
|
||||||
void Observable;
|
return source => {
|
||||||
void callback;
|
return new Observable(observer => {
|
||||||
throw new Error('TODO');
|
let finalized = false;
|
||||||
|
|
||||||
|
const finalizeFn = () => {
|
||||||
|
if (!finalized) {
|
||||||
|
finalized = true;
|
||||||
|
callback();
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
const complete = () => {
|
||||||
|
observer.complete();
|
||||||
|
finalizeFn();
|
||||||
|
};
|
||||||
|
|
||||||
|
const sub = source.subscribe({
|
||||||
|
next: val => observer.next(val),
|
||||||
|
error: err => observer.error(err),
|
||||||
|
complete: () => complete(),
|
||||||
|
});
|
||||||
|
|
||||||
|
sub.add(finalizeFn);
|
||||||
|
|
||||||
|
return sub;
|
||||||
|
});
|
||||||
|
};
|
||||||
}
|
}
|
||||||
|
|||||||
@ -2,7 +2,22 @@ import { Observable } from '../Observable';
|
|||||||
import { OperatorFunction } from './types';
|
import { OperatorFunction } from './types';
|
||||||
|
|
||||||
export function map<T, R>(project: (value: T) => R): OperatorFunction<T, R> {
|
export function map<T, R>(project: (value: T) => R): OperatorFunction<T, R> {
|
||||||
void Observable;
|
return source => {
|
||||||
void project;
|
return new Observable(observer => {
|
||||||
throw new Error('TODO');
|
const next = (value: T) => {
|
||||||
|
try {
|
||||||
|
const result = project(value);
|
||||||
|
observer.next(result);
|
||||||
|
} catch (e) {
|
||||||
|
observer.error(e);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
return source.subscribe({
|
||||||
|
next,
|
||||||
|
error: err => observer.error(err),
|
||||||
|
complete: () => observer.complete(),
|
||||||
|
});
|
||||||
|
});
|
||||||
|
};
|
||||||
}
|
}
|
||||||
|
|||||||
@ -1,8 +1,37 @@
|
|||||||
import { Observable } from '../Observable';
|
import { Observable } from '../Observable';
|
||||||
|
import { Subscription } from '../Subscription';
|
||||||
import { OperatorFunction } from './types';
|
import { OperatorFunction } from './types';
|
||||||
|
|
||||||
export function retry<T>(limit?: number): OperatorFunction<T, T> {
|
export function retry<T>(limit?: number): OperatorFunction<T, T> {
|
||||||
void Observable;
|
return source => {
|
||||||
void limit;
|
return new Observable(observer => {
|
||||||
throw new Error('TODO');
|
let count: number | undefined = limit;
|
||||||
|
let sub: Subscription = new Subscription();
|
||||||
|
|
||||||
|
const makeSubscription = (onError: (err: unknown) => void) => {
|
||||||
|
sub = source.subscribe({
|
||||||
|
next: value => observer.next(value),
|
||||||
|
error: onError,
|
||||||
|
complete: () => observer.complete(),
|
||||||
|
});
|
||||||
|
};
|
||||||
|
|
||||||
|
const error = (err: unknown) => {
|
||||||
|
if (count === undefined) {
|
||||||
|
makeSubscription(error);
|
||||||
|
} else if (count > 0) {
|
||||||
|
count = count - 1;
|
||||||
|
makeSubscription(error);
|
||||||
|
} else {
|
||||||
|
observer.error(err);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
makeSubscription(error);
|
||||||
|
|
||||||
|
return () => {
|
||||||
|
sub.unsubscribe();
|
||||||
|
};
|
||||||
|
});
|
||||||
|
};
|
||||||
}
|
}
|
||||||
|
|||||||
@ -2,7 +2,24 @@ import { Observable } from '../Observable';
|
|||||||
import { OperatorFunction } from './types';
|
import { OperatorFunction } from './types';
|
||||||
|
|
||||||
export function scan<T, R>(accumulator: (acc: R, value: T) => R, seed: R): OperatorFunction<T, R> {
|
export function scan<T, R>(accumulator: (acc: R, value: T) => R, seed: R): OperatorFunction<T, R> {
|
||||||
void Observable;
|
return source => {
|
||||||
void accumulator, seed;
|
return new Observable(observer => {
|
||||||
throw new Error('TODO');
|
let acc: R = seed;
|
||||||
|
|
||||||
|
const next = (value: T) => {
|
||||||
|
try {
|
||||||
|
acc = accumulator(acc, value);
|
||||||
|
observer.next(acc);
|
||||||
|
} catch (e) {
|
||||||
|
observer.error(e);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
return source.subscribe({
|
||||||
|
next,
|
||||||
|
error: err => observer.error(err),
|
||||||
|
complete: () => observer.complete(),
|
||||||
|
});
|
||||||
|
});
|
||||||
|
};
|
||||||
}
|
}
|
||||||
|
|||||||
@ -1,7 +1,62 @@
|
|||||||
import { Observable } from '../Observable';
|
import { Observable } from '../Observable';
|
||||||
|
import { Subscription } from '../Subscription';
|
||||||
import { OperatorFunction } from './types';
|
import { OperatorFunction } from './types';
|
||||||
|
|
||||||
export function switchMap<T, R>(project: (value: T) => Observable<R>): OperatorFunction<T, R> {
|
export function switchMap<T, R>(project: (value: T) => Observable<R>): OperatorFunction<T, R> {
|
||||||
void project;
|
return source => {
|
||||||
throw new Error('TODO');
|
return new Observable(obs => {
|
||||||
|
let sourceIsCompleted = false;
|
||||||
|
let innerSubscription = new Subscription();
|
||||||
|
|
||||||
|
const innerNext = (resultValue: R) => {
|
||||||
|
obs.next(resultValue);
|
||||||
|
};
|
||||||
|
|
||||||
|
const innerError = (err: unknown) => {
|
||||||
|
obs.error(err);
|
||||||
|
};
|
||||||
|
|
||||||
|
const innerComplete = () => {
|
||||||
|
if (sourceIsCompleted) {
|
||||||
|
obs.complete();
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
const next = (value: T) => {
|
||||||
|
innerSubscription.unsubscribe();
|
||||||
|
try {
|
||||||
|
innerSubscription = project(value).subscribe({
|
||||||
|
next: innerNext,
|
||||||
|
error: innerError,
|
||||||
|
complete: innerComplete,
|
||||||
|
});
|
||||||
|
} catch (e) {
|
||||||
|
obs.error(e);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
const error = (err: unknown) => {
|
||||||
|
obs.error(err);
|
||||||
|
};
|
||||||
|
|
||||||
|
const complete = () => {
|
||||||
|
sourceIsCompleted = true;
|
||||||
|
|
||||||
|
if (innerSubscription.closed) {
|
||||||
|
obs.complete();
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
const mainSub = source.subscribe({
|
||||||
|
next,
|
||||||
|
error,
|
||||||
|
complete,
|
||||||
|
});
|
||||||
|
|
||||||
|
return () => {
|
||||||
|
mainSub.unsubscribe();
|
||||||
|
innerSubscription.unsubscribe();
|
||||||
|
};
|
||||||
|
});
|
||||||
|
};
|
||||||
}
|
}
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user