Building a Data Processing Pipeline in Python
This challenge involves creating a robust and modular data processing pipeline in Python. You'll be responsible for ingesting data, performing transformations, and outputting processed results. This is a fundamental skill in data engineering and analysis, enabling efficient handling of large datasets and complex workflows.
Problem Description
Your task is to implement a Python class, DataPipeline, that orchestrates a series of data processing steps. The pipeline should be flexible enough to accept different types of input data and apply a configurable sequence of transformation functions.
Key Requirements:
- Initialization: The
DataPipelineclass should be initialized with a list of transformation functions. - Processing: A
processmethod should accept input data and apply each transformation function in the order they were provided during initialization. - Data Flow: Each transformation function should take the output of the previous function as its input. The first function receives the initial input data.
- Return Value: The
processmethod should return the final processed data after all transformations have been applied. - Error Handling: The pipeline should gracefully handle potential errors during transformation. If a transformation function raises an exception, the pipeline should catch it, log an informative message (e.g., using Python's
loggingmodule), and stop further processing for that input, returningNoneor a designated error indicator. - Modularity: Transformation functions should be independent and reusable.
Expected Behavior:
- When
processis called with valid input and a list of valid transformation functions, it should return the transformed data. - When a transformation function fails, the pipeline should not crash and should indicate that an error occurred.
Edge Cases to Consider:
- Empty input data.
- An empty list of transformation functions.
- Transformation functions that expect specific data types and receive incompatible types.
- Transformation functions that might return
Noneintentionally.
Examples
Example 1:
Input Data: [1, 2, 3, 4, 5]
Transformation Functions:
1. `add_one(data)`: Returns `[x + 1 for x in data]`
2. `square_elements(data)`: Returns `[x * x for x in data]`
Expected Output: [4, 9, 16, 25, 36]
Explanation:
1. Input: [1, 2, 3, 4, 5]
2. After `add_one`: [2, 3, 4, 5, 6]
3. After `square_elements`: [4, 9, 16, 25, 36]
Example 2:
Input Data: {"name": "Alice", "age": 30}
Transformation Functions:
1. `uppercase_name(data)`: If "name" in data, returns `{"name": data["name"].upper(), "age": data["age"]}`. Otherwise, returns data.
2. `increment_age(data)`: If "age" in data, returns `{"name": data["name"], "age": data["age"] + 1}`. Otherwise, returns data.
Expected Output: {"name": "ALICE", "age": 31}
Explanation:
1. Input: {"name": "Alice", "age": 30}
2. After `uppercase_name`: {"name": "ALICE", "age": 30}
3. After `increment_age`: {"name": "ALICE", "age": 31}
Example 3 (Error Handling):
Input Data: [1, 2, "three", 4, 5]
Transformation Functions:
1. `double_elements(data)`: Returns `[x * 2 for x in data]`
2. `filter_even(data)`: Returns `[x for x in data if x % 2 == 0]`
Expected Output: None (and an error logged)
Explanation:
1. Input: [1, 2, "three", 4, 5]
2. `double_elements` is called. When it encounters "three", it will raise a `TypeError` because multiplication is not supported for strings and integers.
3. The pipeline catches this error, logs a message like "Error during transformation 'double_elements': unsupported operand type(s) for *: 'int' and 'str'".
4. Processing stops, and `None` is returned.
Constraints
- The input data can be any Python object (list, dictionary, string, integer, etc.) that the transformation functions can handle.
- Transformation functions must be callable and accept a single argument (the data from the previous step).
- The number of transformation functions can range from 0 to 100.
- The pipeline should be efficient enough to process moderate-sized datasets (up to ~10,000 items per input) within a few seconds.
Notes
- You will need to import the
loggingmodule for error reporting. Configure basic logging to print to the console for this challenge. - Consider how to handle the case where no transformation functions are provided.
- Think about the order of operations and how data flows between functions.
- Your solution should be a single Python class. You can define helper functions or classes outside of it if needed, but the core pipeline logic resides within
DataPipeline.