--- /dev/null
+from typing import Any
+from abc import ABC, abstractmethod
+
+
+class DataProcessor(ABC):
+ @abstractmethod
+ def process(self, data: Any) -> str:
+ pass
+
+ @abstractmethod
+ def validate(self, data: Any) -> bool:
+ pass
+
+ def format_output(self, result: str) -> str:
+ return result
+
+
+class NumericProcessor(DataProcessor):
+ def validate(self, data: Any) -> bool:
+ if not isinstance(data, list):
+ return False
+ return all(isinstance(x, (int, float)) for x in data)
+
+ def process(self, data: Any) -> str:
+ if not self.validate(data):
+ raise ValueError("Invalid data for NumericProcessor")
+ count = len(data)
+ total = sum(data)
+ avg = total / count if count > 0 else 0.0
+ return f"Processed {count} numeric values, sum={total}, avg={avg}"
+
+
+class TextProcessor(DataProcessor):
+ def validate(self, data: Any) -> bool:
+ return isinstance(data, str)
+
+ def process(self, data: Any) -> str:
+ if not self.validate(data):
+ raise ValueError("Invalid data for TextProcessor")
+ char_count = len(data)
+ word_count = len(data.split())
+ return f"Processed text: {char_count} characters, {word_count} words"
+
+
+class LogProcessor(DataProcessor):
+ def validate(self, data: Any) -> bool:
+ return isinstance(data, str) and ": " in data
+
+ def process(self, data: Any) -> str:
+ if not self.validate(data):
+ raise ValueError("Invalid data for LogProcessor")
+ parts = data.split(": ", 1)
+ level = parts[0]
+ message = parts[1]
+ return f"[{level}] {level} level detected: {message}"
--- /dev/null
+from typing import Any, List, Dict, Union, Optional
+from abc import ABC, abstractmethod
+
+
+class DataStream(ABC):
+ def __init__(self, stream_id: str) -> None:
+ self.stream_id = stream_id
+ self.stream_type = "Generic"
+
+ @abstractmethod
+ def process_batch(self, data_batch: List[Any]) -> str:
+ pass
+
+ def filter_data(self, data_batch: List[Any],
+ criteria: Optional[str] = None) -> List[Any]:
+ return data_batch
+
+ def get_stats(self) -> Dict[str, Union[str, int, float]]:
+ return {"id": self.stream_id, "type": self.stream_type}
+
+
+class SensorStream(DataStream):
+ def __init__(self, stream_id: str) -> None:
+ super().__init__(stream_id)
+ self.stream_type = "Environmental Data"
+
+ def process_batch(self, data_batch: List[Any]) -> str:
+ # Assuming data_batch is a list of tuples (type, value) or just values
+ # Based on example, let's assume list of numbers or (type, value)
+ # If simple numbers, we can't distinguish temp.
+ # Let's support (type, value) tuples for robustness matching example
+ count = len(data_batch)
+ temps = []
+ for item in data_batch:
+ if isinstance(item, (int, float)):
+ temps.append(item)
+ elif isinstance(item, tuple) and len(item) == 2:
+ if item[0] == "temp":
+ temps.append(item[1])
+
+ avg_temp = sum(temps) / len(temps) if temps else 0.0
+ return f"Sensor analysis: {count} readings processed, " \
+ f"avg temp: {avg_temp}°C"
+
+ def filter_data(self, data_batch: List[Any],
+ criteria: Optional[str] = None) -> List[Any]:
+ if criteria == "high_priority":
+ # Return values > 50? Or specific types?
+ # Let's assume high values are critical
+ return [x for x in data_batch
+ if (isinstance(x, (int, float)) and x > 50) or
+ (isinstance(x, tuple) and x[1] > 50)]
+ return data_batch
+
+
+class TransactionStream(DataStream):
+ def __init__(self, stream_id: str) -> None:
+ super().__init__(stream_id)
+ self.stream_type = "Financial Data"
+
+ def process_batch(self, data_batch: List[Any]) -> str:
+ count = len(data_batch)
+ net_flow = 0
+ for item in data_batch:
+ if isinstance(item, tuple) and len(item) == 2:
+ action, amount = item
+ if action == "buy":
+ net_flow += amount
+ elif action == "sell":
+ net_flow -= amount
+
+ sign = "+" if net_flow >= 0 else ""
+ return f"Transaction analysis: {count} operations, " \
+ f"net flow: {sign}{net_flow} units"
+
+ def filter_data(self, data_batch: List[Any],
+ criteria: Optional[str] = None) -> List[Any]:
+ if criteria == "large":
+ return [x for x in data_batch
+ if isinstance(x, tuple) and x[1] > 100]
+ return data_batch
+
+
+class EventStream(DataStream):
+ def __init__(self, stream_id: str) -> None:
+ super().__init__(stream_id)
+ self.stream_type = "System Events"
+
+ def process_batch(self, data_batch: List[Any]) -> str:
+ count = len(data_batch)
+ errors = [x for x in data_batch if x == "error"]
+ return f"Event analysis: {count} events, {len(errors)} error detected"
+
+ def filter_data(self, data_batch: List[Any],
+ criteria: Optional[str] = None) -> List[Any]:
+ if criteria == "error":
+ return [x for x in data_batch if x == "error"]
+ return data_batch
+
+
+class StreamProcessor:
+ def process_stream(self, stream: DataStream, batch: List[Any]) -> str:
+ try:
+ return stream.process_batch(batch)
+ except Exception as e:
+ return f"Error processing stream {stream.stream_id}: {str(e)}"
--- /dev/null
+from typing import Any, List, Dict, Union, Optional, Protocol
+from abc import ABC, abstractmethod
+
+
+class ProcessingStage(Protocol):
+ def process(self, data: Any) -> Any:
+ ...
+
+
+class ProcessingPipeline(ABC):
+ def __init__(self, pipeline_id: str) -> None:
+ self.pipeline_id = pipeline_id
+ self.stages: List[ProcessingStage] = []
+
+ def add_stage(self, stage: ProcessingStage) -> None:
+ self.stages.append(stage)
+
+ def _run_stages(self, data: Any) -> Any:
+ result = data
+ for stage in self.stages:
+ result = stage.process(result)
+ return result
+
+ @abstractmethod
+ def process(self, data: Any) -> Union[str, Any]:
+ pass
+
+
+class InputStage:
+ def process(self, data: Any) -> Any:
+ # Generic validation/parsing
+ if isinstance(data, str):
+ if data.startswith("{") and data.endswith("}"):
+ # Simulating JSON parsing
+ import json
+ try:
+ return json.loads(data)
+ except json.JSONDecodeError:
+ pass
+ elif "," in data:
+ # Simulating CSV parsing
+ return data.split(",")
+ return data
+
+
+class TransformStage:
+ def process(self, data: Any) -> Any:
+ # Generic transformation
+ if isinstance(data, dict):
+ data["enriched"] = True
+ elif isinstance(data, list):
+ # Enriched list?
+ pass
+ return data
+
+
+class OutputStage:
+ def process(self, data: Any) -> Any:
+ # Formatting
+ if isinstance(data, dict) and "sensor" in data:
+ return f"Processed {data.get('sensor')} reading: " \
+ f"{data.get('value')} (Normal range)"
+ if isinstance(data, list):
+ return f"User activity logged: {len(data) // 3} actions processed"
+ # Assuming CSV: user,action,timestamp -> 3 items?
+ # Or if list of items.
+ # Example: "Input: "user,action,timestamp" -> split -> 3 items.
+ # Output: "User activity logged: 1 actions processed".
+ # If 3 items = 1 action.
+ return f"Processed data: {data}"
+
+
+class JSONAdapter(ProcessingPipeline):
+ def process(self, data: Any) -> Union[str, Any]:
+ try:
+ return self._run_stages(data)
+ except Exception as e:
+ return f"JSON Pipeline Error: {str(e)}"
+
+
+class CSVAdapter(ProcessingPipeline):
+ def process(self, data: Any) -> Union[str, Any]:
+ try:
+ return self._run_stages(data)
+ except Exception as e:
+ return f"CSV Pipeline Error: {str(e)}"
+
+
+class StreamAdapter(ProcessingPipeline):
+ def process(self, data: Any) -> Union[str, Any]:
+ try:
+ return self._run_stages(data)
+ except Exception as e:
+ return f"Stream Pipeline Error: {str(e)}"
+
+
+class NexusManager:
+ def __init__(self) -> None:
+ self.pipelines: Dict[str, ProcessingPipeline] = {}
+
+ def register_pipeline(self, pipeline: ProcessingPipeline) -> None:
+ self.pipelines[pipeline.pipeline_id] = pipeline
+
+ def get_pipeline(self, pipeline_id: str) -> Optional[ProcessingPipeline]:
+ return self.pipelines.get(pipeline_id)