print(f"Stream ID: {self.stream_id}, Type: {self.stream_type}")
@abstractmethod
- def process_batch(self, data_batch: List[Dict[str, Any]]) -> str:
+ def process_batch(self, data_batch: List[Any]) -> str:
pass
def filter_data(
self,
- data_batch: List[Dict[str, Any]],
+ data_batch: List[Any],
criteria: Optional[str] = None,
- ) -> List[Dict[str, Any]]:
+ ) -> List[Any]:
return data_batch
def get_stats(self) -> Dict[str, Union[str, int, float]]:
errors = [x for x in data_batch if x == "error"]
num_errors = len(errors)
error_str = "error" if num_errors == 1 else "errors"
- return f"Event analysis: {count} events, {num_errors} {error_str} detected"
+ return (
+ f"Event analysis: {count} events, "
+ + f"{num_errors} {error_str} detected"
+ )
def filter_data(
self,
{"type": "pressure", "value": 1013},
]
print(
- "Processing sensor batch: "
- "[temp:22.5, humidity:65, pressure:1013]"
+ "Processing sensor batch: " "[temp:22.5, humidity:65, pressure:1013]"
)
result = sensor_stream.process_batch(sensor_data)
print(result)
f"{len(filtered_transactions)} large transaction"
)
- print("All streams processed successfully. Nexus throughput optimal.")
\ No newline at end of file
+ print("All streams processed successfully. Nexus throughput optimal.")
def process(self, data: Any) -> Any: ...
-class DataAdapter(ProcessingPipeline):
+class JSONAdapter(ProcessingPipeline):
+ def __init__(self, pipeline_id: str) -> None:
+ super().__init__(pipeline_id)
+ self.add_stage(InputStage())
+ self.add_stage(TransformStage())
+ self.add_stage(OutputStage())
+
def process(self, data: Any) -> Any:
start_time = time.time()
+ print("Processing JSON data through pipeline...")
+ print(f"Input: {data}")
try:
+ parsed_data = json.loads(data)
+ current_data = parsed_data
for stage in self.stages:
- data = stage.process(data)
+ current_data = stage.process(current_data)
end_time = time.time()
processing_time = end_time - start_time
print(
f"Performance: 95% efficiency, "
f"{processing_time:.2f}s total processing time"
)
- return data
+ return current_data
+ except json.JSONDecodeError as e:
+ print(f"Error detected in JSON parsing: {e}")
+ print("Recovery initiated: Switching to backup processor")
+ print("Recovery successful: Pipeline restored, processing resumed")
+ return "Error processed - Invalid JSON"
except ValueError as e:
- print(f"Error detected in Stage 1: {e}")
+ print(f"Error detected in adapter stage: {e}")
print("Recovery initiated: Switching to backup processor")
- # In a real scenario, you'd have a backup. Here we just report.
print("Recovery successful: Pipeline restored, processing resumed")
return "Error processed"
-class JSONAdapter(DataAdapter):
- pass
+class CSVAdapter(ProcessingPipeline):
+ def __init__(self, pipeline_id: str) -> None:
+ super().__init__(pipeline_id)
+ self.add_stage(InputStage())
+ self.add_stage(TransformStage())
+ self.add_stage(OutputStage())
+ def process(self, data: Any) -> Any:
+ start_time = time.time()
+ print("Processing CSV data through same pipeline...")
+ print(f'Input: "{data}"')
+ try:
+ parsed_data = data.split(",")
+ current_data = parsed_data
+ for stage in self.stages:
+ current_data = stage.process(current_data)
+ end_time = time.time()
+ processing_time = end_time - start_time
+ print(
+ f"Performance: 95% efficiency, "
+ f"{processing_time:.2f}s total processing time"
+ )
+ return current_data
+ except ValueError as e:
+ print(f"Error detected in adapter: {e}")
+ print("Recovery initiated: Switching to backup processor")
+ print("Recovery successful: Pipeline restored, processing resumed")
+ return "Error processed"
-class CSVAdapter(DataAdapter):
- pass
+class StreamAdapter(ProcessingPipeline):
+ def __init__(self, pipeline_id: str) -> None:
+ super().__init__(pipeline_id)
+ self.add_stage(InputStage())
+ self.add_stage(TransformStage())
+ self.add_stage(OutputStage())
-class StreamAdapter(DataAdapter):
- pass
+ def process(self, data: Any) -> Any:
+ start_time = time.time()
+ print("Processing Stream data through same pipeline...")
+ print("Input: Real-time sensor stream")
+ try:
+ # Assuming data is already a list for stream, or direct passthrough
+ current_data = data
+ for stage in self.stages:
+ current_data = stage.process(current_data)
+ end_time = time.time()
+ processing_time = end_time - start_time
+ print(
+ f"Performance: 95% efficiency, "
+ f"{processing_time:.2f}s total processing time"
+ )
+ return current_data
+ except ValueError as e:
+ print(f"Error detected in adapter: {e}")
+ print("Recovery initiated: Switching to backup processor")
+ print("Recovery successful: Pipeline restored, processing resumed")
+ return "Error processed"
class NexusManager:
print("=== CODE NEXUS - ENTERPRISE PIPELINE SYSTEM ===")
manager = NexusManager()
- pipeline = DataAdapter("data_pipeline")
- pipeline.add_stage(InputStage())
- pipeline.add_stage(TransformStage())
- pipeline.add_stage(OutputStage())
+ json_pipeline = JSONAdapter("json_pipeline")
+ csv_pipeline = CSVAdapter("csv_pipeline")
+ stream_pipeline = StreamAdapter("stream_pipeline")
- manager.register_pipeline(pipeline)
+ manager.register_pipeline(json_pipeline)
+ manager.register_pipeline(csv_pipeline)
+ manager.register_pipeline(stream_pipeline)
print("Creating Data Processing Pipeline...")
+ print("Stage 1: Input validation and parsing")
+ print("Stage 2: Data transformation and enrichment")
+ print("Stage 3: Output formatting and delivery")
print("\n=== Multi-Format Data Processing ===")
- print("Processing JSON data through pipeline...")
json_data = '{"sensor": "temp", "value": 23.5, "unit": "C"}'
- print(f"Input: {json_data}")
- result = manager.orchestrate("data_pipeline", json_data)
+ result = manager.orchestrate("json_pipeline", json_data)
print(result)
- print("\nProcessing CSV data through same pipeline...")
csv_data = "user,action,timestamp"
- print(f'Input: "{csv_data}"')
- result = manager.orchestrate("data_pipeline", csv_data)
+ result = manager.orchestrate("csv_pipeline", csv_data)
print(result)
- print("\nProcessing Stream data through same pipeline...")
stream_data = [20.1, 22.5, 23.0, 21.8, 23.2]
- print("Input: Real-time sensor stream")
- result = manager.orchestrate("data_pipeline", stream_data)
+ result = manager.orchestrate("stream_pipeline", stream_data)
print(result)
print("\n=== Pipeline Chaining Demo ===")
print("\n=== Error Recovery Test ===")
print("Simulating pipeline failure...")
invalid_json = '{"sensor": "temp", "value": 23.5, "unit": "C"'
- result = manager.orchestrate("data_pipeline", invalid_json)
+ result = manager.orchestrate("json_pipeline", invalid_json)
print(f"Error recovery result: {result}")
print("\nNexus Integration complete. All systems operational.")