{ "cells": [ { "cell_type": "markdown", "metadata": { "tags": [] }, "source": [ "# Data Streams Management – Transformationsoperatoren: Eine Einführung\n", "\n", "**Datenströme** oder auch im Englischen **Data Streams**, sind ein weiteres Paradigma, mittels dessen wir Daten verarbeiten können. Konzeptuell ist ein Datenstrom ein potenziell endloser Strom an Daten, der in ein System hineinfließt, dort verarbeitet wird, um Wissen zu extrahieren und dann wieder herausfließt. Das Paradigma des Data Streaming kommt unter anderem dann zum Einsatz, wenn wir sehr viele Daten haben, welche wir in Echtzeit verarbeiten wollen, da sie mit der Zeit ihre Gültigkeit oder Bedeutung verlieren. Dies kann z.B. die live Auswertung von Sensoren in einem Werk oder in einem Krankenhaus beinhalten, mittels dessen man die Maschinen oder den/die Patient\\*in kontrolliert. \n", "Da die Daten schnell ihre Gültigkeit verlieren, speichern Systeme, welche auf Datenströmen arbeiten (Stream Processing Engine (SPE)) die Daten nicht ab. Dies bedeutet nicht, dass Sie die Daten gar nicht mehr abspeichern können/sollten. Immerhin könnten manche dieser doch historisch von Interesse sein, z.B. um herauszufinden, weshalb eine Maschine kaputtgegangen ist oder welche Werte ein/eine Patient*in hatte, bevor sich sein/ihr Zustand verschlechtert hat. Stattdessen ergänzen sich **Relationale Datenbanken** und das **Data Streaming** gegenseitig, dass auch keines der beiden wichtiger ist oder ähnliches. \n", "\n", "Da uns keine Möglichkeit bekannt ist, eine bestehende Stream Processing Engine leicht in Python einzubinden, wie es für Relationale Datenbank Management Systeme (RDBMS) der Fall war, arbeiten wir in dieser und der kommenden Woche mit einer eigenen, in Python geschriebenen [Streaming API](https://pypi.org/project/isda-streaming/). Diese API wird benutzt, um Konzepte von Datenströmen zu üben. Sie simuliert eine Echtzeitverarbeitung lediglich. Für eine tatsächliche Echtzeitverarbeitung bräuchte man dedizierte Systeme wie z.B. [Apache Flink](https://flink.apache.org), die mit einer größeren Komplexität einhergehen.\n", "\n", "Die Klassen und Methoden der API sind ausführlich dokumentiert. Siehe: [ISDA Streaming Dokumentation](https://dima.gitlab-pages.tu-berlin.de/isda/isda-streaming/isda_streaming.html)\n" ] }, { "cell_type": "markdown", "metadata": { "tags": [] }, "source": [ "## Einführung \n", "\n", "Zuerst importieren wir die API. " ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "from isda_streaming.data_stream import DataStream" ] }, { "cell_type": "markdown", "metadata": { "tags": [] }, "source": [ "Nun definieren wir uns einen ersten Datenstrom und geben uns diesen aus. Dabei verwenden wir die *from_collection* Funktion, welche die eingegebene Liste in einen Datenstrom umwandelt. " ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "ds = DataStream().from_collection([1, 2, 3, 4, 5])\n", "print(ds)" ] }, { "cell_type": "markdown", "metadata": { "tags": [] }, "source": [ "Auch mit einer Schleife können wir über den Datenstrom iterieren. " ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "for x in ds:\n", " print(x)" ] }, { "cell_type": "markdown", "metadata": { "tags": [] }, "source": [ "### Map\n", "\n", "Als Nächstes wenden wir den in der Vorlesung vorgestellten **map**-Operator auf dem Datenstrom an. Dieser ist eine sogenannte \"second order function\". Sie erhält als Argument selbst eine Funktion und wendet diese auf jedes Element des Datenstroms an. Dabei wird für jeden Eingabewert ein Ausgabewert produziert. \n", "Somit gibt es die Einschränkung, dass die Funktion, welche an **map** übergeben wird, nur einen **einzelnen Eingabewert** erhalten, als auch nur einen **einzelnen Ausgabewert** produzieren darf. Der **map**-Operator verändert somit nicht die Kardinalität des Datenstroms. \n", "\n", "In diesem Beispiel wird jedes Element des Datenstroms mit 2 multipliziert." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "def times_two(x):\n", " return x * 2\n", "\n", "\n", "output_stream = ds.map(times_two)\n", "print(output_stream)" ] }, { "cell_type": "markdown", "metadata": { "tags": [] }, "source": [ "### Flat Map\n", "\n", "Als Nächstes verwenden wir die **flat_map**. Diese unterscheidet sich von der **map** nur in der Hinsicht, dass sie für jedes Eingabeelement null, ein oder mehrere Elemente erzeugt. Daher kann sich die Anzahl der Elemente im Eingabe- und Ausgabe-Datenstrom beim **flat_map**-Operator unterscheiden, je nachdem, wie die jeweils übergebene Funktion definiert ist. \n", "\n", "Um die Funktionalität zu demonstrieren, definieren wir die Funktion *split_words*, die einen übergebenen String anhand seiner Leerzeichen in einzelne Wörter aufteilt. " ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "def split_words(x: str):\n", " if x == \"\":\n", " return []\n", " return x.split(\" \")\n", "\n", "\n", "input_stream = DataStream().from_collection([\"Hello world!\", \"ISDA\", \"\"])\n", "output_stream = input_stream.flat_map(split_words)\n", "print(output_stream)" ] }, { "cell_type": "markdown", "metadata": { "tags": [] }, "source": [ "### Filter\n", "\n", "Der **filter** ist ebenfalls eine \"second order function\". Dabei gibt es erneut eine Einschränkung, die zu beachten ist: Der Rückgabewert muss ein Boolean sein. **Filter** wendet die Funktion dann auf jedes Element im Datenstrom an und sortiert Elemente aus dem Eingabedatenstrom anhand der selber implementierten booleschen Logik aus. Zurückgegeben wird von dem **Filter** dann ein Datenstrom, welcher nur Elemente beinhaltet, für die die boolesche Funktion *True* ausgibt. \n", "\n", "Um die Funktionalität zu demonstrieren, definieren wir die boolesche Funktion *keep_positives*. Diese Funktion gibt für alle Zahlen größer als 0 *True* zurück und ansonsten *False*." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "def keep_positives(x):\n", " if x > 0:\n", " return True\n", " return False\n", "\n", "\n", "input_stream = DataStream().from_collection([-1, 7, 0, 4, 5, -6])\n", "output_stream = input_stream.filter(keep_positives)\n", "print(output_stream)" ] }, { "cell_type": "markdown", "metadata": { "tags": [] }, "source": [ "### Reduce\n", "\n", "Der **reduce**-Operator kombiniert jedes Element eines Datenstroms anhand der übergebenen \"second order function\" mit dem letzten reduzierten Wert und speichert das Ergebnis in einem neuen Datenstrom. Dabei wird das erste Element des Eingabe-Datenstroms als initialer reduzierter Wert betrachtet.\n", "\n", "Um die Funktionalität zu demonstrieren, definieren wir eine Funktion namens *sum_all*, welche das aktuelle Element mit dem letzten reduzierten Wert summiert." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "def sum_all(value1, value2):\n", " return value1 + value2\n", "\n", "\n", "input_stream = DataStream().from_collection([-1, 7, 0, 4, 5, -6])\n", "output_stream = input_stream.reduce(sum_all)\n", "print(output_stream)" ] }, { "cell_type": "markdown", "metadata": { "tags": [] }, "source": [ "### Key By \n", "\n", "Das **key_by** dient dem gleichen Zweck wie das **GROUP BY** in SQL und gruppiert die Elemente des Datenstroms basierend auf dem Rückgabewert der übergebenen Funktion.\n", "\n", "Um die Funktionalität zu demonstrieren definieren wir zuerst einen Datenstrom von Tupeln. Diesen gruppieren wir dann anhand des **key_by**, mittels der Funktion *key_first_value*, welche Werte anhand des ersten Feldes gruppiert. Es ist wichtig zu beachten, dass die Ausgabe des **key_by**-Operators kein \"normaler\" Datenstrom mehr ist, sondern ein *KeyedStream*." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "def key_first_value(x):\n", " return x[0]\n", "\n", "\n", "input_stream = DataStream().from_collection([(\"a\", 1), (\"a\", 7), (\"b\", 4), (\"a\", 8)])\n", "keyed_stream = input_stream.key_by(key_first_value)\n", "print(keyed_stream)" ] }, { "cell_type": "markdown", "metadata": { "tags": [] }, "source": [ "Nachdem wir eine Gruppierung mithilfe des **key_by** durchgeführt haben, können wir die Daten der jeweiligen Gruppierungen mittels des **reduce**-Operators aggregieren. Da der Eingabe-Datenstrom ganze Tupel beinhaltet (welche mehrere Felder bzw. Attribute haben), müssen wir dem **reduce**-Operator auch eine Funktion namens *sum_all_tuples* übergeben, die korrekt mit den Feldern der Tupel im Datenstrom arbeitet. \n", "\n", "Beachte bitte auch, dass der **reduce**-Operator auf allen Partitionen des Datenstroms angewendet wird." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "def key_first_value(x):\n", " return x[0]\n", "\n", "\n", "def sum_all_tuples(tuple1, tuple2):\n", " return (tuple1[0], tuple1[1] + tuple2[1])\n", "\n", "\n", "input_stream = DataStream().from_collection([(\"a\", 1), (\"a\", 7), (\"b\", 4), (\"a\", 8)])\n", "keyed_stream = input_stream.key_by(key_first_value)\n", "print(keyed_stream)\n", "output_stream = keyed_stream.reduce(sum_all_tuples)\n", "print(output_stream)" ] }, { "cell_type": "markdown", "metadata": { "tags": [] }, "source": [ "Alternativ kann man auch eine Abfolge von Operatoren hintereinander ausführen." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "output_stream = input_stream.key_by(key_first_value).reduce(sum_all_tuples)\n", "print(output_stream)" ] }, { "cell_type": "markdown", "metadata": { "tags": [] }, "source": [ "### Dataflow-Pipeline\n", "\n", "Eine Abfolge von Operatoren wird als **Dataflow-Pipeline** bezeichnet. Hier ist ein Beispiel, wie man eine **Dataflow-Pipeline** als Funktion definiert (*pipeline*) und diese auf den Eingabe-Datenstrom anwendet." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "def pipeline_aufgabe_1(data_stream: DataStream):\n", " return data_stream.key_by(key_first_value).reduce(sum_all_tuples)\n", "\n", "\n", "output_stream = pipeline_aufgabe_1(input_stream)\n", "print(output_stream)" ] } ], "metadata": {}, "nbformat": 4, "nbformat_minor": 4 }