RxJS Operators

Modul #F6 - Angular - RxJS Operators

Ziele

  • Du kennst die verschiedenen RxJS-Operatoren und kannst diese anwenden.

Operators

RxJS bietet eine Vielzahl von Operatoren, die auf Observables angewendet werden können, um sie zu transformieren, zu filtern, zu kombinieren und vieles mehr. Folgend werden die wichtigsten vorgestellt.

Piping

Die pipe() ist ein wichtiger Operator, der es ermöglicht, eine Observable-Kette zu erstellen, indem verschiedene Operatoren nacheinander angewendet werden. Der Piping-Operator wird verwendet, um die Lesbarkeit und Wartbarkeit von Observable-Ketten zu verbessern, indem man sie in kleinere und leichter zu verstehende Abschnitte unterteilt. Wie eine map funktioniert, kannst du hier nachlesen.

1
2
3
4
5
6
7
8
9
import { of, map } from "rxjs";

of(1, 2, 3)
  .pipe(map((x) => x * x))
  .subscribe((v) => console.log(`value: ${v}`));

// value: 1
// value: 4
// value: 9

Creation Operators

from

Der from-Operator wandelt eine Reihe von Werten oder ein Iterable in ein Observable um. Er akzeptiert eine Quelle von Werten, wie beispielsweise ein Array oder eine Map, und gibt ein Observable zurück, das die Werte dieser Quelle nacheinander emittiert.

Der from-Operator ist daher primär für Werte geeignet, die aus verschiedenen dynamischen Quellen resultieren. Er splittet die Eingabe, was der of-Operator nicht tut. Mögliche Argumente sind dabei subscribable Objects, Promises, Observable-artige Objekte, Arrays, iterierbare Datenstrukturen oder ein Array-artige Objekte.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
import { from } from "rxjs";

const source = from([1, 2, 3, 4, 5]);

source.subscribe((value) => console.log(value));

// 1
// 2
// 3
// 4
// 5

of

Der of-Operator wandelt wie der from-Operator auch eine feste Anzahl von Werten in ein Observable um. Der Operator akzeptiert jedoch eine beliebige Anzahl von Argumenten und gibt ein Observable zurück, das diese Argumente in der Reihenfolge ihres Auftretens emittiert.

Dieser Operator ist besonders nützlich, um eine feste Anzahl von Werten zu emittieren, die bekannt sind, bevor das Observable abonniert wird. Wenn man eine Quelle von Werten hat, die dynamisch generiert werden, ist es wahrscheinlich sinnvoller, den from Operator zu verwenden.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
import { of } from "rxjs";

const source = of(1, 2, 3, 4, 5);

source.subscribe((value) => console.log(value));

// 1
// 2
// 3
// 4
// 5

Join Creation Operators

Info: In RxJS bezieht sich “Emission” auf den Wert, der von einem Observable emittiert / ausgegeben wird.

forkJoin

forkJoin ist ein Kombinationsoperator, der ein Array von Observables akzeptiert und wartet, bis alle Observables ihre Emissionen abgeschlossen haben, bevor er ein neues Observable zurückgibt. Das zurückgegebene Observable gibt ein Array von Werten zurück, das den letzten Wert jedes Observables enthält, das dem forkJoin Operator übergeben wurde.

Dieser Operator ist besonders nützlich, wenn man mehrere Quellen von Daten hat, die parallel verarbeitet werden können, und man auf alle Ergebnisse warten muss, bevor man mit der nächsten Phase der Verarbeitung fortfahren kann.

Zu beachten ist, dass der Operator fehlschlägt, wenn eines der übergebenen Observables einen Fehler emittiert, bevor es seine Emissionen abgeschlossen hat.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
import { forkJoin, of } from "rxjs";

const source1 = of("Hello");
const source2 = of("World!");

forkJoin([source1, source2]).subscribe(([value1, value2]) => {
  console.log(value1 + " " + value2);
});

// "Hello World!"

concat

Der concat-Operator kombiniert mehrere Observables sequentiell, indem er die Emissionen des ersten Observables vollständig verarbeitet, bevor er mit dem nächsten Observable fortfährt. Das bedeutet, dass das zweite Observable erst dann subscribed wird, wenn das erste Observable seine Emissionen vollständig abgeschlossen hat und so weiter.

Dieser Operator ist besonders nützlich, wenn man sicherstellen muss, dass bestimmte Aktionen in einer bestimmten Reihenfolge ausgeführt werden, oder wenn man die Emissionen von Observables in einer bestimmten Sequenz verarbeiten muss.

Zu beachten ist jedoch, dass der Operator blockierend ist und erst dann zur nächsten Phase der Verarbeitung übergeht, wenn das vorherige Observable abgeschlossen wurde.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
import { concat, of } from "rxjs";

const source1 = of("Hello");
const source2 = of("Dragon Warrior!");

concat(source1, source2).subscribe((value) => {
  console.log(value);
});

// "Hello"
// "Dragon Warrior!"

merge

Der merge-Operator kombiniert auch mehrere Observables, indem er die Emissionen aller Observables in einem einzigen Observable zusammenführt. Im Gegensatz zum concat-Operator führt der merge-Operator die Emissionen parallel aus, unabhängig davon, welches Observable die Emissionen zuerst ausgibt.

Beachten muss man jedoch, dass dieser Operator keine Garantie für die Reihenfolge der Emissionen gibt und dass es möglich ist, dass die Emissionen der Observables sich gegenseitig überschneiden.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
import { merge, of } from "rxjs";

const source1 = of("Hello");
const source2 = of("Dragon Warrior!");

merge(source1, source2).subscribe((value) => {
  console.log(value);
});

// "Hello"
// "Dragon Warrior!"

zip

Der zip Operator kombiniert die Emissionen mehrerer Observables zu einer einzelnen Emission. Dabei werden die Emissionen jedes Observables zu einer Gruppe kombiniert, und sobald alle Observables eine Emission abgegeben haben, wird diese Gruppe als Emission des resultierenden Observables ausgegeben.

Dieser Operator ist besonders nützlich, wenn man mehrere Datenströme kombiniert und sicherstellen muss, dass alle Daten synchronisiert sind. Man muss jedoch beachten, dass der Operator die Emissionen jedes Observables zu einer Gruppe kombiniert, was bedeutet, dass die Grösse jeder Emission vom Observable mit der geringsten Anzahl an Emissionen begrenzt wird.

Auch zu beachten ist, dass der Operator darauf wartet, dass alle Observables eine Emission abgeben, bevor er eine Emission ausgibt. Wenn ein Observable keine Emission abgibt, wird das resultierende Observable keine Emissionen ausgeben.

Der Hauptunterschied vom zip-Operator zum forkJoin-Operator ist, dass mit zip die reinkommenden Werte Schritt für Schritt kombiniert werden, während forkJoin die Werte erst emittiert, nachdem alle Input-Observables completed wurden.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
import { zip, of } from "rxjs";

const source1 = of("Hello");
const source2 = of("Dragon");
const source3 = of("Warrior");

zip(source1, source2, source3).subscribe((value) => {
  console.log(value);
});

// ["Hello", "Dragon", "Warrior"]

Transformation Operators

map

Der meist verwendete RxJS-Operator ist der map Operator. Dieser wird verwendet, um die Werte eines Observables zu transformieren. Es gibt viele Anwendungsfälle für diesen Operator, einschliesslich der Umwandlung von Daten in ein anderes Format, der Extraktion von bestimmten Werten aus einem Datenstrom oder der Anwendung einer Funktion auf jeden Wert in einem Observable.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
import { of } from "rxjs";
import { map } from "rxjs/operators";

const source = of(1, 2, 3, 4, 5);

const doubled = source.pipe(map((value) => value * 2));

doubled.subscribe((value) => console.log(value));

// 2
// 4
// 6
// 8
// 10

mergeAll/mergeMap

Es kann passieren, dass ein Observable eine Methode aufruft, die ein weiteres Observable zurückgibt. Dieser Verhalten kommt oft bei API-Abfragen vor. Somit hat man dann ein äusseres und inneres Observable. Um nun an die Daten zu gelangen, müsste man auf beide subscriben, was auch möglich ist:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
import { of, from, map } from "rxjs";

const getData = (param) => {
  return of(`retrieved new data with param ${param}`);
};

from([1, 2, 3, 4])
  .pipe(map((param) => getData(param)))
  .subscribe((value) => value.subscribe((data) => console.log(data)));

// retrieved new data with param 1
// retrieved new data with param 2
// retrieved new data with param 3
// retrieved new data with param 4

Das ist aber alles andere als optimal, weshalb man alternativ mergeMap benutzen kann. mergeMap ist eine Kombination von mergeAll und map. mergeAll erledigt die subscription auf das innere Observable, auf diese Weise muss man dann nicht mehr zweimal subscriben. Zudem merged es das innere Observable in das äussere.

Beispiel mit mergeAll:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
import { of, from, map, mergeAll } from "rxjs";

const getData = (param) => {
  return of(`retrieved new data with param ${param}`);
};

from([1, 2, 3, 4])
  .pipe(
    map((param) => getData(param)),
    mergeAll(),
  )
  .subscribe((value) => console.log(value));

// retrieved new data with param 1
// retrieved new data with param 2
// retrieved new data with param 3
// retrieved new data with param 4

Beispiel mit mergeMap:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
import { of, from, mergeMap } from "rxjs";

const getData = (param) => {
  return of(`retrieved new data with param ${param}`);
};

from([1, 2, 3, 4])
  .pipe(mergeMap((param) => getData(param)))
  .subscribe((value) => console.log(value));

// retrieved new data with param 1
// retrieved new data with param 2
// retrieved new data with param 3
// retrieved new data with param 4

Hinweis: Oft wird flatMap als Alias für mergeMap verwendet. Lass dich also nicht verwirren, wenn die Begriffe teilweise wechseln.

switchAll/switchMap

switchMap funktioniert ähnlich wie mergeMap. Der Operator subscribed auch auf das innere Observable. Auch hier ist switchMap eine Kombination, und zwar von switchAll und map. switchAll canceled die vorherige Subscription und subscribed auf die neue, wenn ein neues Observable reinkommt.

Hier kannst du nachlesen, wie delay funktioniert.

Beispiel mit switchAll:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
import { of, from, map, switchAll, delay } from "rxjs";

const getData = (param) => {
  return of(`retrieved new data with param ${param}`).pipe(delay(1000));
};

from([1, 2, 3, 4])
  .pipe(
    map((param) => getData(param)),
    switchAll(),
  )
  .subscribe((value) => console.log(value));

// retrieved new data with param 4

Beispiel mit switchMap:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
import { of, from, switchMap, delay } from "rxjs";

const getData = (param) => {
  return of(`retrieved new data with param ${param}`).pipe(delay(1000));
};

from([1, 2, 3, 4])
  .pipe(switchMap((param) => getData(param)))
  .subscribe((value) => console.log(value));

// retrieved new data with param 4

concatMap

Auch concatMap subscribed auf das innere Observable. Der Unterschied zu switchMap ist, dass concatMap nicht die Subscription cancelled, wenn ein neues Observable reinkommt. Es subscribed stattdessen solange nicht auf das nächste Observable, bis das momentane fertig ist.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
import { of, from } from "rxjs";
import { delay, concatMap } from "rxjs/operators";

const getData2 = (param) => {
  return of(`2 retrieved new data with param ${param}`).pipe(delay(1000));
};

from([1, 2, 3, 4])
  .pipe(concatMap((param) => getData2(param)))
  .subscribe((value) => console.log(value));

// [wait 1s]
// 2 retrieved new data with param 1
// [wait 1s]
// 2 retrieved new data with param 2
// 2 [wait 1s]
// 2 retrieved new data with param 3
// [wait 1s]
// 2 retrieved new data with param 4

groupBy

Die groupBy()-Methode ermöglicht es, ein Observable in mehrere Observables aufzuteilen, die auf der Grundlage eines bestimmten Schlüssels gruppiert sind.

Der Prozess besteht darin, die ursprünglichen Emissionen in Gruppen aufzuteilen, die jeweils auf einen eindeutigen Schlüssel abgebildet werden. Jede Gruppe ist ein eigenes Observable, das alle Emissionen enthält, die diesem Schlüssel zugeordnet sind.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import { from } from "rxjs";
import { groupBy, mergeMap, toArray } from "rxjs/operators";

interface Person {
  name: string;
  age: number;
}

const people: Person[] = from([
  { name: "John", age: 30 },
  { name: "Jane", age: 25 },
  { name: "Bob", age: 30 },
  { name: "Alice", age: 25 },
]);

people
  .pipe(
    groupBy((person) => person.age),
    mergeMap((group) => group.pipe(toArray())),
  )
  .subscribe((group) => console.log(group));

// [ { name: 'Jane', age: 25 }, { name: 'Alice', age: 25 } ]
// [ { name: 'John', age: 30 }, { name: 'Bob', age: 30 } ]

Filtering Operators

elementAt

elementAt() wird benutzt, um ein Element eines Observable an der angegebenen Indexposition auszugeben und das Observable zu schliessen. Der Operator gibt ein Observable zurück, das nur das angeforderte Element emittiert und dann vollständig geschlossen wird.

Man kann nach der Indexposition auch einen Defaultwert angeben, welcher zurückgegeben wird, wenn das Element an der angegebenen Indexposition nicht gefunden wurde.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
import { of } from "rxjs";
import { elementAt } from "rxjs/operators";

const source$ = of("A", "B", "C", "D", "E");

// ohne Defaultwert
source$
  .pipe(
    elementAt(2), // 'C'
  )
  .subscribe(console.log);

// mit Defaultwert
source$
  .pipe(
    elementAt(10, "F"), // 'F'
  )
  .subscribe(console.log);

filter

filter wird verwendet, um Observable-Elemente zu filtern, die nicht den Bedingungen entsprechen, die in der übergebenen Funktion definiert wurden. Die Filterfunktion gibt ein neues Observable zurück, das nur Elemente enthält, die die Bedingungen der Filterfunktion erfüllen.

1
2
3
4
5
6
7
8
import { of } from "rxjs";
import { filter } from "rxjs/operators";

const source = of(1, 2, 3, 4, 5);
const filtered = source.pipe(filter((x) => x % 2 === 0));
filtered.subscribe((x) => console.log(x));
// 2
// 4

first

Der first Operator gibt nur das erste Element aus einem Observable zurück. Man kann den Operator aber auch mittels einer Funktion ergänzen, um eine Bedienung zu schaffen, somit wird dann das erste Element zurückgegeben, welches auf diese Bedingung zutrifft. Nach der Funktion kann man auch einen Defaultwert hinzufügen, welcher zurückgegeben werden soll, wenn kein Element gefunden wird. Gibt man keinen Defaultwert an und die Bedingung trifft nicht zu, so wird ein fehler ausgegeben.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
import { of } from "rxjs";
import { first } from "rxjs/operators";

const source = of(1, 2, 3);

source.pipe(first()).subscribe((value) => console.log(value)); // 1

source.pipe(first((x) => x > 2)).subscribe((value) => console.log(value)); // 3

source.pipe(first((x) => x > 3, null)).subscribe((value) => console.log(value)); // null

last

Der last Operator funktioniert genau gleich wie der first Operator. Nur ist es so, dass hier immer das letzte Element zurückgegeben wird.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
import { of } from "rxjs";
import { first, last } from "rxjs/operators";

const source = of(1, 2, 3);

source.pipe(last()).subscribe((value) => console.log(value)); // 3

source.pipe(last((x) => x > 2)).subscribe((value) => console.log(value)); // 3

source.pipe(last((x) => x > 3, null)).subscribe((value) => console.log(value)); // null

skip

Der skip Operator gibt an, wie viele Ereignisse in einem Observable übersprungen werden sollen, bevor sie an einen Subscriber weitergegeben werden. Es wird verwendet, um den Startpunkt eines Observables zu verschieben.

1
2
3
4
5
6
7
8
import { of } from "rxjs";
import { skip } from "rxjs/operators";

const source = of(1, 2, 3, 4, 5);
const result = source.pipe(skip(3));
result.subscribe((value) => console.log(value));
// 4
// 5

take

Der take() Operator gibt eine bestimmte Anzahl von Werten eines Observables aus und schliesst es dann ab. Der Operator akzeptiert als Parameter die Anzahl der Werte, die ausgegeben werden sollen. Wenn der Parameter nicht angegeben wird, wird standardmässig nur ein Wert ausgegeben. Sind weniger Werte verfügbar als angegeben, gibt der Operator die Anzahl Werte aus, die er zur Verfügung hat.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
import { of } from "rxjs";
import { take } from "rxjs/operators";

const source = of(1, 2, 3, 4, 5);

source.pipe(take(3)).subscribe((value) => console.log(value));
// 1
// 2
// 3

source.pipe(take(10)).subscribe((value) => console.log(value));
// 1
// 2
// 3
// 4
// 5

takeUntil

takeUntil ist ein Operator, der ein Observable subscribed, bis ein anderes Observable ein Ereignis ausgibt. Es wird verwendet, um das Subscriben eines Observables basierend auf einem anderen Observable zu beenden.

Das zweite Observable wird als “Abbruch- oder Trigger-Observable” bezeichnet. Sobald das Trigger-Observable ein Ereignis ausgibt, wird das Subscriben des ersten Observables beendet und das Observable gibt keine weiteren Werte mehr aus.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
import { interval, Subject } from "rxjs";
import { takeUntil } from "rxjs/operators";

const stop$: Subject<void> = new Subject<void>();

// ein Observable, das jede Sekunde eine Zahl sendet
const observable$ = interval(1000).pipe(takeUntil(stop$));

observable$.subscribe((val) => console.log(val));

// Observable nach 5 Sekunden stoppen
setTimeout(() => {
  stop$.next();
  stop$.complete();
}, 5000);

// 0
// 1
// 2
// 3
// 4

In der Praxis wird takeUntil häufig in Kombination mit ngOnDestroy verwendet. ngOnDestroy()ist eine Lifecycle-Methode welche aufgerufen wird, bevor eine Komponente aus dem DOM entfernt wird. Somit kann man takeUntil verwenden, um Subscriptions zu beenden und somit Ressourcen freizugeben und Speicherlecks zu vermeiden.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
import { Component, OnInit, OnDestroy } from "@angular/core";
import { interval, Subject } from "rxjs";
import { takeUntil } from "rxjs/operators";

@Component({
  // ..
})
export class WeaponComponent implements OnInit, OnDestroy {
  private destroy$: Subject<void> = new Subject<void>(); // Ein Subject erstellen, das das Zerstören des Observable signalisiert

  ngOnInit() {
    interval(1000)
      .pipe(takeUntil(this.destroy$)) // Das Observable subscriben und es beenden, wenn das destroy$-Subject ein Ereignis auslöst
      .subscribe((value) => console.log(value));
  }

  ngOnDestroy() {
    this.destroy$.next(); // Das destroy$-Subject ein Ereignis auslösen, um das Observable zu beenden
    this.destroy$.complete(); // Das destroy$-Subject komplett schliessen, um Speicherlecks zu vermeiden
  }
}

Error Handling Operators

catchError

catchError ist ein Operator, der verwendet wird, um Fehler in Observables zu verarbeiten. Er erlaubt es, ein alternatives Observable zurückzugeben oder eine andere Aktion auszuführen, wenn ein Fehler im ursprünglichen Observable auftritt.

Denn normalerweise ist es so, dass im Falle eines Fehler in einem Observable, das gesamte Observable abgebrochen wird und keine weiteren Werte emittiet werden. Wenn man jedoch catchError verwendet, kann man eine Funktion angeben, die den Fehler verarbeitet und ein alternatives Observable zurückgibt. Dadurch kann man das Observable weiterlaufen lassen, anstatt es abzubrechen.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
import { of } from "rxjs";
import { catchError, map } from "rxjs/operators";

const source = of(1, 2, 3, 4, 5);

const result = source.pipe(
  map((value) => {
    if (value === 3) {
      throw new Error("Value cannot be 3");
    }
    return value;
  }),
  catchError((error) => {
    console.log(`Caught error: ${error}`);
    return of("one", "two");
  }),
);

result.subscribe({
  next: (value) => {
    console.log(`Result: ${value}`);
  },
  error: (error) => {
    console.log(`Error: ${error}`);
  },
  complete: () => {
    console.log("Complete");
  },
});

// Result: 1
// Result: 2
// Caught error: Error: Value cannot be 3
// Result: one
// Result: two
// Complete

retry

retry() versucht, das Observable beim ersten Fehler neu zu starten. Dabei wird das gesamte Observable neu subscribed. Wenn das erneute Subscriben wieder einen Fehler erzeugt, wird das Observable erneut neu gestartet und so weiter, bis das erneute Subscriben erfolgreich ist.

Wenn man dies aber nicht unbegrenzt lange machen möchte, bis man erfolgreich ist, kann man die Anzahl der Wiederholungen als Parameter im retry()-Operator mitgeben.

In der Praxis verwendet man retry() oft, um API-Requests zu wiederholen, wenn sie fehlschlagen sollten, da sie eventuell zu lange gebraucht haben.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
import { interval } from "rxjs";
import { retry, map } from "rxjs/operators";

const result = interval(1000).pipe(
  map((value) => {
    if (Math.random() < 0.5) {
      throw new Error("Something went wrong");
    }
    return value;
  }),
  retry(2),
);

result.subscribe(
  (value) => console.log("Value: ", value),
  (error) => console.log("Error: ", error),
);

Utility Operators

tap

Das tap Operator ermöglicht es, “side actions” auf den Werten auszuführen, die von einem Observable emittiert werden, ohne die emittierten Werte zu verändern. Man kann es verwenden, um die emittierten Werte zu debuggen und zu untersuchen oder um eine Aktion auf Grundlage dieser Werte auszulösen, ohne die Observable-Pipeline zu beeinflussen.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
import { tap } from "rxjs/operators";
import { of } from "rxjs";

const source = of(1, 2, 3);

source
  .pipe(
    tap((value) => console.log(`Value before: ${value}`)),
    tap((value) => value * 2),
    tap((value) => console.log(`Value after: ${value}`)),
  )
  .subscribe((value) => console.log(value));

// Value before: 1
// Value after: 1
// 1
// Value before: 2
// Value after: 2
// 2
// Value before: 3
// Value after: 3
// 3

delay

delay verzögert das Weiterleiten von Werten eines Observables um eine bestimmte Zeitspanne. Dadurch kann man eine Verzögerung in der Ausführung von Aktionen erzielen, was besonders nützlich ist, wenn man z.B. Animationen oder Zeitabhängigkeiten implementiert.

1
2
3
4
5
6
7
import { delay, of } from "rxjs";

const source = of(1, 2, 3, 4, 5);
const delayedSource = source.pipe(delay(1000));

// dauert eine Sekunde bis alle Werte ausgegben werden
delayedSource.subscribe((value) => console.log(value));

Conditional and Boolean Operators

every

Der every Operator prüft, ob alle Werte, die von einem Observable emittiert werden, eine bestimmte Bedingung erfüllen. Wenn die Bedingung für alle Werte true zurückgibt, gibt der Operator true zurück. Wenn die Bedingung für mindestens einen Wert false zurückgibt, gibt auch der Operator false zurück.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
import { from } from "rxjs";
import { every } from "rxjs/operators";

const source = from([1, 2, 3, 4, 5]);

const resultTrue = source.pipe(every((value) => value < 6));
const resultFalse = source.pipe(every((value) => value < 5));

resultTrue.subscribe(console.log); // Output: true
resultFalse.subscribe(console.log); // Output: false

find

find ist ein Operator, welcher es ermöglicht, in einem Observable nach dem ersten Element zu suchen, das eine angegebene Bedingung erfüllt. Wenn das Element gefunden wird, wird es in einem Observable zurückgegeben und die Suche wird beendet.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
import { from } from "rxjs";
import { find } from "rxjs/operators";

const numbers = from([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);

numbers.pipe(find((x) => x > 5)).subscribe(
  (value) => console.log(value), // 6
);

numbers.pipe(find((x) => x > 10)).subscribe(
  (value) => console.log(value), // undefined
);

findIndex

Der findIndex Operator gibt den Index des ersten Elements in eines Observables zurück, das die angegebene Bedingung erfüllt. Sie funktioniert ähnlich wie find, aber gibt den Index des Elements statt des Elements selbst zurück.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
import { findIndex, from } from "rxjs";

const numbers = from([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);

numbers.pipe(findIndex((x) => x > 5)).subscribe(
  (value) => console.log(value), // 5
);

numbers.pipe(findIndex((x) => x > 10)).subscribe(
  (value) => console.log(value), // -1
);

isEmpty

isEmpty ist ein Operator, welcher prüft, ob ein Observable leer ist. Wenn das Observable leer ist, gibt der Operator true zurück, andernfalls false.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
import { of } from "rxjs";
import { isEmpty } from "rxjs/operators";

const emptySource = of();
const fullSource = of(1, 2, 3, 4, 5);

const emptyObservable = emptySource.pipe(isEmpty());
const fullObseravle = fullSource.pipe(isEmpty());

emptyObservable.subscribe((value) => console.log(value)); // true
fullObseravle.subscribe((value) => console.log(value)); // false