Development

Development

To get info about new technologies, perspective products and useful services

BigData

BigData

To know more about big data, data analysis techniques, tools and projects

Refactoring

Refactoring

To improve your code quality, speed up development process

Tag: Python

Testing Airflow data pipelines with Catcher end to end

Testing Airflow data pipelines with Catcher end to end

This article is about writing end-to-end test for a data pipeline. It will cover Airflow, as one of the most popular data pipeline scheduler now days and one of the most complicated to test. For impatient – here is the repository with everything set up.

What is data pipeline and why is it important to test it?

It is a development pattern, when we take data from one or several data sources, process it (not always) and move it to another place. It can be real-time or batch. It can be done via different frameworks and tools (like Airflow, Spark, Spring Batch, hand-made).
But it has one common thing:

Any data pipeline is extremely hard to test, as you always need a fully deployed system, prepared in advance data set and mocks of external services.

Imagine you have standard business case: your backend is sending results into Postgres and you need to update merchant’s status in Salesforce for your customer support agent being able to answer customers’ questions on the fly.

To test it you’ll have to pass these complex steps:

  • download data from production & depersonalize it;
  • set up sandboxes or use mocks directly in your code;

What is Catcher and how can it help you?

Catcher is an end-to-end tool, specially designed to test systems containing many components. Initially developed as end-to-end microservices test tool it perfectly fits needs of data pipeline testing.

Main Catcher’s features are:

  • modular architecture. It has lots of modules for different requirements – from Kafka to S3;
  • templates. It fully supports Jinja2 templates which allows you to generate data sets easily;
  • different inventory files for different environments. Write your tests locally and run in cloud environment by just changing inventory file;

You can read about it here.

If you are new to Catcher you can find this article useful.

The pipeline

Imagine, you have a back-end which handles user registration. All newly registered users are stored in MySQL. You also have another back-end which works with GDPR and unsubscriptions. The business requirement is – the second back-end should somehow know about newly created users, as it needs this information for proper unsubscription events matching. The final point is – your back-end developers don’t know about Kafka/Rabbit so the only way to do it – is to write a pipeline which will upload data from MySQL to Postgres.

The pipeline will:

  1. take data from MySQL and load to S3
  2. take data from S3 and put it to Postgres
start >> mysql_to_s3 >> s3_to_psql >> end

In real world, most likely second and third steps of this pipeline would be joined into custom operator MySQLtoPostgresViaS3Operator. But here we divide it to show an example of longer than one actual step pipeline :).

Both ‘start’ and ‘end’ are dummy operators. I added it because it is a good place to have custom notifications to slack or etc.
mysql_to_s3 is a python operator:

mysql_to_s3 = PythonOperator(
    task_id='mysql_to_s3',
    python_callable=mysql_to_s3,
    retries=0,
    dag=dag,
    provide_context=True
)

It just calls mysql_to_s3 function:

def mysql_to_s3(**context):
    mysql_hook = MySqlHook(mysql_conn_id=mysql_conn_id)
    s3_hook = S3Hook(aws_conn_id=aws_conn_id)
    sql = f'Select * from {mysql_tbl_name} order by email'
    df: DataFrame = mysql_hook.get_pandas_df(sql=sql) 
    with NamedTemporaryFile(newline='', mode='w+') as f:
        key_file = f"data/{mysql_tbl_name}/year={datetime.date.today().year}/" \
                   f"month={datetime.date.today().strftime('%m')}/" \
                   f"day={datetime.date.today().strftime('%d')}/" \
                   f"{mysql_tbl_name}.csv"
        df.to_csv(path_or_buf=f,
                  sep=",",
                  columns=df.columns.values.tolist(),
                  index=False
                  )
        f.flush()
        s3_hook.load_file(filename=f.name,
                          key=key_file,
                          bucket_name=bucket_name)
        context["ti"].xcom_push(key=key_str, value=key_file)
        f.close()

In this function, via MySQL hook, we retrieve Pandas Data Frame from given SQL query (be mindful, make sure you don’t read too much data with this query and don’t overload memory, otherwise read in chunks) and store this Data Frame as CSV file on S3.

After S3 file is loaded next task: s3_to_psql is called:

s3_to_psql = PythonOperator(
    task_id='s3_to_psql',
    python_callable=s3_to_psql,
    retries=0,
    dag=dag,
    provide_context=True
)

It is also a python operator which calls s3_to_psql function:

def s3_to_psql(**context):
    ti = context["ti"]
    key_file = ti.xcom_pull(dag_id='simple_example_pipeline',
                            task_ids='mysql_to_s3',
                            key=key_str)
    psql_hook = PostgresHook(postgres_conn_id=postgres_conn_id)
    s3_hook = S3Hook(aws_conn_id=aws_conn_id)
    lines = s3_hook.read_key(key=key_file, bucket_name=bucket_name).split("\n")
    lines = [tuple(line.split(',')) for line in lines if line != '']
    df = DataFrame.from_records(data=lines[1:], columns=lines[0])
    df.to_sql(name=psql_tbl_name,
              con=psql_hook.get_sqlalchemy_engine(),
              if_exists="replace",
              index=False
              )

In this function we read file from S3 into worker memory, build Pandas Data Frame out of it and store it into Postgres.

All the Airflow connection ids are hard-coded at the beginning of the file:

postgres_conn_id = 'psql_conf'
mysql_conn_id = 'mysql_conf'
aws_conn_id = 's3_config'

You do not need to bother and add it into Airflow test environment – Catcher will handle it for your during running test.

That’s all. Now it is a time to test it.

The test itself

Let’s start with defining Catcher’s test-local variables:

variables:
  users:
    - uuid: '{{ random("uuid4") }}'
      email: 'bar@test.com'
    - uuid: '{{ random("uuid4") }}'
      email: 'baz@test.com'
    - uuid: '{{ random("uuid4") }}'
      email: 'foo@test.com'
  pipeline: 'simple_example_pipeline'
  mysql_tbl_name: 'my_table'

We set up Airflow’s pipeline name, mysql table name and 3 users which will be exported.

We provide here two different inventories: one for local run and one for docker run. Which inventories to provide is up to you, depends on particular case.
Local inventory is:

mysql_conf: 'root:test@localhost:3307/test'
psql_conf: 'postgres:postgres@localhost:5432/postgres'
airflow_db: 'airflow:airflow@localhost:5433/airflow'
airflow_web: 'http://127.0.0.1:8080'
s3_config:
    url: http://127.0.0.1:9001
    key_id: minio
    secret_key: minio123

If you have already stage or devenvironment setup, you can add inventory for it the same way as we done for local, but specifying DNS names instead of localhost ip addresses.

Docker inventory is the same, but with domain names instead of localhost.

mysql_conf: 'mysql://root:test@mysql:3306/test'
psql_conf: 'postgresql://postgres:postgres@custom_postgres_1:5432/postgres'
airflow_db: 'airflow:airflow@postgres:5432/airflow'
airflow_web: 'http://webserver:8080'
airflow_fernet: 'zp8kV516l9tKzqq9pJ2Y6cXbM3bgEWIapGwzQs6jio4='
s3_config:
    url: http://minio:9000
    key_id: minio
    secret_key: minio123

Steps

First step should populate test data: it creates MySQL and Postgres tables and generates data. It allows you to avoid monkey labor and simplify your life as a Data Engineer. Forget about test datasets manual building and production data exporting into csv and copying to the test environment. And all sorts of issues related to it: data anonymization, regexp in sql and generators.

For preparing your test data prepare step fits the best:

prepare:
    populate:
      mysql:
        conf: '{{ mysql_conf }}'
        schema: my_table.sql
        data:
          my_table: my_table.csv
      postgres:
        conf: '{{ psql_conf }}'
        schema: psql_tbl.sql

As you can see inside prepare we have defined populate both for MySQL and for Postgres data sources. For both data sources this step follow the same logic: provide configuration and run DDL code from specified SQL files. Both mysql_conf and psql_conf values are taken from the current inventory file (you are running test with).

The only difference, for mysql we specify input data which would be used to fill my_table. We do not specify input data for Postgres as it should be filled in by our Airflow pipeline during the execution. Lets dive deeper into how mysql populate statements are defined.

my_table.sql is a SQL file containing create table statement. In real world you may also have here grant access statement, adding indexes and etc:

CREATE TABLE if not exists test.my_table(
                    user_id      varchar(36)    primary key,
                    email        varchar(36)    NOT NULL
                );

my_table.csv is a data file, main difference with general testing approach is – we don’t specify actual data here. We apply Jinja2 template to generate the csv file based on our users variable from the very beginning. So one of the Catcher’s coolest feature: it supports Jinja2 templates everywhere.

user_id,email
{%- for user in users -%}
{{ user.uuid }},{{ user.email }}
{%- endfor -%}

psql_tbl.sql is almost the same as my_table.sql but with another table name.

When all data is prepared we should trigger our pipeline. It is the second step:

- airflow:
    run:
      config:
        db_conf: '{{ airflow_db }}'
        url: '{{ airflow_web }}'
        populate_connections: true
        fernet_key: '{{ airflow_fernet }}'
      dag_id: '{{ pipeline }}'
      sync: true
      wait_timeout: 150

It will run airflow pipeline simple_example_pipeline and will wait for it to finish (or fail in 150 seconds). And it will also create airflow connections based on your catcher inventory file.

One important thing here – Catcher will create connections in Airflow and name them as they are named in inventory file:

For psql_conf: 'postgres:postgres@localhost:5432/postgres' from inventory it will create connection psql_conf in Airflow. So, in order to have working test connection id in your pipeline should be the same as connection id in inventory file: postgres_conn_id = 'psql_conf'. Name itself does not matter.

Third step is to check if S3 file was created and download it:

- s3:
    get:
      config: '{{ s3_config }}'
      path: 'my_awesome_bucket/data/{{ mysql_tbl_name }}/year={{ now()[:4] }}/month={{ now()[5:7] }}/day={{ now()[8:10] }}/my_table.csv'
    register: {s3_csv: '{{ OUTPUT }}'}

As we said above, Catcher can apply jinja templates everywhere, here you see an example, how to compose path to our S3 resource. Our path in original pipeline build dynamically, depends on execution_date we use built-in function now() which returns current datetime as string, and apply some python string manipulation, like [5:7] to retrieve only part of the string. We pull the resource and register steps’ output as a new variable s3_csv.

Next two steps is to load the content from resource file and compare it with s3_csv (our final step in original airflow pipeline):

- echo: {from_file: 'my_table.csv', register: {expect_csv: '{{ OUTPUT }}'}}
- check:
    equals: {the: '{{ s3_csv.strip() }}', is: '{{ expect_csv.strip() }}'}
    name: 'Check data in s3 expected'

echo step can be also used to write or read from file. Here we read the same resource my_table.csv, which was used to populate MySQL and save step’s output to the variable expect_csv. Echo step will also run Jinja2 template and generate the proper content.

check equals step is used to compare expect_csv and s3_csv variables values. As their content is string we use python’s string strip() method to remove trailing spaces and empty lines.

The last step is to check what was actually written into Postgres. Expect steps fits us the best here:

- expect:
    compare:
      postgres:
        conf: '{{ psql_conf }}'
        data:
          psql_tbl: 'my_table.csv'
        strict: true
    name: 'Postgres data match expected'

Final cleanup

We need to add cleanup after the test to remove any side effects for other tests to run under clean environment.

So, we add block finally to test’s root:

finally:
  - mysql:
      request:
        conf: '{{ mysql_conf }}'
        query: 'drop table my_table'
      name: 'Clean up mysql'
      ignore_errors: true
  - postgres:
      request:
        conf: '{{ psql_conf }}'
        query: 'drop table psql_tbl'
      name: 'Clean up postgres'
      ignore_errors: true
  - s3:
      delete:
        config: '{{ s3_config }}'
        path: 'my_awesome_bucket/data/{{ mysql_tbl_name }}/year={{ now()[:4] }}/month={{ now()[5:7] }}/day={{ now()[8:10] }}/my_table.csv'
      name: 'Clean up s3'
      ignore_errors: true

We remove all data from Mysql and Postgres as well as remove file from S3. ignore_errors means that we don’t care if action fails (if there is no such file on S3 or table in the database). By the way, good approach here would be to move S3 file path into Catcher variable and reuse it in `S3:get` (Step #3) and inside delete step, to reduce code duplication.

How to run

Locally in docker

If you don’t have any environment you can use this docker-compose to start one locally. It is based on puckel-airflow docker repository.

docker-compose up -d

After you started docker-compose you need to run catcher docker image in the same network mounting your tests, resources and specifying inventory:

docker run -it --volume=$(pwd)/test:/opt/catcher/test \
               --volume=$(pwd)/resources:/opt/catcher/resources \
               --volume=$(pwd)/inventory:/opt/catcher/inventory \
               --network catcherairflowexample_default \
           comtihon/catcher -i inventory/docker.yml test

The network is very important, because, if you will run catcher locally, you would probably use local inventory with all services hosts 127.0.0.1. Both you and Catcher would be able to access them, but Catcher will populate Airflow connection with 127.0.0.1 from your local inventory, so your pipeline would fail, because Airflow in docker won’t be able to access database/minio via 127.0.0.1.

See more docker run instructions here.

Remotely in the environment

It is a good point to automate your tests. You can make your CI run catcher after every deployment to every environment. Both Catcher-in-docker or Catcher cli can be used from CI agents. Just use the proper inventory.

The output

If you run your test you’ll see nice colorful output. It will be split on two parts.

First is step-by-step run summary. It helps you to understand how test is running:

INFO:catcher:Step Create table and populate initial data OK
INFO:catcher:Step Trigger pipeline simple_example_pipeline OK
INFO:catcher:Step Get file from s3 OK
INFO:catcher:user_id,email
ea1d710b-0a7b-45f6-a1c4-52a5f7d91bce,bar@test.com
cf0a3043-6958-412d-a7b0-924092e7e95b,baz@test.com
e7a53958-f4aa-45e7-892e-69514990c992,foo@test.com
INFO:catcher:Step echo OK
INFO:catcher:Step Check data in s3 expected OK
INFO:catcher:Step Postgres data match expected OK
INFO:catcher:Test test/test.yml passed.

And following cleanup part:

INFO:catcher:Step Clean up mysql OK
INFO:catcher:Step Clean up postgres OK
INFO:catcher:Step Clean up s3 OK
INFO:catcher:Test test/test.yml [cleanup]  passed.

In case of multiple tests they will follow each other.

Second part is a run summary, one for all tests. It shows you statistics and status of each test run. In case of failure it will show the step number. In our case:

INFO:catcher:Test run 1. Success: 1, Fail: 0. Total: 100%
Test test: pass

Conclusion

When developers hear about end-to-end tests, they usually think about complex BDD frameworks and tons of code which they need to write in order to make everything work. It is not about Catcher. You don’t need to know any programming language to create a test.

When QA engineers hear about end-to-end tests, they usually think about lot’s of manual actions, easy to miss or make an error while testing. It is not about Catcher. Do your manual actions once and put them in Catcher’s script, to repeat every time you do a deploy.

When Data Engineers hear about end-to-end tests, they usually think how cool it would be to have a framework for testing data pipelines. And here we finally have one.

Happy testing!

Rollback for microservices with Ansible and Jenkins

Rollback for microservices with Ansible and Jenkins

Imagine your project consists of 4 microservices (3 backends, 1 frontend). Yesterday you introduced several new features and made a release. Unfortunately, your users have just reported a bug. Some of the old important features are not working. You need to do a rollback for all services. Ah, if it could be done with one button.

Actually it can be. In this article I’ll show you how.

Tech stack:

  • Jenkins for rollback automation
  • Ansible + Python for rollback script
  • Docker registry for storing release images
  • DC/OS for running apps

Overview

We will have a python script, called via Ansible from Jenkins, as described in this article. The only difference is – we should have two different tags to run. The first one gathers all available images, the second runs the rollback.

The get algorithm:

  1. Request all images from the docker registry. Filter them by environment, sort by date and take 10 last one for every repository.
  2. Form json with repositories, images and dates and write to file system

The run algorithm:

  1. Read json from get second step and create a Jenkins input
  2. Take all available images for the selected date and do a rollback

The rollback itself:

  1. Modify the docker image section in marathon json config
  2. Start a deploy with modified config

Special case

Imagine a service, which doesn’t change in this release. It means there won’t be any rollback image available for it. But you still need to roll it back, because of the compatibility issues. Please find the example of the situation on the picture below.

If you select Today-1 only Repo1 and Repo3 will be rolled back, as there are no images for Repo2. Perhaps it wasn’t changed.

The problem here is that Repo1 or Repo3 N-1 versions could be incompatible with Repo2 latest version. So you need to find the next version of Repo2 before the rollback date. It is Today-2 version.

Get rollbacks

We will have two actions for a rollback:

  • We gather all rollback dates and images available for the current environment.
  • User selects the data and we perform a rollback.

Ansible side

Ansible changes are minor. Just add two tags for common steps (like requirements installation):


- name: "Copy requirements.txt"

  copy:

    src: "requirements.txt"

    dest: "/tmp/{{ role_name }}/"

  tags:

    - get

    - run

Don’t forget to add tag to the always step, or your clean-up will be ignored. Using run tag only is preferred.

It would be useful to register rollbacks in get output and debug them. In this case you can use Ansible even without Jenkins.

- name: "Get rollbacks"

  shell: "source activate /tmp/{{ role_name }}/{{ conda_env }} ; {{ item }}"

  with_items:

    - pip install -r /tmp/{{ role_name }}/requirements.txt

    - "python /tmp/{{ role_name }}/rollback.py get

      --repo={{ repo }}

      --dump={{ dump_path }}

      --env={{ env }}"

  args:

    executable: /bin/bash

  tags:

    - get

  register: rollbacks



- debug:

    var: rollbacks.results[1].stdout

  tags:

    - get

Python side

With docopt you can use a single entry point with two options, one for `get` and one for run.

Usage:

  rollback.py get --repo=<r> --env=<e> [--dump=<dr>]

  rollback.py run --date=<d> --env=<e> --slack=<s> --user=<u> --pwd=<p> [--dump=<dr>]

The fork itself:

if arguments['get']:

    return get(repo, env, dump)

if arguments['run']:

    return run(date, env, slack, user, pwd, dump)

To get rollbacks you need to call you Docker registry’s API first.
I assume that you use this image naming schema:
<private-docker-registry-host:port>/service-name:build-number-branch

You need to get all tags for current repo, filter them by environment, then sort by date and return last 10.

def get_rollbacks(repo: str, env: str):

    r = requests.get(f'{DOCKER_REGISTRY}/v2/{repo}/tags/list', verify=False)

    if r.status_code != 200:

        raise Exception(f"Failed to fetch tags {r.status_code}")

    releases = list(filter(lambda x: x.endswith(env), r.json()['tags']))

    all_rollbacks = [(get_manifest(repo, r), {r: repo}) for r in releases[-10:]]

    return dict(all_rollbacks)

Where repo is your `service-name` and env is the current branch.

Sorting by date is a bit complex. Date is not included in tags information. The only way to get it is to fetch the mainfest and to check history.

def get_manifest(repo, tag):

    r = requests.get(f'{DOCKER_REGISTRY}/v2/{repo}/manifests/{tag}', verify=False)

    if r.status_code != 200:

        raise Exception(f"Failed to fetch manifest {r.raw}")
    history = r.json()['history']

    sort = sorted([json.loads(h['v1Compatibility'])['created'] for h in history])

    return sort[-1][:10]

The full get function:

def get(repo: str, env: str, dump: str):

    rollbacks = {}

    repos = repo.split(',')

    for r in repos:

        for date, rb in get_rollbacks(r, env).items():

            if date in rollbacks:

                rollbacks[date] += [rb]

            else:

                rollbacks[date] = [rb]

    print(rollbacks)

    if dump is not None:

        with open(path.join(dump, "rollback.json"), mode='w') as rb:
            json.dump({'all': repos, 'rollbacks': rollbacks}, rb)

    return rollbacks.keys()

Where repo is a comma separated list of your service-names. F.e. repo1,repo2,repo3. You also need to print rollbacks for Ansible debug.

Jenkins side

Let’s start Jenkins pipeline with environment input.

parameters {

  choice(choices: 'dev\nstage\nprod', description: 'Which environment should I rollback?', name: 'environment')

}

if you use master environment instead of prod you don’t need to do anything. Otherwise you need to create static variable rollback_env outside of the pipeline and fill it during the first step.

script {

    // need this as env names don't match each other. develop/master/stage in docker vs dev/stage/prod in marathon

    if (params.environment == 'prod') {

        rollback_env = "master"

    } else if(params.environment == 'stage') {

        rollback_env = "stage"

    } else {

        rollback_env = "develop"

    }

}

Then just download your git repo with ansible playbook and run it.

git branch: 'master',

    credentialsId: <your git user credentials id>',

    url: "<your ansible repo>"

ansiblePlaybook(

        playbook: "${env.PLAYBOOK_ROOT}/rollback_service.yaml",
         
        inventory: "inventories/dev/hosts.ini",

        credentialsId: <your git user credentials id>',

        extras: '-e "repo=' + "${env.REPOS}" + ' env=' + "${docker_env}" + ' slack=' + "${env.SLACK_CALLBACK}" + ' dump_path=' + "/tmp" + '" -v',

        tags: "get")

Please pay attention to the dump_path. It tells python script to create json directly in the /tmp, so that we can read it from Jenkins. Lets do it.

import groovy.json.JsonSlurper

def gather_rollback_dates() {

    def inputFile = readFile("/tmp/rollback.json")

    def InputJSON = new JsonSlurper().parseText(inputFile)

    return InputJSON['rollbacks'].keySet().join("\n")

}

This function will find your rollback, get all dates and form a string with \n separator. It is required to generate an input with dropdown.

stage('Select rollback date') {

 steps {

    script {

          def userInput = false

          try {

            timeout(time: 120, unit: 'SECONDS') {

                userInput = input(id: 'userInput',

                                  message: 'Select a date to rollback',
                                  parameters: [

                                    choice(name: 'rollback_date',
                                           
                                           choices: gather_rollback_dates(),

                                           description: 'One or more services have rollback at this date')])

            }

          } catch(err) {


          }

          if (userInput) {

            print('Performing rollback')

            env.DATE = userInput

          } else {
            print('Skip rollback')
          }

        }

    }

}

It looks like this:

Perform a rollback

We have 5 actions for a rollback:

  • Read json from previous step
  • Find missing images for the selected date
  • Get marathon service ids from docker ids
  • Change marathon app’s config
  • Update app in marathon

Ansible side

Nothing special here. Just call python.

- name: "Perform rollbacks"

  shell: "source activate /tmp/{{ role_name }}/{{ conda_env }} ; {{ item }}"

  with_items:

    - pip install -r /tmp/{{ role_name }}/requirements.txt

    - "python /tmp/{{ role_name }}/rollback.py run

      --date={{ date }}

      --env={{ env }}

      --slack={{ slack }}

      --user={{ dcos_user }}

      --dump={{ dump_path }}

      --pwd={{ dcos_password }}"

  tags:

    - run

Python side

Let’s start with run method

Read json and select all available images for a selected date.

def run(date, env, slack, user, pwd, dump):

    json_data = read_rollbacks(dump)

    all_rollbacks = OrderedDict(sorted(json_data['rollbacks'].items(), key=lambda x: x[0]))

    repos = json_data['all']

    images = all_rollbacks[date]

If images for some repos are missing – we need to find their older versions. Add this to your run method:

if len(repos) > 1 and len(repos) > len(images):

    get_missing_images(date, repos, all_rollbacks)

Where get_missing_images just goes through all_rollbacks and selects image with nearest date for each missing image.

def get_missing_images(date, repos, all_rollbacks):

    images = all_rollbacks[date]  # select available images

    found_services = [list(rb.values())[0] for rb in images]  # get services from images

    missing = list(set(repos) - set(found_services))  # substract to get missing

    for service in missing:  # populate images with rollback for every missing

        rollback = get_nearest_date(service, date, all_rollbacks)

        if rollback is None:

            print(f"Previous rollback for {service} not found")

        else:

            images += [rollback]




def get_nearest_date(repo, date, all_rollbacks):

    for d, images in reversed(all_rollbacks.items()):

        if d < date:

            for rollback, image in images[0].items():

                if image == repo:

                    return {rollback: image}

    return None

After we have our images populated we need to get marathon service ids. Our marathon ids uses standard /<department>/<environment>/<project>/<service-name>. At this step we have only service-name, so we should create a binding to Maration id.

We can do it by listing all applications running in Maration and filtering them by the environment and service name (I haven’t found better solution).

def get_service_ids(env: str, images: list, user: str, pwd: str) -> dict:

    ids_only = get_marathon_ids_for_env(env, user, pwd)  # all running services for env

    services = {}

    for rollback in images:

        tag = list(rollback.keys())[0]

        id_part = rollback[tag]

        real_id = list(filter(lambda x: x.endswith(id_part), ids_only))  # filter by service-name

        if not real_id:

            raise Exception(f"Id {id_part} not found")

        services[real_id[0]] = tag

    return services




def get_marathon_ids_for_env(env: str, user: str, pwd: str):

    res = call_with_output(f'dcos auth login --username={user} --password={pwd}')

    if res.decode().strip() != 'Login successful!':

        raise Exception("Can't login to dcos cli")

    all_services = call_with_output('dcos marathon app list')

    matched = list(filter(lambda x: x.startswith(f"/ds/{env}"),

                          all_services.decode().split('\n')))

    return [m.split(' ')[0] for m in matched]

After we have service ids we can iterate through them and do a rollback for each. Add this to your run method:

services = get_service_ids(env, images, user, pwd)

for service_id, service_tag in services.items():

    if slack is not None:

        notify_slack(slack, f"Rollback { service_id }: { service_tag }")

    print(do_deploy(service_id, service_tag))

Well, that’s all. Don’t forget to add slack notifications for the rollback.

Jenkins side

Python part was the most complex. On Jenkins side you just need to call Ansible with run tag and selected date.

stage('Rollback') {

 when {

    expression {

         return env.DATE != null

    }

 }
 
 steps {

    ansiblePlaybook(

            playbook: "${env.PLAYBOOK_ROOT}/rollback_service.yaml",
            
            inventory: "inventories/dev/hosts.ini",

            credentialsId: <your git user credentials id>',

            extras: '-e "date=' + "${env.DATE}" + ' env=' + "${params.environment}" + ' slack=' + "${env.SLACK_CALLBACK}" + ' dump_path=' + "/tmp" + '" -v',

            tags: "run")

 }

}

Summing up

Current solution is quite complex, but it allows you to run rollbacks both from Ansible via cli and from Jenkins. The second one is preferred, as you can see the user who approved the rollback.

Have a nice firefighting and hope you’ll never have a need in rollbacks!

Ansible and Jenkins – automate your scritps

Ansible and Jenkins – automate your scritps

The topic I’d like to reveal in this article may seem obvious, but I was surprised how many companies don’t follow this best practice.

For impatient:

  • Automate every action you’ve done more than once.
  • Don’t use Jenkins static groovy library.
  • Use Jenkins + Ansible + Python for automation.

The problem

Any developer in his work always faces a situation when some action needs to be repeated. Sometimes these actions are urgent and need to be done very quickly. F.e. your prod is down and you need to rebuild indexes on your database, or repopulate images on your dashboard, or re-elect new leader in your distributed back-end.

It is good to remember these 3 golden rules, which can make your life easier:

  • If you repeat an action more, than twice – it should be automated.
  • If there are several steps to be done – they should be put in one script.
  • When there is some complex set up before running these actions – everything should be documented.

Following these rules will decrease the time you usually spend on firefighting. It may seem unnecessary to spend time on such automation from business prospect, but in real life you free your time for development new features, as well as reduce the time needed to fix a problem.

Another problem is a bus factor. When you have manual actions – there will always be a person, who knows critical and unique information. If this person (dies) leaves your company – you won’t be able to fix the problem quickly, as knowledge would be lost. Documented scripts with actions are your friends here.

Custom scripts

At some point all developers come to the rules, mentioned above. They start to automate their actions by creating scripts. It is good, but here hides the danger – such scripts are usually written in different programming languages and are stored in many repositories.

It is hard to maintain such a zoo. And sometimes even hard to find a script for a particular problem. Maybe some scripts will be even re-implemented several times. Be ready for it.

Another problem is the environment. Such scripts are friendly to it’s creator’s environment. And now imagine you’ve found an old script, written in some language you don’t have installed in your system. What should you do to quickly run it and fix the problem?

Jenkins shared libraries

One solution here is to make Jenkins solve your problem. You have groovy shared libraries with scripts, which do fixes you need. And Jenkins jobs, each one for the problem you need to fix. Everything in one repository.

The approach is good, but not the implementation.

It is really hard to develop such scripts. I’ve faced a lot of problems with it, because there is no guarantee, that a code, you’ve tested locally will work in Jenkins. The main reason lies in different Groovy version.

Python scripts

To solve the versioning problem one can use Python + Conda/venv. Python itself is very good for scripting and quite widespread. There is a higher chance somebody in your team knows Python, than Groovy.

With the help of Conda you can use the same Python version everywhere.

I also highly recommend docopt for Python. Do you remember about the third rule of automation? It is much better when your documentation comes together with the code, because it reduces the maintenance difficulty.

Comments in script are not always able to explain you why and how this script should be run and what are the arguments value. The docopt will handle parameters and default values for you as well as printing the help message on every wrong argument provided or just by demand.

#!/usr/bin/env python

"""
Very important script. It should be run when our prod freezes for some seconds. 
It will get all missed transactions, ask for confirmation and process results.


Usage:
  
    transaction.py --issuer=<i> --bank=<b> [--slack=<s>]
  
    transaction.py -h | --help

Options:
  
  -h --help                     show this help message and exit
  
  --issuer=<i>                  Which issuer to use for transaction confirmation.  [default: primary]
  
  --bank=<b>                    Which bank’s backend to use.
  
  --slack=<s>                   slack callback to notify
"""

Ansible + Python

After previous stage you have self-documented version-independent script. A developer’s dream. What can be improved?

First of all they are still a bit coupled with python dependencies. If you are going to use these python scripts as a company standard – you have to force everybody to install conda in order to be able to run these scripts.

Second – you still need a central storage for such scripts. The unique source of truth, where fix for ideally any problem can be found.

To solve both issues you need to use Ansible and have single repository for it’s scripts (in huge companies you should prefer per-department repository).

Every problem, which can be solved with scripts turns into the role. Each role has it’s Readme, where problem and solution are described. Root’s readme points to each role’s readme with a small comment of a problem it solves.

## Problem
Sometimes we have a pikes of a high load. During this load our slaves can loose master and some transactions won’t be processed.
## Solution
Such missed transactions are saved by slaves in a special queue. This role gets these transactions, asks bank’s confirmation for each and processes the results.
## Run
ansible-playbook resolve_transactions.yaml –i inventory/prod/hosts.ini –extra-vars “-i=’primary’ –b=’my_bank’”

It doesn’t replace your Python scripts, as plain ansible scripts are harder to debug and develop. Instead of it all python scripts go into files or templates inside the role and are called as a part of the play.

The minimal scenario usually contains conda creation and deps installation, as well as running script itself (for simplicity this role assumes conda is installed).

---
- block:

    - name: "Copy requirements.txt"

      copy:

        src: "requirements.txt"

        dest: "/tmp/{{ role_name }}/"


    - name: "Copy python executable"

      template:

        src: "transaction.py.j2"

        dest: "/tmp/{{ role_name }}/transaction.py"


    - name: "Create Conda Env for {{ python_version }}"

      shell: "conda create -p /tmp/{{ role_name }}/{{ conda_env }} --copy -y python={{ python_version }}"


    - name: "Run my script"

      shell: "source activate /tmp/{{ role_name }}/{{ conda_env }} && {{ item }}"

      with_items:

        - pip install -r /tmp/{{ role_name }}/requirements.txt

        - "python /tmp/{{ role_name }}/transaction.py

          --issuer={{ issuer }}

          --bank={{ bank }}

          --slack={{ slack }}"

      args:

        executable: /bin/bash


  always:

    - name: "Clean up"

      file:

        state: absent

        path: "/tmp/{{ role_name }}/"

Here you can benefit from ansible variable system:

Group variables are stored per environment, as well as global variables, which are symlinks to all.

Each role can also has it’s specific `default` variables, which are overridden by ansible input to the script.

Now you can transfer the first line support to another department, just pointing them to a single repository with full documentation and scripts. All they need to know is how to run ansible.

Jenkins + Ansible + Python

The problem with first line support is they are usually cheaper and less qualified than usual developers. They also may run Windows and have no idea about what Ansible is. The ideal solution for them is to provide a document with rules like “If you suspect this problem – push that button”. And you can do it with the help of Jenkins.
First of all ensure you have ansible plugin installed.

Second – create credentials for ssh usage.

Third – write a pipeline for every role you wish to create a button for. You can place it in the role’s root directory in front of Readme and make repository root’s Jenkins pipeline scan for all pipelines in roles/ and create child Jenkins pipelines if necessary.

Your typical pipeline would have input params:

parameters {

    choice(choices: ['dev','prod', 'stage'], description: 'On which Environment should I run this script?', name: 'environment')

}

As well as a first step should be cloning your repo with ansible:

stage(Clone Git repository') {

   steps {

        git branch: 'master',

            credentialsId: <some-uuid>',

            url: "${env.PROJECT_REPO}"

    }

}

And calling the ansible playbook itself:

stage('Run ansible script') {

   steps {

       script {

         if (params.environment == 'prod') {

            env.INVENTORY = "inventories/prod/hosts.ini"

            env.ISSUER = "primary"

         } else if(params.environment == 'dev'){

            env.INVENTORY = "inventories/dev/hosts.ini"

            env.ISSUER = "secondary"

         } else if(params.environment == 'stage'){

            env.INVENTORY = "inventories/stage/hosts.ini"

            env.ISSUER = "secondary"

         } else {

            throw new Exception("Unknown environment: ${params.environment}")

         }

       }

       ansiblePlaybook(

            playbook: "${env.PLAYBOOK_ROOT}/deploy_service.yaml",

            inventory: "${env.PLAYBOOK_ROOT}/${env.INVENTORY}",

            credentialsId: '<your-credentials-id>',

            extras: '-e "i=' + "${ env.ISSUER }" + ' b='my_bank"+ '" -v')

  }

}

After creating Jenkins jobs all you need is link them to each role’s readme, as well as any connected project’s readme.

Summing up

  • Automated scripts allow you to fix problems much faster, but they also require some effort to make them easy to use and platform independent.
  • Self-documented scripts allow you to reduce bus factor and onboarding time for newcomers.
  • Centralized repository with standardized tools allows you to do a quick responsibility handover to another team in future.
  • Ansible + Jenkins allows you to fix problem by pressing a single Jenkins button (even when you are at vacation and have only your mobile phone with you) or running an ansible script, when your Jenkins is down.
  • Jenkins Buttons allows you to reduce the qualification requirements and the price of the first line support.

Have a happy firefighting! 🙂

Python & Graphql. Tips, tricks and performance improvements.

Python & Graphql. Tips, tricks and performance improvements.


Recently I’ve finished another back-end with GraphQL, but now on Python. In this article I would like to tell you about all difficulties I’ve faced and narrow places which can affect the performance.

Technology stack: graphene + flask and sqlalchemy integration. Here is a piece of requirements.txt:

graphene
graphene_sqlalchemy
flask
flask-graphql
flask-sqlalchemy
flask-cors
injector
flask-injector

This allow me to map my database entities directly to GraphQL.

It looks like this:

The model:

class Color(db.Model):
  """color table"""
  __tablename__ = 'colors'

  color_id = Column(BigInteger().with_variant(sqlite.INTEGER(), 'sqlite'), primary_key=True)
  color_name = Column(String(50), nullable=False)
  color_r = Column(SmallInteger)
  color_g = Column(SmallInteger)
  color_b = Column(SmallInteger)

The node:

class ColorNode(SQLAlchemyObjectType):
  class Meta:
    model = colours.Color
    interfaces = (relay.Node,)

  color_id = graphene.Field(BigInt)

Everything is simple and nice.

But what are the problems?

Flask context.

At the time of writing this article I was unable to send my context to the GraphQL.

app.add_url_rule('/graphql',
                 view_func=GraphQLView.as_view('graphql',
                 schema=schema.schema,
                 graphiql=True,
                 context_value={'session': db.session})
                 )

This thing didn’t work for me, as view in flask-graphql integration was replaced by flask request.

Maybe this is fixed now, but I have to subclass GrqphQLView to save the context:

class ContexedView(GraphQLView):
  context_value = None

  def get_context(self):
    context = super().get_context()
    if self.context_value:
      for k, v in self.context_value.items():
        setattr(context, k, v)
    return context

CORS support

It is always a thing I forget to add 🙂

For Python Flask just add flask-cors in your requirements and set it up in your create_app method via CORS(app). That’s all.

Bigint type

I had to create my own bigint type, as I use it in the database as primary key in some columns. And there were graphene errors when I try to send int type.

class BigInt(Scalar):
  @staticmethod
  def serialize(num):
    return num

  @staticmethod
  def parse_literal(node):
    if isinstance(node, ast.StringValue) or isinstance(node, ast.IntValue):
      return int(node.value)

  @staticmethod
  def parse_value(value):
    return int(value)

Compound primary key

Also, graphene_sqlalchemy doesn’t support compound primary key out of the box. I had one table with (Int, Int, Date) primary key. To make it resolve by id via Relay’s Node interface I had to override get_node method:

@classmethod
def get_node(cls, info, id):
  import datetime
  return super().get_node(info, eval(id))

datetime import and eval are very important here, as without them date field will be just a string and nothing will work during querying the database.

Mutations with authorization

It was really easy to make authorization for queries, all I needed is to add Viewer object and write get_token and get_by_token methods, as I did many times in java before.

But mutations are called bypassing Viewer and its naturally for GraphQL.

I didn’t want to add authorization code in every mutation’s header, as it leads to code duplication and it’s a little bit dangerous, as I may create a backdoor by simply forgetting to add this code.

So I’ve subclass mutation and reimplement it’s mutate_and_get_payload like this:

class AuthorizedMutation(relay.ClientIDMutation):
  class Meta:
    abstract = True

  @classmethod
  @abstractmethod
  def mutate_authorized(cls, root, info, **kwargs):
    pass

  @classmethod
  def mutate_and_get_payload(cls, root, info, **kwargs):
    # authorize user using info.context.headers.get('Authorization')
    return cls.mutate_authorized(root, info, **kwargs)

All my mutations subclass AuthorizedMutation and just implement their business logic in mutate_authorized. It is called only if user was authorized.

Sortable and Filterable connections

To have my data automatically sorted via query in connection (with sorted options added to the schema) I had to subclass relay’s connection and implement get_query method (it is called in graphene_sqlalchemy).

class SortedRelayConnection(relay.Connection):
  class Meta:
    abstract = True

  @classmethod
  def get_query(cls, info, **kwargs):
    return SQLAlchemyConnectionField.get_query(cls._meta.node._meta.model, info, **kwargs)

Then I decided to add dynamic filtering over every field. Also with extending schema.

Out of the box graphene can’t do it, so I had to add a PR https://github.com/graphql-python/graphene-sqlalchemy/pull/164 and subclass connection once again:

class FilteredRelayConnection(relay.Connection):
  class Meta:
    abstract = True

  @classmethod
  def get_query(cls, info, **kwargs):
    return FilterableConnectionField.get_query(cls._meta.node._meta.model, info, **kwargs)

Where FilterableConnectionField was introduced in the PR.

Sentry middleware

We use sentry as error notification system and it was hard to make it work with graphene. Sentry has good flask integration, but problem with graphene is – it swallows exceptions returning them as errors in response.

I had to use my own middleware:

class SentryMiddleware(object):

  def __init__(self, sentry) -> None:
    self.sentry = sentry

  def resolve(self, next, root, info, **args):
    promise = next(root, info, **args)
    if promise.is_rejected:
      promise.catch(self.log_and_return)
    return promise

  def log_and_return(self, e):
    try:
      raise e
    except Exception:
      traceback.print_exc()
      if self.sentry.is_configured:
      if not issubclass(type(e), NotImportantUserError):
        self.sentry.captureException()
    return e

It is registered on GraphQL route creation:

app.add_url_rule('/graphql',
                 view_func=ContexedView.as_view('graphql',
                 schema=schema.schema,
                 graphiql=True,
                 context_value={'session': db.session},
                 middleware=[SentryMiddleware(sentry)]
                )

Low performance with relations

Everything was well, tests were green and I was happy till my application went to dev environment with real amounts of data. Everything was super slow.

The problem was in sqlalchemy’s relations. They are lazy by default. https://docs.sqlalchemy.org/en/latest/orm/loading_relationships.html

It means – if you have graph with 3 relations: Master -> Pet -> Food and query them all, first query will receive all masters (select * from masters`). F.e. you’ve received 20. Then for each master there will be query (select * from pets where master_id = ?). 20 queries. And finally – N food queries, based on pet return.

My advice here – if you have complex relations and lots of data (I was writing back-end for big data world) you have to make all relations eager. The query itself will be harder, but it will be only one, reducing response time dramatically.

Performance improvement with custom queries

After I made my critical relations eager (not all relations, I had to study front-end app to understand what and how they query) everything worked faster, but not enough. I looked at generated queries and was a bit frightened – they were monstrous! I had to write my own, optimized queries for some nodes.

F.e. if I have a PlanMonthly entity with several OrderColorDistributions, each of it having one Order.

I can use subqueries to limit the data (remember, I am writing back-end for big data) and populate relations with existing data (I anyway had this data in the query, so there was no need to use eager joins, generated by ORM). It will facilitates the request.

Steps:

  1. Mark subqueries with_labels=True
  2. Use root’s (for this request) entity as return one:
    Order.query \
      .filter(<low level filtering here>) \
      .join(<join another table, which you can use later>) \
      .join(ocr_query, Order.order_id == ocr_query.c.order_color_distribution_order_id) \
      .join(date_limit_query,
            and_(ocr_query.c.order_color_distribution_color_id == date_limit_query.c.plans_monthly_color_id,
                 ocr_query.c.order_color_distribution_date == date_limit_query.c.plans_monthly_date,
                 <another table joined previously> == date_limit_query.c.plans_monthly_group_id))
  3. Use contains_eager on all first level relations.
    query = query.options(contains_eager(Order.color_distributions, alias=ocr_query))
  4. If you have second layer of relations (Order -> OrderColorDistribution -> PlanMonthly) chain contains_eager:
    query = query.options(contains_eager(Order.color_distributions, alias=ocr_query)
                 .contains_eager(OrderColorDistribution.plan, alias=date_limit_query))

Reducing number of calls to the database

Besides data rendering level I have my service layer, which knows nothing about GraphQL. And I am not going to introduce it there, as I don’t like high coupling.

But each service needs fetched months data. To use all the data only once and have it in all services, I use injector with @request scope. Remember this scope, it is your friend in GraphQL.

It works like a singleton, but only within one request to /graphql. In my connection I just populate it with plans, found via GraphQL query (including all custom filters and ranges from front-end):

app.injector.get(FutureMonthCache).set_months(found)

Then in all services, which need to access this data I just use this cache:

@inject
def __init__(self,
             prediction_service: PredictionService,
             price_calculator: PriceCalculator,
             future_month_cache: FutureMonthCache) -> None:
  super().__init__(future_month_cache)
  self._prediction_service = prediction_service
  self._price_calculator = price_calculator

Another nice thing is – all my services, which manipulate data and form the request have also @request scope, so I don’t need to calculate predictions for every month. I take them all from cache, do one query and store the results. Moreover, one service can rely on other service’s calculated data. Request scope helps a lot here, as it allows me to calculate all data only once.

On the Node side I call my request scope services via resolver:

def resolve_predicted_pieces(self, _info):
  return app.injector.get(PredictionCalculator).get_original_future_value(self)

It allows me to run heavy calculations only if predicted_pieces were specified in the GraphQL query.

Summing up

That’s all difficulties I’ve faced. I haven’t tried websocket subscriptions, but from what I’ve learned I can say that Python’s GraphQL is more flexible, than Java’s one. Because of Python’s flexibility itself. But if I am going to work on high-load back-end, I would prefer not to use GraphQL, as it is harder to optimize.