def process_batch(self, data_batch: List[Any]) -> str:
pass
- def filter_data(self, data_batch: List[Any],
- criteria: Optional[str] = None) -> List[Any]:
+ def filter_data(
+ self, data_batch: List[Any], criteria: Optional[str] = None
+ ) -> List[Any]:
return data_batch
def get_stats(self) -> Dict[str, Union[str, int, float]]:
temps.append(item[1])
avg_temp = sum(temps) / len(temps) if temps else 0.0
- return f"Sensor analysis: {count} readings processed, " \
- f"avg temp: {avg_temp}°C"
-
- def filter_data(self, data_batch: List[Any],
- criteria: Optional[str] = None) -> List[Any]:
+ return (
+ f"Sensor analysis: {count} readings processed, "
+ f"avg temp: {avg_temp}°C"
+ )
+
+ def filter_data(
+ self, data_batch: List[Any], criteria: Optional[str] = None
+ ) -> List[Any]:
if criteria == "high_priority":
# Return values > 50? Or specific types?
# Let's assume high values are critical
- return [x for x in data_batch
- if (isinstance(x, (int, float)) and x > 50) or
- (isinstance(x, tuple) and x[1] > 50)]
+ return [
+ x
+ for x in data_batch
+ if (isinstance(x, (int, float)) and x > 50)
+ or (isinstance(x, tuple) and x[1] > 50)
+ ]
return data_batch
net_flow -= amount
sign = "+" if net_flow >= 0 else ""
- return f"Transaction analysis: {count} operations, " \
- f"net flow: {sign}{net_flow} units"
-
- def filter_data(self, data_batch: List[Any],
- criteria: Optional[str] = None) -> List[Any]:
+ return (
+ f"Transaction analysis: {count} operations, "
+ f"net flow: {sign}{net_flow} units"
+ )
+
+ def filter_data(
+ self, data_batch: List[Any], criteria: Optional[str] = None
+ ) -> List[Any]:
if criteria == "large":
- return [x for x in data_batch
- if isinstance(x, tuple) and x[1] > 100]
+ return [
+ x for x in data_batch if isinstance(x, tuple) and x[1] > 100
+ ]
return data_batch
errors = [x for x in data_batch if x == "error"]
return f"Event analysis: {count} events, {len(errors)} error detected"
- def filter_data(self, data_batch: List[Any],
- criteria: Optional[str] = None) -> List[Any]:
+ def filter_data(
+ self, data_batch: List[Any], criteria: Optional[str] = None
+ ) -> List[Any]:
if criteria == "error":
return [x for x in data_batch if x == "error"]
return data_batch
class ProcessingStage(Protocol):
- def process(self, data: Any) -> Any:
- ...
+ def process(self, data: Any) -> Any: ...
class ProcessingPipeline(ABC):
if data.startswith("{") and data.endswith("}"):
# Simulating JSON parsing
import json
+
try:
return json.loads(data)
except json.JSONDecodeError:
def process(self, data: Any) -> Any:
# Formatting
if isinstance(data, dict) and "sensor" in data:
- return f"Processed {data.get('sensor')} reading: " \
- f"{data.get('value')} (Normal range)"
+ return (
+ f"Processed {data.get('sensor')} reading: "
+ f"{data.get('value')} (Normal range)"
+ )
if isinstance(data, list):
return f"User activity logged: {len(data) // 3} actions processed"
# Assuming CSV: user,action,timestamp -> 3 items?