]> Untitled Git - axy/ft/python05.git/commitdiff
feat: implement polymorphic data processing streams
authorAxy <gilliardmarthey.axel@gmail.com>
Mon, 19 Jan 2026 16:02:17 +0000 (17:02 +0100)
committerAxy <gilliardmarthey.axel@gmail.com>
Mon, 19 Jan 2026 16:02:17 +0000 (17:02 +0100)
ex0/stream_processor.py [new file with mode: 0644]
ex1/data_stream.py [new file with mode: 0644]
ex2/nexus_pipeline.py [new file with mode: 0644]

diff --git a/ex0/stream_processor.py b/ex0/stream_processor.py
new file mode 100644 (file)
index 0000000..3cecc9d
--- /dev/null
@@ -0,0 +1,55 @@
+from typing import Any
+from abc import ABC, abstractmethod
+
+
+class DataProcessor(ABC):
+    @abstractmethod
+    def process(self, data: Any) -> str:
+        pass
+
+    @abstractmethod
+    def validate(self, data: Any) -> bool:
+        pass
+
+    def format_output(self, result: str) -> str:
+        return result
+
+
+class NumericProcessor(DataProcessor):
+    def validate(self, data: Any) -> bool:
+        if not isinstance(data, list):
+            return False
+        return all(isinstance(x, (int, float)) for x in data)
+
+    def process(self, data: Any) -> str:
+        if not self.validate(data):
+            raise ValueError("Invalid data for NumericProcessor")
+        count = len(data)
+        total = sum(data)
+        avg = total / count if count > 0 else 0.0
+        return f"Processed {count} numeric values, sum={total}, avg={avg}"
+
+
+class TextProcessor(DataProcessor):
+    def validate(self, data: Any) -> bool:
+        return isinstance(data, str)
+
+    def process(self, data: Any) -> str:
+        if not self.validate(data):
+            raise ValueError("Invalid data for TextProcessor")
+        char_count = len(data)
+        word_count = len(data.split())
+        return f"Processed text: {char_count} characters, {word_count} words"
+
+
+class LogProcessor(DataProcessor):
+    def validate(self, data: Any) -> bool:
+        return isinstance(data, str) and ": " in data
+
+    def process(self, data: Any) -> str:
+        if not self.validate(data):
+            raise ValueError("Invalid data for LogProcessor")
+        parts = data.split(": ", 1)
+        level = parts[0]
+        message = parts[1]
+        return f"[{level}] {level} level detected: {message}"
diff --git a/ex1/data_stream.py b/ex1/data_stream.py
new file mode 100644 (file)
index 0000000..666ccfb
--- /dev/null
@@ -0,0 +1,106 @@
+from typing import Any, List, Dict, Union, Optional
+from abc import ABC, abstractmethod
+
+
+class DataStream(ABC):
+    def __init__(self, stream_id: str) -> None:
+        self.stream_id = stream_id
+        self.stream_type = "Generic"
+
+    @abstractmethod
+    def process_batch(self, data_batch: List[Any]) -> str:
+        pass
+
+    def filter_data(self, data_batch: List[Any],
+                    criteria: Optional[str] = None) -> List[Any]:
+        return data_batch
+
+    def get_stats(self) -> Dict[str, Union[str, int, float]]:
+        return {"id": self.stream_id, "type": self.stream_type}
+
+
+class SensorStream(DataStream):
+    def __init__(self, stream_id: str) -> None:
+        super().__init__(stream_id)
+        self.stream_type = "Environmental Data"
+
+    def process_batch(self, data_batch: List[Any]) -> str:
+        # Assuming data_batch is a list of tuples (type, value) or just values
+        # Based on example, let's assume list of numbers or (type, value)
+        # If simple numbers, we can't distinguish temp.
+        # Let's support (type, value) tuples for robustness matching example
+        count = len(data_batch)
+        temps = []
+        for item in data_batch:
+            if isinstance(item, (int, float)):
+                temps.append(item)
+            elif isinstance(item, tuple) and len(item) == 2:
+                if item[0] == "temp":
+                    temps.append(item[1])
+
+        avg_temp = sum(temps) / len(temps) if temps else 0.0
+        return f"Sensor analysis: {count} readings processed, " \
+               f"avg temp: {avg_temp}°C"
+
+    def filter_data(self, data_batch: List[Any],
+                    criteria: Optional[str] = None) -> List[Any]:
+        if criteria == "high_priority":
+            # Return values > 50? Or specific types?
+            # Let's assume high values are critical
+            return [x for x in data_batch
+                    if (isinstance(x, (int, float)) and x > 50) or
+                    (isinstance(x, tuple) and x[1] > 50)]
+        return data_batch
+
+
+class TransactionStream(DataStream):
+    def __init__(self, stream_id: str) -> None:
+        super().__init__(stream_id)
+        self.stream_type = "Financial Data"
+
+    def process_batch(self, data_batch: List[Any]) -> str:
+        count = len(data_batch)
+        net_flow = 0
+        for item in data_batch:
+            if isinstance(item, tuple) and len(item) == 2:
+                action, amount = item
+                if action == "buy":
+                    net_flow += amount
+                elif action == "sell":
+                    net_flow -= amount
+
+        sign = "+" if net_flow >= 0 else ""
+        return f"Transaction analysis: {count} operations, " \
+               f"net flow: {sign}{net_flow} units"
+
+    def filter_data(self, data_batch: List[Any],
+                    criteria: Optional[str] = None) -> List[Any]:
+        if criteria == "large":
+            return [x for x in data_batch
+                    if isinstance(x, tuple) and x[1] > 100]
+        return data_batch
+
+
+class EventStream(DataStream):
+    def __init__(self, stream_id: str) -> None:
+        super().__init__(stream_id)
+        self.stream_type = "System Events"
+
+    def process_batch(self, data_batch: List[Any]) -> str:
+        count = len(data_batch)
+        errors = [x for x in data_batch if x == "error"]
+        return f"Event analysis: {count} events, {len(errors)} error detected"
+
+    def filter_data(self, data_batch: List[Any],
+                    criteria: Optional[str] = None) -> List[Any]:
+        if criteria == "error":
+            return [x for x in data_batch if x == "error"]
+        return data_batch
+
+
+class StreamProcessor:
+    def process_stream(self, stream: DataStream, batch: List[Any]) -> str:
+        try:
+            return stream.process_batch(batch)
+        except Exception as e:
+            return f"Error processing stream {stream.stream_id}: {str(e)}"
diff --git a/ex2/nexus_pipeline.py b/ex2/nexus_pipeline.py
new file mode 100644 (file)
index 0000000..6f828e2
--- /dev/null
@@ -0,0 +1,105 @@
+from typing import Any, List, Dict, Union, Optional, Protocol
+from abc import ABC, abstractmethod
+
+
+class ProcessingStage(Protocol):
+    def process(self, data: Any) -> Any:
+        ...
+
+
+class ProcessingPipeline(ABC):
+    def __init__(self, pipeline_id: str) -> None:
+        self.pipeline_id = pipeline_id
+        self.stages: List[ProcessingStage] = []
+
+    def add_stage(self, stage: ProcessingStage) -> None:
+        self.stages.append(stage)
+
+    def _run_stages(self, data: Any) -> Any:
+        result = data
+        for stage in self.stages:
+            result = stage.process(result)
+        return result
+
+    @abstractmethod
+    def process(self, data: Any) -> Union[str, Any]:
+        pass
+
+
+class InputStage:
+    def process(self, data: Any) -> Any:
+        # Generic validation/parsing
+        if isinstance(data, str):
+            if data.startswith("{") and data.endswith("}"):
+                # Simulating JSON parsing
+                import json
+                try:
+                    return json.loads(data)
+                except json.JSONDecodeError:
+                    pass
+            elif "," in data:
+                # Simulating CSV parsing
+                return data.split(",")
+        return data
+
+
+class TransformStage:
+    def process(self, data: Any) -> Any:
+        # Generic transformation
+        if isinstance(data, dict):
+            data["enriched"] = True
+        elif isinstance(data, list):
+            # Enriched list?
+            pass
+        return data
+
+
+class OutputStage:
+    def process(self, data: Any) -> Any:
+        # Formatting
+        if isinstance(data, dict) and "sensor" in data:
+            return f"Processed {data.get('sensor')} reading: " \
+                   f"{data.get('value')} (Normal range)"
+        if isinstance(data, list):
+            return f"User activity logged: {len(data) // 3} actions processed"
+            # Assuming CSV: user,action,timestamp -> 3 items?
+            # Or if list of items.
+            # Example: "Input: "user,action,timestamp" -> split -> 3 items.
+            # Output: "User activity logged: 1 actions processed".
+            # If 3 items = 1 action.
+        return f"Processed data: {data}"
+
+
+class JSONAdapter(ProcessingPipeline):
+    def process(self, data: Any) -> Union[str, Any]:
+        try:
+            return self._run_stages(data)
+        except Exception as e:
+            return f"JSON Pipeline Error: {str(e)}"
+
+
+class CSVAdapter(ProcessingPipeline):
+    def process(self, data: Any) -> Union[str, Any]:
+        try:
+            return self._run_stages(data)
+        except Exception as e:
+            return f"CSV Pipeline Error: {str(e)}"
+
+
+class StreamAdapter(ProcessingPipeline):
+    def process(self, data: Any) -> Union[str, Any]:
+        try:
+            return self._run_stages(data)
+        except Exception as e:
+            return f"Stream Pipeline Error: {str(e)}"
+
+
+class NexusManager:
+    def __init__(self) -> None:
+        self.pipelines: Dict[str, ProcessingPipeline] = {}
+
+    def register_pipeline(self, pipeline: ProcessingPipeline) -> None:
+        self.pipelines[pipeline.pipeline_id] = pipeline
+
+    def get_pipeline(self, pipeline_id: str) -> Optional[ProcessingPipeline]:
+        return self.pipelines.get(pipeline_id)