feat: correction for other operators: scan, switchMap, retry and finalize
This commit is contained in:
parent
44f4a97b95
commit
ada576a77a
@ -2,7 +2,31 @@ import { Observable } from '../Observable';
|
||||
import { OperatorFunction } from './types';
|
||||
|
||||
export function finalize<T>(callback: () => void): OperatorFunction<T, T> {
|
||||
void Observable;
|
||||
void callback;
|
||||
throw new Error('TODO');
|
||||
return source => {
|
||||
return new Observable(observer => {
|
||||
let finalized = false;
|
||||
|
||||
const finalizeFn = () => {
|
||||
if (!finalized) {
|
||||
finalized = true;
|
||||
callback();
|
||||
}
|
||||
};
|
||||
|
||||
const complete = () => {
|
||||
observer.complete();
|
||||
finalizeFn();
|
||||
};
|
||||
|
||||
const sub = source.subscribe({
|
||||
next: val => observer.next(val),
|
||||
error: err => observer.error(err),
|
||||
complete: () => complete(),
|
||||
});
|
||||
|
||||
sub.add(finalizeFn);
|
||||
|
||||
return sub;
|
||||
});
|
||||
};
|
||||
}
|
||||
|
||||
@ -1,8 +1,37 @@
|
||||
import { Observable } from '../Observable';
|
||||
import { Subscription } from '../Subscription';
|
||||
import { OperatorFunction } from './types';
|
||||
|
||||
export function retry<T>(limit?: number): OperatorFunction<T, T> {
|
||||
void Observable;
|
||||
void limit;
|
||||
throw new Error('TODO');
|
||||
return source => {
|
||||
return new Observable(observer => {
|
||||
let count: number | undefined = limit;
|
||||
let sub: Subscription = new Subscription();
|
||||
|
||||
const makeSubscription = (onError: (err: unknown) => void) => {
|
||||
sub = source.subscribe({
|
||||
next: value => observer.next(value),
|
||||
error: onError,
|
||||
complete: () => observer.complete(),
|
||||
});
|
||||
};
|
||||
|
||||
const error = (err: unknown) => {
|
||||
if (count === undefined) {
|
||||
makeSubscription(error);
|
||||
} else if (count > 0) {
|
||||
count = count - 1;
|
||||
makeSubscription(error);
|
||||
} else {
|
||||
observer.error(err);
|
||||
}
|
||||
};
|
||||
|
||||
makeSubscription(error);
|
||||
|
||||
return () => {
|
||||
sub.unsubscribe();
|
||||
};
|
||||
});
|
||||
};
|
||||
}
|
||||
|
||||
@ -2,7 +2,24 @@ import { Observable } from '../Observable';
|
||||
import { OperatorFunction } from './types';
|
||||
|
||||
export function scan<T, R>(accumulator: (acc: R, value: T) => R, seed: R): OperatorFunction<T, R> {
|
||||
void Observable;
|
||||
void accumulator, seed;
|
||||
throw new Error('TODO');
|
||||
return source => {
|
||||
return new Observable(observer => {
|
||||
let acc: R = seed;
|
||||
|
||||
const next = (value: T) => {
|
||||
try {
|
||||
acc = accumulator(acc, value);
|
||||
observer.next(acc);
|
||||
} catch (e) {
|
||||
observer.error(e);
|
||||
}
|
||||
};
|
||||
|
||||
return source.subscribe({
|
||||
next,
|
||||
error: err => observer.error(err),
|
||||
complete: () => observer.complete(),
|
||||
});
|
||||
});
|
||||
};
|
||||
}
|
||||
|
||||
@ -1,7 +1,62 @@
|
||||
import { Observable } from '../Observable';
|
||||
import { Subscription } from '../Subscription';
|
||||
import { OperatorFunction } from './types';
|
||||
|
||||
export function switchMap<T, R>(project: (value: T) => Observable<R>): OperatorFunction<T, R> {
|
||||
void project;
|
||||
throw new Error('TODO');
|
||||
return source => {
|
||||
return new Observable(obs => {
|
||||
let sourceIsCompleted = false;
|
||||
let innerSubscription = new Subscription();
|
||||
|
||||
const innerNext = (resultValue: R) => {
|
||||
obs.next(resultValue);
|
||||
};
|
||||
|
||||
const innerError = (err: unknown) => {
|
||||
obs.error(err);
|
||||
};
|
||||
|
||||
const innerComplete = () => {
|
||||
if (sourceIsCompleted) {
|
||||
obs.complete();
|
||||
}
|
||||
};
|
||||
|
||||
const next = (value: T) => {
|
||||
innerSubscription.unsubscribe();
|
||||
try {
|
||||
innerSubscription = project(value).subscribe({
|
||||
next: innerNext,
|
||||
error: innerError,
|
||||
complete: innerComplete,
|
||||
});
|
||||
} catch (e) {
|
||||
obs.error(e);
|
||||
}
|
||||
};
|
||||
|
||||
const error = (err: unknown) => {
|
||||
obs.error(err);
|
||||
};
|
||||
|
||||
const complete = () => {
|
||||
sourceIsCompleted = true;
|
||||
|
||||
if (innerSubscription.closed) {
|
||||
obs.complete();
|
||||
}
|
||||
};
|
||||
|
||||
const mainSub = source.subscribe({
|
||||
next,
|
||||
error,
|
||||
complete,
|
||||
});
|
||||
|
||||
return () => {
|
||||
mainSub.unsubscribe();
|
||||
innerSubscription.unsubscribe();
|
||||
};
|
||||
});
|
||||
};
|
||||
}
|
||||
|
||||
Loading…
Reference in New Issue
Block a user