]> Untitled Git - axy/ft/python05.git/commitdiff
format
authorAxy <gilliardmarthey.axel@gmail.com>
Mon, 2 Feb 2026 15:25:42 +0000 (16:25 +0100)
committerAxy <gilliardmarthey.axel@gmail.com>
Mon, 2 Feb 2026 15:25:42 +0000 (16:25 +0100)
ex1/data_stream.py
ex2/nexus_pipeline.py

index 666ccfb15ac1a0c00331d46eee81ce7f752e064d..82f7954c3cfd026f75530bde4c092ea459dc012b 100644 (file)
@@ -11,8 +11,9 @@ class DataStream(ABC):
     def process_batch(self, data_batch: List[Any]) -> str:
         pass
 
-    def filter_data(self, data_batch: List[Any],
-                    criteria: Optional[str] = None) -> List[Any]:
+    def filter_data(
+        self, data_batch: List[Any], criteria: Optional[str] = None
+    ) -> List[Any]:
         return data_batch
 
     def get_stats(self) -> Dict[str, Union[str, int, float]]:
@@ -39,17 +40,23 @@ class SensorStream(DataStream):
                     temps.append(item[1])
 
         avg_temp = sum(temps) / len(temps) if temps else 0.0
-        return f"Sensor analysis: {count} readings processed, " \
-               f"avg temp: {avg_temp}°C"
-
-    def filter_data(self, data_batch: List[Any],
-                    criteria: Optional[str] = None) -> List[Any]:
+        return (
+            f"Sensor analysis: {count} readings processed, "
+            f"avg temp: {avg_temp}°C"
+        )
+
+    def filter_data(
+        self, data_batch: List[Any], criteria: Optional[str] = None
+    ) -> List[Any]:
         if criteria == "high_priority":
             # Return values > 50? Or specific types?
             # Let's assume high values are critical
-            return [x for x in data_batch
-                    if (isinstance(x, (int, float)) and x > 50) or
-                    (isinstance(x, tuple) and x[1] > 50)]
+            return [
+                x
+                for x in data_batch
+                if (isinstance(x, (int, float)) and x > 50)
+                or (isinstance(x, tuple) and x[1] > 50)
+            ]
         return data_batch
 
 
@@ -70,14 +77,18 @@ class TransactionStream(DataStream):
                     net_flow -= amount
 
         sign = "+" if net_flow >= 0 else ""
-        return f"Transaction analysis: {count} operations, " \
-               f"net flow: {sign}{net_flow} units"
-
-    def filter_data(self, data_batch: List[Any],
-                    criteria: Optional[str] = None) -> List[Any]:
+        return (
+            f"Transaction analysis: {count} operations, "
+            f"net flow: {sign}{net_flow} units"
+        )
+
+    def filter_data(
+        self, data_batch: List[Any], criteria: Optional[str] = None
+    ) -> List[Any]:
         if criteria == "large":
-            return [x for x in data_batch
-                    if isinstance(x, tuple) and x[1] > 100]
+            return [
+                x for x in data_batch if isinstance(x, tuple) and x[1] > 100
+            ]
         return data_batch
 
 
@@ -91,8 +102,9 @@ class EventStream(DataStream):
         errors = [x for x in data_batch if x == "error"]
         return f"Event analysis: {count} events, {len(errors)} error detected"
 
-    def filter_data(self, data_batch: List[Any],
-                    criteria: Optional[str] = None) -> List[Any]:
+    def filter_data(
+        self, data_batch: List[Any], criteria: Optional[str] = None
+    ) -> List[Any]:
         if criteria == "error":
             return [x for x in data_batch if x == "error"]
         return data_batch
index 6f828e2adc52865da227a2b57a1b40487f8bdf4a..cac36dd882f0acc48a2294c6af0552bc54552547 100644 (file)
@@ -3,8 +3,7 @@ from abc import ABC, abstractmethod
 
 
 class ProcessingStage(Protocol):
-    def process(self, data: Any) -> Any:
-        ...
+    def process(self, data: Any) -> Any: ...
 
 
 class ProcessingPipeline(ABC):
@@ -33,6 +32,7 @@ class InputStage:
             if data.startswith("{") and data.endswith("}"):
                 # Simulating JSON parsing
                 import json
+
                 try:
                     return json.loads(data)
                 except json.JSONDecodeError:
@@ -58,8 +58,10 @@ class OutputStage:
     def process(self, data: Any) -> Any:
         # Formatting
         if isinstance(data, dict) and "sensor" in data:
-            return f"Processed {data.get('sensor')} reading: " \
-                   f"{data.get('value')} (Normal range)"
+            return (
+                f"Processed {data.get('sensor')} reading: "
+                f"{data.get('value')} (Normal range)"
+            )
         if isinstance(data, list):
             return f"User activity logged: {len(data) // 3} actions processed"
             # Assuming CSV: user,action,timestamp -> 3 items?