Skip to content

Integrating DAGs into the test harness

Recall that the new sample DAG is just Python logic. As such, it can be validated by running the test harness:

make tests

Note that with the addition of the new sample DAG, Flowz has detected the change to the Airflow DagBag. The test harness will fail as a results:

New DAG fails the test harness.
E       AssertionError: DagBag to "DAG_TASK_IDS" control list mis-match: check the DAG names defined by DAG_TASK_IDS in fixtures. Or, add to "dag_names_to_skip" in the test_dagbag_set() test to skip the check.
E       assert ['ADMIN_BOOTSTRAP_LOCAL', 'ADMIN_SAMPLE_LOCAL']i == ['ADMIN_BOOTSTRAP_LOCAL']
E
E         Left contains one more item: 'ADMIN_SAMPLE_LOCAL'
E
E         Full diff:
E           [
E               'ADMIN_BOOTSTRAP_LOCAL',
E         +     'ADMIN_SAMPLE_LOCAL',
E           ]

tests/flowz/dags/test_dags.py:35: AssertionError

 tests/flowz/dags/test_dags.py::test_dagbag_set тип

If the new DAG will form part of production deployments, then you may consider adding the appropriate coverage in the test. This way, the test harness will safeguard against syntastic errors and incorrect deletions. To do so, you will need to add an entry to the DAG_TASK_IDS:

DAG_TASK_IDS in tests/flowz/dags/conftest.py
DAG_TASK_IDS = {
    "ADMIN_BOOTSTRAP_LOCAL": [
        "end",
        "load-connections",
        "load-dag-variables",
        "load-task-variables",
        "set-authentication",
        "start",
    ],
}

Note that DAG_TASK_IDS is a dictionary based data structure that takes the DAG name as the key and the task names as values. Add the following to the DAG_TASK_IDS:

Adding new DAG to DAG_TASK_IDS
DAG_TASK_IDS = {
    "ADMIN_BOOTSTRAP_LOCAL": [
        "end",
        "load-connections",
        "load-dag-variables",
        "load-task-variables",
        "set-authentication",
        "start",
    ],
    "ADMIN_SAMPLE_LOCAL": [
        "end"
        "start",
    ],
}

Subsequent test harness passes should now complete successfully.

Alternatively, you can skip the validation of the DAG in the test harness by adding the name of the DAG to the dag_names_to_skipvariable in the test. This is an empty list by default as follows:

Skip DAG definition in the DagBag set validation.
@unittest.mock.patch.dict(os.environ, {"AIRFLOW_CUSTOM_ENV": "LOCAL"})
def test_dagbag_set(
    dag_names: Iterable[str],
    dag_id_cntrl: KeysView,
) -> None:
    """Test the dagbag load."""
    # Given a list of DAG names taken from the DagBag
    # dag_names

    # less the DAG names that can be skipped from the check
    dag_names_to_skip: list[str] = []
    received = [x for x in dag_names if x not in dag_names_to_skip]

    frame: FrameType | None = currentframe()
    assert frame is not None
    test_to_skip: str = frame.f_code.co_name
    msg = (
        'DagBag to "DAG_TASK_IDS" control list mis-match: '
        "check the DAG names defined by DAG_TASK_IDS in fixtures. "
        f'Or, add to "dag_names_to_skip" in the {test_to_skip}() '
        "test to skip the check."
    )
    expected = [x for x in dag_id_cntrl if x not in dag_names_to_skip]
    assert sorted(received) == sorted(expected), msg

The following adjustment will suppress the DAG check:

    ...
    # less the DAG names that can be skipped from the check
    dag_names_to_skip: list[str] = ["ADMIN_SAMPLE_LOCAL"]
    ...