Data Streams Management – Transformationsoperatoren: Eine Einführung

Data Streams Management – Transformationsoperatoren: Eine Einführung#

Datenströme oder auch im Englischen Data Streams, sind ein weiteres Paradigma, mittels dessen wir Daten verarbeiten können. Konzeptuell ist ein Datenstrom ein potenziell endloser Strom an Daten, der in ein System hineinfließt, dort verarbeitet wird, um Wissen zu extrahieren und dann wieder herausfließt. Das Paradigma des Data Streaming kommt unter anderem dann zum Einsatz, wenn wir sehr viele Daten haben, welche wir in Echtzeit verarbeiten wollen, da sie mit der Zeit ihre Gültigkeit oder Bedeutung verlieren. Dies kann z.B. die live Auswertung von Sensoren in einem Werk oder in einem Krankenhaus beinhalten, mittels dessen man die Maschinen oder den/die Patient*in kontrolliert.
Da die Daten schnell ihre Gültigkeit verlieren, speichern Systeme, welche auf Datenströmen arbeiten (Stream Processing Engine (SPE)) die Daten nicht ab. Dies bedeutet nicht, dass Sie die Daten gar nicht mehr abspeichern können/sollten. Immerhin könnten manche dieser doch historisch von Interesse sein, z.B. um herauszufinden, weshalb eine Maschine kaputtgegangen ist oder welche Werte ein/eine Patient*in hatte, bevor sich sein/ihr Zustand verschlechtert hat. Stattdessen ergänzen sich Relationale Datenbanken und das Data Streaming gegenseitig, dass auch keines der beiden wichtiger ist oder ähnliches.

Da uns keine Möglichkeit bekannt ist, eine bestehende Stream Processing Engine leicht in Python einzubinden, wie es für Relationale Datenbank Management Systeme (RDBMS) der Fall war, arbeiten wir in dieser und der kommenden Woche mit einer eigenen, in Python geschriebenen Streaming API. Diese API wird benutzt, um Konzepte von Datenströmen zu üben. Sie simuliert eine Echtzeitverarbeitung lediglich. Für eine tatsächliche Echtzeitverarbeitung bräuchte man dedizierte Systeme wie z.B. Apache Flink, die mit einer größeren Komplexität einhergehen.

Die Klassen und Methoden der API sind ausführlich dokumentiert. Siehe: ISDA Streaming Dokumentation

Einführung#

Zuerst importieren wir die API.

from isda_streaming.data_stream import DataStream

Nun definieren wir uns einen ersten Datenstrom und geben uns diesen aus. Dabei verwenden wir die from_collection Funktion, welche die eingegebene Liste in einen Datenstrom umwandelt.

ds = DataStream().from_collection([1, 2, 3, 4, 5])
print(ds)
DataStream([1, 2, 3, 4, 5])

Auch mit einer Schleife können wir über den Datenstrom iterieren.

for x in ds:
    print(x)
1
2
3
4
5

Map#

Als Nächstes wenden wir den in der Vorlesung vorgestellten map-Operator auf dem Datenstrom an. Dieser ist eine sogenannte “second order function”. Sie erhält als Argument selbst eine Funktion und wendet diese auf jedes Element des Datenstroms an. Dabei wird für jeden Eingabewert ein Ausgabewert produziert.
Somit gibt es die Einschränkung, dass die Funktion, welche an map übergeben wird, nur einen einzelnen Eingabewert erhalten, als auch nur einen einzelnen Ausgabewert produzieren darf. Der map-Operator verändert somit nicht die Kardinalität des Datenstroms.

In diesem Beispiel wird jedes Element des Datenstroms mit 2 multipliziert.

def times_two(x):
    return x * 2


output_stream = ds.map(times_two)
print(output_stream)
DataStream([2, 4, 6, 8, 10])

Flat Map#

Als Nächstes verwenden wir die flat_map. Diese unterscheidet sich von der map nur in der Hinsicht, dass sie für jedes Eingabeelement null, ein oder mehrere Elemente erzeugt. Daher kann sich die Anzahl der Elemente im Eingabe- und Ausgabe-Datenstrom beim flat_map-Operator unterscheiden, je nachdem, wie die jeweils übergebene Funktion definiert ist.

Um die Funktionalität zu demonstrieren, definieren wir die Funktion split_words, die einen übergebenen String anhand seiner Leerzeichen in einzelne Wörter aufteilt.

def split_words(x: str):
    if x == "":
        return []
    return x.split(" ")


input_stream = DataStream().from_collection(["Hello world!", "ISDA", ""])
output_stream = input_stream.flat_map(split_words)
print(output_stream)
DataStream(['Hello', 'world!', 'ISDA'])

Filter#

Der filter ist ebenfalls eine “second order function”. Dabei gibt es erneut eine Einschränkung, die zu beachten ist: Der Rückgabewert muss ein Boolean sein. Filter wendet die Funktion dann auf jedes Element im Datenstrom an und sortiert Elemente aus dem Eingabedatenstrom anhand der selber implementierten booleschen Logik aus. Zurückgegeben wird von dem Filter dann ein Datenstrom, welcher nur Elemente beinhaltet, für die die boolesche Funktion True ausgibt.

Um die Funktionalität zu demonstrieren, definieren wir die boolesche Funktion keep_positives. Diese Funktion gibt für alle Zahlen größer als 0 True zurück und ansonsten False.

def keep_positives(x):
    if x > 0:
        return True
    return False


input_stream = DataStream().from_collection([-1, 7, 0, 4, 5, -6])
output_stream = input_stream.filter(keep_positives)
print(output_stream)
DataStream([7, 4, 5])

Reduce#

Der reduce-Operator kombiniert jedes Element eines Datenstroms anhand der übergebenen “second order function” mit dem letzten reduzierten Wert und speichert das Ergebnis in einem neuen Datenstrom. Dabei wird das erste Element des Eingabe-Datenstroms als initialer reduzierter Wert betrachtet.

Um die Funktionalität zu demonstrieren, definieren wir eine Funktion namens sum_all, welche das aktuelle Element mit dem letzten reduzierten Wert summiert.

def sum_all(value1, value2):
    return value1 + value2


input_stream = DataStream().from_collection([-1, 7, 0, 4, 5, -6])
output_stream = input_stream.reduce(sum_all)
print(output_stream)
DataStream([-1, 6, 6, 10, 15, 9])

Key By#

Das key_by dient dem gleichen Zweck wie das GROUP BY in SQL und gruppiert die Elemente des Datenstroms basierend auf dem Rückgabewert der übergebenen Funktion.

Um die Funktionalität zu demonstrieren definieren wir zuerst einen Datenstrom von Tupeln. Diesen gruppieren wir dann anhand des key_by, mittels der Funktion key_first_value, welche Werte anhand des ersten Feldes gruppiert. Es ist wichtig zu beachten, dass die Ausgabe des key_by-Operators kein “normaler” Datenstrom mehr ist, sondern ein KeyedStream.

def key_first_value(x):
    return x[0]


input_stream = DataStream().from_collection([("a", 1), ("a", 7), ("b", 4), ("a", 8)])
keyed_stream = input_stream.key_by(key_first_value)
print(keyed_stream)
KeyedStream({'a': [('a', 1), ('a', 7), ('a', 8)], 'b': [('b', 4)]})

Nachdem wir eine Gruppierung mithilfe des key_by durchgeführt haben, können wir die Daten der jeweiligen Gruppierungen mittels des reduce-Operators aggregieren. Da der Eingabe-Datenstrom ganze Tupel beinhaltet (welche mehrere Felder bzw. Attribute haben), müssen wir dem reduce-Operator auch eine Funktion namens sum_all_tuples übergeben, die korrekt mit den Feldern der Tupel im Datenstrom arbeitet.

Beachte bitte auch, dass der reduce-Operator auf allen Partitionen des Datenstroms angewendet wird.

def key_first_value(x):
    return x[0]


def sum_all_tuples(tuple1, tuple2):
    return (tuple1[0], tuple1[1] + tuple2[1])


input_stream = DataStream().from_collection([("a", 1), ("a", 7), ("b", 4), ("a", 8)])
keyed_stream = input_stream.key_by(key_first_value)
print(keyed_stream)
output_stream = keyed_stream.reduce(sum_all_tuples)
print(output_stream)
KeyedStream({'a': [('a', 1), ('a', 7), ('a', 8)], 'b': [('b', 4)]})
KeyedStream({'a': [('a', 1), ('a', 8), ('a', 16)], 'b': [('b', 4)]})

Alternativ kann man auch eine Abfolge von Operatoren hintereinander ausführen.

output_stream = input_stream.key_by(key_first_value).reduce(sum_all_tuples)
print(output_stream)
KeyedStream({'a': [('a', 1), ('a', 8), ('a', 16)], 'b': [('b', 4)]})

Dataflow-Pipeline#

Eine Abfolge von Operatoren wird als Dataflow-Pipeline bezeichnet. Hier ist ein Beispiel, wie man eine Dataflow-Pipeline als Funktion definiert (pipeline) und diese auf den Eingabe-Datenstrom anwendet.

def pipeline_aufgabe_1(data_stream: DataStream):
    return data_stream.key_by(key_first_value).reduce(sum_all_tuples)


output_stream = pipeline_aufgabe_1(input_stream)
print(output_stream)
KeyedStream({'a': [('a', 1), ('a', 8), ('a', 16)], 'b': [('b', 4)]})