]> Untitled Git - axy/ft/python05.git/commitdiff
feat(ex2): Implement Nexus Integration with Adaptors master
authorAxy <gilliardmarthey.axel@gmail.com>
Mon, 2 Feb 2026 20:57:00 +0000 (21:57 +0100)
committerAxy <gilliardmarthey.axel@gmail.com>
Mon, 2 Feb 2026 20:57:00 +0000 (21:57 +0100)
- Correctly implemented JSONAdapter, CSVAdapter, and StreamAdapter to inherit from ProcessingPipeline.
- Each adapter now overrides the process() method for format-specific data handling.
- Ensured proper error handling for JSON parsing and stage-level ValueErrors.
- Updated the main execution block to utilize the specialized adapters polymorphically.
- Verified output against the subject's example, including error recovery.

feat(ex1): small fix

ex1/data_stream.py
ex2/nexus_pipeline.py

index e4140ac864af79f2637689fe747be21d1f792567..28bf0bfc0b440963518bc92301aecf2b72f41dbb 100644 (file)
@@ -10,14 +10,14 @@ class DataStream(ABC):
         print(f"Stream ID: {self.stream_id}, Type: {self.stream_type}")
 
     @abstractmethod
-    def process_batch(self, data_batch: List[Dict[str, Any]]) -> str:
+    def process_batch(self, data_batch: List[Any]) -> str:
         pass
 
     def filter_data(
         self,
-        data_batch: List[Dict[str, Any]],
+        data_batch: List[Any],
         criteria: Optional[str] = None,
-    ) -> List[Dict[str, Any]]:
+    ) -> List[Any]:
         return data_batch
 
     def get_stats(self) -> Dict[str, Union[str, int, float]]:
@@ -102,7 +102,10 @@ class EventStream(DataStream):
         errors = [x for x in data_batch if x == "error"]
         num_errors = len(errors)
         error_str = "error" if num_errors == 1 else "errors"
-        return f"Event analysis: {count} events, {num_errors} {error_str} detected"
+        return (
+            f"Event analysis: {count} events, "
+            + f"{num_errors} {error_str} detected"
+        )
 
     def filter_data(
         self,
@@ -124,8 +127,7 @@ if __name__ == "__main__":
         {"type": "pressure", "value": 1013},
     ]
     print(
-        "Processing sensor batch: "
-        "[temp:22.5, humidity:65, pressure:1013]"
+        "Processing sensor batch: " "[temp:22.5, humidity:65, pressure:1013]"
     )
     result = sensor_stream.process_batch(sensor_data)
     print(result)
@@ -221,4 +223,4 @@ if __name__ == "__main__":
         f"{len(filtered_transactions)} large transaction"
     )
 
-    print("All streams processed successfully. Nexus throughput optimal.")
\ No newline at end of file
+    print("All streams processed successfully. Nexus throughput optimal.")
index e211a9b55d7fc251afbb4c55055f90dafbba3836..9cdfdeb74772bf3ee343b09c4cdb37c0f495a05d 100644 (file)
@@ -73,37 +73,99 @@ class ProcessingPipeline(ABC):
     def process(self, data: Any) -> Any: ...
 
 
-class DataAdapter(ProcessingPipeline):
+class JSONAdapter(ProcessingPipeline):
+    def __init__(self, pipeline_id: str) -> None:
+        super().__init__(pipeline_id)
+        self.add_stage(InputStage())
+        self.add_stage(TransformStage())
+        self.add_stage(OutputStage())
+
     def process(self, data: Any) -> Any:
         start_time = time.time()
+        print("Processing JSON data through pipeline...")
+        print(f"Input: {data}")
         try:
+            parsed_data = json.loads(data)
+            current_data = parsed_data
             for stage in self.stages:
-                data = stage.process(data)
+                current_data = stage.process(current_data)
             end_time = time.time()
             processing_time = end_time - start_time
             print(
                 f"Performance: 95% efficiency, "
                 f"{processing_time:.2f}s total processing time"
             )
-            return data
+            return current_data
+        except json.JSONDecodeError as e:
+            print(f"Error detected in JSON parsing: {e}")
+            print("Recovery initiated: Switching to backup processor")
+            print("Recovery successful: Pipeline restored, processing resumed")
+            return "Error processed - Invalid JSON"
         except ValueError as e:
-            print(f"Error detected in Stage 1: {e}")
+            print(f"Error detected in adapter stage: {e}")
             print("Recovery initiated: Switching to backup processor")
-            # In a real scenario, you'd have a backup. Here we just report.
             print("Recovery successful: Pipeline restored, processing resumed")
             return "Error processed"
 
 
-class JSONAdapter(DataAdapter):
-    pass
+class CSVAdapter(ProcessingPipeline):
+    def __init__(self, pipeline_id: str) -> None:
+        super().__init__(pipeline_id)
+        self.add_stage(InputStage())
+        self.add_stage(TransformStage())
+        self.add_stage(OutputStage())
 
+    def process(self, data: Any) -> Any:
+        start_time = time.time()
+        print("Processing CSV data through same pipeline...")
+        print(f'Input: "{data}"')
+        try:
+            parsed_data = data.split(",")
+            current_data = parsed_data
+            for stage in self.stages:
+                current_data = stage.process(current_data)
+            end_time = time.time()
+            processing_time = end_time - start_time
+            print(
+                f"Performance: 95% efficiency, "
+                f"{processing_time:.2f}s total processing time"
+            )
+            return current_data
+        except ValueError as e:
+            print(f"Error detected in adapter: {e}")
+            print("Recovery initiated: Switching to backup processor")
+            print("Recovery successful: Pipeline restored, processing resumed")
+            return "Error processed"
 
-class CSVAdapter(DataAdapter):
-    pass
 
+class StreamAdapter(ProcessingPipeline):
+    def __init__(self, pipeline_id: str) -> None:
+        super().__init__(pipeline_id)
+        self.add_stage(InputStage())
+        self.add_stage(TransformStage())
+        self.add_stage(OutputStage())
 
-class StreamAdapter(DataAdapter):
-    pass
+    def process(self, data: Any) -> Any:
+        start_time = time.time()
+        print("Processing Stream data through same pipeline...")
+        print("Input: Real-time sensor stream")
+        try:
+            # Assuming data is already a list for stream, or direct passthrough
+            current_data = data
+            for stage in self.stages:
+                current_data = stage.process(current_data)
+            end_time = time.time()
+            processing_time = end_time - start_time
+            print(
+                f"Performance: 95% efficiency, "
+                f"{processing_time:.2f}s total processing time"
+            )
+            return current_data
+        except ValueError as e:
+            print(f"Error detected in adapter: {e}")
+            print("Recovery initiated: Switching to backup processor")
+            print("Recovery successful: Pipeline restored, processing resumed")
+            return "Error processed"
 
 
 class NexusManager:
@@ -126,31 +188,29 @@ if __name__ == "__main__":
     print("=== CODE NEXUS - ENTERPRISE PIPELINE SYSTEM ===")
     manager = NexusManager()
 
-    pipeline = DataAdapter("data_pipeline")
-    pipeline.add_stage(InputStage())
-    pipeline.add_stage(TransformStage())
-    pipeline.add_stage(OutputStage())
+    json_pipeline = JSONAdapter("json_pipeline")
+    csv_pipeline = CSVAdapter("csv_pipeline")
+    stream_pipeline = StreamAdapter("stream_pipeline")
 
-    manager.register_pipeline(pipeline)
+    manager.register_pipeline(json_pipeline)
+    manager.register_pipeline(csv_pipeline)
+    manager.register_pipeline(stream_pipeline)
     print("Creating Data Processing Pipeline...")
+    print("Stage 1: Input validation and parsing")
+    print("Stage 2: Data transformation and enrichment")
+    print("Stage 3: Output formatting and delivery")
 
     print("\n=== Multi-Format Data Processing ===")
-    print("Processing JSON data through pipeline...")
     json_data = '{"sensor": "temp", "value": 23.5, "unit": "C"}'
-    print(f"Input: {json_data}")
-    result = manager.orchestrate("data_pipeline", json_data)
+    result = manager.orchestrate("json_pipeline", json_data)
     print(result)
 
-    print("\nProcessing CSV data through same pipeline...")
     csv_data = "user,action,timestamp"
-    print(f'Input: "{csv_data}"')
-    result = manager.orchestrate("data_pipeline", csv_data)
+    result = manager.orchestrate("csv_pipeline", csv_data)
     print(result)
 
-    print("\nProcessing Stream data through same pipeline...")
     stream_data = [20.1, 22.5, 23.0, 21.8, 23.2]
-    print("Input: Real-time sensor stream")
-    result = manager.orchestrate("data_pipeline", stream_data)
+    result = manager.orchestrate("stream_pipeline", stream_data)
     print(result)
 
     print("\n=== Pipeline Chaining Demo ===")
@@ -170,7 +230,7 @@ if __name__ == "__main__":
     print("\n=== Error Recovery Test ===")
     print("Simulating pipeline failure...")
     invalid_json = '{"sensor": "temp", "value": 23.5, "unit": "C"'
-    result = manager.orchestrate("data_pipeline", invalid_json)
+    result = manager.orchestrate("json_pipeline", invalid_json)
     print(f"Error recovery result: {result}")
 
     print("\nNexus Integration complete. All systems operational.")