To get info about new technologies, perspective products and useful services



To know more about big data, data analysis techniques, tools and projects



To improve your code quality, speed up development process

Category: BigData

Articles about bigdata, data analysis.

Python & Graphql. Tips, tricks and performance improvements.

Python & Graphql. Tips, tricks and performance improvements.

Recently I’ve finished another back-end with GraphQL, but now on Python. In this article I would like to tell you about all difficulties I’ve faced and narrow places which can affect the performance.

Technology stack: graphene + flask and sqlalchemy integration. Here is a piece of requirements.txt:


This allow me to map my database entities directly to GraphQL.

It looks like this:

The model:

class Color(db.Model):
  """color table"""
  __tablename__ = 'colors'

  color_id = Column(BigInteger().with_variant(sqlite.INTEGER(), 'sqlite'), primary_key=True)
  color_name = Column(String(50), nullable=False)
  color_r = Column(SmallInteger)
  color_g = Column(SmallInteger)
  color_b = Column(SmallInteger)

The node:

class ColorNode(SQLAlchemyObjectType):
  class Meta:
    model = colours.Color
    interfaces = (relay.Node,)

  color_id = graphene.Field(BigInt)

Everything is simple and nice.

But what are the problems?

Flask context.

At the time of writing this article I was unable to send my context to the GraphQL.

                 context_value={'session': db.session})

This thing didn’t work for me, as view in flask-graphql integration was replaced by flask request.

Maybe this is fixed now, but I have to subclass GrqphQLView to save the context:

class ContexedView(GraphQLView):
  context_value = None

  def get_context(self):
    context = super().get_context()
    if self.context_value:
      for k, v in self.context_value.items():
        setattr(context, k, v)
    return context

CORS support

It is always a thing I forget to add 🙂

For Python Flask just add flask-cors in your requirements and set it up in your create_app method via CORS(app). That’s all.

Bigint type

I had to create my own bigint type, as I use it in the database as primary key in some columns. And there were graphene errors when I try to send int type.

class BigInt(Scalar):
  def serialize(num):
    return num

  def parse_literal(node):
    if isinstance(node, ast.StringValue) or isinstance(node, ast.IntValue):
      return int(node.value)

  def parse_value(value):
    return int(value)

Compound primary key

Also, graphene_sqlalchemy doesn’t support compound primary key out of the box. I had one table with (Int, Int, Date) primary key. To make it resolve by id via Relay’s Node interface I had to override get_node method:

def get_node(cls, info, id):
  import datetime
  return super().get_node(info, eval(id))

datetime import and eval are very important here, as without them date field will be just a string and nothing will work during querying the database.

Mutations with authorization

It was really easy to make authorization for queries, all I needed is to add Viewer object and write get_token and get_by_token methods, as I did many times in java before.

But mutations are called bypassing Viewer and its naturally for GraphQL.

I didn’t want to add authorization code in every mutation’s header, as it leads to code duplication and it’s a little bit dangerous, as I may create a backdoor by simply forgetting to add this code.

So I’ve subclass mutation and reimplement it’s mutate_and_get_payload like this:

class AuthorizedMutation(relay.ClientIDMutation):
  class Meta:
    abstract = True

  def mutate_authorized(cls, root, info, **kwargs):

  def mutate_and_get_payload(cls, root, info, **kwargs):
    # authorize user using info.context.headers.get('Authorization')
    return cls.mutate_authorized(root, info, **kwargs)

All my mutations subclass AuthorizedMutation and just implement their business logic in mutate_authorized. It is called only if user was authorized.

Sortable and Filterable connections

To have my data automatically sorted via query in connection (with sorted options added to the schema) I had to subclass relay’s connection and implement get_query method (it is called in graphene_sqlalchemy).

class SortedRelayConnection(relay.Connection):
  class Meta:
    abstract = True

  def get_query(cls, info, **kwargs):
    return SQLAlchemyConnectionField.get_query(cls._meta.node._meta.model, info, **kwargs)

Then I decided to add dynamic filtering over every field. Also with extending schema.

Out of the box graphene can’t do it, so I had to add a PR and subclass connection once again:

class FilteredRelayConnection(relay.Connection):
  class Meta:
    abstract = True

  def get_query(cls, info, **kwargs):
    return FilterableConnectionField.get_query(cls._meta.node._meta.model, info, **kwargs)

Where FilterableConnectionField was introduced in the PR.

Sentry middleware

We use sentry as error notification system and it was hard to make it work with graphene. Sentry has good flask integration, but problem with graphene is – it swallows exceptions returning them as errors in response.

I had to use my own middleware:

class SentryMiddleware(object):

  def __init__(self, sentry) -> None:
    self.sentry = sentry

  def resolve(self, next, root, info, **args):
    promise = next(root, info, **args)
    if promise.is_rejected:
    return promise

  def log_and_return(self, e):
      raise e
    except Exception:
      if self.sentry.is_configured:
      if not issubclass(type(e), NotImportantUserError):
    return e

It is registered on GraphQL route creation:

                 context_value={'session': db.session},

Low performance with relations

Everything was well, tests were green and I was happy till my application went to dev environment with real amounts of data. Everything was super slow.

The problem was in sqlalchemy’s relations. They are lazy by default.

It means – if you have graph with 3 relations: Master -> Pet -> Food and query them all, first query will receive all masters (select * from masters`). F.e. you’ve received 20. Then for each master there will be query (select * from pets where master_id = ?). 20 queries. And finally – N food queries, based on pet return.

My advice here – if you have complex relations and lots of data (I was writing back-end for big data world) you have to make all relations eager. The query itself will be harder, but it will be only one, reducing response time dramatically.

Performance improvement with custom queries

After I made my critical relations eager (not all relations, I had to study front-end app to understand what and how they query) everything worked faster, but not enough. I looked at generated queries and was a bit frightened – they were monstrous! I had to write my own, optimized queries for some nodes.

F.e. if I have a PlanMonthly entity with several OrderColorDistributions, each of it having one Order.

I can use subqueries to limit the data (remember, I am writing back-end for big data) and populate relations with existing data (I anyway had this data in the query, so there was no need to use eager joins, generated by ORM). It will facilitates the request.


  1. Mark subqueries with_labels=True
  2. Use root’s (for this request) entity as return one:
    Order.query \
      .filter(<low level filtering here>) \
      .join(<join another table, which you can use later>) \
      .join(ocr_query, Order.order_id == ocr_query.c.order_color_distribution_order_id) \
            and_(ocr_query.c.order_color_distribution_color_id == date_limit_query.c.plans_monthly_color_id,
                 ocr_query.c.order_color_distribution_date == date_limit_query.c.plans_monthly_date,
                 <another table joined previously> == date_limit_query.c.plans_monthly_group_id))
  3. Use contains_eager on all first level relations.
    query = query.options(contains_eager(Order.color_distributions, alias=ocr_query))
  4. If you have second layer of relations (Order -> OrderColorDistribution -> PlanMonthly) chain contains_eager:
    query = query.options(contains_eager(Order.color_distributions, alias=ocr_query)
                 .contains_eager(OrderColorDistribution.plan, alias=date_limit_query))

Reducing number of calls to the database

Besides data rendering level I have my service layer, which knows nothing about GraphQL. And I am not going to introduce it there, as I don’t like high coupling.

But each service needs fetched months data. To use all the data only once and have it in all services, I use injector with @request scope. Remember this scope, it is your friend in GraphQL.

It works like a singleton, but only within one request to /graphql. In my connection I just populate it with plans, found via GraphQL query (including all custom filters and ranges from front-end):


Then in all services, which need to access this data I just use this cache:

def __init__(self,
             prediction_service: PredictionService,
             price_calculator: PriceCalculator,
             future_month_cache: FutureMonthCache) -> None:
  self._prediction_service = prediction_service
  self._price_calculator = price_calculator

Another nice thing is – all my services, which manipulate data and form the request have also @request scope, so I don’t need to calculate predictions for every month. I take them all from cache, do one query and store the results. Moreover, one service can rely on other service’s calculated data. Request scope helps a lot here, as it allows me to calculate all data only once.

On the Node side I call my request scope services via resolver:

def resolve_predicted_pieces(self, _info):
  return app.injector.get(PredictionCalculator).get_original_future_value(self)

It allows me to run heavy calculations only if predicted_pieces were specified in the GraphQL query.

Summing up

That’s all difficulties I’ve faced. I haven’t tried websocket subscriptions, but from what I’ve learned I can say that Python’s GraphQL is more flexible, than Java’s one. Because of Python’s flexibility itself. But if I am going to work on high-load back-end, I would prefer not to use GraphQL, as it is harder to optimize.

How to create avro based table in Impala

How to create avro based table in Impala

Consider the following situation: A bundle of .avro files is stored on HDFS. They need to be converted to Impala tables. Schemas are not provided with files, at least not externally (it’s contained as first line of any avro file). But Impala has a known-issue with avro tables, and its usage is pretty limited: we can create avro based table only if all the columns are manually declared with their types in CREATE TABLE statement, otherwise it will fail with an error.


But what if we have hundreds of columns or just not completely sure in schema and would like to automate process of tables creation?

Disclaimer: you can’t do that directly, but there is  a work around: you have to create temporary avro table in hive, then create as select temporary parquet file as select from avro table and finally run invalidate metadata in impala to catch up all the changes in tables set into impala.

Step by step algorithm:

  1. Check if you have file.avsc along with file.avro , if yes, skip step #2
  2. Create external schema avro file from file.avro
    • Download avro-tools-1.7.7.jar – official tool to work with avro files. Pay attention to a version – it should be 1.7.7.
    • Place it somewhere on your server’s local file system
      # parameters you will need
      # <tmp_local_path>=/tmp/get_avro any random path inside /tmp folder of local file system
      # <file_name> name of the file.avro from where you want to extract schema
      # <hdfs_file_path>  absolute path on hdfs to target file.avro
      # <result_schema_file_path>  path on hdfs where you would like to get created schema
      # create tmp folder
      mkdir -p <tmp_local_path>
      # here we read with cat command first 50Kb of file.avro and store it as file_sample on our local file system
      # depends on amount of columns 50Kb can be not enough. You can try --lines 1, to pick only first line or increase size. 
      hdfs dfs -cat <hdfs_file_path> | head --bytes 50K > <tmp_local_path>/<file_name>_sample
      # use avro-tool jar to retrieve schema from sample file and store it the name of origin file.avro with extension .avsc
      java -jar ~/jars/avro-tools-1.7.7.jar getschema <tmp_local_path>/<file_name>_sample > <tmp_local_path>/<file_name>.avsc
      # copy from local file system back to hdfs our created file.avsc (avro schema)
      hdfs dfs -put <tmp_local_path>/<file_name>.avsc <result_schema_file_path>
      #clean up mess 
      rm -rf <tmp_local_path>
  3. In Hive:
    ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe'
    LOCATION 'hdfs://<orig_file_dir_path>/'
    tblproperties ('avro.schema.url'='hdfs://<path_to_avro_schema_file_on_hdfs>');
    CREATE TABLE IF NOT EXISTS <parq_tmp_tbl_name>
    AS select * from  <avro_tmp_tbl_name>  ;
  4. In impala: Invalidate metadata <parq_tmp_tbl_name>;

That’s all – job is done. This step-by-step algorithm is easy to wrap into any pipeline tool like: oozie, airflow, etc and thus, completely automate routine part of work.