Hone logo
Hone
Problems

Building a Data Processing Pipeline in Python

This challenge involves creating a robust and modular data processing pipeline in Python. You'll be responsible for ingesting data, performing transformations, and outputting processed results. This is a fundamental skill in data engineering and analysis, enabling efficient handling of large datasets and complex workflows.

Problem Description

Your task is to implement a Python class, DataPipeline, that orchestrates a series of data processing steps. The pipeline should be flexible enough to accept different types of input data and apply a configurable sequence of transformation functions.

Key Requirements:

  1. Initialization: The DataPipeline class should be initialized with a list of transformation functions.
  2. Processing: A process method should accept input data and apply each transformation function in the order they were provided during initialization.
  3. Data Flow: Each transformation function should take the output of the previous function as its input. The first function receives the initial input data.
  4. Return Value: The process method should return the final processed data after all transformations have been applied.
  5. Error Handling: The pipeline should gracefully handle potential errors during transformation. If a transformation function raises an exception, the pipeline should catch it, log an informative message (e.g., using Python's logging module), and stop further processing for that input, returning None or a designated error indicator.
  6. Modularity: Transformation functions should be independent and reusable.

Expected Behavior:

  • When process is called with valid input and a list of valid transformation functions, it should return the transformed data.
  • When a transformation function fails, the pipeline should not crash and should indicate that an error occurred.

Edge Cases to Consider:

  • Empty input data.
  • An empty list of transformation functions.
  • Transformation functions that expect specific data types and receive incompatible types.
  • Transformation functions that might return None intentionally.

Examples

Example 1:

Input Data: [1, 2, 3, 4, 5]

Transformation Functions:
    1. `add_one(data)`: Returns `[x + 1 for x in data]`
    2. `square_elements(data)`: Returns `[x * x for x in data]`

Expected Output: [4, 9, 16, 25, 36]

Explanation:
1. Input: [1, 2, 3, 4, 5]
2. After `add_one`: [2, 3, 4, 5, 6]
3. After `square_elements`: [4, 9, 16, 25, 36]

Example 2:

Input Data: {"name": "Alice", "age": 30}

Transformation Functions:
    1. `uppercase_name(data)`: If "name" in data, returns `{"name": data["name"].upper(), "age": data["age"]}`. Otherwise, returns data.
    2. `increment_age(data)`: If "age" in data, returns `{"name": data["name"], "age": data["age"] + 1}`. Otherwise, returns data.

Expected Output: {"name": "ALICE", "age": 31}

Explanation:
1. Input: {"name": "Alice", "age": 30}
2. After `uppercase_name`: {"name": "ALICE", "age": 30}
3. After `increment_age`: {"name": "ALICE", "age": 31}

Example 3 (Error Handling):

Input Data: [1, 2, "three", 4, 5]

Transformation Functions:
    1. `double_elements(data)`: Returns `[x * 2 for x in data]`
    2. `filter_even(data)`: Returns `[x for x in data if x % 2 == 0]`

Expected Output: None (and an error logged)

Explanation:
1. Input: [1, 2, "three", 4, 5]
2. `double_elements` is called. When it encounters "three", it will raise a `TypeError` because multiplication is not supported for strings and integers.
3. The pipeline catches this error, logs a message like "Error during transformation 'double_elements': unsupported operand type(s) for *: 'int' and 'str'".
4. Processing stops, and `None` is returned.

Constraints

  • The input data can be any Python object (list, dictionary, string, integer, etc.) that the transformation functions can handle.
  • Transformation functions must be callable and accept a single argument (the data from the previous step).
  • The number of transformation functions can range from 0 to 100.
  • The pipeline should be efficient enough to process moderate-sized datasets (up to ~10,000 items per input) within a few seconds.

Notes

  • You will need to import the logging module for error reporting. Configure basic logging to print to the console for this challenge.
  • Consider how to handle the case where no transformation functions are provided.
  • Think about the order of operations and how data flows between functions.
  • Your solution should be a single Python class. You can define helper functions or classes outside of it if needed, but the core pipeline logic resides within DataPipeline.
Loading editor...
python