According to original Documentation
the pipable operator is that function take observables as a input and it returns another observable .previous observable stays unmodified.
pipe(...fns: UnaryFunction<any, any>[]): UnaryFunction<any, any>
Pipes let you combine multiple functions into a single function. The pipe() function takes as its arguments the functions you want to combine, and returns a new function that, when executed, runs the composed functions in sequence.
The purpose of the PIPE() function is to lump together all the functions that take, and return observable. It takes an observable initially, then that observable is used throughout the pipe() function by each function used inside of it.
First function takes the observable, processes it, modify its value, and passes to the next function, then next function takes the output observable of the first function, processes it, and passes to the next function, then it goes on until all the functions inside of pipe() function use that observable, finally you have the processed observable. At the end you can execute the observable with subscribe() function to extract the value out of it. Remember, the values in the original observable are not changed.!!
Observable.pipe(function1(), function2(), function3(), function4())
And remember You need to call subscribe on the observable to execute the request.
Angular and RxJS
First note the difference between concepts pipe in the context of Angular and RxJS
We have pipes concept in Angular and pipe() function in RxJS.
-
Pipes in Angular: A pipe takes in data as input and transforms it to the desired output https://angular.io/guide/pipes
-
pipe() function in RxJS: You can use pipes to link operators together. Pipes let you combine multiple functions into a single function.
The pipe() function takes as its arguments the functions you want to combine, and returns a new function that, when executed, runs the composed functions in sequence. https://angular.io/guide/rx-library (search for pipes in this URL, you can find the same)
pipe() is a function/method that is used to chain multiple RxJS operators while map() and filter() are operators that operate and transform the values of an Observable (sequence of values). They are similar to the map() and filter() methods of JavaScript arrays.
What does this pipe() function exactly mean in this case?
return (
this.http.get <
Hero >
url.pipe(
tap((_) => this.log(`fetched hero id=${id}`)),
catchError(this.handleError < Hero > `getHero id=${id}`),
)
)
The pipe() in above example is the pipe() method of RxJS 5.5 (RxJS is the default for all Angular apps).
The pipe() function takes as its arguments the functions you want to combine, and returns a new function that, when executed, runs the composed functions in sequence.
tap() – RxJS tap operator will look at the Observable value and do something with that value. In other words, after a successful API request, the tap() operator will do any function you want it to perform with the response. In the example, it will just log that string.
catchError() – catchError does exactly the same thing but with error response. If you want to throw an error or want to call some function if you get an error, you can do it here. In the example, it will call handleError() and inside that, it will just log that string.
RxJS Operators are functions that build on the observables foundation to enable sophisticated manipulation of collections.
For example, RxJS defines operators such as map(), filter(), concat(), and flatMap().
You can use pipes to link operators together. Pipes let you combine multiple functions into a single function.
It decouples the streaming operations (map, filter, reduce…) from the core functionality(subscribing, piping). By piping operations instead of chaining, it doesn’t pollute the prototype of Observable making it easier to do tree shaking.
Unlike map, which is an operator, pipe is a method on Observable which is used for composing operators. pipe was introduced to RxJS in v5.5 to take code that looked like this:
of(1, 2, 3)
.map((x) => x + 1)
.filter((x) => x > 2)
// and turn it into this
of(1, 2, 3).pipe(
map((x) => x + 1),
filter((x) => x > 2),
)
pipe offers the following benefits:
It cleans up Observable.prototype by removing operators
It makes the RxJS library more tree-shakeable
It makes it easier to write and use third-party operators (since you don’t have to worry about patching Observable.prototype).
Another example of pipe usage in RxJS.
number$.pipe(
map((n) => n * n),
filter((n) => n % 2 === 0),
)
// We can also write it in a different way:
const { pipe } = rxjs
const transformNumbers = pipe(
map((x) => x * x),
filter((x) => x % 2 === 0),
)
And the result is exactly the same.
The pipe function in RxJS takes a number of functions and composes them by passing the result of a function as an argument to another function. Actually, both map and filter will return functions. We’re not composing map and filter themselves but rather the functions returned by invoking them.
You can check out how RxJS implements pipe function [here]
create an Observable using the of() function from a sequence of 1 to 10 numbers and use the pipe() method to apply the filter() operator on the sequence:
const ob$: Observable<number> = of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10).pipe(
filter((v) => v % 2 === 0),
map((v) => v * 10),
)
Finally, let’s run this by subscribing to the returned Observable:
ob$.subscribe(
(next) => console.log("next:", next),
(err) => console.log("error:", err),
() => console.log("Completed"),
)
We apply both the filter() and map() operators, filter() will be executed first then map(). This will produce the following output:
next: 20
next: 40
next: 60
next: 80
next: 100
Completed
I want to retry an api call 10 times (waiting one second since it fails until next execution) and if this 10 times it fails, then I will execute a function, this is my aproach:
import { Observable, throwError, EMPTY } from "rxjs"
import { concatMap, delay, retryWhen, take } from "rxjs/operators"
handleErrorWithRetry(error, req: HttpRequest<any>, next: HttpHandler): Observable<any> {
if (error.status === 0) {
return next.handle(req).pipe(
retryWhen((errors) => {
return errors.pipe(
delay(1000),
take(10), // Number of retries
concatMap(throwError), // Let the error bubble up again
)
),
catchError((err) => this.checkConnection(err))),
);
}
}
checkConnection(error): Observable<any> {
console.log(error)
return EMPTY;
}
https://stackoverflow.com/questions/48668701/what-is-pipe-for-in-rxjs/56881298