Reactive Extensions – Alles ist ein Event!

Seite: 2/2

Anbieter zum Thema

Eventstreams orchestrieren und filtern

Richtig nützlich wird die Library aber durch die Komposition und Filterung solcher Eventstreams. Es existiert eine große Anzahl an Operatoren bzw. Methoden um den Stream auf die eigenen Bedürfnisse anzupassen. Der eigentliche Eventstream, die echte Quelle der Events, bleibt dabei allerdings unverändert. So kann ein Eventstream mit einer hohen Eventfrequenz durch entsprechende Operatoren entschärft werden.

Liefert beispielsweise ein Sensor alle 10 ms einen Wert, so kann der Operator Sample() einen neuen Eventstream erstellen, der nur alle 100 ms den aktuellen Wert meldet. Nützlich könnte hier aber auch eine Eventstream sein, der mit jedem neuen Wert auch den bisherigen Durchschnitt berechnet. Sinnvoll ist es in diesem Fall den bereits entschärften Eventstream als Basis zu nutzen. Somit wird ein zweiter Eventstream erstellt, der als Event den Durschnitt aller bisher aufgetretenen Event-Werte berechnet. Bild 1 verdeutlicht den Verlauf der Events aller drei Eventstreams. Diese Form von Diagrammen wird als Marble-Diagramm bezeichnet.

Die Reactive Extensions beinhalten eine große Zahl an Methoden, die aus einem Stream weitere Streams ableiten können. Eine weitere Stärke der Bibliothek liegt in der Kombinationsmöglichkeit von Eventstreams. Neben dem Erstellen von neuen Streams ist es genauso möglich auch Streams miteinander zu kombinieren.

Dies kann vor allem sinnvoll sein, wenn es darum geht, mehrere Eventstreams in einem zusammenzufassen. Mit der Methode Merge() wird in einem vorhandenen Eventstream ein zweiter Stream eingefügt. Ein typisches Beispiel sind Eventstreams von Sensoren, die durch einen dedizierten Thread verarbeitet werden. Dabei werden die Streams der einzelnen Sensoren in einem neuen zentralen Stream zusammengeführt, welcher dann in einem exklusiven Thread verarbeitet wird.

Daneben gibt es noch weitere nützliche Methoden welche Eventstreams miteinander vereinen. Die Methode Zip() generiert aus zwei Streams einen neuen Stream, der erst ein Event auslöst, wenn die beiden zugrunde liegenden Streams jeweils ein Event auslösen.

Neben der Möglichkeit, echte Events zu modellieren, bietet das Modell des Observable/Observer einen anderen sehr interessanten Aspekt. Mit Hilfe von Observables ist es möglich, Arrays/Collections über die Zeit abzubilden. Über die Zeit heißt in diesem Sinne das die Daten in aller Regel asynchron dem Konsumenten zur Verfügung gestellt werden.

Ein Vergleich zwischen dem klassischen Iterator-Pattern und einem Observable sollte den Unterschied klar machen. Der Iterator ist von einem Anbieter (Aggregat) abfragbar und ermöglicht das sequentielle Abfragen der Daten. Mit Hilfe von Methoden wie z.B. MoveNext() wird zum nächsten sequentiellen Element gesprungen.

An dieser Stelle kann es ein Problem geben, denn MoveNext() würde so lange blockieren bis das nächste Element bereit ist. Dies kann aber z.B. bei I/O-basierten Quellen, wie z.B. einer Internetverbindung, eine lange Wartezeit mit sich bringen.

Bei großen Anwendungen oder Services kann dies zu einem Problem werden, da viele Threads einfach mit blockierenden Methoden ungenutzt Ressourcen verbrauchen. Diese könnten an andere Stelle sinnvoller eingesetzt werden.

Das Push- und das Pull-Modell

Aus Sicht des Konsumenten müssen die Daten aktiv aus der Quelle geholt werden. Daher wird dieses Modell auch als Pull-Modell bezeichnet. Wäre es nicht einfacher, der Konsument bekommt die angefragten Daten einfach zugestellt, sobald diese vorhanden sind? Genau an dieser Stelle kommt das Push-Modell ins Spiel. Neben der Zustellung der Daten wird der Konsument auch informiert, wenn keine Daten mehr vorhanden sind. Sollte die Quelle einen Fehler bzw. Exception melden, wird diese ebenso gemeldet.

Genau dieses Push-Modell wird durch die Observables repräsentiert. Durch dieses Konzept sollte der Konsument nicht mehr in die Lage kommen blockierend auf das nächste Element einer Sequenz zu warten. Durch die Verwendung von Schedulern ist auch das Koordinieren von nebenläufigen Operationen etwas entschärft. Viele Operatoren der Reactive Extensions ermöglichen eine einfache und komfortable Verarbeitung.

Da solche Datenquellen aber letztendlich auch nur Eventstreams repräsentieren, können diese beliebig mit anderen Streams kombiniert und verbunden werden.

Dem Entwickler wird damit ein Werkzeug in die Hand gegeben, mit dem er Eventstreams einfach und effizient verarbeiten, kombinieren und (asynchron) koordinieren kann.

Somit bietet sich dieses Konzept vor allem bei Software an, die z.B. mit sehr vielen Events arbeitet oder so designt wurde, Stichwort „Event Based Components“. Aber auch bei der (asynchronen) Verarbeitung großer Datenmengen kann diese Library überzeugen.

Quellen

  • Introduction to RX – auch als kostenloses eBook (http://www.introtorx.com/)
  • Übersicht für eine Vielzahl von Programmiersprachen (http://reactivex.io/)
  • Das reaktive Manifesto (http://www.reactivemanifesto.org/)
  • The Cloud Programmability Group at Microsoft (https://github.com/Reactive-Extensions/)

* Marko Beelmann ist bei Philips Medizinsysteme Böblingen tätig. Seine Funktion ist die aktive Produktentwicklung im Bereich Patientenüberwachung mit dem Schwerpunkt PC basierte Erweiterungen und Systemlösungen für Echtzeit-Patientenmonitore.

(ID:44308945)