From: Axy Date: Mon, 2 Feb 2026 20:57:00 +0000 (+0100) Subject: feat(ex2): Implement Nexus Integration with Adaptors X-Git-Url: https://git.uwuaxy.net/?a=commitdiff_plain;ds=inline;p=axy%2Fft%2Fpython05.git feat(ex2): Implement Nexus Integration with Adaptors - Correctly implemented JSONAdapter, CSVAdapter, and StreamAdapter to inherit from ProcessingPipeline. - Each adapter now overrides the process() method for format-specific data handling. - Ensured proper error handling for JSON parsing and stage-level ValueErrors. - Updated the main execution block to utilize the specialized adapters polymorphically. - Verified output against the subject's example, including error recovery. feat(ex1): small fix --- diff --git a/ex1/data_stream.py b/ex1/data_stream.py index e4140ac..28bf0bf 100644 --- a/ex1/data_stream.py +++ b/ex1/data_stream.py @@ -10,14 +10,14 @@ class DataStream(ABC): print(f"Stream ID: {self.stream_id}, Type: {self.stream_type}") @abstractmethod - def process_batch(self, data_batch: List[Dict[str, Any]]) -> str: + def process_batch(self, data_batch: List[Any]) -> str: pass def filter_data( self, - data_batch: List[Dict[str, Any]], + data_batch: List[Any], criteria: Optional[str] = None, - ) -> List[Dict[str, Any]]: + ) -> List[Any]: return data_batch def get_stats(self) -> Dict[str, Union[str, int, float]]: @@ -102,7 +102,10 @@ class EventStream(DataStream): errors = [x for x in data_batch if x == "error"] num_errors = len(errors) error_str = "error" if num_errors == 1 else "errors" - return f"Event analysis: {count} events, {num_errors} {error_str} detected" + return ( + f"Event analysis: {count} events, " + + f"{num_errors} {error_str} detected" + ) def filter_data( self, @@ -124,8 +127,7 @@ if __name__ == "__main__": {"type": "pressure", "value": 1013}, ] print( - "Processing sensor batch: " - "[temp:22.5, humidity:65, pressure:1013]" + "Processing sensor batch: " "[temp:22.5, humidity:65, pressure:1013]" ) result = sensor_stream.process_batch(sensor_data) print(result) @@ -221,4 +223,4 @@ if __name__ == "__main__": f"{len(filtered_transactions)} large transaction" ) - print("All streams processed successfully. Nexus throughput optimal.") \ No newline at end of file + print("All streams processed successfully. Nexus throughput optimal.") diff --git a/ex2/nexus_pipeline.py b/ex2/nexus_pipeline.py index e211a9b..9cdfdeb 100644 --- a/ex2/nexus_pipeline.py +++ b/ex2/nexus_pipeline.py @@ -73,37 +73,99 @@ class ProcessingPipeline(ABC): def process(self, data: Any) -> Any: ... -class DataAdapter(ProcessingPipeline): +class JSONAdapter(ProcessingPipeline): + def __init__(self, pipeline_id: str) -> None: + super().__init__(pipeline_id) + self.add_stage(InputStage()) + self.add_stage(TransformStage()) + self.add_stage(OutputStage()) + def process(self, data: Any) -> Any: start_time = time.time() + print("Processing JSON data through pipeline...") + print(f"Input: {data}") try: + parsed_data = json.loads(data) + current_data = parsed_data for stage in self.stages: - data = stage.process(data) + current_data = stage.process(current_data) end_time = time.time() processing_time = end_time - start_time print( f"Performance: 95% efficiency, " f"{processing_time:.2f}s total processing time" ) - return data + return current_data + except json.JSONDecodeError as e: + print(f"Error detected in JSON parsing: {e}") + print("Recovery initiated: Switching to backup processor") + print("Recovery successful: Pipeline restored, processing resumed") + return "Error processed - Invalid JSON" except ValueError as e: - print(f"Error detected in Stage 1: {e}") + print(f"Error detected in adapter stage: {e}") print("Recovery initiated: Switching to backup processor") - # In a real scenario, you'd have a backup. Here we just report. print("Recovery successful: Pipeline restored, processing resumed") return "Error processed" -class JSONAdapter(DataAdapter): - pass +class CSVAdapter(ProcessingPipeline): + def __init__(self, pipeline_id: str) -> None: + super().__init__(pipeline_id) + self.add_stage(InputStage()) + self.add_stage(TransformStage()) + self.add_stage(OutputStage()) + def process(self, data: Any) -> Any: + start_time = time.time() + print("Processing CSV data through same pipeline...") + print(f'Input: "{data}"') + try: + parsed_data = data.split(",") + current_data = parsed_data + for stage in self.stages: + current_data = stage.process(current_data) + end_time = time.time() + processing_time = end_time - start_time + print( + f"Performance: 95% efficiency, " + f"{processing_time:.2f}s total processing time" + ) + return current_data + except ValueError as e: + print(f"Error detected in adapter: {e}") + print("Recovery initiated: Switching to backup processor") + print("Recovery successful: Pipeline restored, processing resumed") + return "Error processed" -class CSVAdapter(DataAdapter): - pass +class StreamAdapter(ProcessingPipeline): + def __init__(self, pipeline_id: str) -> None: + super().__init__(pipeline_id) + self.add_stage(InputStage()) + self.add_stage(TransformStage()) + self.add_stage(OutputStage()) -class StreamAdapter(DataAdapter): - pass + def process(self, data: Any) -> Any: + start_time = time.time() + print("Processing Stream data through same pipeline...") + print("Input: Real-time sensor stream") + try: + # Assuming data is already a list for stream, or direct passthrough + current_data = data + for stage in self.stages: + current_data = stage.process(current_data) + end_time = time.time() + processing_time = end_time - start_time + print( + f"Performance: 95% efficiency, " + f"{processing_time:.2f}s total processing time" + ) + return current_data + except ValueError as e: + print(f"Error detected in adapter: {e}") + print("Recovery initiated: Switching to backup processor") + print("Recovery successful: Pipeline restored, processing resumed") + return "Error processed" class NexusManager: @@ -126,31 +188,29 @@ if __name__ == "__main__": print("=== CODE NEXUS - ENTERPRISE PIPELINE SYSTEM ===") manager = NexusManager() - pipeline = DataAdapter("data_pipeline") - pipeline.add_stage(InputStage()) - pipeline.add_stage(TransformStage()) - pipeline.add_stage(OutputStage()) + json_pipeline = JSONAdapter("json_pipeline") + csv_pipeline = CSVAdapter("csv_pipeline") + stream_pipeline = StreamAdapter("stream_pipeline") - manager.register_pipeline(pipeline) + manager.register_pipeline(json_pipeline) + manager.register_pipeline(csv_pipeline) + manager.register_pipeline(stream_pipeline) print("Creating Data Processing Pipeline...") + print("Stage 1: Input validation and parsing") + print("Stage 2: Data transformation and enrichment") + print("Stage 3: Output formatting and delivery") print("\n=== Multi-Format Data Processing ===") - print("Processing JSON data through pipeline...") json_data = '{"sensor": "temp", "value": 23.5, "unit": "C"}' - print(f"Input: {json_data}") - result = manager.orchestrate("data_pipeline", json_data) + result = manager.orchestrate("json_pipeline", json_data) print(result) - print("\nProcessing CSV data through same pipeline...") csv_data = "user,action,timestamp" - print(f'Input: "{csv_data}"') - result = manager.orchestrate("data_pipeline", csv_data) + result = manager.orchestrate("csv_pipeline", csv_data) print(result) - print("\nProcessing Stream data through same pipeline...") stream_data = [20.1, 22.5, 23.0, 21.8, 23.2] - print("Input: Real-time sensor stream") - result = manager.orchestrate("data_pipeline", stream_data) + result = manager.orchestrate("stream_pipeline", stream_data) print(result) print("\n=== Pipeline Chaining Demo ===") @@ -170,7 +230,7 @@ if __name__ == "__main__": print("\n=== Error Recovery Test ===") print("Simulating pipeline failure...") invalid_json = '{"sensor": "temp", "value": 23.5, "unit": "C"' - result = manager.orchestrate("data_pipeline", invalid_json) + result = manager.orchestrate("json_pipeline", invalid_json) print(f"Error recovery result: {result}") print("\nNexus Integration complete. All systems operational.")