Development

Development

To get info about new technologies, perspective products and useful services

BigData

BigData

To know more about big data, data analysis techniques, tools and projects

Refactoring

Refactoring

To improve your code quality, speed up development process

Category: BigData

Articles about bigdata, data analysis.

HowTo: Run Cloudera Quickstart in Docker

HowTo: Run Cloudera Quickstart in Docker

The best way to familiarize yourself with Hadoop ecosystem or to do POC is to play with it in a sandbox. For that Cloudera provides 2 Quick Start options: one is the image for VirtualMashine and another is the Docker image. Both of them contain a small CDH cluster with one DataNode. But this is more than enough for sandbox purpose.

However, most of  data engineering neophytes find it difficult to set it up, since, unfortunately,  official Cloudera installation guide missing some essential parts on twinks after installation. Here you will find quick and easy how to setup QuickStart from Cloudera with Docker image on Linux.

If you have installed docker image already and don’t want to repeat those steps again, just move to the part with the solution or to the fixed docker image.

Run QuickStart according to official documentation

Check if you have docker installed. In terminal type: docker -v to check version. (This HOWTO doesn’t cover docker installation, the manual  from official docker site).

Check status of the docker service on your machine, start it if needed:

sudo systemctl status docker
sudo systemctl start docker

In my case docker was stopped, so I had to start it. Now when docker is installed and ready to work, we need to download QuickStart Image: sudo docker pull cloudera/quickstart:latest.  Depends on your connection download operation can take time – be patient :).

After successful download our Cloudera QuickStart image should appear in the list of docker images: docker images.  Copy <IMAGE ID> of ‘cloudera/quickstart’, we will use it later. In my case it is: ‘4239cd2958c6’. Finally, main command to run docker image as container is:

sudo docker run  --hostname=quickstart.cloudera --privileged=true -t -i -p 8888 -p 7180 4239cd2958c6 /usr/bin/docker-quickstart -d

/usr/bin/docker-quickstart #Entry point to start all CDH services. Provided by cloudera
--hostname=quickstart.cloudera #Required: pseudo-distributed configuration assumes this hostname
--privileged=true #Required: for HBase, MySQL-backed Hive metastore, Hue, Oozie, Sentry, and Cloudera Manager, and possibly others
-t  #Required: once services are started, a Bash shell takes over and will die without this
-i  #Required: if you want to use the terminal, either immediately or attach later
-p 8888  #Recommended: maps the Hue port in the guest to another port on the host
-p 7180  #Recommended: maps the Cloudera Manager port in the guest (7180) to another port on the host
-p [PORT] #Any other ports you want to remap from guest to free host ports. To make it accessible outside container. 
-d   #Optional: runs the container in the background. I would recommend to use this option if you planning to run container constantly on background.

Fix standard image to get it work

Further starts tuning to make your sandbox operational. By default, cloudera manager is not started in container, so, lets enable it first.

Obtain <CONTAINER_ID> value docker ps. In my case it is: ‘5fadd6cb8e0c’

Connect  to QuickStart container shell: docker attach <CONTAINER_ID> and run script to enable cloudera manager: /home/cloudera/cloudera-manager --express

At the end script prints out an address on which you can access cloudera-manager. Surprisingly, you won’t be able to connect to it. That’s happening because we remaped port earlier. And now we need to get correct host port to which our guest port 7180 was mapped.

Detect new port mapping from guest to host: sudo docker port <CONTAINER_ID> <guest_port>. Thus, to connect to Cloudera manager I need to type: ‘0.0.0.0:32771’ and finally we will see this:

 But wait a second, something is red at our Hosts. Let’s take a look on our error closer and fix it. It’s clock offset error:

Basically it means that our ntpd service is either not started or can’t connect to services. The solution is:

date      # will show difference between real date and server one
sudo chkconfig --add ntpd
sudo service ntpd restart 
date      # to make sure that ntpd is working and date is sync

Wait a couple of minutes and check cloudera manager again. As you see error has been gone.

Since now, until container won’t be turned off, ntpd service continue to work properly. If you are lazy to run additional command at cloudera docker container every time it was rebooted, you simply can use my docker image, based on Cloudera QuickStart one, or create one yourself.

Python & Graphql. Tips, tricks and performance improvements.

Python & Graphql. Tips, tricks and performance improvements.


Recently I’ve finished another back-end with GraphQL, but now on Python. In this article I would like to tell you about all difficulties I’ve faced and narrow places which can affect the performance.

Technology stack: graphene + flask and sqlalchemy integration. Here is a piece of requirements.txt:

graphene
graphene_sqlalchemy
flask
flask-graphql
flask-sqlalchemy
flask-cors
injector
flask-injector

This allow me to map my database entities directly to GraphQL.

It looks like this:

The model:

class Color(db.Model):
  """color table"""
  __tablename__ = 'colors'

  color_id = Column(BigInteger().with_variant(sqlite.INTEGER(), 'sqlite'), primary_key=True)
  color_name = Column(String(50), nullable=False)
  color_r = Column(SmallInteger)
  color_g = Column(SmallInteger)
  color_b = Column(SmallInteger)

The node:

class ColorNode(SQLAlchemyObjectType):
  class Meta:
    model = colours.Color
    interfaces = (relay.Node,)

  color_id = graphene.Field(BigInt)

Everything is simple and nice.

But what are the problems?

Flask context.

At the time of writing this article I was unable to send my context to the GraphQL.

app.add_url_rule('/graphql',
                 view_func=GraphQLView.as_view('graphql',
                 schema=schema.schema,
                 graphiql=True,
                 context_value={'session': db.session})
                 )

This thing didn’t work for me, as view in flask-graphql integration was replaced by flask request.

Maybe this is fixed now, but I have to subclass GrqphQLView to save the context:

class ContexedView(GraphQLView):
  context_value = None

  def get_context(self):
    context = super().get_context()
    if self.context_value:
      for k, v in self.context_value.items():
        setattr(context, k, v)
    return context

CORS support

It is always a thing I forget to add 🙂

For Python Flask just add flask-cors in your requirements and set it up in your create_app method via CORS(app). That’s all.

Bigint type

I had to create my own bigint type, as I use it in the database as primary key in some columns. And there were graphene errors when I try to send int type.

class BigInt(Scalar):
  @staticmethod
  def serialize(num):
    return num

  @staticmethod
  def parse_literal(node):
    if isinstance(node, ast.StringValue) or isinstance(node, ast.IntValue):
      return int(node.value)

  @staticmethod
  def parse_value(value):
    return int(value)

Compound primary key

Also, graphene_sqlalchemy doesn’t support compound primary key out of the box. I had one table with (Int, Int, Date) primary key. To make it resolve by id via Relay’s Node interface I had to override get_node method:

@classmethod
def get_node(cls, info, id):
  import datetime
  return super().get_node(info, eval(id))

datetime import and eval are very important here, as without them date field will be just a string and nothing will work during querying the database.

Mutations with authorization

It was really easy to make authorization for queries, all I needed is to add Viewer object and write get_token and get_by_token methods, as I did many times in java before.

But mutations are called bypassing Viewer and its naturally for GraphQL.

I didn’t want to add authorization code in every mutation’s header, as it leads to code duplication and it’s a little bit dangerous, as I may create a backdoor by simply forgetting to add this code.

So I’ve subclass mutation and reimplement it’s mutate_and_get_payload like this:

class AuthorizedMutation(relay.ClientIDMutation):
  class Meta:
    abstract = True

  @classmethod
  @abstractmethod
  def mutate_authorized(cls, root, info, **kwargs):
    pass

  @classmethod
  def mutate_and_get_payload(cls, root, info, **kwargs):
    # authorize user using info.context.headers.get('Authorization')
    return cls.mutate_authorized(root, info, **kwargs)

All my mutations subclass AuthorizedMutation and just implement their business logic in mutate_authorized. It is called only if user was authorized.

Sortable and Filterable connections

To have my data automatically sorted via query in connection (with sorted options added to the schema) I had to subclass relay’s connection and implement get_query method (it is called in graphene_sqlalchemy).

class SortedRelayConnection(relay.Connection):
  class Meta:
    abstract = True

  @classmethod
  def get_query(cls, info, **kwargs):
    return SQLAlchemyConnectionField.get_query(cls._meta.node._meta.model, info, **kwargs)

Then I decided to add dynamic filtering over every field. Also with extending schema.

Out of the box graphene can’t do it, so I had to add a PR https://github.com/graphql-python/graphene-sqlalchemy/pull/164 and subclass connection once again:

class FilteredRelayConnection(relay.Connection):
  class Meta:
    abstract = True

  @classmethod
  def get_query(cls, info, **kwargs):
    return FilterableConnectionField.get_query(cls._meta.node._meta.model, info, **kwargs)

Where FilterableConnectionField was introduced in the PR.

Sentry middleware

We use sentry as error notification system and it was hard to make it work with graphene. Sentry has good flask integration, but problem with graphene is – it swallows exceptions returning them as errors in response.

I had to use my own middleware:

class SentryMiddleware(object):

  def __init__(self, sentry) -> None:
    self.sentry = sentry

  def resolve(self, next, root, info, **args):
    promise = next(root, info, **args)
    if promise.is_rejected:
      promise.catch(self.log_and_return)
    return promise

  def log_and_return(self, e):
    try:
      raise e
    except Exception:
      traceback.print_exc()
      if self.sentry.is_configured:
      if not issubclass(type(e), NotImportantUserError):
        self.sentry.captureException()
    return e

It is registered on GraphQL route creation:

app.add_url_rule('/graphql',
                 view_func=ContexedView.as_view('graphql',
                 schema=schema.schema,
                 graphiql=True,
                 context_value={'session': db.session},
                 middleware=[SentryMiddleware(sentry)]
                )

Low performance with relations

Everything was well, tests were green and I was happy till my application went to dev environment with real amounts of data. Everything was super slow.

The problem was in sqlalchemy’s relations. They are lazy by default. https://docs.sqlalchemy.org/en/latest/orm/loading_relationships.html

It means – if you have graph with 3 relations: Master -> Pet -> Food and query them all, first query will receive all masters (select * from masters`). F.e. you’ve received 20. Then for each master there will be query (select * from pets where master_id = ?). 20 queries. And finally – N food queries, based on pet return.

My advice here – if you have complex relations and lots of data (I was writing back-end for big data world) you have to make all relations eager. The query itself will be harder, but it will be only one, reducing response time dramatically.

Performance improvement with custom queries

After I made my critical relations eager (not all relations, I had to study front-end app to understand what and how they query) everything worked faster, but not enough. I looked at generated queries and was a bit frightened – they were monstrous! I had to write my own, optimized queries for some nodes.

F.e. if I have a PlanMonthly entity with several OrderColorDistributions, each of it having one Order.

I can use subqueries to limit the data (remember, I am writing back-end for big data) and populate relations with existing data (I anyway had this data in the query, so there was no need to use eager joins, generated by ORM). It will facilitates the request.

Steps:

  1. Mark subqueries with_labels=True
  2. Use root’s (for this request) entity as return one:
    Order.query \
      .filter(<low level filtering here>) \
      .join(<join another table, which you can use later>) \
      .join(ocr_query, Order.order_id == ocr_query.c.order_color_distribution_order_id) \
      .join(date_limit_query,
            and_(ocr_query.c.order_color_distribution_color_id == date_limit_query.c.plans_monthly_color_id,
                 ocr_query.c.order_color_distribution_date == date_limit_query.c.plans_monthly_date,
                 <another table joined previously> == date_limit_query.c.plans_monthly_group_id))
  3. Use contains_eager on all first level relations.
    query = query.options(contains_eager(Order.color_distributions, alias=ocr_query))
  4. If you have second layer of relations (Order -> OrderColorDistribution -> PlanMonthly) chain contains_eager:
    query = query.options(contains_eager(Order.color_distributions, alias=ocr_query)
                 .contains_eager(OrderColorDistribution.plan, alias=date_limit_query))

Reducing number of calls to the database

Besides data rendering level I have my service layer, which knows nothing about GraphQL. And I am not going to introduce it there, as I don’t like high coupling.

But each service needs fetched months data. To use all the data only once and have it in all services, I use injector with @request scope. Remember this scope, it is your friend in GraphQL.

It works like a singleton, but only within one request to /graphql. In my connection I just populate it with plans, found via GraphQL query (including all custom filters and ranges from front-end):

app.injector.get(FutureMonthCache).set_months(found)

Then in all services, which need to access this data I just use this cache:

@inject
def __init__(self,
             prediction_service: PredictionService,
             price_calculator: PriceCalculator,
             future_month_cache: FutureMonthCache) -> None:
  super().__init__(future_month_cache)
  self._prediction_service = prediction_service
  self._price_calculator = price_calculator

Another nice thing is – all my services, which manipulate data and form the request have also @request scope, so I don’t need to calculate predictions for every month. I take them all from cache, do one query and store the results. Moreover, one service can rely on other service’s calculated data. Request scope helps a lot here, as it allows me to calculate all data only once.

On the Node side I call my request scope services via resolver:

def resolve_predicted_pieces(self, _info):
  return app.injector.get(PredictionCalculator).get_original_future_value(self)

It allows me to run heavy calculations only if predicted_pieces were specified in the GraphQL query.

Summing up

That’s all difficulties I’ve faced. I haven’t tried websocket subscriptions, but from what I’ve learned I can say that Python’s GraphQL is more flexible, than Java’s one. Because of Python’s flexibility itself. But if I am going to work on high-load back-end, I would prefer not to use GraphQL, as it is harder to optimize.

How to create avro based table in Impala

How to create avro based table in Impala

Consider the following situation: A bundle of .avro files is stored on HDFS. They need to be converted to Impala tables. Schemas are not provided with files, at least not externally (it’s contained as first line of any avro file). But Impala has a known-issue with avro tables, and its usage is pretty limited: we can create avro based table only if all the columns are manually declared with their types in CREATE TABLE statement, otherwise it will fail with an error.

(pic).

But what if we have hundreds of columns or just not completely sure in schema and would like to automate process of tables creation?

Disclaimer: you can’t do that directly, but there is  a work around: you have to create temporary avro table in hive, then create as select temporary parquet file as select from avro table and finally run invalidate metadata in impala to catch up all the changes in tables set into impala.

Step by step algorithm:

  1. Check if you have file.avsc along with file.avro , if yes, skip step #2
  2. Create external schema avro file from file.avro
    • Download avro-tools-1.7.7.jar – official tool to work with avro files. Pay attention to a version – it should be 1.7.7.
    • Place it somewhere on your server’s local file system
      # parameters you will need
      # <tmp_local_path>=/tmp/get_avro any random path inside /tmp folder of local file system
      # <file_name> name of the file.avro from where you want to extract schema
      # <hdfs_file_path>  absolute path on hdfs to target file.avro
      # <result_schema_file_path>  path on hdfs where you would like to get created schema
      
      # create tmp folder
      mkdir -p <tmp_local_path>
      
      # here we read with cat command first 50Kb of file.avro and store it as file_sample on our local file system
      # depends on amount of columns 50Kb can be not enough. You can try --lines 1, to pick only first line or increase size. 
      hdfs dfs -cat <hdfs_file_path> | head --bytes 50K > <tmp_local_path>/<file_name>_sample
      
      # use avro-tool jar to retrieve schema from sample file and store it the name of origin file.avro with extension .avsc
      java -jar ~/jars/avro-tools-1.7.7.jar getschema <tmp_local_path>/<file_name>_sample > <tmp_local_path>/<file_name>.avsc
      
      # copy from local file system back to hdfs our created file.avsc (avro schema)
      hdfs dfs -put <tmp_local_path>/<file_name>.avsc <result_schema_file_path>
      
      #clean up mess 
      rm -rf <tmp_local_path>
      
      
  3. In Hive:
    CREATE EXTERNAL TABLE IF NOT EXISTS <avro_tmp_tbl_name>
    ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe'
    STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat'
    OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat'
    LOCATION 'hdfs://<orig_file_dir_path>/'
    tblproperties ('avro.schema.url'='hdfs://<path_to_avro_schema_file_on_hdfs>');
    
    
    CREATE TABLE IF NOT EXISTS <parq_tmp_tbl_name>
    STORED AS PARQUET
    AS select * from  <avro_tmp_tbl_name>  ;
  4. In impala: Invalidate metadata <parq_tmp_tbl_name>;

That’s all – job is done. This step-by-step algorithm is easy to wrap into any pipeline tool like: oozie, airflow, etc and thus, completely automate routine part of work.