Data Streams Management – Anfrageoperatoren: Aufgaben#

Aufgabe 1: Windowing#

In den folgenden Aufgaben wollen wir die Konzepte des Windowing vertiefen. Gegeben ist ein Timed Stream auf dem wir Tupel- sowie Zeit-basierte Fenster erstellen, auf denen wir Anfragen stellen.

import random

from isda_streaming.data_stream import DataStream, TimedStream
from isda_streaming.synopsis import BloomFilter, CountMinSketch, ReservoirSample

random.seed(42)

data = [5, 10, 7, 8, 12, 2, 6, 7, 11, 15, 7, 11, 1, 10, 21, 3, 4, 9, 8, 2]
timestamps = [
    0.0,
    1.0,
    2.0,
    3.0,
    5.0,
    6.0,
    7.0,
    9.0,
    14.0,
    15.0,
    16.0,
    17.0,
    18.0,
    18.5,
    19.0,
    19.5,
    21.0,
    22.5,
    23.5,
    25.0,
]
timed_stream = TimedStream().from_collection(data, timestamps)
print(timed_stream)
TimedStream([(5, 0.0), (10, 1.0), (7, 2.0), (8, 3.0), (12, 5.0), (2, 6.0), (6, 7.0), (7, 9.0), (11, 14.0), (15, 15.0), (7, 16.0), (11, 17.0), (1, 18.0), (10, 18.5), (21, 19.0), (3, 19.5), (4, 21.0), (9, 22.5), (8, 23.5), (2, 25.0)])

Aufgabe 1.1: Landmark Windows#

Aufgabe 1.1.1: Landmark Windows - Tuple Basiert#

Es soll ein Tupel-basiertes Landmark-Window (Größe 5) erstellt werden. Bestimmen Sie pro Fenster den maximalen Wert. Geben Sie sich zusätzlich die Windows selbst aus.

# your code here

Aufgabe 1.1.2: Landmark Windows - Zeit Basiert#

Es soll ein Zeit-basiertes Landmark-Window alle 10 Sekunden erstellt werden. Bestimmen Sie pro Fenster den vorletzten Wert. Geben Sie sich zusätzlich die Windows selbst aus.

# your code here

Aufgabe 1.2: Tumbling Windows#

Aufgabe 1.2.1: Tumbling Windows - Tuple Basiert#

Erstellen Sie ein Tupel-basiertes (Größe 5) Tumbling Window und lassen Sie sich die Durchschnittswerte pro Fenster ausgeben.

# your code here

Aufgabe 1.2.2: Tumbling Windows - Zeit Basiert*#

Erstellen Sie ein Zeit basiertes (10 Sekunden) Tumbling Window und lassen Sie sich die Durchschnittswerte pro Fenster ausgeben.

# your code here

Aufgabe 1.3: Sliding Windows#

Aufgabe 1.3.1: Sliding Windows - Tuple Basiert*#

Erstellen Sie ein Tupel-basiertes Window mit 8 Elementen und einem Slide-Faktor von 4. Zusätzlich sollen Sie eine Methode erstellen, die für alle Werte eines Fensters die Summe bildet. Wenden Sie diese dann auf Ihre Windows an. Benutzen Sie in diesem Fall nicht .aggregate('sum')!

# your code here

Aufgabe 1.3.2: Sliding Windows - Zeit Basiert#

Teilen Sie den Stream nun in Zeit-basierte Sliding Windows von 10 Sekunden mit einer Verschiebung von 5 Sekunden auf. Lassen Sie sich zusätzlich die Anzahl an Elementen in jedem Fenster ausgeben.

# your code here

Aufgabe 1.4: Dataflow-Pipeline#

In dieser Aufgabe möchten wir eine vollständige Dataflow-Pipeline bauen. Gegeben ist ein Finanzticker, der uns die Daten einiger Automobilhersteller zurückgibt. Diese haben wir hier der Einfachheit wegen als Tupel mit dem Markennamen in Kleinbuchstaben und einem int-Wert als aktuellem Aktienkurs. Zudem hat jedes Tupel in unserem TimedStream auch noch einen zugehörigen Zeitstempel, der uns sagt, wann das Update zum jeweiligen Aktienkurs eingetroffen ist.

timed_stream = TimedStream().from_csv("../resources/09_data_streams/finanzticker_1.csv")
print(timed_stream)
TimedStream([(('toyota', 28.0), 0.0), (('bmw', 220.0), 0.5), (('mercedes', 150.0), 1.5), (('hyundai', 76.0), 2.0), (('ferrari', 564.0), 3.0), (('suzuki', 64.0), 5.0), (('bmw', 224.0), 5.5), (('mercedes', 156.0), 7.0), (('suzuki', 66.0), 7.5), (('ferrari', 560.0), 8.0), (('hyundai', 70.0), 9.0), (('toyota', 32.0), 10.0), (('suzuki', 64.0), 11.5), (('ferrari', 550.0), 12.5), (('mercedes', 158.0), 13.0), (('toyota', 30.0), 14.5), (('bmw', 230.0), 16.0), (('hyundai', 80.0), 18.5), (('suzuki', 70.0), 20.5), (('hyundai', 68.0), 21.0), (('ferrari', 540.0), 21.5), (('toyota', 36.0), 22.0), (('mercedes', 156.0), 22.5), (('bmw', 230.0), 23.5), (('suzuki', 66.0), 25.5)])

Aufgabe 1.4.1#

Unser Finanzticker gibt uns momentan die Werte für die Marken BMW, Ferrari, Hyundai, Mercedes, Suzuki & Toyota aus. Unser Chef ändert jedoch oft seine Meinung und möchte nun von den gennanten Herstellern nur noch die aus Asien haben. Schreiben Sie eine weitere Methode, die checkt, ob eines unserer Tupel von einem asiatischen Fahrzeughersteller ist.

# your code here

Aufgabe 1.4.2#

Diesen Finanzticker haben wir uns selbst erstellt, nun ist uns jedoch aufgefallen, dass uns bei der Implementierung ein Fehler passiert sein muss. Die Werte für den aktuellen Aktienkurs sind doppelt so groß. Schreiben Sie nun eine Methode, die für unsere Tupel den Wert des Aktienkurses halbiert.

# your code here

Aufgabe 1.4.3#

Zuletzt wollen wir noch eine Methode schreiben, die wir einer reduce-Funktion übergeben können und die uns das Tupel mit dem höchsten Aktienkurs zurückgibt.

# your code here

Aufgabe 1.4.4#

Kombinieren Sie die vorherigen Lösungen zu einer Pipeline. Bestimmen Sie alle 5 Sekunden den maximalen Aktienkurs asiatischer Automobilhersteller in den letzten 5 Sekunden.

Hinweis: Vergessen Sie nicht, die Kurse zu korrigieren.

def pipeline_windowing_1(input_stream: TimedStream):
    return  # your pipeline here


result = pipeline_windowing_1(timed_stream)
print(result)

# comparison with expected result
expected = DataStream().from_collection(
    [
        (("hyundai", 38), 0.0, 5.0),
        (("hyundai", 35), 5.0, 10.0),
        (("suzuki", 32), 10.0, 15.0),
        (("hyundai", 40), 15.0, 20.0),
        (("suzuki", 35), 20.0, 25.0),
        (("suzuki", 33), 25.0, 30.0),
    ]
)

print("Does the output stream match the expected one?", result == expected)
None
Does the output stream match the expected one? False

Aufgabe 1.4.5*#

Kombinieren Sie die vorherigen Lösungen zu einer Pipeline. Bestimmen Sie alle 5 Sekunden den durchschnittlichen Aktienkurs asiatischer Automobilhersteller in den letzten 10 Sekunden.

Hinweis: Vergessen Sie nicht, die Kurse zu korrigieren.

Hinweis: Beachten Sie, dass wir auf Tupeln arbeiten. Die Hersteller der Aktienkurs Updates sind für uns bei der Berechnung des Durchschnittspreises aller Kursupdates nicht von Bedeutung. Möglicherweise müssen Sie die Daten noch vor der Einteilung in Fenster verändern.

# your code here


def pipeline_windowing_2(input_stream: TimedStream):
    return  # your pipeline here


result = pipeline_windowing_2(timed_stream)
print(result)

# comparison with expected result
expected = DataStream().from_collection(
    [
        (30.4, 0.0, 10.0),
        (27.166666666666668, 5.0, 15.0),
        (25.75, 10.0, 20.0),
        (31.75, 15.0, 25.0),
        (30.0, 20.0, 30.0),
    ]
)

print("Does the output stream match the expected one?", result == expected)
None
Does the output stream match the expected one? False

Aufgabe 2: Synopsen#

In den folgenden Teilaufgaben arbeiten Sie mit Synopsen.

Aufgabe 2.1: Count-Min Sketch - Auswirkung der Datenverteilung bei Count-Min Sketches#

Im Folgenden betrachten wir, wie sich der Count-Min Sketch verhält, wenn wir eine größere Anzahl von Elementen hinzufügen. Wenn die Anzahl der einzigartigen Elemente größer ist als die Anzahl der Einträge im Array, wird deutlich, wie der Count-Min Sketch mit Kollisionen umgeht und wie gut er die Häufigkeiten approximiert.

Konkret sollten wir mindestens mehr einzigartige Elemente einfügen, als wir Einträge im Array haben, da sonst auch ein Array von Zählern genügen würde und wir jedes Element ohne den Mehraufwand des Hashing und der wiederholten Updates pro Reihe genau zählen könnten.

Erstellen Sie zwei neue Count-Min Sketches. Initialisieren Sie diese mit einer Breite von 16 und mit 3 Reihen. Fügen Sie dann jeweils 500 Elemente ein, wobei es genau 40 einzigartige Elemente geben sollte. Betrachten Sie, wie sich eine unterschiedliche Verteilung auf die Qualität der Approximierung auswirkt. Approximieren Sie jeweils konkret die Schlüssel 0 und 1.

Diskutieren Sie, was Ihre Erwartungen sind, wie sich die Verteilung auf die Genauigkeit der Approximierung auswirken könnte.

# Uniform Verteilung
data_stream = DataStream().from_collection([i % 40 for i in range(500)])
real_counts = [0 for x in range(40)]
# your code here
print(
    "Genaues Ergebnis für Key 0: ",
    real_counts[0],
    "- Approximation für Key 0: ",
    cm2.query(0),
    "-> Absoluter Fehler: ",
    abs(real_counts[0] - cm2.query(0)),
)

print(
    "Genaues Ergebnis für Key 1: ",
    real_counts[1],
    "- Approximation für Key 1: ",
    cm2.query(1),
    "-> Absoluter Fehler: ",
    abs(real_counts[1] - cm2.query(1)),
)
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
Cell In[16], line 5
      1 print(
      2     "Genaues Ergebnis für Key 0: ",
      3     real_counts[0],
      4     "- Approximation für Key 0: ",
----> 5     cm2.query(0),
      6     "-> Absoluter Fehler: ",
      7     abs(real_counts[0] - cm2.query(0)),
      8 )
     10 print(
     11     "Genaues Ergebnis für Key 1: ",
     12     real_counts[1],
   (...)     16     abs(real_counts[1] - cm2.query(1)),
     17 )

NameError: name 'cm2' is not defined
# Zipf Verteilung
data_stream2 = DataStream().from_collection([i if i < 40 else 0 for i in range(500)])
real_counts = [0 for x in range(40)]
# your code here
print(
    "Genaues Ergebnis für Key 0: ",
    real_counts[0],
    "- Approximation für Key 0: ",
    cm3.query(0),
    "-> Absoluter Fehler: ",
    abs(real_counts[0] - cm3.query(0)),
)

print(
    "Genaues Ergebnis für Key 1: ",
    real_counts[1],
    "- Approximation für Key 1: ",
    cm3.query(1),
    "-> Absoluter Fehler: ",
    abs(real_counts[1] - cm3.query(1)),
)

Aufgabe 2.2: Count-Min Sketch - Durchschnittlicher Absoluter Fehler#

Zuvor haben wir das Verhalten der Verteilung auf die Approximierung nur anhand von einzelnen Werten bewertet. Das Verhalten könnte somit nur per Zufall so ausgefallen sein. Nun bestimmen wir den durchschnittlichen absoluten Fehler pro Verteilung. Um diesen Fehler zu berechnen, bestimmen wir die Summe der absoluten Differenzen zwischen der Approximation und dem erwarteten Ergebnis und teilen dies durch die Gesamtanzahl der einzigartig beobachteten Elemente. Die Formel lautet:$\(Fehler = \frac{\sum_{n=1}|{query(x_n)}-y_n|}{|Elemente|}\)$

Dabei ist \(query(x_n)\) die Approximation für das Element \(x_n\) und \(y_n\) das erwartete Ergebnis. Die Summe wird über alle beobachteten Elemente durchgeführt und der Wert wird durch die Anzahl der einzigartigen Elemente geteilt.

# Uniformverteilung
# your code here
# Zipfverteilung
# your code here

Aufgabe 2.3: Bloom-Filter#

Erstellen Sie nun einen generellen Inhaltschecker. Erstellen Sie dafür einen Bloom-Filter mit 1500 Bits und 3 Hash-Funktionen. Erzeugen Sie dann einen Datenstrom. Fragen (query) Sie danach den Bloom-Filter ab, mit den Keys: Trump, Hello, SVW und interpretieren Sie die Ergebnisse.

Artikel = """
Anklageschrift veröffentlicht
Schwere Vorwürfe gegen Trump
Der frühere US-Präsident Trump ist erneut angeklagt worden. Die Justiz beschuldigt ihn in 37 Punkten - darunter sind Vergehen, für die ihm bis zu zehn Jahre Haft drohen. Es ist die erste Anklage gegen einen Ex-Präsidenten auf Bundesebene. Der frühere US-Präsident Donald Trump wird im Zusammenhang mit dem Fund von Geheimdokumenten, die sich nach Ende seiner Amtszeit in seinem Besitz befanden, in 37 Punkten angeklagt. Die US-Justiz wirft Trump laut der jetzt veröffentlichten Anklageschrift unter anderem auch Verschwörung zur Behinderung der Ermittlungen vor.
Insgesamt werden sieben Kategorien von Vergehen aufgeführt. Trump wird unter anderem die vorsätzliche Aufbewahrung von Informationen der nationalen Verteidigung vorgeworfen. Dieser Punkt fällt unter das US-Spionagegesetz und kann mit bis zu zehn Jahren Haft bestraft werden.
Geheime Akten in der Dusche und im Lagerraum Hintergrund ist die Affäre um Trumps Umgang mit geheimen Regierungsunterlagen nach seinem Abschied aus dem Weißen Haus. In der Anklageschrift heißt es, dass einige der Kisten mit Geheimdienstdokumenten zeitweise in einem Raum in Trumps Privatanwesen Mar-a-Lago gelagert worden seien, in dem öffentliche Veranstaltungen stattgefunden hätten.
"Trump bewahrte seine Kartons mit Geheimdokumenten an verschiedenen Orten im Mar-a-Lago-Club auf - unter anderem in einem Ballsaal, in einem Badezimmer und einer Dusche, einem Büro, seinem Schlafzimmer und einem Lagerraum", heißt es weiter. Entsprechende Fotos wurden der Anklage beigefügt. In mindestens zwei Fällen habe Trump zudem Geheimdokumente anderen gezeigt. Ein bei Trump beschlagnahmtes Dokument aus dem Juni 2020 enthielt darüber hinaus Informationen zu den nuklearen Fähigkeiten eines anderen Landes.
Sonderermittler verspricht "zügiges Gerichtsverfahren" In einer Pressekonferenz stellte der zuständige Sonderermittler Jack Smith ein "zügiges Gerichtsverfahren" in Aussicht. "Beachten Sie, dass die Angeklagten in diesem Fall als unschuldig zu gelten haben, bis ihre Schuld vor Gericht zweifelsfrei bewiesen ist", sagte Smith. Die Anklageschrift zeige aber den "Umfang und die Schwere" der Vorwürfe gegen Trump. Auch Trumps persönlicher Assistent wurde angeklagt. Smith betonte, dass der Schutz von Informationen der Landesverteidigung von entscheidender Bedeutung für die Sicherheit der USA sei. "Verstöße gegen diese Gesetze bringen unser Land in Gefahr", sagte Smith. Erste Anklage gegen eines Ex-Präsidenten auf Bundesebene Die Bundespolizei FBI hatte im August Trumps Anwesen in Florida untersucht und dort zahlreiche Verschlusssachen beschlagnahmt, einige mit höchster Geheimhaltungsstufe. Weil der Ex-Präsident die Unterlagen lange nach seinem Abschied aus dem Amt in seinem Privathaus aufbewahrt hatte, könnte er sich strafbar gemacht haben. Nachdem Trump im November offiziell verkündete, bei der Wahl 2024 erneut anzutreten, setzte das Justizministerium den unabhängigen Sonderermittler Smith ein, um die politisch heiklen Ermittlungen gegen Trump auszulagern. Es ist das erste Mal, dass gegen einen Ex-Präsidenten der USA auf Bundesebene Anklage erhoben wurde. Biden: Habe nicht mit Justizminister gesprochen Amtsinhaber Joe Biden sagte am Freitag, er habe keinen Kontakt mit Justizminister Merrick Garland gehabt. "Ich habe überhaupt nicht mit ihm gesprochen und werde auch nicht mit ihm sprechen. Und dazu habe ich keinen Kommentar". Damit spielte Biden auf die Vorwürfe von Trump und seinen Unterstützerinnen und Unterstützern an, die Macht seines Amtes zu missbrauchen, um Trump als politischen Gegner loszuwerden. Biden hatte in der Vergangenheit immer wieder gesagt, dass er dem Justizministerium weder Anweisungen gebe noch Kontakt in der Sache suche.
Trump drohen weitere Verfahren Trump reagierte auf die erneute Anklage in seinem Social-Media-Dienst Truth Social. Dort griff er Sonderermittler Smith persönlich an: Dessen Frau sei ein "Trump-Hasser", schrieb der Ex-Präsident. Trump war im April bereits im Zusammenhang mit Schweigegeldzahlungen an einen Pornostar auf Bundesstaaten-Ebene in New York angeklagt worden. In einem Zivilverfahren wurde er vor wenigen Wochen dann vor Gericht für einen sexuellen Übergriff verantwortlich gemacht. Bislang wiegen die Vorwürfe im Zusammenhang mit den Dokumenten juristisch am schwersten. Es wird aber noch in anderen Fällen gegen Trump ermittelt, im Zusammenhang mit seinen Versuchen, den Ausgang der Präsidentenwahl 2020 zu kippen. Es könnten also womöglich weitere Anklagen folgen - und die könnten ebenfalls gefährlich für ihn werden.
"""
# Quelle:https://www.tagesschau.de/ausland/amerika/usa-trump-anklageschrift-100.html#:~:text=Anklageschrift%20veröffentlicht%20Schwere%20Vorwürfe%20gegen%20Trump&text=Der%20frühere%20US%2DPräsident%20Trump,einen%20Ex%2DPräsidenten%20auf%20Bundesebene.

words = (
    Artikel.replace(" - ", " ")
    .replace("-", " ")
    .replace(".", "")
    .replace(",", "")
    .replace("/", "")
    .replace('"', "")
    .split()
)
# your code here

Aufgabe 2.4: Reservoir Sample#

Wir erstellen einen Datenstrom mit 100.000 Elementen. Der Wertebereich sollte dabei im Intervall \([0, 100]\) liegen, wobei der Wert 0 50% der Zeit vorkommen sollte und die restlichen Werte 1 bis 100 jeweils 0,5% .

# der weights Vektor enthält die Wahrscheinlichkeiten des Vorkommens der Werte 0 (50%) sowie 1-100 (jeweils 0,5%)
weights = [100] + 100 * [1]
items = [i for i in range(101)]

data = random.choices(items, weights=weights, k=100000)

ds = DataStream().from_collection(data)

Aufgabe 2.4.1: Reservoir Sample - Updates#

Erstellen Sie nun ein Reservoir Sample mit 50 Einträgen und fügen Sie die Elemente des Datenstroms ein.

# your code here

Aufgabe 2.4.2: Reservoir Sample - Anfrage#

Diskutieren Sie nun was eine gute Methode zur Approximierung ist, implementieren sie diese und testen Sie diese aus. Wenn ein Element nicht gefunden wird, dann wird bei Query Optimizern z.B. oftmals eine Uniformverteilung angenommen und 1 % zurückgegeben.

Können Elemente, die nicht im Sample auftauchen, gut oder überhaupt approximiert werden? Frage die Keys 0 und 1 ab und interpretiere das Ergebnis.

def query(sample, key):
    # your code here
    return