Data Streams Management – Transformationsoperatoren: Aufgaben (Lösungen)

Data Streams Management – Transformationsoperatoren: Aufgaben (Lösungen)#

from isda_streaming.data_stream import DataStream
import random

random.seed(84)  # wird benötigt, um bei allen die gleichen zufälligen Zahlen zu generieren

Aufgabe 1#

Es soll ein Datenstrom initialisiert werden, der 10 zufällige Ganze Zahlen aus im Intervall [-10,10] enthält.

Dafür soll die Funktion random.randint(startwert, stoppwert) genutzt werden, welche eine eine zufällige Zahl aus dem Intervall [startwert, stoppwert] generiert.

# Your code here
ds = DataStream()

print(ds)

# wird benötigt um das Ergebnis mit dem erwarteten Ergebnis zu vergleichen
result_ds = DataStream().from_collection([-1, -9, 5, -10, 6, 0, -4, 4, 5, 7])
print("Does the result match the expected one?", ds == result_ds)
DataStream([])
Does the result match the expected one? False

Hide code cell content

#### Musterlösung

random.seed(84)  # wird benötigt, um bei allen die gleichen zufälligen Zahlen zu generieren

# Your code here
random_numbers = []
for i in range(10):
    random_numbers.append(random.randint(-10, 10))

ds = DataStream().from_collection(random_numbers)

print(ds)

# wird benötigt um das Ergebnis mit dem erwarteten Ergebnis zu vergleichen
result_ds = DataStream().from_collection([-1, -9, 5, -10, 6, 0, -4, 4, 5, 7])
print("Does the result match the expected one?", ds == result_ds)
DataStream([-1, -9, 5, -10, 6, 0, -4, 4, 5, 7])
Does the result match the expected one? True

Aufgabe 2#

Ersetzen Sie jede 0 im Datenstrom durch -1 und 1.

ds = DataStream().from_collection([-1, -9, 5, -10, 6, 0, -4, 4, 5, 7])

# Your code here
replace_zero_ds = ds

print(replace_zero_ds)

# wird benötigt um das Ergebnis mit dem erwarteten Ergebnis zu vergleichen
result_ds = DataStream().from_collection([-1, -9, 5, -10, 6, -1, 1, -4, 4, 5, 7])
print("Does the result match the expected one?", replace_zero_ds == result_ds)
DataStream([-1, -9, 5, -10, 6, 0, -4, 4, 5, 7])
Does the result match the expected one? False

Hide code cell content

#### Musterlösung

ds = DataStream().from_collection([-1, -9, 5, -10, 6, 0, -4, 4, 5, 7])


def replace_zero(x: int):
    if x == 0:
        return [-1, 1]
    return [x]


replace_zero_ds = ds.flat_map(replace_zero)

print(replace_zero_ds)

# wird benötigt um das Ergebnis mit dem erwarteten Ergebnis zu vergleichen
result_ds = DataStream().from_collection([-1, -9, 5, -10, 6, -1, 1, -4, 4, 5, 7])
print("Does the result match the expected one?", replace_zero_ds == result_ds)
DataStream([-1, -9, 5, -10, 6, -1, 1, -4, 4, 5, 7])
Does the result match the expected one? True

Aufgabe 3#

Erstellen Sie aus dem gegebenen Datenstrom zwei neue Datenströme, wobei der erste Datenstrom alle geraden ganzen Zahlen beinhaltet und der andere alle ungeraden.

ds = DataStream().from_collection([-1, -9, 5, -10, 6, -1, 1, -4, 4, 5, 7])

# Your code here
even_ds = ds
odd_ds = ds

print("Datastream with even numbers:", even_ds)
print("Datastream with odd numbers:", odd_ds)

# wird benötigt um das Ergebnis mit dem erwarteten Ergebnis zu vergleichen
result_ds = DataStream().from_collection([-10, 6, -4, 4])
print("Does the even datastream match the expected one?", even_ds == result_ds)

result_ds = DataStream().from_collection([-1, -9, 5, -1, 1, 5, 7])
print("Does the odd datastream match the expected one?", odd_ds == result_ds)
Datastream with even numbers: DataStream([-1, -9, 5, -10, 6, -1, 1, -4, 4, 5, 7])
Datastream with odd numbers: DataStream([-1, -9, 5, -10, 6, -1, 1, -4, 4, 5, 7])
Does the even datastream match the expected one? False
Does the odd datastream match the expected one? False

Hide code cell content

#### Musterlösung

ds = DataStream().from_collection([-1, -9, 5, -10, 6, -1, 1, -4, 4, 5, 7])


def keep_even(x):
    if x % 2 == 0:
        return True
    return False


def keep_odd(x):
    return not keep_even(x)


even_ds = ds.filter(keep_even)
odd_ds = ds.filter(keep_odd)

print("Datastream with even numbers:", even_ds)
print("Datastream with odd numbers:", odd_ds)

# wird benötigt um das Ergebnis mit dem erwarteten Ergebnis zu vergleichen
result_ds = DataStream().from_collection([-10, 6, -4, 4])
print("Does the even datastream match the expected one?", even_ds == result_ds)

result_ds = DataStream().from_collection([-1, -9, 5, -1, 1, 5, 7])
print("Does the odd datastream match the expected one?", odd_ds == result_ds)
Datastream with even numbers: DataStream([-10, 6, -4, 4])
Datastream with odd numbers: DataStream([-1, -9, 5, -1, 1, 5, 7])
Does the even datastream match the expected one? True
Does the odd datastream match the expected one? True

Aufgabe 4#

Kategorisieren Sie die Werte des Datenstroms. Ersetzen Sie dabei alle atomaren Werte des Datenstroms mit einem Tupel. Dabei gibt das erste Feld des Tuples an ob der Wert positiv (‘p’) oder negativ (‘n’) ist und das zweite Feld beinhaltet nur noch den absoluten Betrag des ursprünglichen Wertes.

Beispiel:
Input-Stream = [2, -4, 6]
Output-Stream = [(‘p’, 2), (‘n’, 4), (‘p’, 6)]

even_ds = DataStream().from_collection([-10, 6, -4, 4])
odd_ds = DataStream().from_collection([-1, -9, 5, -1, 1, 5, 7])

# Your code here
even_tuple_ds = even_ds
odd_tuple_ds = odd_ds

print("even_tuple_ds = ", even_tuple_ds)
print("odd_tuple_ds = ", odd_tuple_ds)

# wird benötigt um das Ergebnis mit dem erwarteten Ergebnis zu vergleichen
result_ds = DataStream().from_collection([("n", 10), ("p", 6), ("n", 4), ("p", 4)])
print(
    "Does the even datastream with tuples match the expected one?",
    even_tuple_ds == result_ds,
)

result_ds = DataStream().from_collection(
    [("n", 1), ("n", 9), ("p", 5), ("n", 1), ("p", 1), ("p", 5), ("p", 7)]
)
print(
    "Does the odd datastream with tuples match the expected one?",
    odd_tuple_ds == result_ds,
)
even_tuple_ds =  DataStream([-10, 6, -4, 4])
odd_tuple_ds =  DataStream([-1, -9, 5, -1, 1, 5, 7])
Does the even datastream with tuples match the expected one? False
Does the odd datastream with tuples match the expected one? False

Hide code cell content

#### Musterlösung

even_ds = DataStream().from_collection([-10, 6, -4, 4])
odd_ds = DataStream().from_collection([-1, -9, 5, -1, 1, 5, 7])


def get_tuple(x: int):
    if x > 0:
        return ("p", abs(x))
    return ("n", abs(x))


even_tuple_ds = even_ds.map(get_tuple)
odd_tuple_ds = odd_ds.map(get_tuple)

print(even_tuple_ds)
print(odd_tuple_ds)

# wird benötigt um das Ergebnis mit dem erwarteten Ergebnis zu vergleichen
result_ds = DataStream().from_collection([("n", 10), ("p", 6), ("n", 4), ("p", 4)])
print(
    "Does the even datastream with tuples match the expected one?",
    even_tuple_ds == result_ds,
)

result_ds = DataStream().from_collection(
    [("n", 1), ("n", 9), ("p", 5), ("n", 1), ("p", 1), ("p", 5), ("p", 7)]
)
print(
    "Does the odd datastream with tuples match the expected one?",
    odd_tuple_ds == result_ds,
)
DataStream([('n', 10), ('p', 6), ('n', 4), ('p', 4)])
DataStream([('n', 1), ('n', 9), ('p', 5), ('n', 1), ('p', 1), ('p', 5), ('p', 7)])
Does the even datastream with tuples match the expected one? True
Does the odd datastream with tuples match the expected one? True

Aufgabe 5#

Bauen Sie nun eine Dataflow-Pipeline, welche die Ergebnisse der vorherigen Aufgaben kombiniert:

Ersetzen Sie zunächst alle 0 Einträge im Datenstrom mit -1 und 1. Filtern Sie danach alle gerade Zahlen heraus um die übrigen Werte dann zu kategorisieren. Positive Werte sollen dabei erneut als Tupel mit ‘p’ kategorisiert werden und nur noch den absoluten Wert (zweites Feld) haben.

ds = DataStream().from_collection([-1, -9, 5, -10, 6, 0, -4, 4, 5, 7])

# Your code here
transformed_ds = ds

print(transformed_ds)

# wird benötigt um das Ergebnis mit dem erwarteten Ergebnis zu vergleichen
result_ds = DataStream().from_collection(
    [("n", 1), ("n", 9), ("p", 5), ("n", 1), ("p", 1), ("p", 5), ("p", 7)]
)
print(
    "Does the odd datastream with tuples match the expected one?",
    transformed_ds == result_ds,
)
DataStream([-1, -9, 5, -10, 6, 0, -4, 4, 5, 7])
Does the odd datastream with tuples match the expected one? False

Hide code cell content

#### Musterlösung

ds = DataStream().from_collection([-1, -9, 5, -10, 6, 0, -4, 4, 5, 7])


def pipeline_aufgabe_2(data_stream: DataStream):
    return data_stream.flat_map(replace_zero).filter(keep_odd).map(get_tuple)


transformed_ds = pipeline_aufgabe_2(ds)

print(transformed_ds)

# wird benötigt um das Ergebnis mit dem erwarteten Ergebnis zu vergleichen
result_ds = DataStream().from_collection(
    [("n", 1), ("n", 9), ("p", 5), ("n", 1), ("p", 1), ("p", 5), ("p", 7)]
)
print(
    "Does the odd datastream with tuples match the expected one?",
    transformed_ds == result_ds,
)
DataStream([('n', 1), ('n', 9), ('p', 5), ('n', 1), ('p', 1), ('p', 5), ('p', 7)])
Does the odd datastream with tuples match the expected one? True

Aufgabe 6: Bluthochdruck#

Es steht ein Datenstrom zur Verfügung, der Informationen über den Blutdruck und den Puls eines Menschen beinhaltet. Unsere Sensoren messen dabei:

  • die Anzahl der Herzschläge pro 10 Sekunden und

  • den Blutdruck in Pascal, welcher zwei Werte annehmen kann:

    • den systolischen Wert (wenn das Herz Blut in die Gefäße pumpt und sich zusammenzieht)

    • den diastolischen Wert (wenn der Herzmuskel erschlafft)

Weiterhin wissen wir, dass bei einem gesunden Erwachsenen:

  • Der Ruhepuls ungefähr zwischen 60 und 90 Schlägen pro Minute,

  • der systolische Wert zwischen 110 und 130 mmHg und

  • der diastolische Wert zwischen 80 und 90 mmHg liegt.

Leider liefern uns die Sensoren die Daten über Puls und Blutdruck in einem schlechten Format.

  • Der Blutdruck wird üblicherweise in mmHg (Millimeter Quecksilbersäulen) angegeben, 1 mmHg entspricht dabei ungefähr 133,3 Pascal.

  • Der Puls entspricht in der Regel den Herzschlägen pro Minute, hier jedoch erfolgt die Messung pro 10s.

Die Daten aus dem Datenstrom sollen zuerst in das herkömmliche Format transformiert werden und danach soll jeder Wert eine der folgenden Kategorien bekommen:

  • “puls” für Puls-Werte im gesunden Rahmen

  • “sys” für systolische Werte im gesunden Rahmen

  • “dia” für diastolische Werte im gesunden Rahmen

  • “anomaly” für Blutdruck-Werte, die höher als 130mmHg sind

  • “error” für sonstige Werte

Da der beobachtete Patient an Bluthochdruck leidet, soll der Datenstrom nur noch den Durchschnitt aller bisher gesehenen Blutdruckwerte enthalten, die über 130mmHg liegen.
Anhand dieses Datenstroms könnte man nun Alarm schlagen, wenn der Durchschnitt auf einen zu hohen Wert ansteigt.

Hinweis: Kategorisieren Sie zunächst die Daten, ansonsten kann es zu Überschneidungen bei den Wertebereichen kommen.

random.seed(42)
data = [random.randint(10, 15) for _ in range(100)]
data.extend(
    [random.randint(14663, 17329) for _ in range(100)]
    + [random.randint(17329, 21328) for _ in range(20)]
)
data.extend([random.randint(10664, 11997) for _ in range(100)])

random.shuffle(data)

ds = DataStream().from_collection(data)

# 1) transform the measurements into the common format and assigns them a category
categorized_ds = ds

# comparison with expected result
result_ds = DataStream().from_csv("../resources/09_data_streams/blutwerte_1.csv")
print(
    "Does the categorized datastream match the expected one?",
    categorized_ds == result_ds,
)

# 2) filter the anomaly data
filtered_ds = ds

# comparison with expected result
result_ds = DataStream().from_csv("../resources/09_data_streams/blutwerte_2.csv")
print("Does the filtered datastream match the expected one?", filtered_ds == result_ds)

# 3) calculate the average
avg_ds = ds

# comparison with expected result
result_ds = DataStream().from_csv("../resources/09_data_streams/blutwerte_3.csv")
print(
    "Does the datastream with the average values match the expected one?",
    avg_ds == result_ds,
)

# print the latest average value in the datastream
last_avg = list(avg_ds)[-1]
print("The average value of all seen anomalies is", round(last_avg, 2))
Does the categorized datastream match the expected one? False
Does the filtered datastream match the expected one? False
Does the datastream with the average values match the expected one? False
The average value of all seen anomalies is 11151

Hide code cell content

#### Musterlösung

random.seed(42)
data = [random.randint(10, 15) for _ in range(100)]
data.extend(
    [random.randint(14663, 17329) for _ in range(100)]
    + [random.randint(17329, 21328) for _ in range(20)]
)
data.extend([random.randint(10664, 11997) for _ in range(100)])

random.shuffle(data)

ds = DataStream().from_collection(data)

# 1) transform the measurements into the common format and assigns them a category


def transform_data(x: int):
    # transforms the measurements into the common format and assigns them a category
    # input: (value)
    # output: (category, value)
    if x >= 10 and x <= 15:
        return ("puls", x * 6)
    elif x >= 10664 and x <= 11997:
        return ("dia", x // 133.3)
    elif x >= 14663 and x <= 17329:
        return ("sys", x // 133.3)
    elif x > 17329:
        return ("anomaly", x // 133.3)

    return ("error", x)


categorized_ds = ds.map(transform_data)

result_ds = DataStream().from_csv("../resources/09_data_streams/blutwerte_1.csv")
print(
    "Does the categorized datastream match the expected one?",
    categorized_ds == result_ds,
)

# 2) filter the anomaly data


def filter_anomaly(x):
    # only data from the category "anomaly" will be selected
    if x[0] == "anomaly":
        return True
    return False


filtered_ds = ds.map(transform_data).filter(filter_anomaly)

result_ds = DataStream().from_csv("../resources/09_data_streams/blutwerte_2.csv")
print("Does the filtered datastream match the expected one?", filtered_ds == result_ds)

# 3) calculate the average


def make_tuple(x):
    # transforms every (category, value) into (value, 1)
    return (x[1], 1)


def sum_count(x0, x1):
    # counts the elements in the datastream and calculates the rolling sum of all values
    # input:  x0 = (value1, number_of_elements1)
    #         x1 = (value2, number_of_elements2)
    #         where value1 and value2 are the rolling sums of all values seen so far
    #         and number_of_elements1 and number_of_elements2 are the counts of all elements seen so far
    # output: (value1 + value2, number of element + 1)
    return (x0[0] + x1[0], x0[1] + x1[1])


def calc_avg(x):
    # calculates the rolling average over the rolling sum of all elements
    return x[0] / x[1]


avg_ds = (
    ds.map(transform_data).filter(filter_anomaly).map(make_tuple).reduce(sum_count).map(calc_avg)
)

result_ds = DataStream().from_csv("../resources/09_data_streams/blutwerte_3.csv")
print(
    "Does the datastream with the average values match the expected one?",
    avg_ds == result_ds,
)

last_avg = list(avg_ds)[-1]
print("The average value of all seen anomalies is", round(last_avg, 2))
Does the categorized datastream match the expected one? True
Does the filtered datastream match the expected one? True
Does the datastream with the average values match the expected one? True
The average value of all seen anomalies is 146.45