Development

Development

To get info about new technologies, perspective products and useful services

BigData

BigData

To know more about big data, data analysis techniques, tools and projects

Refactoring

Refactoring

To improve your code quality, speed up development process

Author: Katya Belova

Testing Airflow data pipelines with Catcher end to end

Testing Airflow data pipelines with Catcher end to end

This article is about writing end-to-end test for a data pipeline. It will cover Airflow, as one of the most popular data pipeline scheduler now days and one of the most complicated to test. For impatient – here is the repository with everything set up.

What is data pipeline and why is it important to test it?

It is a development pattern, when we take data from one or several data sources, process it (not always) and move it to another place. It can be real-time or batch. It can be done via different frameworks and tools (like Airflow, Spark, Spring Batch, hand-made).
But it has one common thing:

Any data pipeline is extremely hard to test, as you always need a fully deployed system, prepared in advance data set and mocks of external services.

Imagine you have standard business case: your backend is sending results into Postgres and you need to update merchant’s status in Salesforce for your customer support agent being able to answer customers’ questions on the fly.

To test it you’ll have to pass these complex steps:

  • download data from production & depersonalize it;
  • set up sandboxes or use mocks directly in your code;

What is Catcher and how can it help you?

Catcher is an end-to-end tool, specially designed to test systems containing many components. Initially developed as end-to-end microservices test tool it perfectly fits needs of data pipeline testing.

Main Catcher’s features are:

  • modular architecture. It has lots of modules for different requirements – from Kafka to S3;
  • templates. It fully supports Jinja2 templates which allows you to generate data sets easily;
  • different inventory files for different environments. Write your tests locally and run in cloud environment by just changing inventory file;

You can read about it here.

If you are new to Catcher you can find this article useful.

The pipeline

Imagine, you have a back-end which handles user registration. All newly registered users are stored in MySQL. You also have another back-end which works with GDPR and unsubscriptions. The business requirement is – the second back-end should somehow know about newly created users, as it needs this information for proper unsubscription events matching. The final point is – your back-end developers don’t know about Kafka/Rabbit so the only way to do it – is to write a pipeline which will upload data from MySQL to Postgres.

The pipeline will:

  1. take data from MySQL and load to S3
  2. take data from S3 and put it to Postgres
start >> mysql_to_s3 >> s3_to_psql >> end

In real world, most likely second and third steps of this pipeline would be joined into custom operator MySQLtoPostgresViaS3Operator. But here we divide it to show an example of longer than one actual step pipeline :).

Both ‘start’ and ‘end’ are dummy operators. I added it because it is a good place to have custom notifications to slack or etc.
mysql_to_s3 is a python operator:

mysql_to_s3 = PythonOperator(
    task_id='mysql_to_s3',
    python_callable=mysql_to_s3,
    retries=0,
    dag=dag,
    provide_context=True
)

It just calls mysql_to_s3 function:

def mysql_to_s3(**context):
    mysql_hook = MySqlHook(mysql_conn_id=mysql_conn_id)
    s3_hook = S3Hook(aws_conn_id=aws_conn_id)
    sql = f'Select * from {mysql_tbl_name} order by email'
    df: DataFrame = mysql_hook.get_pandas_df(sql=sql) 
    with NamedTemporaryFile(newline='', mode='w+') as f:
        key_file = f"data/{mysql_tbl_name}/year={datetime.date.today().year}/" \
                   f"month={datetime.date.today().strftime('%m')}/" \
                   f"day={datetime.date.today().strftime('%d')}/" \
                   f"{mysql_tbl_name}.csv"
        df.to_csv(path_or_buf=f,
                  sep=",",
                  columns=df.columns.values.tolist(),
                  index=False
                  )
        f.flush()
        s3_hook.load_file(filename=f.name,
                          key=key_file,
                          bucket_name=bucket_name)
        context["ti"].xcom_push(key=key_str, value=key_file)
        f.close()

In this function, via MySQL hook, we retrieve Pandas Data Frame from given SQL query (be mindful, make sure you don’t read too much data with this query and don’t overload memory, otherwise read in chunks) and store this Data Frame as CSV file on S3.

After S3 file is loaded next task: s3_to_psql is called:

s3_to_psql = PythonOperator(
    task_id='s3_to_psql',
    python_callable=s3_to_psql,
    retries=0,
    dag=dag,
    provide_context=True
)

It is also a python operator which calls s3_to_psql function:

def s3_to_psql(**context):
    ti = context["ti"]
    key_file = ti.xcom_pull(dag_id='simple_example_pipeline',
                            task_ids='mysql_to_s3',
                            key=key_str)
    psql_hook = PostgresHook(postgres_conn_id=postgres_conn_id)
    s3_hook = S3Hook(aws_conn_id=aws_conn_id)
    lines = s3_hook.read_key(key=key_file, bucket_name=bucket_name).split("\n")
    lines = [tuple(line.split(',')) for line in lines if line != '']
    df = DataFrame.from_records(data=lines[1:], columns=lines[0])
    df.to_sql(name=psql_tbl_name,
              con=psql_hook.get_sqlalchemy_engine(),
              if_exists="replace",
              index=False
              )

In this function we read file from S3 into worker memory, build Pandas Data Frame out of it and store it into Postgres.

All the Airflow connection ids are hard-coded at the beginning of the file:

postgres_conn_id = 'psql_conf'
mysql_conn_id = 'mysql_conf'
aws_conn_id = 's3_config'

You do not need to bother and add it into Airflow test environment – Catcher will handle it for your during running test.

That’s all. Now it is a time to test it.

The test itself

Let’s start with defining Catcher’s test-local variables:

variables:
  users:
    - uuid: '{{ random("uuid4") }}'
      email: 'bar@test.com'
    - uuid: '{{ random("uuid4") }}'
      email: 'baz@test.com'
    - uuid: '{{ random("uuid4") }}'
      email: 'foo@test.com'
  pipeline: 'simple_example_pipeline'
  mysql_tbl_name: 'my_table'

We set up Airflow’s pipeline name, mysql table name and 3 users which will be exported.

We provide here two different inventories: one for local run and one for docker run. Which inventories to provide is up to you, depends on particular case.
Local inventory is:

mysql_conf: 'root:test@localhost:3307/test'
psql_conf: 'postgres:postgres@localhost:5432/postgres'
airflow_db: 'airflow:airflow@localhost:5433/airflow'
airflow_web: 'http://127.0.0.1:8080'
s3_config:
    url: http://127.0.0.1:9001
    key_id: minio
    secret_key: minio123

If you have already stage or devenvironment setup, you can add inventory for it the same way as we done for local, but specifying DNS names instead of localhost ip addresses.

Docker inventory is the same, but with domain names instead of localhost.

mysql_conf: 'mysql://root:test@mysql:3306/test'
psql_conf: 'postgresql://postgres:postgres@custom_postgres_1:5432/postgres'
airflow_db: 'airflow:airflow@postgres:5432/airflow'
airflow_web: 'http://webserver:8080'
airflow_fernet: 'zp8kV516l9tKzqq9pJ2Y6cXbM3bgEWIapGwzQs6jio4='
s3_config:
    url: http://minio:9000
    key_id: minio
    secret_key: minio123

Steps

First step should populate test data: it creates MySQL and Postgres tables and generates data. It allows you to avoid monkey labor and simplify your life as a Data Engineer. Forget about test datasets manual building and production data exporting into csv and copying to the test environment. And all sorts of issues related to it: data anonymization, regexp in sql and generators.

For preparing your test data prepare step fits the best:

prepare:
    populate:
      mysql:
        conf: '{{ mysql_conf }}'
        schema: my_table.sql
        data:
          my_table: my_table.csv
      postgres:
        conf: '{{ psql_conf }}'
        schema: psql_tbl.sql

As you can see inside prepare we have defined populate both for MySQL and for Postgres data sources. For both data sources this step follow the same logic: provide configuration and run DDL code from specified SQL files. Both mysql_conf and psql_conf values are taken from the current inventory file (you are running test with).

The only difference, for mysql we specify input data which would be used to fill my_table. We do not specify input data for Postgres as it should be filled in by our Airflow pipeline during the execution. Lets dive deeper into how mysql populate statements are defined.

my_table.sql is a SQL file containing create table statement. In real world you may also have here grant access statement, adding indexes and etc:

CREATE TABLE if not exists test.my_table(
                    user_id      varchar(36)    primary key,
                    email        varchar(36)    NOT NULL
                );

my_table.csv is a data file, main difference with general testing approach is – we don’t specify actual data here. We apply Jinja2 template to generate the csv file based on our users variable from the very beginning. So one of the Catcher’s coolest feature: it supports Jinja2 templates everywhere.

user_id,email
{%- for user in users -%}
{{ user.uuid }},{{ user.email }}
{%- endfor -%}

psql_tbl.sql is almost the same as my_table.sql but with another table name.

When all data is prepared we should trigger our pipeline. It is the second step:

- airflow:
    run:
      config:
        db_conf: '{{ airflow_db }}'
        url: '{{ airflow_web }}'
        populate_connections: true
        fernet_key: '{{ airflow_fernet }}'
      dag_id: '{{ pipeline }}'
      sync: true
      wait_timeout: 150

It will run airflow pipeline simple_example_pipeline and will wait for it to finish (or fail in 150 seconds). And it will also create airflow connections based on your catcher inventory file.

One important thing here – Catcher will create connections in Airflow and name them as they are named in inventory file:

For psql_conf: 'postgres:postgres@localhost:5432/postgres' from inventory it will create connection psql_conf in Airflow. So, in order to have working test connection id in your pipeline should be the same as connection id in inventory file: postgres_conn_id = 'psql_conf'. Name itself does not matter.

Third step is to check if S3 file was created and download it:

- s3:
    get:
      config: '{{ s3_config }}'
      path: 'my_awesome_bucket/data/{{ mysql_tbl_name }}/year={{ now()[:4] }}/month={{ now()[5:7] }}/day={{ now()[8:10] }}/my_table.csv'
    register: {s3_csv: '{{ OUTPUT }}'}

As we said above, Catcher can apply jinja templates everywhere, here you see an example, how to compose path to our S3 resource. Our path in original pipeline build dynamically, depends on execution_date we use built-in function now() which returns current datetime as string, and apply some python string manipulation, like [5:7] to retrieve only part of the string. We pull the resource and register steps’ output as a new variable s3_csv.

Next two steps is to load the content from resource file and compare it with s3_csv (our final step in original airflow pipeline):

- echo: {from_file: 'my_table.csv', register: {expect_csv: '{{ OUTPUT }}'}}
- check:
    equals: {the: '{{ s3_csv.strip() }}', is: '{{ expect_csv.strip() }}'}
    name: 'Check data in s3 expected'

echo step can be also used to write or read from file. Here we read the same resource my_table.csv, which was used to populate MySQL and save step’s output to the variable expect_csv. Echo step will also run Jinja2 template and generate the proper content.

check equals step is used to compare expect_csv and s3_csv variables values. As their content is string we use python’s string strip() method to remove trailing spaces and empty lines.

The last step is to check what was actually written into Postgres. Expect steps fits us the best here:

- expect:
    compare:
      postgres:
        conf: '{{ psql_conf }}'
        data:
          psql_tbl: 'my_table.csv'
        strict: true
    name: 'Postgres data match expected'

Final cleanup

We need to add cleanup after the test to remove any side effects for other tests to run under clean environment.

So, we add block finally to test’s root:

finally:
  - mysql:
      request:
        conf: '{{ mysql_conf }}'
        query: 'drop table my_table'
      name: 'Clean up mysql'
      ignore_errors: true
  - postgres:
      request:
        conf: '{{ psql_conf }}'
        query: 'drop table psql_tbl'
      name: 'Clean up postgres'
      ignore_errors: true
  - s3:
      delete:
        config: '{{ s3_config }}'
        path: 'my_awesome_bucket/data/{{ mysql_tbl_name }}/year={{ now()[:4] }}/month={{ now()[5:7] }}/day={{ now()[8:10] }}/my_table.csv'
      name: 'Clean up s3'
      ignore_errors: true

We remove all data from Mysql and Postgres as well as remove file from S3. ignore_errors means that we don’t care if action fails (if there is no such file on S3 or table in the database). By the way, good approach here would be to move S3 file path into Catcher variable and reuse it in `S3:get` (Step #3) and inside delete step, to reduce code duplication.

How to run

Locally in docker

If you don’t have any environment you can use this docker-compose to start one locally. It is based on puckel-airflow docker repository.

docker-compose up -d

After you started docker-compose you need to run catcher docker image in the same network mounting your tests, resources and specifying inventory:

docker run -it --volume=$(pwd)/test:/opt/catcher/test \
               --volume=$(pwd)/resources:/opt/catcher/resources \
               --volume=$(pwd)/inventory:/opt/catcher/inventory \
               --network catcherairflowexample_default \
           comtihon/catcher -i inventory/docker.yml test

The network is very important, because, if you will run catcher locally, you would probably use local inventory with all services hosts 127.0.0.1. Both you and Catcher would be able to access them, but Catcher will populate Airflow connection with 127.0.0.1 from your local inventory, so your pipeline would fail, because Airflow in docker won’t be able to access database/minio via 127.0.0.1.

See more docker run instructions here.

Remotely in the environment

It is a good point to automate your tests. You can make your CI run catcher after every deployment to every environment. Both Catcher-in-docker or Catcher cli can be used from CI agents. Just use the proper inventory.

The output

If you run your test you’ll see nice colorful output. It will be split on two parts.

First is step-by-step run summary. It helps you to understand how test is running:

INFO:catcher:Step Create table and populate initial data OK
INFO:catcher:Step Trigger pipeline simple_example_pipeline OK
INFO:catcher:Step Get file from s3 OK
INFO:catcher:user_id,email
ea1d710b-0a7b-45f6-a1c4-52a5f7d91bce,bar@test.com
cf0a3043-6958-412d-a7b0-924092e7e95b,baz@test.com
e7a53958-f4aa-45e7-892e-69514990c992,foo@test.com
INFO:catcher:Step echo OK
INFO:catcher:Step Check data in s3 expected OK
INFO:catcher:Step Postgres data match expected OK
INFO:catcher:Test test/test.yml passed.

And following cleanup part:

INFO:catcher:Step Clean up mysql OK
INFO:catcher:Step Clean up postgres OK
INFO:catcher:Step Clean up s3 OK
INFO:catcher:Test test/test.yml [cleanup]  passed.

In case of multiple tests they will follow each other.

Second part is a run summary, one for all tests. It shows you statistics and status of each test run. In case of failure it will show the step number. In our case:

INFO:catcher:Test run 1. Success: 1, Fail: 0. Total: 100%
Test test: pass

Conclusion

When developers hear about end-to-end tests, they usually think about complex BDD frameworks and tons of code which they need to write in order to make everything work. It is not about Catcher. You don’t need to know any programming language to create a test.

When QA engineers hear about end-to-end tests, they usually think about lot’s of manual actions, easy to miss or make an error while testing. It is not about Catcher. Do your manual actions once and put them in Catcher’s script, to repeat every time you do a deploy.

When Data Engineers hear about end-to-end tests, they usually think how cool it would be to have a framework for testing data pipelines. And here we finally have one.

Happy testing!

HowTo: Run Cloudera Quickstart in Docker

HowTo: Run Cloudera Quickstart in Docker

The best way to familiarize yourself with Hadoop ecosystem or to do POC is to play with it in a sandbox. For that Cloudera provides 2 Quick Start options: one is the image for VirtualMashine and another is the Docker image. Both of them contain a small CDH cluster with one DataNode. But this is more than enough for sandbox purpose.

However, most of  data engineering neophytes find it difficult to set it up, since, unfortunately,  official Cloudera installation guide missing some essential parts on twinks after installation. Here you will find quick and easy how to setup QuickStart from Cloudera with Docker image on Linux.

If you have installed docker image already and don’t want to repeat those steps again, just move to the part with the solution or to the fixed docker image.

Run QuickStart according to official documentation

Check if you have docker installed. In terminal type: docker -v to check version. (This HOWTO doesn’t cover docker installation, the manual  from official docker site).

Check status of the docker service on your machine, start it if needed:

sudo systemctl status docker
sudo systemctl start docker

In my case docker was stopped, so I had to start it. Now when docker is installed and ready to work, we need to download QuickStart Image: `sudo docker pull cloudera/quickstart:latest`.  Depends on your connection download operation can take time – be patient :).

After successful download our Cloudera QuickStart image should appear in the list of docker images: `docker images`.  Copy <IMAGE ID> of ‘cloudera/quickstart’, we will use it later. In my case it is: ‘4239cd2958c6’. Finally, main command to run docker image as container is:

sudo docker run  --hostname=quickstart.cloudera --privileged=true -t -i -p 8888 -p 7180 4239cd2958c6 /usr/bin/docker-quickstart -d

/usr/bin/docker-quickstart #Entry point to start all CDH services. Provided by cloudera
--hostname=quickstart.cloudera #Required: pseudo-distributed configuration assumes this hostname
--privileged=true #Required: for HBase, MySQL-backed Hive metastore, Hue, Oozie, Sentry, and Cloudera Manager, and possibly others
-t  #Required: once services are started, a Bash shell takes over and will die without this
-i  #Required: if you want to use the terminal, either immediately or attach later
-p 8888  #Recommended: maps the Hue port in the guest to another port on the host
-p 7180  #Recommended: maps the Cloudera Manager port in the guest (7180) to another port on the host
-p [PORT] #Any other ports you want to remap from guest to free host ports. To make it accessible outside container. 
-d   #Optional: runs the container in the background. I would recommend to use this option if you planning to run container constantly on background.

Fix standard image to get it work

Further starts tuning to make your sandbox operational. By default, cloudera manager is not started in container, so, lets enable it first.

Obtain <CONTAINER_ID> value `docker ps`. In my case it is: ‘5fadd6cb8e0c’

Connect  to QuickStart container shell: `docker attach <CONTAINER_ID>` and run script to enable cloudera manager: `/home/cloudera/cloudera-manager –express`

At the end script prints out an address on which you can access cloudera-manager. Surprisingly, you won’t be able to connect to it. That’s happening because we remaped port earlier. And now we need to get correct host port to which our guest port 7180 was mapped.

Detect new port mapping from guest to host: `sudo docker port <CONTAINER_ID> <guest_port>`. Thus, to connect to Cloudera manager I need to type: ‘0.0.0.0:32771’ and finally we will see this:

 But wait a second, something is red at our `Hosts`. Let’s take a look on our error closer and fix it. It’s clock offset error:

Basically it means that our ntpd service is either not started or can’t connect to services. The solution is:

date      # will show difference between real date and server one
sudo chkconfig --add ntpd
sudo service ntpd restart 
date      # to make sure that ntpd is working and date is sync

Wait a couple of minutes and check cloudera manager again. As you see error has been gone.

Since now, until container won’t be turned off, ntpd service continue to work properly. If you are lazy to run additional command at cloudera docker container every time it was rebooted, you simply can use my docker image, based on Cloudera QuickStart one, or create one yourself.

How to create avro based table in Impala

How to create avro based table in Impala

Consider the following situation: A bundle of .avro files is stored on HDFS. They need to be converted to Impala tables. Schemas are not provided with files, at least not externally (it’s contained as first line of any avro file). But Impala has a known-issue with avro tables, and its usage is pretty limited: we can create avro based table only if all the columns are manually declared with their types in `CREATE TABLE` statement, otherwise it will fail with an error.

(pic).

But what if we have hundreds of columns or just not completely sure in schema and would like to automate process of tables creation?

Disclaimer: you can’t do that directly, but there is  a work around: you have to create temporary avro table in hive, then `create as select` temporary parquet file as select from avro table and finally run `invalidate metadata` in impala to catch up all the changes in tables set into impala.

Step by step algorithm:

  1. Check if you have `file.avsc` along with `file.avro` , if yes, skip step #2
  2. Create external schema avro file from file.avro
    • Download avro-tools-1.7.7.jar – official tool to work with avro files. Pay attention to a version – it should be 1.7.7.
    • Place it somewhere on your server’s local file system
      # parameters you will need
      # <tmp_local_path>=/tmp/get_avro any random path inside /tmp folder of local file system
      # <file_name> name of the file.avro from where you want to extract schema
      # <hdfs_file_path>  absolute path on hdfs to target file.avro
      # <result_schema_file_path>  path on hdfs where you would like to get created schema
      
      # create tmp folder
      mkdir -p <tmp_local_path>
      
      # here we read with cat command first 50Kb of file.avro and store it as file_sample on our local file system
      # depends on amount of columns 50Kb can be not enough. You can try --lines 1, to pick only first line or increase size. 
      hdfs dfs -cat <hdfs_file_path> | head --bytes 50K > <tmp_local_path>/<file_name>_sample
      
      # use avro-tool jar to retrieve schema from sample file and store it the name of origin file.avro with extension .avsc
      java -jar ~/jars/avro-tools-1.7.7.jar getschema <tmp_local_path>/<file_name>_sample > <tmp_local_path>/<file_name>.avsc
      
      # copy from local file system back to hdfs our created file.avsc (avro schema)
      hdfs dfs -put <tmp_local_path>/<file_name>.avsc <result_schema_file_path>
      
      #clean up mess 
      rm -rf <tmp_local_path>
      
      
  3. In Hive:
    CREATE EXTERNAL TABLE IF NOT EXISTS <avro_tmp_tbl_name>
    ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe'
    STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat'
    OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat'
    LOCATION 'hdfs://<orig_file_dir_path>/'
    tblproperties ('avro.schema.url'='hdfs://<path_to_avro_schema_file_on_hdfs>');
    
    
    CREATE TABLE IF NOT EXISTS <parq_tmp_tbl_name>
    STORED AS PARQUET
    AS select * from  <avro_tmp_tbl_name>  ;
  4. In impala: Invalidate metadata <parq_tmp_tbl_name>;

That’s all – job is done. This step-by-step algorithm is easy to wrap into any pipeline tool like: oozie, airflow, etc and thus, completely automate routine part of work.