{ "cells": [ { "cell_type": "markdown", "metadata": { "tags": [] }, "source": [ "# Data Streams Management – Anfrageoperatoren: Eine Einführung\n", "\n", "In der vergangenen Woche haben wir angefangen, uns mit Datenströmen auseinanderzusetzen. Dabei haben wir gesagt, dass Datenströme potenziell unendlich lang sind und dass die Gültigkeit dieser Daten nur kurzlebig ist. Wenn wir jedoch Anfragen stellen wollen, bevor die Daten ihre Gültigkeit verlieren, ein Datenstrom aber potenziell unendlich lang ist, so müssen wir diesen unterteilen und Anfragen auf einzelnen Abschnitten eines Datenstroms stellen. Für diese Partitionierung führen wir heute das sogenannte Windowing ein, welches anhand der Window Definition den Datenstrom partitioniert. Die Anfragen werden dann auf einer pro Window Basis ausgeführt. Deswegen ist das Datenstrom-Paradigma durch wiederkehrende Anfragen geprägt. Somit werden im Data Streaming die Rollen der Daten und Anfragen vertauscht. Während in einer relationalen Datenbank die Daten ruhen und sich generell nicht viel verändern, sind die Anfragen \"in Bewegung\", da diese nur ad-hoc/spontan gestellt werden. Im Data Streaming bewegen sich stattdessen die Daten mit einer hohen Geschwindigkeit, aber die Anfragen ruhen, da diese einmal gestellt werden und dann wiederkehrend ausgeführt werden. \n", "\n", "Des Weiteren haben wir in der vergangenen Woche nur Tuple basierte Streams kennengelernt. In dieser Woche haben wir dies durch zeitbasierte Streams ergänzt, welche wir in diesem Tutorium entsprechend wiederholen. Zuletzt wurden dann auch noch approximative Methoden (Synopsen) vorgestellt, welche ebenfalls in diesem Tutorium wiederholt und geübt werden. " ] }, { "cell_type": "markdown", "metadata": { "tags": [] }, "source": [ "## Einführung \n", "\n", "Wir verwenden erneut die eigens definierte Streaming-API der letzten Woche. Dieses Mal ergänzen wir die imports um die zuvor erwähnten zeitbasierten Streams, als auch die Synopsen (engl. Synopses). Für eine tatsächliche Echtzeitverarbeitung bräuchte man dedizierte Systeme wie z.B. Apache Flink, die mit einer größeren Komplexität einhergehen.\n", "\n", "Die Klassen und Methoden der API sind ausführlich dokumentiert. Siehe: https://dima.gitlab-pages.tu-berlin.de/isda/isda-streaming/isda_streaming.html" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "import random\n", "\n", "from isda_streaming.data_stream import DataStream, TimedStream\n", "from isda_streaming.synopsis import BloomFilter, CountMinSketch, ReservoirSample" ] }, { "cell_type": "markdown", "metadata": { "tags": [] }, "source": [ "### Windowing\n", "\n", "In den folgenden Beispielen wollen wir uns mit Windowing beschäftigen. Als Windowing bezeichnen wir den Prozess der Diskretisierung eines Datenstroms mit Fenstern. Aber wieso benötigen wir dies überhaupt?" ] }, { "cell_type": "markdown", "metadata": { "tags": [] }, "source": [ "Als erstes initialisieren wir einen Datenstrom." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "ds = DataStream().from_collection([1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12])\n", "print(ds)" ] }, { "cell_type": "markdown", "metadata": { "tags": [] }, "source": [ "Nun wollen wir den Stream in einzelne Windows unterteilen. Es gibt viele unterschiedliche Windowarten, aber in der Vorlesung haben Sie die folgenden kennengelernt:\n", "- Landmark Windows: Alle Windows haben einen gemeinsamen Startpunkt, jedoch wird das Window Ende späterer Windows linear um den übergebenen Dauer Parameter länger. \n", "- Tumbling Windows: Erneut wird eine Dauer als Parameter übergeben. Dieses Mal ist dies die gesamte Länge des Windows, wobei das Ende eines Windows der Beginn des nächsten ist. Windows überlappen sich hier also nie. \n", "- Sliding Windows: Beim Sliding Window gibt es zwei Parameter: die Dauer und den Slide Faktor, welcher angibt, um wie viel sich ein Window Anfang bzw. Ende verschiebt. Wenn der Slide der Dauer entspricht, so entspricht das Sliding Window dem Tumbling Window.\n", "\n", "In der unten stehenden Codezelle können Sie das Verhalten genauer analysieren. " ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "w1 = ds.landmark_window(4)\n", "print(\"Landmark Windows:\\n\", w1)\n", "w2 = ds.tumbling_window(4)\n", "print(\"Tumbling Windows:\\n\", w2)\n", "w3 = ds.sliding_window(4, 2)\n", "print(\"Sliding Windows:\\n\", w3)" ] }, { "cell_type": "markdown", "metadata": { "tags": [] }, "source": [ "Es ist uns nun möglich, mittels `reduce`, `aggregate` oder `apply` auf dem Windowed Stream weiterzuarbeiten. Mehr über deren Funktionsweise findet man in der `isda_streaming` Dokumentation." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "# Example: reduce functions on windowed streams\n", "def sum_all(value1, value2):\n", " return value1 + value2\n", "\n", "\n", "result = w1.reduce(sum_all)\n", "print(result)\n", "result = w2.reduce(sum_all)\n", "print(result)\n", "result = w3.reduce(sum_all)\n", "print(result)" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "# Example: aggregate on windowed streams\n", "result = w1.aggregate(\"mean\")\n", "print(result)\n", "result = w2.aggregate(\"mean\")\n", "print(result)\n", "result = w3.aggregate(\"mean\")\n", "print(result)" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "# Example: custom apply functions on windowed streams\n", "def get_first_element(window: list):\n", " return window[0]\n", "\n", "\n", "result = w1.apply(get_first_element)\n", "print(result)\n", "result = w2.apply(get_first_element)\n", "print(result)\n", "result = w3.apply(get_first_element)\n", "print(result)" ] }, { "cell_type": "markdown", "metadata": { "tags": [] }, "source": [ "Es ist auch möglich, einen Stream zeitbasiert zu unterteilen. Hierzu brauchen wir eine Instanz von `TimedStream`, bei der jeder Wert mit einem zugehörigen Timestamp versehen ist." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "data = [0, 1, 2, 3, 4, 5]\n", "timestamps = [0.0, 1.0, 2.0, 3.0, 4.0, 5.0]\n", "timed_stream = TimedStream().from_collection(data, timestamps)\n", "print(timed_stream)\n", "\n", "w1 = timed_stream.tumbling_time_window(3.0)\n", "print(w1)" ] }, { "cell_type": "markdown", "metadata": { "tags": [] }, "source": [ "Hierbei hängt die Einteilung der Fenster nun nicht mehr von der Anzahl der Tupel ab, sondern von den Zeitstempeln der Tupel, Zeitintervall und den Timestamps ab. Um dies zu verdeutlichen, definieren wir einen zweiten Zeit basierten Datenstrom mit den gleichen Daten wie zuvor, aber mit anderen Zeitstempeln." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "data = [0, 1, 2, 3, 4, 5]\n", "timestamps = [0.0, 2.0, 4.0, 6.0, 8.0, 10.0]\n", "timed_stream = TimedStream().from_collection(data, timestamps)\n", "print(timed_stream)\n", "\n", "w1 = timed_stream.tumbling_tuple_window(3)\n", "print(\"Tuple-based Tumbling Windows:\\n\", w1)\n", "w2 = timed_stream.tumbling_time_window(3.0)\n", "print(\"Time-based Tumbling Windows:\\n\", w2)" ] }, { "cell_type": "markdown", "metadata": { "tags": [] }, "source": [ "### Synopsen" ] }, { "cell_type": "markdown", "metadata": { "tags": [] }, "source": [ "Im Folgenden beschäftigen wir uns mit approximativen Datenmethoden, die im Englischen als Synopses bezeichnet werden. Konkret sind diese Methoden in vier Klassen unterteilt: Histogramme, Samples, Sketches und Wavelets. Diese Methoden sind sehr beliebt und finden vor allem im Bereich des Data Streamings viele Anwendungen, in denen die Datenmenge so massiv sein kann, dass Genauigkeit zugunsten von Rechenleistung geopfert wird. Des Weiteren haben sie einen sehr geringen Speicherbedarf, da sie die Daten komprimieren. Im Folgenden wiederholen wir drei solcher Datenstrukturen, die Sie bereits in der Vorlesung kennengelernt haben. Dabei gehören der **Bloom-Filter** und der **Count-Min Sketch** zur Klasse der Sketches, während das **Reservoir Sample** zur Klasse der Samples gehört. Wir betrachten also explizit keine Histogramme oder Wavelets.\n", "\n", "Bei weiterem Interesse zu diesem Thema können Sie das Buch \"Synopses for Massive Data: Samples, Histograms, Wavelets, Sketches\" konsultieren [[1]](https://dsf.berkeley.edu/cs286/papers/synopses-fntdb2012.pdf)." ] }, { "cell_type": "markdown", "metadata": { "tags": [] }, "source": [ "#### Count-Min Sketch" ] }, { "cell_type": "markdown", "metadata": { "tags": [] }, "source": [ "Wir gucken uns zuerst den Count-Min Sketch an. Ein Count-Min Sketch besteht aus einem zweidimensionalen Array, welches über seine Breite und Höhe definiert ist. Alle Einträge des Arrays werden bei der Initialisierung des Count-Min Sketches auf 0 initialisiert. \n", "\n", "Zuerst erstellen wir solch einen Count-Min Sketch und geben diesen aus. " ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "cm = CountMinSketch(width=10, depth=3)\n", "print(cm)" ] }, { "cell_type": "markdown", "metadata": { "tags": [] }, "source": [ "Zusätzlich zum Array enthält ein Count-Min Sketch auch eine Menge von Hash-Funktionen*. Diese Hash-Funktionen bilden die Eingabe auf den Bereich der Sketch-Breite ab, d.h. auf das Intervall $[0, \\text{Breite}-1]$. Die Anzahl der Hash-Funktionen entspricht der Anzahl der Reihen im Sketch, da wir später unterschiedliche Hash-Funktionen pro Reihe verwenden möchten.\n", "\n", "Im Folgenden betrachten wir das `hashFunctions` Objekt, das beim Initialisieren des Sketches bereits erstellt wurde, und wenden es auf ein paar Eingaben an.\n", "\n", "*Eine mathematische Funktion, die eine Eingabe beliebiger Größe auf eine feste Ausgabegröße abbildet." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "hashFunctions = cm.get_hash_functions()\n", "print(hashFunctions.hash(\"Hello World\"))\n", "print(hashFunctions.hash(\"Hello World Studis\"))\n", "print(hashFunctions.hash(\"Werder Bremen\"))" ] }, { "cell_type": "markdown", "metadata": { "tags": [] }, "source": [ "Wir sehen, dass wir immer 3 Hashwerte erhalten, welche im Bereich zwischen 0 und 9 liegen. Die Werte zweier Hash-Funktionen können dabei per Zufall auch identisch sein.\n", "\n", "Wenn wir die gleichen Eingaben nun an die `update`-Funktion unseres Count-Min Sketch übergeben, dann werden genau diese Einträge jeweils um eins erhöht, da wir diese Eingaben nun einmal mehr als zuvor gesehen haben. " ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "cm.update(\"Hello World\")\n", "print(cm)\n", "print(\"\\n\")\n", "cm.update(\"Hello World Studis\")\n", "print(cm)\n", "print(\"\\n\")\n", "cm.update(\"Werder Bremen\")\n", "print(cm)" ] }, { "cell_type": "markdown", "metadata": { "tags": [] }, "source": [ "Da es in einer Reihe zu mehreren Kollisionen zwischen Hash-Werten kommen kann, kann dies das Ergebnis beeinträchtigen. Deswegen haben wir mehrere Reihen mit unterschiedlichen Hash-Funktionen, um die Wahrscheinlichkeit zu erhöhen, dass in einer dieser Reihen die Anzahl der Kollisionen möglichst minimal ist.\n", "\n", "Wenn wir Werte approximieren möchten, die wir bereits gesehen oder beobachtet haben, verwenden wir erneut die Hash-Funktion, betrachten die Indizes und wählen den kleinsten Wert aus, da hier die minimale Anzahl an Kollisionen vorliegt. Wir können somit nur über-approximieren. \n", "\n", "Wenn wir den Wert \"Werder Bremen\" hinzufügen, gibt es zwar in der dritten Reihe eine Kollision, aber in den anderen Reihen nicht. Indem wir den Index mit der geringsten Anzahl von Kollisionen wählen, approximieren wir die Häufigkeit von \"Werder Bremen\" immer noch genau mit 1." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "cm.query(\"Werder Bremen\")" ] }, { "cell_type": "markdown", "metadata": { "tags": [] }, "source": [ "#### Bloom-Filter" ] }, { "cell_type": "markdown", "metadata": { "tags": [] }, "source": [ "Wir kommen nun zum Bloom-Filter. Im Gegensatz zum Count-Min Sketch besteht der Bloom-Filter nur aus einem eindimensionalen Array, das bei der Initialisierung, genauso wie der Count-Min Sketch, in jedem Eintrag auf 0 gesetzt wird. Oft wird dies auch als Bitvektor (oder Bitmap auf Englisch) bezeichnet und entsprechend implementiert, da der Wertebereich nur aus 0 und 1 besteht. Zusätzlich dazu hat der Bloom-Filter eine oder mehrere Hash-Funktionen, die ähnlich wie beim Count-Min Sketch hashen, jedoch im Gegensatz zum Count-Min Sketch ist die Anzahl der Hash-Funktionen ein Parameter.\n", "\n", "Wir starten, indem wir einen Bloom-Filter mit 10 Bits und 2 Hash-Funktionen initialisieren." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "bf = BloomFilter(n_bits=10, n_hash_functions=2)\n", "print(bf)" ] }, { "cell_type": "markdown", "metadata": { "tags": [] }, "source": [ "Nun fügen wir ein paar Werte ein. Genau wie beim Count-Min Sketch wird jeder Eingabewert pro Hash-Funktion einmal gehasht. Allerdings haben wir beim Bloom-Filter keinen Counter, sondern ein Bit pro Eintrag. Wir prüfen einfach, ob das Bit derzeit auf Null gesetzt ist, und falls ja, setzen wir es auf 1. Wenn das Bit bereits auf eins gesetzt ist, lassen wir es unverändert und fahren fort.\n", "\n", "Lassen Sie uns also ein paar Fußballvereine der 1. Bundesliga als Datenstrom wie in den vorherigen Aufgaben hinzufügen." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "ds = DataStream().from_collection([\"Werder Bremen\", \"1. FC Union Berlin\", \"SC Freiburg\"])\n", "for tupel in ds:\n", " bf.update(tupel)\n", "print(bf)" ] }, { "cell_type": "markdown", "metadata": { "tags": [] }, "source": [ "Während der Count-Min Sketch unter anderem die Häufigkeit von einzelnen Instanzen approximiert (Point-Query), löst der Bloom-Filter das \"Set-Problem\" approximativ. Der Bloom-Filter kann zwar nie garantiert sagen, ob er ein konkretes Element schon einmal gesehen hat, kann aber mit absoluter Garantie sagen, dass er ein Element noch nie gesehen hat. Testen wir diese Eigenschaft mit ein paar Anfragen. " ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "print(bf.query(\"Werder Bremen\"))\n", "print(bf.query(\"Hertha\"))" ] }, { "cell_type": "markdown", "metadata": { "tags": [] }, "source": [ "Wir sehen, dass Werder Bremen zu den Mannschaften der 1. Bundesliga gehört, die unser Bloom-Filter möglicherweise schon einmal gesehen hat. Das Gleiche gilt jedoch auch für Hertha BSC, obwohl wir anhand unserer Daten konkret wissen, dass wir diesen Wert noch nicht gesehen haben. Dieses Verhalten liegt erneut an den Kollisionen im Array. Wenn wir uns die Hashwerte von Hertha BSC anschauen, sehen wir, dass die korrespondierenden Bits durch andere Vereine bereits auf 1 gesetzt wurden, die wir tatsächlich beobachtet haben. Somit können wir nur garantieren, dass wir etwas **möglicherweise** schon einmal gesehen haben." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "hashFunctionsbf = bf.get_hash_functions()\n", "print(hashFunctionsbf.hash(\"Hertha\"))" ] }, { "cell_type": "markdown", "metadata": { "tags": [] }, "source": [ "Was wir jedoch definitiv ausschließen können, ist, dass der Hamburger Sport Verein zu den Vereinen der 1. Bundesliga gehört, die wir bereits gesehen haben." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "print(hashFunctionsbf.hash(\"Hamburger Sport Verein\"))\n", "bf.query(\"Hamburger Sport Verein\")" ] }, { "cell_type": "markdown", "metadata": { "tags": [] }, "source": [ "Da nicht alle Bits des Hamburger Sport-Verein auf 1 gesetzt sind, können wir garantieren, dass dieser noch nie beobachtet worden ist. Wäre der Hamburger Sport-Verein jemals gesehen worden, dann wäre dies der Fall." ] }, { "cell_type": "markdown", "metadata": { "tags": [] }, "source": [ "#### Reservoir Sample" ] }, { "cell_type": "markdown", "metadata": { "tags": [] }, "source": [ "Zuletzt schauen wir uns das Reservoir Sample an, das zur Klasse der Samples, also der Stichproben, gehört. Im Gegensatz zu den zuvor präsentierten Datenstrukturen arbeitet das Reservoir Sample im Hintergrund nicht mit Hashfunktionen, sondern mit Zufallszahlen. Da es sich bei Samples um eine Stichprobe aus dem gesamten Datensatz handelt, sind Samples eindimensionale Arrays. Solange das Reservoir Sample noch nicht vollständig gefüllt ist, wird jedes Element einfach in das Array eingetragen.\n", "\n", "Wir erstellen ein Reservoir Sample mit einem Array, das 20 Einträge hat, und füllen es vorerst mit genau 20 Werten." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "rs = ReservoirSample(20)\n", "\n", "for i in range(20):\n", " rs.update(i)\n", "\n", "print(rs)" ] }, { "cell_type": "markdown", "metadata": { "tags": [] }, "source": [ "Sobald das Array gefüllt ist, wird per Zufall entschieden, ob ein Element noch hinzugefügt wird und welcher Wert dafür weichen muss. Im Folgenden fügen wir weitere Werte ein, setzen dieses Mal jedoch den Debug-Parameter der Update-Funktion, um Informationen während des Updates zu erhalten." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "rs.update(20, debug=True)\n", "print(rs)" ] }, { "cell_type": "markdown", "metadata": { "tags": [] }, "source": [ "Jedes Element soll die gleiche Wahrscheinlichkeit haben, im Sample aufzutauchen. Da wir nun 21 Elemente gesehen haben, bestimmen wir die Wahrscheinlichkeit, dass das 21. Element ins Sample aufgenommen wird. Diese beträgt 95 %. Zusätzlich dazu generieren wir eine Zufallszahl im Intervall von 0 bis 1, um zu bestimmen, ob es einer der fünf Prozent der Fälle ist, in denen wir diesen Wert nicht aufnehmen sollten, oder ob wir das Element in das Sample aufnehmen sollen. Wenn wir es aufnehmen sollen, wird eine weitere Zufallszahl im Intervall [0,Breite - 1] erzeugt, um zu bestimmen, welches Element im Sample ersetzt wird." ] } ], "metadata": {}, "nbformat": 4, "nbformat_minor": 4 }