This article is about writing end-to-end test for a data pipeline. It will cover Airflow, as one of the most popular data pipeline scheduler now days and one of the most complicated to test. For impatient – here is the repository with everything set up.

What is data pipeline and why is it important to test it?

It is a development pattern, when we take data from one or several data sources, process it (not always) and move it to another place. It can be real-time or batch. It can be done via different frameworks and tools (like Airflow, Spark, Spring Batch, hand-made).
But it has one common thing:

Any data pipeline is extremely hard to test, as you always need a fully deployed system, prepared in advance data set and mocks of external services.

Imagine you have standard business case: your backend is sending results into Postgres and you need to update merchant’s status in Salesforce for your customer support agent being able to answer customers’ questions on the fly.

To test it you’ll have to pass these complex steps:

  • download data from production & depersonalize it;
  • set up sandboxes or use mocks directly in your code;

What is Catcher and how can it help you?

Catcher is an end-to-end tool, specially designed to test systems containing many components. Initially developed as end-to-end microservices test tool it perfectly fits needs of data pipeline testing.

Main Catcher’s features are:

  • modular architecture. It has lots of modules for different requirements – from Kafka to S3;
  • templates. It fully supports Jinja2 templates which allows you to generate data sets easily;
  • different inventory files for different environments. Write your tests locally and run in cloud environment by just changing inventory file;

You can read about it here.

Catcher e2e tests tool for beginners

If you are new to Catcher you can find this article useful.

The pipeline

Imagine, you have a back-end which handles user registration. All newly registered users are stored in MySQL. You also have another back-end which works with GDPR and unsubscriptions. The business requirement is – the second back-end should somehow know about newly created users, as it needs this information for proper unsubscription events matching. The final point is – your back-end developers don’t know about Kafka/Rabbit so the only way to do it – is to write a pipeline which will upload data from MySQL to Postgres.

The pipeline will:

  1. take data from MySQL and load to S3
  2. take data from S3 and put it to Postgres
start >> mysql_to_s3 >> s3_to_psql >> end

In real world, most likely second and third steps of this pipeline would be joined into custom operator MySQLtoPostgresViaS3Operator. But here we divide it to show an example of longer than one actual step pipeline :).

Both ‘start’ and ‘end’ are dummy operators. I added it because it is a good place to have custom notifications to slack or etc.
mysql_to_s3 is a python operator:

mysql_to_s3 = PythonOperator(
    task_id='mysql_to_s3',
    python_callable=mysql_to_s3,
    retries=0,
    dag=dag,
    provide_context=True
)

It just calls mysql_to_s3 function:

def mysql_to_s3(**context):
    mysql_hook = MySqlHook(mysql_conn_id=mysql_conn_id)
    s3_hook = S3Hook(aws_conn_id=aws_conn_id)
    sql = f'Select * from {mysql_tbl_name} order by email'
    df: DataFrame = mysql_hook.get_pandas_df(sql=sql) 
    with NamedTemporaryFile(newline='', mode='w+') as f:
        key_file = f"data/{mysql_tbl_name}/year={datetime.date.today().year}/" \
                   f"month={datetime.date.today().strftime('%m')}/" \
                   f"day={datetime.date.today().strftime('%d')}/" \
                   f"{mysql_tbl_name}.csv"
        df.to_csv(path_or_buf=f,
                  sep=",",
                  columns=df.columns.values.tolist(),
                  index=False
                  )
        f.flush()
        s3_hook.load_file(filename=f.name,
                          key=key_file,
                          bucket_name=bucket_name)
        context["ti"].xcom_push(key=key_str, value=key_file)
        f.close()

In this function, via MySQL hook, we retrieve Pandas Data Frame from given SQL query (be mindful, make sure you don’t read too much data with this query and don’t overload memory, otherwise read in chunks) and store this Data Frame as CSV file on S3.

After S3 file is loaded next task: s3_to_psql is called:

s3_to_psql = PythonOperator(
    task_id='s3_to_psql',
    python_callable=s3_to_psql,
    retries=0,
    dag=dag,
    provide_context=True
)

It is also a python operator which calls s3_to_psql function:

def s3_to_psql(**context):
    ti = context["ti"]
    key_file = ti.xcom_pull(dag_id='simple_example_pipeline',
                            task_ids='mysql_to_s3',
                            key=key_str)
    psql_hook = PostgresHook(postgres_conn_id=postgres_conn_id)
    s3_hook = S3Hook(aws_conn_id=aws_conn_id)
    lines = s3_hook.read_key(key=key_file, bucket_name=bucket_name).split("\n")
    lines = [tuple(line.split(',')) for line in lines if line != '']
    df = DataFrame.from_records(data=lines[1:], columns=lines[0])
    df.to_sql(name=psql_tbl_name,
              con=psql_hook.get_sqlalchemy_engine(),
              if_exists="replace",
              index=False
              )

In this function we read file from S3 into worker memory, build Pandas Data Frame out of it and store it into Postgres.

All the Airflow connection ids are hard-coded at the beginning of the file:

postgres_conn_id = 'psql_conf'
mysql_conn_id = 'mysql_conf'
aws_conn_id = 's3_config'

You do not need to bother and add it into Airflow test environment – Catcher will handle it for your during running test.

That’s all. Now it is a time to test it.

The test itself

Let’s start with defining Catcher’s test-local variables:

variables:
  users:
    - uuid: '{{ random("uuid4") }}'
      email: 'bar@test.com'
    - uuid: '{{ random("uuid4") }}'
      email: 'baz@test.com'
    - uuid: '{{ random("uuid4") }}'
      email: 'foo@test.com'
  pipeline: 'simple_example_pipeline'
  mysql_tbl_name: 'my_table'

We set up Airflow’s pipeline name, mysql table name and 3 users which will be exported.

We provide here two different inventories: one for local run and one for docker run. Which inventories to provide is up to you, depends on particular case.
Local inventory is:

mysql_conf: 'root:test@localhost:3307/test'
psql_conf: 'postgres:postgres@localhost:5432/postgres'
airflow_db: 'airflow:airflow@localhost:5433/airflow'
airflow_web: 'http://127.0.0.1:8080'
s3_config:
    url: http://127.0.0.1:9001
    key_id: minio
    secret_key: minio123

If you have already stage or devenvironment setup, you can add inventory for it the same way as we done for local, but specifying DNS names instead of localhost ip addresses.

Docker inventory is the same, but with domain names instead of localhost.

mysql_conf: 'mysql://root:test@mysql:3306/test'
psql_conf: 'postgresql://postgres:postgres@custom_postgres_1:5432/postgres'
airflow_db: 'airflow:airflow@postgres:5432/airflow'
airflow_web: 'http://webserver:8080'
airflow_fernet: 'zp8kV516l9tKzqq9pJ2Y6cXbM3bgEWIapGwzQs6jio4='
s3_config:
    url: http://minio:9000
    key_id: minio
    secret_key: minio123

Steps

First step should populate test data: it creates MySQL and Postgres tables and generates data. It allows you to avoid monkey labor and simplify your life as a Data Engineer. Forget about test datasets manual building and production data exporting into csv and copying to the test environment. And all sorts of issues related to it: data anonymization, regexp in sql and generators.

For preparing your test data prepare step fits the best:

prepare:
    populate:
      mysql:
        conf: '{{ mysql_conf }}'
        schema: my_table.sql
        data:
          my_table: my_table.csv
      postgres:
        conf: '{{ psql_conf }}'
        schema: psql_tbl.sql

As you can see inside prepare we have defined populate both for MySQL and for Postgres data sources. For both data sources this step follow the same logic: provide configuration and run DDL code from specified SQL files. Both mysql_conf and psql_conf values are taken from the current inventory file (you are running test with).

The only difference, for mysql we specify input data which would be used to fill my_table. We do not specify input data for Postgres as it should be filled in by our Airflow pipeline during the execution. Lets dive deeper into how mysql populate statements are defined.

my_table.sql is a SQL file containing create table statement. In real world you may also have here grant access statement, adding indexes and etc:

CREATE TABLE if not exists test.my_table(
                    user_id      varchar(36)    primary key,
                    email        varchar(36)    NOT NULL
                );

my_table.csv is a data file, main difference with general testing approach is – we don’t specify actual data here. We apply Jinja2 template to generate the csv file based on our users variable from the very beginning. So one of the Catcher’s coolest feature: it supports Jinja2 templates everywhere.

user_id,email
{%- for user in users -%}
{{ user.uuid }},{{ user.email }}
{%- endfor -%}

psql_tbl.sql is almost the same as my_table.sql but with another table name.

When all data is prepared we should trigger our pipeline. It is the second step:

- airflow:
    run:
      config:
        db_conf: '{{ airflow_db }}'
        url: '{{ airflow_web }}'
        populate_connections: true
        fernet_key: '{{ airflow_fernet }}'
      dag_id: '{{ pipeline }}'
      sync: true
      wait_timeout: 150

It will run airflow pipeline simple_example_pipeline and will wait for it to finish (or fail in 150 seconds). And it will also create airflow connections based on your catcher inventory file.

One important thing here – Catcher will create connections in Airflow and name them as they are named in inventory file:

For psql_conf: 'postgres:postgres@localhost:5432/postgres' from inventory it will create connection psql_conf in Airflow. So, in order to have working test connection id in your pipeline should be the same as connection id in inventory file: postgres_conn_id = 'psql_conf'. Name itself does not matter.

Third step is to check if S3 file was created and download it:

- s3:
    get:
      config: '{{ s3_config }}'
      path: 'my_awesome_bucket/data/{{ mysql_tbl_name }}/year={{ now()[:4] }}/month={{ now()[5:7] }}/day={{ now()[8:10] }}/my_table.csv'
    register: {s3_csv: '{{ OUTPUT }}'}

As we said above, Catcher can apply jinja templates everywhere, here you see an example, how to compose path to our S3 resource. Our path in original pipeline build dynamically, depends on execution_date we use built-in function now() which returns current datetime as string, and apply some python string manipulation, like [5:7] to retrieve only part of the string. We pull the resource and register steps’ output as a new variable s3_csv.

Next two steps is to load the content from resource file and compare it with s3_csv (our final step in original airflow pipeline):

- echo: {from_file: 'my_table.csv', register: {expect_csv: '{{ OUTPUT }}'}}
- check:
    equals: {the: '{{ s3_csv.strip() }}', is: '{{ expect_csv.strip() }}'}
    name: 'Check data in s3 expected'

echo step can be also used to write or read from file. Here we read the same resource my_table.csv, which was used to populate MySQL and save step’s output to the variable expect_csv. Echo step will also run Jinja2 template and generate the proper content.

check equals step is used to compare expect_csv and s3_csv variables values. As their content is string we use python’s string strip() method to remove trailing spaces and empty lines.

The last step is to check what was actually written into Postgres. Expect steps fits us the best here:

- expect:
    compare:
      postgres:
        conf: '{{ psql_conf }}'
        data:
          psql_tbl: 'my_table.csv'
        strict: true
    name: 'Postgres data match expected'

Final cleanup

We need to add cleanup after the test to remove any side effects for other tests to run under clean environment.

So, we add block finally to test’s root:

finally:
  - mysql:
      request:
        conf: '{{ mysql_conf }}'
        query: 'drop table my_table'
      name: 'Clean up mysql'
      ignore_errors: true
  - postgres:
      request:
        conf: '{{ psql_conf }}'
        query: 'drop table psql_tbl'
      name: 'Clean up postgres'
      ignore_errors: true
  - s3:
      delete:
        config: '{{ s3_config }}'
        path: 'my_awesome_bucket/data/{{ mysql_tbl_name }}/year={{ now()[:4] }}/month={{ now()[5:7] }}/day={{ now()[8:10] }}/my_table.csv'
      name: 'Clean up s3'
      ignore_errors: true

We remove all data from Mysql and Postgres as well as remove file from S3. ignore_errors means that we don’t care if action fails (if there is no such file on S3 or table in the database). By the way, good approach here would be to move S3 file path into Catcher variable and reuse it in `S3:get` (Step #3) and inside delete step, to reduce code duplication.

How to run

Locally in docker

If you don’t have any environment you can use this docker-compose to start one locally. It is based on puckel-airflow docker repository.

docker-compose up -d

After you started docker-compose you need to run catcher docker image in the same network mounting your tests, resources and specifying inventory:

docker run -it --volume=$(pwd)/test:/opt/catcher/test \
               --volume=$(pwd)/resources:/opt/catcher/resources \
               --volume=$(pwd)/inventory:/opt/catcher/inventory \
               --network catcherairflowexample_default \
           comtihon/catcher -i inventory/docker.yml test

The network is very important, because, if you will run catcher locally, you would probably use local inventory with all services hosts 127.0.0.1. Both you and Catcher would be able to access them, but Catcher will populate Airflow connection with 127.0.0.1 from your local inventory, so your pipeline would fail, because Airflow in docker won’t be able to access database/minio via 127.0.0.1.

See more docker run instructions here.

Remotely in the environment

It is a good point to automate your tests. You can make your CI run catcher after every deployment to every environment. Both Catcher-in-docker or Catcher cli can be used from CI agents. Just use the proper inventory.

The output

If you run your test you’ll see nice colorful output. It will be split on two parts.

First is step-by-step run summary. It helps you to understand how test is running:

INFO:catcher:Step Create table and populate initial data OK
INFO:catcher:Step Trigger pipeline simple_example_pipeline OK
INFO:catcher:Step Get file from s3 OK
INFO:catcher:user_id,email
ea1d710b-0a7b-45f6-a1c4-52a5f7d91bce,bar@test.com
cf0a3043-6958-412d-a7b0-924092e7e95b,baz@test.com
e7a53958-f4aa-45e7-892e-69514990c992,foo@test.com
INFO:catcher:Step echo OK
INFO:catcher:Step Check data in s3 expected OK
INFO:catcher:Step Postgres data match expected OK
INFO:catcher:Test test/test.yml passed.

And following cleanup part:

INFO:catcher:Step Clean up mysql OK
INFO:catcher:Step Clean up postgres OK
INFO:catcher:Step Clean up s3 OK
INFO:catcher:Test test/test.yml [cleanup]  passed.

In case of multiple tests they will follow each other.

Second part is a run summary, one for all tests. It shows you statistics and status of each test run. In case of failure it will show the step number. In our case:

INFO:catcher:Test run 1. Success: 1, Fail: 0. Total: 100%
Test test: pass

Conclusion

When developers hear about end-to-end tests, they usually think about complex BDD frameworks and tons of code which they need to write in order to make everything work. It is not about Catcher. You don’t need to know any programming language to create a test.

When QA engineers hear about end-to-end tests, they usually think about lot’s of manual actions, easy to miss or make an error while testing. It is not about Catcher. Do your manual actions once and put them in Catcher’s script, to repeat every time you do a deploy.

When Data Engineers hear about end-to-end tests, they usually think how cool it would be to have a framework for testing data pipelines. And here we finally have one.

Happy testing!

Leave a Reply