Development

Development

To get info about new technologies, perspective products and useful services

BigData

BigData

To know more about big data, data analysis techniques, tools and projects

Refactoring

Refactoring

To improve your code quality, speed up development process

Category: BigData

Articles about bigdata, data analysis.

End-to-end from front-end to back-end with Catcher

End-to-end from front-end to back-end with Catcher

Today Catcher’s external modules 5.1.0 were finally released. It’s great news as it enables Selenium step for Front-end testing!

How should proper e2e test look like?

Imagine you have a user service with nice UI, which allows you to get information about users, registered in your system. Deeply in the back-end you also have an audit log, which saves all actions.

Before 5.1.0 you could use HTTP calls to mimic front-end behavior to trigger some actions on the back-end side.

Your test probably looked like:
– call http endpoint to search for a user
– check search event was saved to the database
– compare found user with search event, saved in the database

This test checks 100% of back-end functionality. But most likely front-end is the part of your system also! So proper end-to-end test should start with front-end application and end up in a back-end.

Without touching front-end you could have false-positive results in e2e tests. F.e.: a user has some special symbols in his name. All back-end tests passes and you deploy your application in production. After the deploy your users start to complain that front-end part of the application crashes. The reason is – front-end can’t handle back-end’s response when rendering user details with special symbols in his name.

With the new Catcher’s version you can include Front-end in your test. So – instead of calling http you can use selenium step.

The test

Let’s write a test, which will search for a user and will check that our search attempt was logged.

Every test starts with variables. To cover false-positive results we need to save multiple users and then check that only the correct one is returned. Let’s compose our users. Every user will have a random email and random name thanks to random built-in function.

variables:
    users:
        - name: '{{ random("name") }}'
          email:  '{{ random("email") }}'
        - name: '{{ random("name") }}'
          email:  '{{ random("email") }}'
        - name: '{{ random("name") }}'
          email:  '{{ random("email") }}'

Now we are ready to write our steps.

Populate the data

The first step we need to do is to populate the data with prepare step.

Let’s prepare a users.sql which will create all back-end tables (in case of clean run we don’t have them).

 CREATE TABLE if not exists users_table( 
                     email varchar(36) primary key,
                     name varchar(36) NOT NULL 
                     );

Next – we need to fill our table with test data. users.csv will use our users variable to prepare data for our step.

email,name
{%- for user in users -%}
{{ user.email }},{{ user.name }}
{%- endfor -%}

The step itself will take users.sql and create database tables if needed. Then it will populate it using users.csv based on users variable.

steps:
  - prepare:
      populate:
          postgres:
              conf: '{{ postgres }}'
              schema: users_table.sql
              data:
                  users: users.csv
      name: Populate postgres with {{ users|length }} users

Select a user to search for

The next (small) step is to select a user for our search. Echo step will randomly select user from users variable and register it’s email as a new variable.

- echo: 
    from: '{{ random_choice(users).email }}'
    register: {search_for: '{{ OUTPUT }}'}
    name: 'Select {{ search_for }} for search'

Search front-end for our user

With the Selenium step we can use our front-end to search for the user. Selenium step runs the script in JS/Java/Jar/Python from resources directory.

It passes Catcher’s variables as environment variables to the script so you can access it within Selenium. It also greps the script’s output, so you can access everything in Catcher’s next steps.

- selenium:
        test:
            file: register_user.js
            driver: '/usr/lib/geckodriver'
        register: {title: '{{ OUTPUT.title  }}'}

The script will run register_user which searches for our selected user and will register page’s title.

Check the search log

After we did the search we need to check if it was logged. Imagine our back-end uses MongoDB. So we’ll use mongo step.

- mongo:
      request:
            conf: '{{ mongo }}'
            collection: 'search_log'
            find: {'text': '{{ search_for }}'}
      register: {search_log: '{{ OUTPUT }}'}

This step searches MongoDB search_log collection for any search attempts with our user in text.

Compare results

Final steps are connected with results comparison. First – we’ll use echo again to transform our users so that we can search in users by email.

- echo:
        from: '{{ users|groupby("email")|asdict }}'
        register: {users_kv: '{{ OUTPUT }}'}

Second – we will compare front-end page title got from selenium with MongoDB search log and user’s name.

 - check:
        and:
            - equals: {the: '{{ users_kv[search_for][0].name }}', is: '{{ title }}'}
            - equals: {the: '{{ title }}', is: '{{ search_log.name }}'}

The selenium resource

Let’s add a Selenium test resource. It will go to your site and will searches for your user. If everything is OK page title will be the result of this step.

Javascript

Selenium step supports Java, JS, Python and Jar archives. In this article I’ll show you all of them (except Jar, it is the same as Java, but without compilation). Let’s start with JavaScript.

const {Builder, By, Key, until} = require('selenium-webdriver');
async function basicExample(){
    let driver = await new Builder().forBrowser('firefox').build();
    try{
        await driver.get(process.env.site_url);
        await driver.findElement(By.name('q')).sendKeys(process.env.search_for, Key.RETURN);
        await driver.wait(until.titleContains(process.env.search_for), 1000);
        await driver.getTitle().then(function(title) {
                    console.log('{\"title\":\"' + title + '\"}')
            });
        driver.quit();
    }
    catch(err) {
        console.error(err);
        process.exitCode = 1;
        driver.quit();
      }
}
basicExample();

Catcher passes all it’s variables as environment variables, so you can access them from JS/Java/Python. process.env.site_url in this example takes site_url from Catcher’s variables and process.env.search_for takes user email to search for it.

Everything you write to STDOUT is caught by Catcher. In case of JSON it will be returned as dictionary. F.e. with console.log('{\"title\":\"' + title + '\"}') statement OUTPUT.title will be available on Catcher’s side. If Catcher can’t parse JSON – it will return a text as OUTPUT.

Python

Here is the Python implementation of the same resource. It should be also placed in resources directory. To use it instead of Java implementation you need to change file parameter in Selenium step.

from selenium import webdriver
from selenium.webdriver.common.keys import Keys
import os
from selenium.webdriver.firefox.options import Options

options = Options()
options.headless = True
driver = webdriver.Firefox(options=options)
try:
    driver.get(os.environ['site_url'])
    assert "Python" in driver.title
    elem = driver.find_element_by_name("q")
    elem.clear()
    elem.send_keys(os.environ['search_for'])
    elem.send_keys(Keys.RETURN)
    assert "No results found." not in driver.page_source
    print(f'{"title":"{driver.title}"')
finally:
    driver.close() 

Java

Java is a bit more complex, as (if you are not using already compiled Jar) Catcher should compile Java source before running it. For this you need to have Java and Selenium libraries installed in your system.

Luckily Catcher comes with Docker image where libraries (JS, Java, Python), Selenium drivers (Firefox, Chrome, Opera) and tools (NodeJS, JDK, Python) installed.

package selenium;

import org.openqa.selenium.By; 
import org.openqa.selenium.WebDriver;
import org.openqa.selenium.WebElement;
import org.openqa.selenium.firefox.FirefoxDriver;
import org.openqa.selenium.firefox.FirefoxBinary;
import org.openqa.selenium.firefox.FirefoxOptions;

public class MySeleniumTest {

    public static void main(String[] args) {
        FirefoxBinary firefoxBinary = new FirefoxBinary();
        FirefoxOptions options = new FirefoxOptions();
        options.setBinary(firefoxBinary);
        options.setHeadless(true);
        WebDriver driver = new FirefoxDriver(options);
        try {
            driver.get(System.getenv("site_url"));
            WebElement element = driver.findElement(By.name("q"));
            element.sendKeys(System.getenv("search_for"));
            element.submit();
            System.out.println("{\"title\":\""+driver.getTitle() + "\"}");
        } finally {
            driver.quit();
        }
    }
} 

Conclusion

Catcher’s update 5.1.0 unites front and back-end testing, allowing them both to exist in one testcase. It improves the coverage and make the test really end-to-end.

Testing Airflow data pipelines with Catcher end to end

Testing Airflow data pipelines with Catcher end to end

This article is about writing end-to-end test for a data pipeline. It will cover Airflow, as one of the most popular data pipeline scheduler now days and one of the most complicated to test. For impatient – here is the repository with everything set up.

What is data pipeline and why is it important to test it?

It is a development pattern, when we take data from one or several data sources, process it (not always) and move it to another place. It can be real-time or batch. It can be done via different frameworks and tools (like Airflow, Spark, Spring Batch, hand-made).
But it has one common thing:

Any data pipeline is extremely hard to test, as you always need a fully deployed system, prepared in advance data set and mocks of external services.

Imagine you have standard business case: your backend is sending results into Postgres and you need to update merchant’s status in Salesforce for your customer support agent being able to answer customers’ questions on the fly.

To test it you’ll have to pass these complex steps:

  • download data from production & depersonalize it;
  • set up sandboxes or use mocks directly in your code;

What is Catcher and how can it help you?

Catcher is an end-to-end tool, specially designed to test systems containing many components. Initially developed as end-to-end microservices test tool it perfectly fits needs of data pipeline testing.

Main Catcher’s features are:

  • modular architecture. It has lots of modules for different requirements – from Kafka to S3;
  • templates. It fully supports Jinja2 templates which allows you to generate data sets easily;
  • different inventory files for different environments. Write your tests locally and run in cloud environment by just changing inventory file;

You can read about it here.

If you are new to Catcher you can find this article useful.

The pipeline

Imagine, you have a back-end which handles user registration. All newly registered users are stored in MySQL. You also have another back-end which works with GDPR and unsubscriptions. The business requirement is – the second back-end should somehow know about newly created users, as it needs this information for proper unsubscription events matching. The final point is – your back-end developers don’t know about Kafka/Rabbit so the only way to do it – is to write a pipeline which will upload data from MySQL to Postgres.

The pipeline will:

  1. take data from MySQL and load to S3
  2. take data from S3 and put it to Postgres
start >> mysql_to_s3 >> s3_to_psql >> end

In real world, most likely second and third steps of this pipeline would be joined into custom operator MySQLtoPostgresViaS3Operator. But here we divide it to show an example of longer than one actual step pipeline :).

Both ‘start’ and ‘end’ are dummy operators. I added it because it is a good place to have custom notifications to slack or etc.
mysql_to_s3 is a python operator:

mysql_to_s3 = PythonOperator(
    task_id='mysql_to_s3',
    python_callable=mysql_to_s3,
    retries=0,
    dag=dag,
    provide_context=True
)

It just calls mysql_to_s3 function:

def mysql_to_s3(**context):
    mysql_hook = MySqlHook(mysql_conn_id=mysql_conn_id)
    s3_hook = S3Hook(aws_conn_id=aws_conn_id)
    sql = f'Select * from {mysql_tbl_name} order by email'
    df: DataFrame = mysql_hook.get_pandas_df(sql=sql) 
    with NamedTemporaryFile(newline='', mode='w+') as f:
        key_file = f"data/{mysql_tbl_name}/year={datetime.date.today().year}/" \
                   f"month={datetime.date.today().strftime('%m')}/" \
                   f"day={datetime.date.today().strftime('%d')}/" \
                   f"{mysql_tbl_name}.csv"
        df.to_csv(path_or_buf=f,
                  sep=",",
                  columns=df.columns.values.tolist(),
                  index=False
                  )
        f.flush()
        s3_hook.load_file(filename=f.name,
                          key=key_file,
                          bucket_name=bucket_name)
        context["ti"].xcom_push(key=key_str, value=key_file)
        f.close()

In this function, via MySQL hook, we retrieve Pandas Data Frame from given SQL query (be mindful, make sure you don’t read too much data with this query and don’t overload memory, otherwise read in chunks) and store this Data Frame as CSV file on S3.

After S3 file is loaded next task: s3_to_psql is called:

s3_to_psql = PythonOperator(
    task_id='s3_to_psql',
    python_callable=s3_to_psql,
    retries=0,
    dag=dag,
    provide_context=True
)

It is also a python operator which calls s3_to_psql function:

def s3_to_psql(**context):
    ti = context["ti"]
    key_file = ti.xcom_pull(dag_id='simple_example_pipeline',
                            task_ids='mysql_to_s3',
                            key=key_str)
    psql_hook = PostgresHook(postgres_conn_id=postgres_conn_id)
    s3_hook = S3Hook(aws_conn_id=aws_conn_id)
    lines = s3_hook.read_key(key=key_file, bucket_name=bucket_name).split("\n")
    lines = [tuple(line.split(',')) for line in lines if line != '']
    df = DataFrame.from_records(data=lines[1:], columns=lines[0])
    df.to_sql(name=psql_tbl_name,
              con=psql_hook.get_sqlalchemy_engine(),
              if_exists="replace",
              index=False
              )

In this function we read file from S3 into worker memory, build Pandas Data Frame out of it and store it into Postgres.

All the Airflow connection ids are hard-coded at the beginning of the file:

postgres_conn_id = 'psql_conf'
mysql_conn_id = 'mysql_conf'
aws_conn_id = 's3_config'

You do not need to bother and add it into Airflow test environment – Catcher will handle it for your during running test.

That’s all. Now it is a time to test it.

The test itself

Let’s start with defining Catcher’s test-local variables:

variables:
  users:
    - uuid: '{{ random("uuid4") }}'
      email: 'bar@test.com'
    - uuid: '{{ random("uuid4") }}'
      email: 'baz@test.com'
    - uuid: '{{ random("uuid4") }}'
      email: 'foo@test.com'
  pipeline: 'simple_example_pipeline'
  mysql_tbl_name: 'my_table'

We set up Airflow’s pipeline name, mysql table name and 3 users which will be exported.

We provide here two different inventories: one for local run and one for docker run. Which inventories to provide is up to you, depends on particular case.
Local inventory is:

mysql_conf: 'root:test@localhost:3307/test'
psql_conf: 'postgres:postgres@localhost:5432/postgres'
airflow_db: 'airflow:airflow@localhost:5433/airflow'
airflow_web: 'http://127.0.0.1:8080'
s3_config:
    url: http://127.0.0.1:9001
    key_id: minio
    secret_key: minio123

If you have already stage or devenvironment setup, you can add inventory for it the same way as we done for local, but specifying DNS names instead of localhost ip addresses.

Docker inventory is the same, but with domain names instead of localhost.

mysql_conf: 'mysql://root:test@mysql:3306/test'
psql_conf: 'postgresql://postgres:postgres@custom_postgres_1:5432/postgres'
airflow_db: 'airflow:airflow@postgres:5432/airflow'
airflow_web: 'http://webserver:8080'
airflow_fernet: 'zp8kV516l9tKzqq9pJ2Y6cXbM3bgEWIapGwzQs6jio4='
s3_config:
    url: http://minio:9000
    key_id: minio
    secret_key: minio123

Steps

First step should populate test data: it creates MySQL and Postgres tables and generates data. It allows you to avoid monkey labor and simplify your life as a Data Engineer. Forget about test datasets manual building and production data exporting into csv and copying to the test environment. And all sorts of issues related to it: data anonymization, regexp in sql and generators.

For preparing your test data prepare step fits the best:

prepare:
    populate:
      mysql:
        conf: '{{ mysql_conf }}'
        schema: my_table.sql
        data:
          my_table: my_table.csv
      postgres:
        conf: '{{ psql_conf }}'
        schema: psql_tbl.sql

As you can see inside prepare we have defined populate both for MySQL and for Postgres data sources. For both data sources this step follow the same logic: provide configuration and run DDL code from specified SQL files. Both mysql_conf and psql_conf values are taken from the current inventory file (you are running test with).

The only difference, for mysql we specify input data which would be used to fill my_table. We do not specify input data for Postgres as it should be filled in by our Airflow pipeline during the execution. Lets dive deeper into how mysql populate statements are defined.

my_table.sql is a SQL file containing create table statement. In real world you may also have here grant access statement, adding indexes and etc:

CREATE TABLE if not exists test.my_table(
                    user_id      varchar(36)    primary key,
                    email        varchar(36)    NOT NULL
                );

my_table.csv is a data file, main difference with general testing approach is – we don’t specify actual data here. We apply Jinja2 template to generate the csv file based on our users variable from the very beginning. So one of the Catcher’s coolest feature: it supports Jinja2 templates everywhere.

user_id,email
{%- for user in users -%}
{{ user.uuid }},{{ user.email }}
{%- endfor -%}

psql_tbl.sql is almost the same as my_table.sql but with another table name.

When all data is prepared we should trigger our pipeline. It is the second step:

- airflow:
    run:
      config:
        db_conf: '{{ airflow_db }}'
        url: '{{ airflow_web }}'
        populate_connections: true
        fernet_key: '{{ airflow_fernet }}'
      dag_id: '{{ pipeline }}'
      sync: true
      wait_timeout: 150

It will run airflow pipeline simple_example_pipeline and will wait for it to finish (or fail in 150 seconds). And it will also create airflow connections based on your catcher inventory file.

One important thing here – Catcher will create connections in Airflow and name them as they are named in inventory file:

For psql_conf: 'postgres:postgres@localhost:5432/postgres' from inventory it will create connection psql_conf in Airflow. So, in order to have working test connection id in your pipeline should be the same as connection id in inventory file: postgres_conn_id = 'psql_conf'. Name itself does not matter.

Third step is to check if S3 file was created and download it:

- s3:
    get:
      config: '{{ s3_config }}'
      path: 'my_awesome_bucket/data/{{ mysql_tbl_name }}/year={{ now()[:4] }}/month={{ now()[5:7] }}/day={{ now()[8:10] }}/my_table.csv'
    register: {s3_csv: '{{ OUTPUT }}'}

As we said above, Catcher can apply jinja templates everywhere, here you see an example, how to compose path to our S3 resource. Our path in original pipeline build dynamically, depends on execution_date we use built-in function now() which returns current datetime as string, and apply some python string manipulation, like [5:7] to retrieve only part of the string. We pull the resource and register steps’ output as a new variable s3_csv.

Next two steps is to load the content from resource file and compare it with s3_csv (our final step in original airflow pipeline):

- echo: {from_file: 'my_table.csv', register: {expect_csv: '{{ OUTPUT }}'}}
- check:
    equals: {the: '{{ s3_csv.strip() }}', is: '{{ expect_csv.strip() }}'}
    name: 'Check data in s3 expected'

echo step can be also used to write or read from file. Here we read the same resource my_table.csv, which was used to populate MySQL and save step’s output to the variable expect_csv. Echo step will also run Jinja2 template and generate the proper content.

check equals step is used to compare expect_csv and s3_csv variables values. As their content is string we use python’s string strip() method to remove trailing spaces and empty lines.

The last step is to check what was actually written into Postgres. Expect steps fits us the best here:

- expect:
    compare:
      postgres:
        conf: '{{ psql_conf }}'
        data:
          psql_tbl: 'my_table.csv'
        strict: true
    name: 'Postgres data match expected'

Final cleanup

We need to add cleanup after the test to remove any side effects for other tests to run under clean environment.

So, we add block finally to test’s root:

finally:
  - mysql:
      request:
        conf: '{{ mysql_conf }}'
        query: 'drop table my_table'
      name: 'Clean up mysql'
      ignore_errors: true
  - postgres:
      request:
        conf: '{{ psql_conf }}'
        query: 'drop table psql_tbl'
      name: 'Clean up postgres'
      ignore_errors: true
  - s3:
      delete:
        config: '{{ s3_config }}'
        path: 'my_awesome_bucket/data/{{ mysql_tbl_name }}/year={{ now()[:4] }}/month={{ now()[5:7] }}/day={{ now()[8:10] }}/my_table.csv'
      name: 'Clean up s3'
      ignore_errors: true

We remove all data from Mysql and Postgres as well as remove file from S3. ignore_errors means that we don’t care if action fails (if there is no such file on S3 or table in the database). By the way, good approach here would be to move S3 file path into Catcher variable and reuse it in `S3:get` (Step #3) and inside delete step, to reduce code duplication.

How to run

Locally in docker

If you don’t have any environment you can use this docker-compose to start one locally. It is based on puckel-airflow docker repository.

docker-compose up -d

After you started docker-compose you need to run catcher docker image in the same network mounting your tests, resources and specifying inventory:

docker run -it --volume=$(pwd)/test:/opt/catcher/test \
               --volume=$(pwd)/resources:/opt/catcher/resources \
               --volume=$(pwd)/inventory:/opt/catcher/inventory \
               --network catcherairflowexample_default \
           comtihon/catcher -i inventory/docker.yml test

The network is very important, because, if you will run catcher locally, you would probably use local inventory with all services hosts 127.0.0.1. Both you and Catcher would be able to access them, but Catcher will populate Airflow connection with 127.0.0.1 from your local inventory, so your pipeline would fail, because Airflow in docker won’t be able to access database/minio via 127.0.0.1.

See more docker run instructions here.

Remotely in the environment

It is a good point to automate your tests. You can make your CI run catcher after every deployment to every environment. Both Catcher-in-docker or Catcher cli can be used from CI agents. Just use the proper inventory.

The output

If you run your test you’ll see nice colorful output. It will be split on two parts.

First is step-by-step run summary. It helps you to understand how test is running:

INFO:catcher:Step Create table and populate initial data OK
INFO:catcher:Step Trigger pipeline simple_example_pipeline OK
INFO:catcher:Step Get file from s3 OK
INFO:catcher:user_id,email
ea1d710b-0a7b-45f6-a1c4-52a5f7d91bce,bar@test.com
cf0a3043-6958-412d-a7b0-924092e7e95b,baz@test.com
e7a53958-f4aa-45e7-892e-69514990c992,foo@test.com
INFO:catcher:Step echo OK
INFO:catcher:Step Check data in s3 expected OK
INFO:catcher:Step Postgres data match expected OK
INFO:catcher:Test test/test.yml passed.

And following cleanup part:

INFO:catcher:Step Clean up mysql OK
INFO:catcher:Step Clean up postgres OK
INFO:catcher:Step Clean up s3 OK
INFO:catcher:Test test/test.yml [cleanup]  passed.

In case of multiple tests they will follow each other.

Second part is a run summary, one for all tests. It shows you statistics and status of each test run. In case of failure it will show the step number. In our case:

INFO:catcher:Test run 1. Success: 1, Fail: 0. Total: 100%
Test test: pass

Conclusion

When developers hear about end-to-end tests, they usually think about complex BDD frameworks and tons of code which they need to write in order to make everything work. It is not about Catcher. You don’t need to know any programming language to create a test.

When QA engineers hear about end-to-end tests, they usually think about lot’s of manual actions, easy to miss or make an error while testing. It is not about Catcher. Do your manual actions once and put them in Catcher’s script, to repeat every time you do a deploy.

When Data Engineers hear about end-to-end tests, they usually think how cool it would be to have a framework for testing data pipelines. And here we finally have one.

Happy testing!

HowTo: Run Cloudera Quickstart in Docker

HowTo: Run Cloudera Quickstart in Docker

The best way to familiarize yourself with Hadoop ecosystem or to do POC is to play with it in a sandbox. For that Cloudera provides 2 Quick Start options: one is the image for VirtualMashine and another is the Docker image. Both of them contain a small CDH cluster with one DataNode. But this is more than enough for sandbox purpose.

However, most of  data engineering neophytes find it difficult to set it up, since, unfortunately,  official Cloudera installation guide missing some essential parts on twinks after installation. Here you will find quick and easy how to setup QuickStart from Cloudera with Docker image on Linux.

If you have installed docker image already and don’t want to repeat those steps again, just move to the part with the solution or to the fixed docker image.

Run QuickStart according to official documentation

Check if you have docker installed. In terminal type: docker -v to check version. (This HOWTO doesn’t cover docker installation, the manual  from official docker site).

Check status of the docker service on your machine, start it if needed:

sudo systemctl status docker
sudo systemctl start docker

In my case docker was stopped, so I had to start it. Now when docker is installed and ready to work, we need to download QuickStart Image: `sudo docker pull cloudera/quickstart:latest`.  Depends on your connection download operation can take time – be patient :).

After successful download our Cloudera QuickStart image should appear in the list of docker images: `docker images`.  Copy <IMAGE ID> of ‘cloudera/quickstart’, we will use it later. In my case it is: ‘4239cd2958c6’. Finally, main command to run docker image as container is:

sudo docker run  --hostname=quickstart.cloudera --privileged=true -t -i -p 8888 -p 7180 4239cd2958c6 /usr/bin/docker-quickstart -d

/usr/bin/docker-quickstart #Entry point to start all CDH services. Provided by cloudera
--hostname=quickstart.cloudera #Required: pseudo-distributed configuration assumes this hostname
--privileged=true #Required: for HBase, MySQL-backed Hive metastore, Hue, Oozie, Sentry, and Cloudera Manager, and possibly others
-t  #Required: once services are started, a Bash shell takes over and will die without this
-i  #Required: if you want to use the terminal, either immediately or attach later
-p 8888  #Recommended: maps the Hue port in the guest to another port on the host
-p 7180  #Recommended: maps the Cloudera Manager port in the guest (7180) to another port on the host
-p [PORT] #Any other ports you want to remap from guest to free host ports. To make it accessible outside container. 
-d   #Optional: runs the container in the background. I would recommend to use this option if you planning to run container constantly on background.

Fix standard image to get it work

Further starts tuning to make your sandbox operational. By default, cloudera manager is not started in container, so, lets enable it first.

Obtain <CONTAINER_ID> value `docker ps`. In my case it is: ‘5fadd6cb8e0c’

Connect  to QuickStart container shell: `docker attach <CONTAINER_ID>` and run script to enable cloudera manager: `/home/cloudera/cloudera-manager –express`

At the end script prints out an address on which you can access cloudera-manager. Surprisingly, you won’t be able to connect to it. That’s happening because we remaped port earlier. And now we need to get correct host port to which our guest port 7180 was mapped.

Detect new port mapping from guest to host: `sudo docker port <CONTAINER_ID> <guest_port>`. Thus, to connect to Cloudera manager I need to type: ‘0.0.0.0:32771’ and finally we will see this:

 But wait a second, something is red at our `Hosts`. Let’s take a look on our error closer and fix it. It’s clock offset error:

Basically it means that our ntpd service is either not started or can’t connect to services. The solution is:

date      # will show difference between real date and server one
sudo chkconfig --add ntpd
sudo service ntpd restart 
date      # to make sure that ntpd is working and date is sync

Wait a couple of minutes and check cloudera manager again. As you see error has been gone.

Since now, until container won’t be turned off, ntpd service continue to work properly. If you are lazy to run additional command at cloudera docker container every time it was rebooted, you simply can use my docker image, based on Cloudera QuickStart one, or create one yourself.

Python & Graphql. Tips, tricks and performance improvements.

Python & Graphql. Tips, tricks and performance improvements.


Recently I’ve finished another back-end with GraphQL, but now on Python. In this article I would like to tell you about all difficulties I’ve faced and narrow places which can affect the performance.

Technology stack: graphene + flask and sqlalchemy integration. Here is a piece of requirements.txt:

graphene
graphene_sqlalchemy
flask
flask-graphql
flask-sqlalchemy
flask-cors
injector
flask-injector

This allow me to map my database entities directly to GraphQL.

It looks like this:

The model:

class Color(db.Model):
  """color table"""
  __tablename__ = 'colors'

  color_id = Column(BigInteger().with_variant(sqlite.INTEGER(), 'sqlite'), primary_key=True)
  color_name = Column(String(50), nullable=False)
  color_r = Column(SmallInteger)
  color_g = Column(SmallInteger)
  color_b = Column(SmallInteger)

The node:

class ColorNode(SQLAlchemyObjectType):
  class Meta:
    model = colours.Color
    interfaces = (relay.Node,)

  color_id = graphene.Field(BigInt)

Everything is simple and nice.

But what are the problems?

Flask context.

At the time of writing this article I was unable to send my context to the GraphQL.

app.add_url_rule('/graphql',
                 view_func=GraphQLView.as_view('graphql',
                 schema=schema.schema,
                 graphiql=True,
                 context_value={'session': db.session})
                 )

This thing didn’t work for me, as view in flask-graphql integration was replaced by flask request.

Maybe this is fixed now, but I have to subclass GrqphQLView to save the context:

class ContexedView(GraphQLView):
  context_value = None

  def get_context(self):
    context = super().get_context()
    if self.context_value:
      for k, v in self.context_value.items():
        setattr(context, k, v)
    return context

CORS support

It is always a thing I forget to add 🙂

For Python Flask just add flask-cors in your requirements and set it up in your create_app method via CORS(app). That’s all.

Bigint type

I had to create my own bigint type, as I use it in the database as primary key in some columns. And there were graphene errors when I try to send int type.

class BigInt(Scalar):
  @staticmethod
  def serialize(num):
    return num

  @staticmethod
  def parse_literal(node):
    if isinstance(node, ast.StringValue) or isinstance(node, ast.IntValue):
      return int(node.value)

  @staticmethod
  def parse_value(value):
    return int(value)

Compound primary key

Also, graphene_sqlalchemy doesn’t support compound primary key out of the box. I had one table with (Int, Int, Date) primary key. To make it resolve by id via Relay’s Node interface I had to override get_node method:

@classmethod
def get_node(cls, info, id):
  import datetime
  return super().get_node(info, eval(id))

datetime import and eval are very important here, as without them date field will be just a string and nothing will work during querying the database.

Mutations with authorization

It was really easy to make authorization for queries, all I needed is to add Viewer object and write get_token and get_by_token methods, as I did many times in java before.

But mutations are called bypassing Viewer and its naturally for GraphQL.

I didn’t want to add authorization code in every mutation’s header, as it leads to code duplication and it’s a little bit dangerous, as I may create a backdoor by simply forgetting to add this code.

So I’ve subclass mutation and reimplement it’s mutate_and_get_payload like this:

class AuthorizedMutation(relay.ClientIDMutation):
  class Meta:
    abstract = True

  @classmethod
  @abstractmethod
  def mutate_authorized(cls, root, info, **kwargs):
    pass

  @classmethod
  def mutate_and_get_payload(cls, root, info, **kwargs):
    # authorize user using info.context.headers.get('Authorization')
    return cls.mutate_authorized(root, info, **kwargs)

All my mutations subclass AuthorizedMutation and just implement their business logic in mutate_authorized. It is called only if user was authorized.

Sortable and Filterable connections

To have my data automatically sorted via query in connection (with sorted options added to the schema) I had to subclass relay’s connection and implement get_query method (it is called in graphene_sqlalchemy).

class SortedRelayConnection(relay.Connection):
  class Meta:
    abstract = True

  @classmethod
  def get_query(cls, info, **kwargs):
    return SQLAlchemyConnectionField.get_query(cls._meta.node._meta.model, info, **kwargs)

Then I decided to add dynamic filtering over every field. Also with extending schema.

Out of the box graphene can’t do it, so I had to add a PR https://github.com/graphql-python/graphene-sqlalchemy/pull/164 and subclass connection once again:

class FilteredRelayConnection(relay.Connection):
  class Meta:
    abstract = True

  @classmethod
  def get_query(cls, info, **kwargs):
    return FilterableConnectionField.get_query(cls._meta.node._meta.model, info, **kwargs)

Where FilterableConnectionField was introduced in the PR.

Sentry middleware

We use sentry as error notification system and it was hard to make it work with graphene. Sentry has good flask integration, but problem with graphene is – it swallows exceptions returning them as errors in response.

I had to use my own middleware:

class SentryMiddleware(object):

  def __init__(self, sentry) -> None:
    self.sentry = sentry

  def resolve(self, next, root, info, **args):
    promise = next(root, info, **args)
    if promise.is_rejected:
      promise.catch(self.log_and_return)
    return promise

  def log_and_return(self, e):
    try:
      raise e
    except Exception:
      traceback.print_exc()
      if self.sentry.is_configured:
      if not issubclass(type(e), NotImportantUserError):
        self.sentry.captureException()
    return e

It is registered on GraphQL route creation:

app.add_url_rule('/graphql',
                 view_func=ContexedView.as_view('graphql',
                 schema=schema.schema,
                 graphiql=True,
                 context_value={'session': db.session},
                 middleware=[SentryMiddleware(sentry)]
                )

Low performance with relations

Everything was well, tests were green and I was happy till my application went to dev environment with real amounts of data. Everything was super slow.

The problem was in sqlalchemy’s relations. They are lazy by default. https://docs.sqlalchemy.org/en/latest/orm/loading_relationships.html

It means – if you have graph with 3 relations: Master -> Pet -> Food and query them all, first query will receive all masters (select * from masters`). F.e. you’ve received 20. Then for each master there will be query (select * from pets where master_id = ?). 20 queries. And finally – N food queries, based on pet return.

My advice here – if you have complex relations and lots of data (I was writing back-end for big data world) you have to make all relations eager. The query itself will be harder, but it will be only one, reducing response time dramatically.

Performance improvement with custom queries

After I made my critical relations eager (not all relations, I had to study front-end app to understand what and how they query) everything worked faster, but not enough. I looked at generated queries and was a bit frightened – they were monstrous! I had to write my own, optimized queries for some nodes.

F.e. if I have a PlanMonthly entity with several OrderColorDistributions, each of it having one Order.

I can use subqueries to limit the data (remember, I am writing back-end for big data) and populate relations with existing data (I anyway had this data in the query, so there was no need to use eager joins, generated by ORM). It will facilitates the request.

Steps:

  1. Mark subqueries with_labels=True
  2. Use root’s (for this request) entity as return one:
    Order.query \
      .filter(<low level filtering here>) \
      .join(<join another table, which you can use later>) \
      .join(ocr_query, Order.order_id == ocr_query.c.order_color_distribution_order_id) \
      .join(date_limit_query,
            and_(ocr_query.c.order_color_distribution_color_id == date_limit_query.c.plans_monthly_color_id,
                 ocr_query.c.order_color_distribution_date == date_limit_query.c.plans_monthly_date,
                 <another table joined previously> == date_limit_query.c.plans_monthly_group_id))
  3. Use contains_eager on all first level relations.
    query = query.options(contains_eager(Order.color_distributions, alias=ocr_query))
  4. If you have second layer of relations (Order -> OrderColorDistribution -> PlanMonthly) chain contains_eager:
    query = query.options(contains_eager(Order.color_distributions, alias=ocr_query)
                 .contains_eager(OrderColorDistribution.plan, alias=date_limit_query))

Reducing number of calls to the database

Besides data rendering level I have my service layer, which knows nothing about GraphQL. And I am not going to introduce it there, as I don’t like high coupling.

But each service needs fetched months data. To use all the data only once and have it in all services, I use injector with @request scope. Remember this scope, it is your friend in GraphQL.

It works like a singleton, but only within one request to /graphql. In my connection I just populate it with plans, found via GraphQL query (including all custom filters and ranges from front-end):

app.injector.get(FutureMonthCache).set_months(found)

Then in all services, which need to access this data I just use this cache:

@inject
def __init__(self,
             prediction_service: PredictionService,
             price_calculator: PriceCalculator,
             future_month_cache: FutureMonthCache) -> None:
  super().__init__(future_month_cache)
  self._prediction_service = prediction_service
  self._price_calculator = price_calculator

Another nice thing is – all my services, which manipulate data and form the request have also @request scope, so I don’t need to calculate predictions for every month. I take them all from cache, do one query and store the results. Moreover, one service can rely on other service’s calculated data. Request scope helps a lot here, as it allows me to calculate all data only once.

On the Node side I call my request scope services via resolver:

def resolve_predicted_pieces(self, _info):
  return app.injector.get(PredictionCalculator).get_original_future_value(self)

It allows me to run heavy calculations only if predicted_pieces were specified in the GraphQL query.

Summing up

That’s all difficulties I’ve faced. I haven’t tried websocket subscriptions, but from what I’ve learned I can say that Python’s GraphQL is more flexible, than Java’s one. Because of Python’s flexibility itself. But if I am going to work on high-load back-end, I would prefer not to use GraphQL, as it is harder to optimize.

How to create avro based table in Impala

How to create avro based table in Impala

Consider the following situation: A bundle of .avro files is stored on HDFS. They need to be converted to Impala tables. Schemas are not provided with files, at least not externally (it’s contained as first line of any avro file). But Impala has a known-issue with avro tables, and its usage is pretty limited: we can create avro based table only if all the columns are manually declared with their types in `CREATE TABLE` statement, otherwise it will fail with an error.

(pic).

But what if we have hundreds of columns or just not completely sure in schema and would like to automate process of tables creation?

Disclaimer: you can’t do that directly, but there is  a work around: you have to create temporary avro table in hive, then `create as select` temporary parquet file as select from avro table and finally run `invalidate metadata` in impala to catch up all the changes in tables set into impala.

Step by step algorithm:

  1. Check if you have `file.avsc` along with `file.avro` , if yes, skip step #2
  2. Create external schema avro file from file.avro
    • Download avro-tools-1.7.7.jar – official tool to work with avro files. Pay attention to a version – it should be 1.7.7.
    • Place it somewhere on your server’s local file system
      # parameters you will need
      # <tmp_local_path>=/tmp/get_avro any random path inside /tmp folder of local file system
      # <file_name> name of the file.avro from where you want to extract schema
      # <hdfs_file_path>  absolute path on hdfs to target file.avro
      # <result_schema_file_path>  path on hdfs where you would like to get created schema
      
      # create tmp folder
      mkdir -p <tmp_local_path>
      
      # here we read with cat command first 50Kb of file.avro and store it as file_sample on our local file system
      # depends on amount of columns 50Kb can be not enough. You can try --lines 1, to pick only first line or increase size. 
      hdfs dfs -cat <hdfs_file_path> | head --bytes 50K > <tmp_local_path>/<file_name>_sample
      
      # use avro-tool jar to retrieve schema from sample file and store it the name of origin file.avro with extension .avsc
      java -jar ~/jars/avro-tools-1.7.7.jar getschema <tmp_local_path>/<file_name>_sample > <tmp_local_path>/<file_name>.avsc
      
      # copy from local file system back to hdfs our created file.avsc (avro schema)
      hdfs dfs -put <tmp_local_path>/<file_name>.avsc <result_schema_file_path>
      
      #clean up mess 
      rm -rf <tmp_local_path>
      
      
  3. In Hive:
    CREATE EXTERNAL TABLE IF NOT EXISTS <avro_tmp_tbl_name>
    ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe'
    STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat'
    OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat'
    LOCATION 'hdfs://<orig_file_dir_path>/'
    tblproperties ('avro.schema.url'='hdfs://<path_to_avro_schema_file_on_hdfs>');
    
    
    CREATE TABLE IF NOT EXISTS <parq_tmp_tbl_name>
    STORED AS PARQUET
    AS select * from  <avro_tmp_tbl_name>  ;
  4. In impala: Invalidate metadata <parq_tmp_tbl_name>;

That’s all – job is done. This step-by-step algorithm is easy to wrap into any pipeline tool like: oozie, airflow, etc and thus, completely automate routine part of work.