From fd884bdfeb9cb1fc64f2790a05dc1d439d3d34e8 Mon Sep 17 00:00:00 2001 From: Axy Date: Mon, 19 Jan 2026 17:02:17 +0100 Subject: [PATCH 1/1] feat: implement polymorphic data processing streams --- ex0/stream_processor.py | 55 +++++++++++++++++++++ ex1/data_stream.py | 106 ++++++++++++++++++++++++++++++++++++++++ ex2/nexus_pipeline.py | 105 +++++++++++++++++++++++++++++++++++++++ 3 files changed, 266 insertions(+) create mode 100644 ex0/stream_processor.py create mode 100644 ex1/data_stream.py create mode 100644 ex2/nexus_pipeline.py diff --git a/ex0/stream_processor.py b/ex0/stream_processor.py new file mode 100644 index 0000000..3cecc9d --- /dev/null +++ b/ex0/stream_processor.py @@ -0,0 +1,55 @@ +from typing import Any +from abc import ABC, abstractmethod + + +class DataProcessor(ABC): + @abstractmethod + def process(self, data: Any) -> str: + pass + + @abstractmethod + def validate(self, data: Any) -> bool: + pass + + def format_output(self, result: str) -> str: + return result + + +class NumericProcessor(DataProcessor): + def validate(self, data: Any) -> bool: + if not isinstance(data, list): + return False + return all(isinstance(x, (int, float)) for x in data) + + def process(self, data: Any) -> str: + if not self.validate(data): + raise ValueError("Invalid data for NumericProcessor") + count = len(data) + total = sum(data) + avg = total / count if count > 0 else 0.0 + return f"Processed {count} numeric values, sum={total}, avg={avg}" + + +class TextProcessor(DataProcessor): + def validate(self, data: Any) -> bool: + return isinstance(data, str) + + def process(self, data: Any) -> str: + if not self.validate(data): + raise ValueError("Invalid data for TextProcessor") + char_count = len(data) + word_count = len(data.split()) + return f"Processed text: {char_count} characters, {word_count} words" + + +class LogProcessor(DataProcessor): + def validate(self, data: Any) -> bool: + return isinstance(data, str) and ": " in data + + def process(self, data: Any) -> str: + if not self.validate(data): + raise ValueError("Invalid data for LogProcessor") + parts = data.split(": ", 1) + level = parts[0] + message = parts[1] + return f"[{level}] {level} level detected: {message}" diff --git a/ex1/data_stream.py b/ex1/data_stream.py new file mode 100644 index 0000000..666ccfb --- /dev/null +++ b/ex1/data_stream.py @@ -0,0 +1,106 @@ +from typing import Any, List, Dict, Union, Optional +from abc import ABC, abstractmethod + + +class DataStream(ABC): + def __init__(self, stream_id: str) -> None: + self.stream_id = stream_id + self.stream_type = "Generic" + + @abstractmethod + def process_batch(self, data_batch: List[Any]) -> str: + pass + + def filter_data(self, data_batch: List[Any], + criteria: Optional[str] = None) -> List[Any]: + return data_batch + + def get_stats(self) -> Dict[str, Union[str, int, float]]: + return {"id": self.stream_id, "type": self.stream_type} + + +class SensorStream(DataStream): + def __init__(self, stream_id: str) -> None: + super().__init__(stream_id) + self.stream_type = "Environmental Data" + + def process_batch(self, data_batch: List[Any]) -> str: + # Assuming data_batch is a list of tuples (type, value) or just values + # Based on example, let's assume list of numbers or (type, value) + # If simple numbers, we can't distinguish temp. + # Let's support (type, value) tuples for robustness matching example + count = len(data_batch) + temps = [] + for item in data_batch: + if isinstance(item, (int, float)): + temps.append(item) + elif isinstance(item, tuple) and len(item) == 2: + if item[0] == "temp": + temps.append(item[1]) + + avg_temp = sum(temps) / len(temps) if temps else 0.0 + return f"Sensor analysis: {count} readings processed, " \ + f"avg temp: {avg_temp}°C" + + def filter_data(self, data_batch: List[Any], + criteria: Optional[str] = None) -> List[Any]: + if criteria == "high_priority": + # Return values > 50? Or specific types? + # Let's assume high values are critical + return [x for x in data_batch + if (isinstance(x, (int, float)) and x > 50) or + (isinstance(x, tuple) and x[1] > 50)] + return data_batch + + +class TransactionStream(DataStream): + def __init__(self, stream_id: str) -> None: + super().__init__(stream_id) + self.stream_type = "Financial Data" + + def process_batch(self, data_batch: List[Any]) -> str: + count = len(data_batch) + net_flow = 0 + for item in data_batch: + if isinstance(item, tuple) and len(item) == 2: + action, amount = item + if action == "buy": + net_flow += amount + elif action == "sell": + net_flow -= amount + + sign = "+" if net_flow >= 0 else "" + return f"Transaction analysis: {count} operations, " \ + f"net flow: {sign}{net_flow} units" + + def filter_data(self, data_batch: List[Any], + criteria: Optional[str] = None) -> List[Any]: + if criteria == "large": + return [x for x in data_batch + if isinstance(x, tuple) and x[1] > 100] + return data_batch + + +class EventStream(DataStream): + def __init__(self, stream_id: str) -> None: + super().__init__(stream_id) + self.stream_type = "System Events" + + def process_batch(self, data_batch: List[Any]) -> str: + count = len(data_batch) + errors = [x for x in data_batch if x == "error"] + return f"Event analysis: {count} events, {len(errors)} error detected" + + def filter_data(self, data_batch: List[Any], + criteria: Optional[str] = None) -> List[Any]: + if criteria == "error": + return [x for x in data_batch if x == "error"] + return data_batch + + +class StreamProcessor: + def process_stream(self, stream: DataStream, batch: List[Any]) -> str: + try: + return stream.process_batch(batch) + except Exception as e: + return f"Error processing stream {stream.stream_id}: {str(e)}" diff --git a/ex2/nexus_pipeline.py b/ex2/nexus_pipeline.py new file mode 100644 index 0000000..6f828e2 --- /dev/null +++ b/ex2/nexus_pipeline.py @@ -0,0 +1,105 @@ +from typing import Any, List, Dict, Union, Optional, Protocol +from abc import ABC, abstractmethod + + +class ProcessingStage(Protocol): + def process(self, data: Any) -> Any: + ... + + +class ProcessingPipeline(ABC): + def __init__(self, pipeline_id: str) -> None: + self.pipeline_id = pipeline_id + self.stages: List[ProcessingStage] = [] + + def add_stage(self, stage: ProcessingStage) -> None: + self.stages.append(stage) + + def _run_stages(self, data: Any) -> Any: + result = data + for stage in self.stages: + result = stage.process(result) + return result + + @abstractmethod + def process(self, data: Any) -> Union[str, Any]: + pass + + +class InputStage: + def process(self, data: Any) -> Any: + # Generic validation/parsing + if isinstance(data, str): + if data.startswith("{") and data.endswith("}"): + # Simulating JSON parsing + import json + try: + return json.loads(data) + except json.JSONDecodeError: + pass + elif "," in data: + # Simulating CSV parsing + return data.split(",") + return data + + +class TransformStage: + def process(self, data: Any) -> Any: + # Generic transformation + if isinstance(data, dict): + data["enriched"] = True + elif isinstance(data, list): + # Enriched list? + pass + return data + + +class OutputStage: + def process(self, data: Any) -> Any: + # Formatting + if isinstance(data, dict) and "sensor" in data: + return f"Processed {data.get('sensor')} reading: " \ + f"{data.get('value')} (Normal range)" + if isinstance(data, list): + return f"User activity logged: {len(data) // 3} actions processed" + # Assuming CSV: user,action,timestamp -> 3 items? + # Or if list of items. + # Example: "Input: "user,action,timestamp" -> split -> 3 items. + # Output: "User activity logged: 1 actions processed". + # If 3 items = 1 action. + return f"Processed data: {data}" + + +class JSONAdapter(ProcessingPipeline): + def process(self, data: Any) -> Union[str, Any]: + try: + return self._run_stages(data) + except Exception as e: + return f"JSON Pipeline Error: {str(e)}" + + +class CSVAdapter(ProcessingPipeline): + def process(self, data: Any) -> Union[str, Any]: + try: + return self._run_stages(data) + except Exception as e: + return f"CSV Pipeline Error: {str(e)}" + + +class StreamAdapter(ProcessingPipeline): + def process(self, data: Any) -> Union[str, Any]: + try: + return self._run_stages(data) + except Exception as e: + return f"Stream Pipeline Error: {str(e)}" + + +class NexusManager: + def __init__(self) -> None: + self.pipelines: Dict[str, ProcessingPipeline] = {} + + def register_pipeline(self, pipeline: ProcessingPipeline) -> None: + self.pipelines[pipeline.pipeline_id] = pipeline + + def get_pipeline(self, pipeline_id: str) -> Optional[ProcessingPipeline]: + return self.pipelines.get(pipeline_id) -- 2.52.0