Integrating DAGs into the test harness
Recall that the new sample
DAG is just Python logic. As such, it can be validated by running the test harness:
Note that with the addition of the new sample
DAG, Flowz has detected the change to the Airflow DagBag. The test harness will fail as a results:
E AssertionError: DagBag to "DAG_TASK_IDS" control list mis-match: check the DAG names defined by DAG_TASK_IDS in fixtures. Or, add to "dag_names_to_skip" in the test_dagbag_set() test to skip the check.
E assert ['ADMIN_BOOTSTRAP_LOCAL', 'ADMIN_SAMPLE_LOCAL']i == ['ADMIN_BOOTSTRAP_LOCAL']
E
E Left contains one more item: 'ADMIN_SAMPLE_LOCAL'
E
E Full diff:
E [
E 'ADMIN_BOOTSTRAP_LOCAL',
E + 'ADMIN_SAMPLE_LOCAL',
E ]
tests/flowz/dags/test_dags.py:35: AssertionError
tests/flowz/dags/test_dags.py::test_dagbag_set тип
If the new DAG will form part of production deployments, then you may consider adding the appropriate coverage in the test. This way, the test harness will safeguard against syntastic errors and incorrect deletions. To do so, you will need to add an entry to the DAG_TASK_IDS
:
DAG_TASK_IDS = {
"ADMIN_BOOTSTRAP_LOCAL": [
"end",
"load-connections",
"load-dag-variables",
"load-task-variables",
"set-authentication",
"start",
],
}
Note that DAG_TASK_IDS
is a dictionary based data structure that takes the DAG name as the key and the task names as values. Add the following to the DAG_TASK_IDS
:
DAG_TASK_IDS = {
"ADMIN_BOOTSTRAP_LOCAL": [
"end",
"load-connections",
"load-dag-variables",
"load-task-variables",
"set-authentication",
"start",
],
"ADMIN_SAMPLE_LOCAL": [
"end"
"start",
],
}
Subsequent test harness passes should now complete successfully.
Alternatively, you can skip the validation of the DAG in the test harness by adding the name of the DAG to the dag_names_to_skip
variable in the test. This is an empty list by default as follows:
@unittest.mock.patch.dict(os.environ, {"AIRFLOW_CUSTOM_ENV": "LOCAL"})
def test_dagbag_set(
dag_names: Iterable[str],
dag_id_cntrl: KeysView,
) -> None:
"""Test the dagbag load."""
# Given a list of DAG names taken from the DagBag
# dag_names
# less the DAG names that can be skipped from the check
dag_names_to_skip: list[str] = []
received = [x for x in dag_names if x not in dag_names_to_skip]
frame: FrameType | None = currentframe()
assert frame is not None
test_to_skip: str = frame.f_code.co_name
msg = (
'DagBag to "DAG_TASK_IDS" control list mis-match: '
"check the DAG names defined by DAG_TASK_IDS in fixtures. "
f'Or, add to "dag_names_to_skip" in the {test_to_skip}() '
"test to skip the check."
)
expected = [x for x in dag_id_cntrl if x not in dag_names_to_skip]
assert sorted(received) == sorted(expected), msg
The following adjustment will suppress the DAG check: