From 3f4dadcd052a00bb8e1dbd277ca8157b6e33e913 Mon Sep 17 00:00:00 2001 From: Axy Date: Mon, 2 Feb 2026 16:25:42 +0100 Subject: [PATCH] format --- ex1/data_stream.py | 50 +++++++++++++++++++++++++++---------------- ex2/nexus_pipeline.py | 10 +++++---- 2 files changed, 37 insertions(+), 23 deletions(-) diff --git a/ex1/data_stream.py b/ex1/data_stream.py index 666ccfb..82f7954 100644 --- a/ex1/data_stream.py +++ b/ex1/data_stream.py @@ -11,8 +11,9 @@ class DataStream(ABC): def process_batch(self, data_batch: List[Any]) -> str: pass - def filter_data(self, data_batch: List[Any], - criteria: Optional[str] = None) -> List[Any]: + def filter_data( + self, data_batch: List[Any], criteria: Optional[str] = None + ) -> List[Any]: return data_batch def get_stats(self) -> Dict[str, Union[str, int, float]]: @@ -39,17 +40,23 @@ class SensorStream(DataStream): temps.append(item[1]) avg_temp = sum(temps) / len(temps) if temps else 0.0 - return f"Sensor analysis: {count} readings processed, " \ - f"avg temp: {avg_temp}°C" - - def filter_data(self, data_batch: List[Any], - criteria: Optional[str] = None) -> List[Any]: + return ( + f"Sensor analysis: {count} readings processed, " + f"avg temp: {avg_temp}°C" + ) + + def filter_data( + self, data_batch: List[Any], criteria: Optional[str] = None + ) -> List[Any]: if criteria == "high_priority": # Return values > 50? Or specific types? # Let's assume high values are critical - return [x for x in data_batch - if (isinstance(x, (int, float)) and x > 50) or - (isinstance(x, tuple) and x[1] > 50)] + return [ + x + for x in data_batch + if (isinstance(x, (int, float)) and x > 50) + or (isinstance(x, tuple) and x[1] > 50) + ] return data_batch @@ -70,14 +77,18 @@ class TransactionStream(DataStream): net_flow -= amount sign = "+" if net_flow >= 0 else "" - return f"Transaction analysis: {count} operations, " \ - f"net flow: {sign}{net_flow} units" - - def filter_data(self, data_batch: List[Any], - criteria: Optional[str] = None) -> List[Any]: + return ( + f"Transaction analysis: {count} operations, " + f"net flow: {sign}{net_flow} units" + ) + + def filter_data( + self, data_batch: List[Any], criteria: Optional[str] = None + ) -> List[Any]: if criteria == "large": - return [x for x in data_batch - if isinstance(x, tuple) and x[1] > 100] + return [ + x for x in data_batch if isinstance(x, tuple) and x[1] > 100 + ] return data_batch @@ -91,8 +102,9 @@ class EventStream(DataStream): errors = [x for x in data_batch if x == "error"] return f"Event analysis: {count} events, {len(errors)} error detected" - def filter_data(self, data_batch: List[Any], - criteria: Optional[str] = None) -> List[Any]: + def filter_data( + self, data_batch: List[Any], criteria: Optional[str] = None + ) -> List[Any]: if criteria == "error": return [x for x in data_batch if x == "error"] return data_batch diff --git a/ex2/nexus_pipeline.py b/ex2/nexus_pipeline.py index 6f828e2..cac36dd 100644 --- a/ex2/nexus_pipeline.py +++ b/ex2/nexus_pipeline.py @@ -3,8 +3,7 @@ from abc import ABC, abstractmethod class ProcessingStage(Protocol): - def process(self, data: Any) -> Any: - ... + def process(self, data: Any) -> Any: ... class ProcessingPipeline(ABC): @@ -33,6 +32,7 @@ class InputStage: if data.startswith("{") and data.endswith("}"): # Simulating JSON parsing import json + try: return json.loads(data) except json.JSONDecodeError: @@ -58,8 +58,10 @@ class OutputStage: def process(self, data: Any) -> Any: # Formatting if isinstance(data, dict) and "sensor" in data: - return f"Processed {data.get('sensor')} reading: " \ - f"{data.get('value')} (Normal range)" + return ( + f"Processed {data.get('sensor')} reading: " + f"{data.get('value')} (Normal range)" + ) if isinstance(data, list): return f"User activity logged: {len(data) // 3} actions processed" # Assuming CSV: user,action,timestamp -> 3 items? -- 2.52.0