Installing Livy on a Hadoop Cluster

Purpose

Livy is an open source component to Apache Spark that allows you to submit REST calls to your Apache Spark Cluster. You can view the source code here: https://github.com/cloudera/livy

In this post I will be going over the steps you would need to follow to get Livy installed on a Hadoop Cluster. The steps were derived from the above source code link, however, this post provides more information on how to test it in a more simple manner.

Install Steps

  1. Determine which node in your cluster will act as the Livy server
    1. Note: the server will need to have Hadoop and Spark libraries and configurations deployed on them.
  2. Login to the machine as Root
  3. Download the Livy source code
    cd /opt
    wget https://github.com/cloudera/livy/archive/v0.2.0.zip
    unzip v0.2.0.zip
    cd livy-0.2.0
  4. Get the version of spark that is currently installed on your cluster
    1. Run the following command
      spark-submit --version
    2. Example: 1.6.0
    3. Use this value in downstream commands as {SPARK_VERSION}
  5.  Build the Livy source code with Maven
    /usr/local/apache-maven/apache-maven-3.0.4/bin/mvn -DskipTests=true -Dspark.version={SPARK_VERSION} clean package
  6. Your done!

Steps to Control Livy

Get Status

ps -eaf | grep livy

It will  be listed like the following:

root      9379     1 14 18:28 pts/0    00:00:01 java -cp /opt/livy-0.2.0/server/target/jars/*:/opt/livy-0.2.0/conf:/etc/hadoop/conf: com.cloudera.livy.server.LivyServer

Start

Note: Run as Root

cd /opt/livy-0.2.0/
export SPARK_HOME=/usr/lib/spark
export HADOOP_CONF_DIR=/etc/hadoop/conf
./bin/livy-server start

Once started, the Livy Server can be called with the following host and port:

http://localhost:8998

If you’re calling it from another machine, then you will need to update “localhost” to the Public IP or Hostname of the Livy server.

Stop

Note: Run as Root

cd /opt/livy-0.2.0/
./bin/livy-server stop

Testing Livy

This assumes you are running it from the machine where Livy was installed. Hence why we’re using localhost. If you would like to test it from another machine, then you just need to change “localhost” to the Public IP or Hostname of the Livy server.

  1. Create a new Livy Session
    1. Curl Command
      curl -H "Content-Type: application/json" -X POST -d '{"kind":"spark"}' -i http://localhost:8998/sessions
    2. Output
      HTTP/1.1 201 Created
      Date: Wed, 02 Nov 2016 22:38:13 GMT
      Content-Type: application/json; charset=UTF-8
      Location: /sessions/1
      Content-Length: 81
      Server: Jetty(9.2.16.v20160414)
      
      {"id":1,"owner":null,"proxyUser":null,"state":"starting","kind":"spark","log":[]}
  2. View Current Livy Sessions
    1. Curl Command
      curl -H "Content-Type: application/json" -i http://localhost:8998/sessions
    2. Output
      HTTP/1.1 200 OK
      Date: Tue, 08 Nov 2016 02:30:34 GMT
      Content-Type: application/json; charset=UTF-8
      Content-Length: 111
      Server: Jetty(9.2.16.v20160414)
      
      {"from":0,"total":1,"sessions":[{"id":0,"owner":null,"proxyUser":null,"state":"idle","kind":"spark","log":[]}]}
  3. Get Livy Session Info
    1. Curl Command
      curl -H "Content-Type: application/json" -i http://localhost:8998/sessions/0
    2. Output
      HTTP/1.1 200 OK
      Date: Tue, 08 Nov 2016 02:31:04 GMT
      Content-Type: application/json; charset=UTF-8
      Content-Length: 77
      Server: Jetty(9.2.16.v20160414)
      
      {"id":0,"owner":null,"proxyUser":null,"state":"idle","kind":"spark","log":[]}
  4. Submit job to Livy
    1. Curl Command
      curl -H "Content-Type: application/json" -X POST -d '{"code":"println(sc.parallelize(1 to 5).collect())"}' -i http://localhost:8998/sessions/0/statements
    2. Output
      HTTP/1.1 201 Created
      Date: Tue, 08 Nov 2016 02:31:29 GMT
      Content-Type: application/json; charset=UTF-8
      Location: /sessions/0/statements/0
      Content-Length: 40
      Server: Jetty(9.2.16.v20160414)
      
      {"id":0,"state":"running","output":null}
  5. Get Job Status and Output
    1. Curl Command
      curl -H "Content-Type: application/json" -i http://localhost:8998/sessions/0/statements/0
    2. Output
      HTTP/1.1 200 OK
      Date: Tue, 08 Nov 2016 02:32:15 GMT
      Content-Type: application/json; charset=UTF-8
      Content-Length: 109
      Server: Jetty(9.2.16.v20160414)
      
      {"id":0,"state":"available","output":{"status":"ok","execution_count":0,"data":{"text/plain":"[I@6270e14a"}}}
  6. Delete Session
    1. Curl Command
      curl -H "Content-Type: application/json" -X DELETE -d -i http://localhost:8998/sessions/0
    2. Output
      {"msg":"deleted"}

Installing SparkR on a Hadoop Cluster

Purpose

SparkR is an extension to Apache Spark which allows you to run Spark jobs with the R programming language. This provides the benefit of being able to use R packages and libraries in your Spark jobs. In the case of both Cloudera and MapR, SparkR is not supported and would need to be installed separately.

Installation Steps

Here are the steps you can take to Install SparkR on a Hadoop Cluster:

  1. Execute the following steps on all the Spark Gateways/Edge Nodes
    1. Login to the target machine as root
    2. Install R and other Dependencies
      1. Execute the following to Install
        1. Ubuntu
          sh -c 'echo "deb http://cran.rstudio.com/bin/linux/debian lenny-cran/" >> /etc/apt/sources.list'
          apt-get install r-base r-base-dev
        2. Centos
          1. Install the repo
            rpm -ivh http://mirror.unl.edu/epel/6/x86_64/epel-release-6-8.noarch.rpm
          2. Enable the repo
            1. Edit the /etc/yum.repos.d/epel-testing.repo file with your favorite text editing software
            2. Change all the enabled sections to ‘1’
          3. Clean yum cache
            yum clean all
          4. Install R and Dependencies
            yum install R R-devel libcurl-devel openssl-devel
      2. Test R installation
        1. Start up an R Session
          R
        2. Within the R Shell, execute an addition command to ensure things are ran correctly
          1 + 1
        3. Quit when you’re done
          quit()
      3. Note: R libraries gets installed at “/usr/lib64/R”
    3. Get the version of Spark you currently have installed
      1. Run the following command
        spark-submit --version
      2. Example output: 1.6.0
      3. Replace the Placeholder {SPARK_VERSION} with this value
    4. Install SparkR
      1. Start up the R console
        R
      2. Install the Depending R Packages
        install.packages("devtools")
        install.packages("roxygen2")
        install.packages("testthat")
      3. Install the SparkR Packages
        devtools::install_github('apache/spark@v{SPARK_VERSION}', subdir='R/pkg')
        install.packages('sparklyr')
      4. Close out of the R shell
        quit()
    5. Find the Spark Home Directory and replace the Placeholder {SPARK_HOME_DIRECTORY} with this value
    6. Install the SparkR OS Dependencies
      cd /tmp/
      wget https://github.com/apache/spark/archive/v{SPARK_VERSION}.zip
      unzip v{SPARK_VERSION}.zip
      cd spark-{SPARK_VERSION}
      cp -r R {SPARK_HOME_DIRECTORY}
      cd bin
      cp sparkR {SPARK_HOME_DIRECTORY}/bin/
    7. Run Dev Install
      cd {SPARK_HOME_DIRECTORY}/R/
      sh install-dev.sh
    8. Create a new file “/user/bin/sparkR” and set the contents
      1. Copy the contents of the /usr/bin/spark-shell file to /usr/bin/sparkR
        cp /usr/bin/spark-shell /usr/bin/sparkR
      2. Edit the /usr/bin/sparkR file. Replace “spark-shell” with “sparkR” on the bottom exec command.
    9. Finish install
      sudo chmod 755 /usr/bin/sparkR
    10. Verify that the sparkR command is available
      cd ~
      which sparkR
    11. Your done!

Testing

Upon completion of the installation steps, here are some ways that you can test the installation to verify everything is running correctly.

  • Test from R Console – Run on a Spark Gateway
    1. Start an R Shell
      R
    2. Execute the following commands in the R Shell
      library(SparkR)
      library(sparklyr)
      Sys.setenv(SPARK_HOME='{SPARK_HOME_DIRECTORY}')
      Sys.setenv(SPARK_HOME_VERSION='{SPARK_VERSION}')
      Sys.setenv(YARN_CONF_DIR='{YARN_CONF_DIRECTORY}')
      sc = spark_connect(master = "yarn-client")
    3. If this runs without errors then you know it’s working!
  • Test from SparkR Console – Run on a Spark Gateway
    1. Open the SparkR Console
      sparkR
    2. Verify the Spark Context is available with the following command:
      sc
    3. If the sc variable is listed then you know it’s working!
  • Sample code you can run to test more
    rdd = SparkR:::parallelize(sc, 1:5)
    SparkR:::collect(rdd)

2016 Phoenix Data Conference

The Third Phoenix Data Conference – the largest big data event in the Phoenix Valley had a tremendous response this year. Having established itself as a must attend big data event in the Phoenix area, the event this year attracted thought leaders and key companies discussing latest trends and technologies in the big data space. Over 350 technologists, business leaders, data analysts and engineers attended the 2016 conference

The Phoenix data conference 2016, that concluded last Saturday focussed on practical big data use cases from leading analytics companies like ClouderaSAP, Clairvoyant, MapR, StreamSets, MicrosoftConfluent, SnapLogicDataTorrent, Tresata, AmazonChoice Hotels, MemSQL, Wells Fargo and others. Technology leaders in the big data space shared innovative implementations and advances in the hadoop space. Specific challenges around security, talent availability, technical deployments, managed services etc were discussed by the speakers.

We would like to thank our speakers, sponsors, all the attendees and the volunteers for making the Phoenix Data Conference – 2016 a huge success. The intent of the conference was to encourage local community to come together and learn from other practitioners and experts in this field. We strongly believe that an active  community and exposure to new ideas is key to help improve the local talent base and improve opportunities for every one. We really hope an educated and well informed talent pool will help bring more interesting companies and work to the greater Phoenix area.

None of this would have been possible without the speaker’s time and effort to share their knowledge with us. It was a pleasure to host all of you. We had a good mix of sessions that included techniques to solve problems using different products, lessons learnt based sessions, deep dive into certain technology, and also a showcase of how companies are implementing their big data strategy.

A Big Thank You also to all the attendees, all the discussions and questions made for a very engaging day. You made the event a great success!!

If you missed all this big data fun but are interested to know more, visit the website www.phxdataconference.com or join-in for the monthly Phoenix Hadoop meetup 

 In addition to the regular monthly meetups, and the Phoenix Data Conference – we also hosted an all day hands on session on Hadoop. Based on the interest and the feedback we received, we will be hosting more workshops in the future to provide hands on training for Hadoop.

If you have any additional feedback, suggestions for future events or any questions please do drop us a line at contact@phxdataconference.com. Your input is very valuable to help us organize such events – please keep it coming!

You can also fill out a short survey at

           https://www.surveymonkey.com/r/DJP76R8

Thanks!

The Phoenix Data Conference Team

Building the Next Generation of Leaders @ Phoenix Mobile Festival 2016

We, at Clairvoyant, are passionate about giving back to the entrepreneur ecosystem, especially when it comes to aspiring young students.

At the Phoenix Mobile Festival, this weekend, we conducted a Mobile App Pitch competition providing students with an opportunity to come up with an idea and use case that will help make an impact on the daily life of students at Grand Canyon University. Participation was great and in under two weeks, we received close to 20 applications.  A few teams were shortlisted to present their  ideas, app design and make a pitch at the conference. A panel of judges picked the winners.

Clairvoyant awarded prizes to the top three.

pmc

We are proud and honored by the opportunity to give back, in our own small way, to the community and the young entrepreneurs at GCU.

pmc2

Congratulations to the winners! We are very excited to see the enthusiasm and the drive of these young entrepreneurs and wish them success in their endeavors and help further the cause of making Phoenix a hub of technology innovation.

Finally, a special callout to the organizers of the conference, on running another edition this year.

Clairvoyant is a Chandler-based company providing strategic, architectural and implementation consulting focused on Data Driven Solutions.

Celebrating STEM Women on Women Equality Day

Clairvoyant is a leading technology consulting & services company with presence in North America and Asia. We help organizations build innovative products and platforms using data, analytics and the cloud. It is critical to our success that we have a diverse workforce. Today, we celebrate the women at Clairvoyant for their exceptionalcontribution to our growth and culture that has helped shape our values.

We really feel proud to share testimonials from some of us, of how we stand out as responsible and independent women in STEM at Clairvoyant.

“ I give my best everyday to stand out as a competent, happy professional who is passionate about her work.” ­‐Cora

Team Phoenix

Team Phoenix

“ The first thing I noticed after joining the company is being surrounded by smart people. It’s a rare opportunity you get and I feel lucky to be part of this group.” ­‐Shilpa

“ There are so many high performing, passionate women around me at Clairvoyant, that walking the additional 5 or 10 or 50 miles to deliver what was promised is just me trying to keep up to the quality of work ethic.” ‐Nithya

“Clairvoyant provides me that positive environment for doing my job with all the dedication and to learn new technologies. I stand very happily at clairvoyant as a woman.” ­‐Unnati

Team Pune

Team Pune

“It gives me immense satisfaction to work with critical thinkers as a highly motivated and confident trailblazer.” ‐Amita

“Girl power is further strengthened when we surround ourselves with encouraging colleagues and efficient leadership” ‐Shradha

“At Clairvoyant, I am able to explore my strengths and make my grounds stronger.” ­‐Isha

We at Clairvoyant have always cherished the passion and hard work of our team and it is a platform for our growth. We are proud of what we do because we own it and work together to make things happen. We are a family and we strive to make a difference everyday.

Python upgrade for an existing application

What is Python?

Python is an interpreted, object-oriented, high-level programming language with dynamic semantics. Its high-level built in data structures, combined with dynamic typing and dynamic binding, make it very attractive for Rapid Application Development, as well as for use as a scripting or glue language to connect existing components together. Python’s simple, easy to learn syntax emphasizes readability and therefore reduces the cost of program maintenance. 

Future of Python – Python 3

Python 3.0 was released in 2008. The final 2.x version 2.7 release came out in mid-2010, with a statement of extended support for this end-of-life release. The 2.x branch will see no new major releases after that. 3.x is under active development and has already seen over five years of stable releases, including version 3.3 in 2012, 3.4 in 2014, and 3.5 in 2015. This means that all recent standard library improvements, for example, are only available by default in Python 3.x.

Although Python 3 is the latest generation of the language, many programmers still use Python 2.7, the final update to Python 2, which was released in 2010.

There is currently no clear-cut answer to the question of which version of Python you should use; the decision depends on what you want to achieve. While Python 3 is clearly the future of the language, some programmers choose to remain with Python 2.7 because some older libraries and packages only work in Python 2.

This query of the StackExchange data shows that in Q1, 2016, there have been almost as many Python 3-tagged posts as there were Python 2-tagged posts.

python-stack-exchange-posts

All major releases and active development are on the 3.x version; 2.x is maintained only with bug fixes and new hardware/OS compatibility. There will be no new full-feature releases for the language or standard library.

These reasons drives us to migrate from python 2 to python 3 as “Python 2.x is legacy, Python 3.x is the present and future of the language”, while doing so we have faced many challenges as python 3 is a major milestone release that has lots of changes which led to many changes in our existing codebase and would like to take this opportunity to share our learning on the same.

What is changed ?

Listing down few of the key changes that needs to be taken care while upgrading from Python 2 to 3

Print is a Function Like It Should Have Been All Along

In Python 3, print() is a function and has parentheses just like it’s supposed to. This, alone, makes getting into the language so much easier.

Python 2 : print ‘Hello World !!!’

Python 3 : print(‘Hello World !!!’)

Relative Imports

In Python 2, if you have a package called mypackage and that contains a module called csv.py, it would hide the csv module from the standard library. The code import csv would within mypackage import the local file, and importing from the standard library would become tricky.

In Python 3, this has changed so that import csv would import from the standard library, and to import the local csv.py file you need to write from . import csv and from csv import my_csv needs to be changed to from .csv import my_csv. These are called “relative imports”, and there is also a syntax to import from one level up module above; from .. import csv.

 

map() & filter() function

In Python 2 map() & filter() function returns a list while in Python 3 it returns an iterator.

Parameter unpacking

In Python 2 you have parameter unpacking:

>>> def unpacks(a, (b, c)):
...    return a,b,c

>>> unpacks(1, (2,3))
(1, 2, 3)

Python 3 does not support this, so you need to do your own unpacking:

>>> def unpacks(a, b):
...    return a,b[0],b[1]

>>> unpacks(1, (2,3))
(1, 2, 3)

Text versus binary data

Python 2, you had to mark every single Unicode string with a u at the beginning, like u'Hello'. Guess how often you use Unicode strings? All the time!Guess how often you forget to put that little u on a string? Forget the u and you have a byte sequence instead.

To make the distinction between text and binary data clearer and more pronounced, Python 3 did what most languages created in the age of the internet have done and made text and binary data distinct types that cannot blindly be mixed together (Python predates widespread access to the internet). For any code that only deals with text or only binary data, this separation doesn’t pose an issue. But for code that has to deal with both, it does mean you might have to now care about when you are using text compared to binary data, which is why this cannot be entirely automated.

Division

In Python 2, 5 / 2 == 2

In Python 3, all division between int values result in a float. 5 / 2 == 2.5

Dictionary methods

In Python 2 dictionaries have the methods iterkeys(), itervalues() and iteritems() that return iterators instead of lists. In Python 3 the standard keys(), values() and items() return dictionary views, which are iterators, so the iterator variants become pointless and are removed.

 

Opening a File

In Python 2 there is a file type builtin. This is replaced with various file types in Python 3. You commonly see code in Python 2 that uses file(pathname) which will fail in Python 3. Replace this usage with open(pathname).

Python 2 : test_file = file(‘/Users/test.csv’)

Python 3 : test_file = open(‘/Users/test.csv’)

 

Exception (except)

In Python 2 the syntax to catch exceptions have changed from:

except (Exception1, Exception2), target:

to the clearer Python 3 syntax:

except (Exception1, Exception2) as target:

 

References:

http://python3porting.com/differences.html

https://docs.python.org/3.0/whatsnew/3.0.html

 

Work Flow Management for Big Data: Guide to Airflow (part 3)

Data analytics has been playing a key role in the decision making process at various stages of the business in many industries. In this era of Big Data, the adoption level is going to ONLY increase day by day. It is really overwhelming to see all the Big Data technologies that are popping up every week to cater to various stages of the Big Data Solution implementations. With data being generated at a very fast pace at various sources (applications that automate the business processes), implementing solutions for use cases like “real time data ingestion from various sources”, “processing the data at different levels of the data ingestion” and “preparing the final data for analysis” become challenging. Especially, orchestrating, scheduling, managing and monitoring the pipelines is a very critical task for the Data platform to be stable and reliable. Also, due to the dynamic nature of the data sources, data inflow rate, data schema, processing needs, etc, the work flow management (pipeline generation/maintenance/monitoring) becomes even more challenging.

This is a three part series of which “Overview along with few architectural details of Airflow” was covered as part of the first part. The second part covered the Deployment options for Airflow in Production. This part (the last of the series) will cover the installation steps (with commands) for Airflow.

Part 3: Installation steps for Airflow and its dependencies

Install pip

Installation steps
1
2
sudo yum install epel-release
sudo yum install python-pip python-wheel 

Install Erlang

Installation steps
1
2
sudo yum install wxGTK
sudo yum install erlang

RabbitMQ

Installation steps
1
2
wget https://www.rabbitmq.com/releases/rabbitmq-server/v3.6.2/rabbitmq-server-3.6.2-1.noarch.rpm
sudo yum install rabbitmq-server-3.6.2-1.noarch.rpm

Celery

Installation steps
1
pip install celery

Airflow:Pre-requisites

Installation steps
1
sudo yum install gcc-gfortran libgfortran numpy redhat-rpm-config python-devel gcc-c++

Airflow

Installation steps
1
2
3
4
5
6
# create a home directory for airflow
mkdir ~/airflow
 # export the location to AIRFLOW_HOME variable
export AIRFLOW_HOME=~/airflow
pip install airflow

Initialize the Airflow database

Installation steps
1
airflow initdb

By default Airflow installs with SQLLite DB. Above step would create a airflow.cfg file within “$AIRFLOW_HOME”/ directory. Once this is done, you may want to change the Repository database to some well known (Highly Available) relations database like “MySQL”, Postgress etc. Then reinitialize the database (using airflow initdb command). That would create all the required tables for airflow in the relational database.

Start the Airflow components

Installation steps
1
2
3
4
5
6
# Start the Scheduler
airflow scheduler
# Start the Webserver
airflow webserver
# Start the Worker
airflow worker

Below are few important Configuration points in airflow.cfg file

 

Here we conclude the series and would be updating the part 3 with more steps as and when possible.

 

Work Flow Management for Big Data: Guide to Airflow (part 2)

Data analytics has been playing a key role in the decision making process at various stages of the business in many industries. In this era of Big Data, the adoption level is going to ONLY increase day by day. It is really overwhelming to see all the Big Data technologies that are popping up every week to cater to various stages of the Big Data Solution implementations. With data being generated at a very fast pace at various sources (applications that automate the business processes), implementing solutions for use cases like “real time data ingestion from various sources”, “processing the data at different levels of the data ingestion” and “preparing the final data for analysis” become challenging. Especially, orchestrating, scheduling, managing and monitoring the pipelines is a very critical task for the Data platform to be stable and reliable. Also, due to the dynamic nature of the data sources, data inflow rate, data schema, processing needs, etc, the work flow management (pipeline generation/maintenance/monitoring) becomes even more challenging.

This is a three part series of which “Overview along with few architectural details of Airflow” was covered as part (1) of the first part. This part covers the Deployment options for Airflow in Production.

Part 2: Deployment view: gives a better picture

Based on the needs, one may have to go with a simple setup or a complex setup of Airflow. There are different ways Airflow can be deployed (especially from an Executor point of view). Below are the deployment options along with the description for each.

Standalone mode of deployment

Description: As mentioned in the above section, the typical installation of Airflow to start with would look something like this.

  • Configuration file (airflow.cfg):  which contains the details of where to pick the DAGs from, what Executor to run, how frequently the scheduler should poll the DAGs folder for new definitions, which port to start the webserver on etc..
  • Metadata Repository: Typically Mysql or postgres database is used for this purpose. All the metadata related to the DAGs, thier metrics, Tasks and thier statuses, SLAs, Variables etc are stored here.
  • Webserver: This renders the beautiful UI that shows up all the DAGs, their current states along with the metrics (which are pulled from the Repo).
  • Scheduler: This reads the DAGs, put the details about the DAGs into Repo. It initiates the Executor.
  • Executor: This is responsible for reading the schedule_interval info and creates the instances for the DAGs and Tasks into Repo.
  • Worker: The worker reads the tasks instances and actually perform the tasks and writes the status back to the Repo.       

Distributed mode of deployment

Description: The description for most of the components mentioned in the Standalone section remain the same except for the Executor part and the workers part.

  • RabbitMQ: RabbitMQ is the distributed messaging service that leveraged by Celery Executor to put the task instances into. This is where the workers would typically read the tasks from for execution. Basically there is a broker URL thats exposed by RabbitMQ for the Celery Executor and Workers to talk to.
  • Executor: Here the executor would be Celery executor (configured in airflow.cfg).The Celery executor is configured to point to the RabbitMQ Broker.
  • Workers: The workers are installed on different nodes (based on the requirement) and they are configured to read the tasks info from the RabbitMQ brokers. The workers are also configured with a Worker_result_backend which typically can be configured to be the MySQL repo itself.

Few important points to be noted here are:

  1. The Worker nodes nothing but the airflow installation it self. Just that only the worker process would be started.
  2. The DAG definitions should be in sync on all the nodes (both the primary airflow installation and the Worker nodes)

Distributed mode of deployment with High Availability set up

Description: As part of the setup for high availability of airflow installation, we are assuming that the MySQL repository is configured to be highly available and RabbitMQ would be highly available. The focus is on how to make the airflow components like the Webserver and Scheduler highly available.

The description for most of the components would remain the same as above. Below is what changes:

  • New airflow instance (standby): There would be another instance of airflow setup as a standby.
    • The ones shown in Green is the primary airflow instance.
    • The one in red is the stand by one.
  • A new DAG has to be put in place. Something called “CounterPart Poller”. The purpose of this DAG would be two fold
    • To continuously poll the counter part scheduler to see if it is up and running.
    • If the counter part instance is not reachable (which means the instance is down),
      • declare the current airflow instance as the Primary
      • move the DAGs of the (previous) primary instance out of DAGs folder and move the Counterpart Poller DAG into the DAGs folder.
      • move the actual DAGs into the DAGs folder and move the Counterpart Poller out of DAGs folder on the standby server (the one which declares itself as primary now).

Note that the declaration as primary server by the instances can be done as a flag in some mysql table.

Here is how it would work:

  1. Initially the primary (say instance 1) and standby (say instance 2) schedulers would be up and running. The instance 1 would be declared as primary airflow server in the mysql table.
  2. The DAGs folder for primary instance (instance 1) would contain the actual DAGs and the DAGs folder and the standby instance (say instance 2) would contain Counterpart poller (say PrimaryServerPoller).
    1. Primary server would be scheduling the actual DAGs as required.
    2. Standby server would be running the PrimaryServerPoller which would continuously poll the Primary Airflow scheduler.
  3. Lets assume, the Primary server has gone down. In that case, the PrimaryServerPoller would detect the same and
    1. Declare itself as the primary airflow server in the Mysql table.
    2. move the actual DAGs out of DAGs folder and moves the PrimaryServerPoller DAG into the DAGs folder on the older primary server (instance 1)
    3. move the actual DAGs into DAGs folder and move the PrimaryServerPoller DAG out of DAGs folder on the older standby (instance 2).
  4. So at this point of time, the Primary and Standby servers have swapped their positions.
  5. Even if the airflow scheduler on the current standby server (instance 1) comes back, since there would be only the CounterServerPoller DAG running on it, there would be no harm. And this server (instance 1) would remain to be standby till the current Primary server (instance 2) goes down.
  6. In case the current primary server goes down, the same process would repeat and the airflow running on instance 1 would become the Primary server.

Hopefully this give a better picture of the deployment options for airflow. Please check out part 3 of this series for steps on how to install airflow along with the commands for most of the steps.

IMPALA High Availability

Impala daemon is a core component in the Impala architecture. The daemon process runs on each data node  and is the process to which the clients (Hue,JDBC,ODBC) connect to issue queries.  When a query gets submitted to an Impala daemon ,  that node serves as the coordinator node for that query. Impala daemon acting as the co-ordinator parallelizes the queries and distributes work to other nodes in the Impala cluster. The other nodes transmit partial results back to the coordinator, which constructs the final result set for a query.

It is a recommended practice to run Impalad on each of the Data nodes in a cluster , as Impala takes advantage of the data locality while processing its queries. So most of the time the Impala clients connect to any of the data nodes to run their queries. This might create a single point of failure for the clients if the clients are always issuing queries to a single data node. In addition to that the node acting as a coordinator node for each Impala query potentially requires more memory and CPU cycles than the other nodes that process the query. For clusters running production workloads,  High Availability from the Impala clients standpoint and load distribution across the nodes can be achieved by  having a proxy server or load-balancer to issue queries to impala daemons using a round-robin scheduling.

HAProxy  is free, open source load balancer that can be used as a proxy-server or load balancer to distribute the load across different impala daemons. The high level architecture for this setup looks like below.

impala-high-availability

 

Install the load balancer:

HAProxy can installed and configured on Red Hat Enterprise Linux system and Centos OS using the following instructions.

yum install haproxy

Set up the configuration file: /etc/haproxy/haproxy.cfg.

See the following section for a sample configuration file

global
log         127.0.0.1 local2

chroot      /var/lib/haproxy
pidfile     /var/run/haproxy.pid
maxconn     4000
user       haproxy
group      haproxy
daemon

# turn on stats unix socket
stats socket /var/lib/haproxy/stats

#---------------------------------------------------------------------
# common defaults that all the 'listen' and 'backend' sections will
# use if not designated in their block
#---------------------------------------------------------------------
defaults
mode                   tcp
log                    global
 
retries               3
timeout connect       50000s
timeout client        50000s
timeout server        50000s
maxconn               3000

#---------------------------------------------------------------------
# main frontend which proxys to the backends - change the port
# if you want
#---------------------------------------------------------------------
frontend main *:5000
acl url_static       path_beg       -i /static /images /javascript/stylesheets
acl url_static       path_end       -i .jpg .gif .png .css .js

use_backend static          if url_static
default_backend            impala

#---------------------------------------------------------------------
#static backend for serving up images, stylesheets and such
#---------------------------------------------------------------------
backend static
    balance     roundrobin
    server      static 127.0.0.1:4331 check

#---------------------------------------------------------------------
#round robin balancing between the various backends
#---------------------------------------------------------------------
backend impala
    mode          tcp
    option        tcplog
    balance       roundrobin
    #balance      leastconn
	#---------------------------------------------------------------------
	# Replace the ip addresses with your client nodes ip addresses
	#---------------------------------------------------------------------
    server client1 192.168.3.163:21000
    server client2 192.168.3.164:21000
    server client3 192.168.3.165:21000

Run the following command after done the changes

service haproxy reload;

Note:

The key configuration options are balance and server in the backend impala section. As well as the timeout configuration options in the defaults section. The server with the lowest number of connections receives the connection only when the balance parameter is set to leastconn. If balance parameter is set to roundrobin, the proxy server can issue queries to each connection uses a different coordinator node.

  1. On systems managed by Cloudera Manager, on the page Impala > Configuration > Impala Daemon Default Group, specify a value for the Impala Daemons Load Balancer field. Specify the address of the load balancer in host:port format. This setting lets Cloudera Manager route all appropriate Impala-related operations through the proxy server.
  2. For any scripts, jobs, or configuration settings for applications that formerly connected to a specific datanode to run Impala SQL statements, change the connection information (such as the -i option inimpala-shell) to point to the load balancer instead.

Test Impala through a Proxy for High Availability:

Manual testing with HAProxy:

Stop the impala daemon service one by one and run the queries, check the impala high availability is working fine or not.

Test the impala high availability with shell script using HAProxy:

Run the following shell script and test the impala high availability using HAProxy.

Note: Please change the ‘table_name’ and ‘database_name’ placeholders.

for (( i = 0 ; i < 5; i++ ))
do
    impala-shell -i localhost:5000 -q "select * from {table_name}" -d {database_name}
done

Result: Run the above script and find the usage of load balancer.

  • Query should be executing on different impala daemon nodes for each iteration (when balance is roundrobin).

Work Flow Management for Big Data: Guide to Airflow (part 1)

Data analytics has been playing a key role in the decision making process at various stages of the business in many industries. In this era of Big Data, the adoption level is going to ONLY increase day by day. It is really overwhelming to see all the Big Data technologies that are popping up every week to cater to various stages of the Big Data Solution implementations. With data being generated at a very fast pace at various sources (applications that automate the business processes), implementing solutions for use cases like “real time data ingestion from various sources”, “processing the data at different levels of the data ingestion” and “preparing the final data for analysis” become challenging. Especially, orchestrating, scheduling, managing and monitoring the pipelines is a very critical task for the Data platform to be stable and reliable. Also, due to the dynamic nature of the data sources, data inflow rate, data schema, processing needs, etc, the work flow management (pipeline generation/maintenance/monitoring) becomes even more challenging.

This is a three part series of which “Overview along with few architectural details of Airflow” is covered as part of the first part.

Part 1:Overview along with few architectural details of Airflow

Typical stages of Workflow Management

The typical stages of the life cycle for Workflow Management of Big Data would be as follows:

  • Create Jobs to interact with systems that operate on Data
    • Hive/Presto/HDFS/Postgres/S3 etc
  • (Dynamic) Workflow creation
    • Based on the number of sources, size of data, business logic, variety of data, changes in the schema, and the list goes on.
  • Manage Dependencies between Operations
    • Upstream, Downstream, Cross Pipeline dependencies, Previous Job state, etc.
  • Schedule the Jobs/Operations
    • Calendar schedule, Event Driven, Cron Expression etc.
  • Keep track of the Operations and the metrics of the workflow
    • Monitor the current/historic state of the jobs, the results of the jobs etc.
  • Ensuring Fault tolerance of the pipelines and capability to back fill any missing data, etc.

And the list grows as the complexity increases.

Where to start?

There are various tools that have been developed to solve this problem but each have their own strengths and limitations. Below are few tools that are used in the big data context.

Few (open source) tools that solve the Workflow Management Problem in Big Data space

There are a bunch of Workflow Management Tools out there in the market. few that have support for Big Data operations out of the box, and few that need extensive customization/extensions to support Big Data operations. Please refer to this link to see the list of tools available for exploration.

  • Oozie → Oozie is a workflow scheduler system to manage Apache Hadoop jobs
  • BigDataScript → BigDataScript is intended as a scripting language for big data pipeline
  • Makeflow → Makeflow is a workflow engine for executing large complex workflows on clusters, clouds, and grids
  • Luigi → Luigi is a Python module that helps you build complex pipelines of batch jobs. (This is a strong contender for Airflow)
  • Airflow → Airflow is a platform to programmatically author, schedule and monitor workflows
  • Azkaban → Azkaban is a batch workflow job scheduler created at LinkedIn to run Hadoop jobs
  • Pinball → Pinball is a scalable workflow manager developed at Pinterest

Though most of the above tools meets the basic needs of the workflow management, when it comes to dealing with the complex workflows, only few of the above shine. Luigi, Airflow, Oozie and Pinball are the tools preferred (and are being used in Production) by most teams across the industry. A quick comparison of these tools is available at this link (by Marton – Data Engineer as Facebook).  We observed that none of the existing resources (on the web) talk about architecture graphically and about the setup of Airflow (Workflow Management Tool, open sourced by AirBnB) in production with CeleryExecutor and more importantly on how Airflow needs to be configured to be Highly Available. Hence here is an attempt to share that missing information.

Introduction to Airflow

Airflow is a platform to programmatically author, schedule and monitor data pipelines that meets the needs of almost all of the stages of the life cycle of Workflow Management. The system has been built (by AirBnB) on the below four principles (copied as is from Airflow docs):

  • Dynamic: Airflow pipelines are configuration as code (Python), allowing for dynamic pipeline generation. This allows for writing code that instantiates pipelines dynamically.
  • Extensible: Easily define your own operators, executors and extend the library so that it fits the level of abstraction that suits your environment.
  • Elegant: Airflow pipelines are lean and explicit. Parameterizing your scripts is built into the core of Airflow using the powerful Jinja templating engine.
  • Scalable: Airflow has a modular architecture and uses a message queue to orchestrate an arbitrary number of workers. Airflow is ready to scale to infinity.

Basic concepts of Airflow

  • DAGs – Directed Acyclic Graph – is a collection of all the tasks you want to run, organized in a way that reflects their relationships and dependencies.
    • DAGs are defined as python scripts and are placed in the DAGs folder (could be any location, but needs to be configured in the airflow config file).
    • Once a new DAG is placed into the DAGS folder, the DAGS are picked up by Airflow automatically within a minutes time.
  • Operators – An operator describes a single task in a workflow. While DAGs describe how to run a workflow, Operators determine what actually gets done.
    • Task: Once an operator is instantiated using some parameters, it is referred to as a “task”
    • Task Instance: A task executed at a particular time is called TaskInstance.
  • Scheduling the DAGs/Tasks – The DAGs and Tasks can be scheduled to be run at certain frequency using the below parameters.
    • schedule_interval → Determines when the DAG has to be triggered. This can be a cron expression or a datetime.deltatime object of python.
  • Executors – Once the DAGs, Tasks and the scheduling definitions are in place, someone need to execute the jobs/tasks. Here is where Executors come into picture.
    • There are three types of executors provided by Airflow out of the box.
      • Sequential → A Sequential executor is for test drive that can execute the tasks one by one (sequentially). Tasks can not be parallelized.
      • Local → A local executor is similar to Sequential executor. But it can parallelize task instances locally.
      • Celery → Celery executor is a open source Distributed Tasks Execution Engine that based on message queues making it more scalable and fault tolerant. Message queues like RabbitMQ or Redis can be used along with Celery.
        • This is typically used for production purposes.

***  For advanced concepts of Airflow like Hooks, Pools, Connections, Queues, SLAs etc, please take a look at the Airflow documentation.

Airflow has an edge over other tools in the space

Below are some key features where Airflow has an upper hand over other tools like Luigi and Oozie:

  • Pipelines are configured via code making the pipelines dynamic
  • A graphical representation of the DAG instances and Task Instances along with the metrices.
  • Scalability: Distribution of Workers and Queues for Task execution
  • Hot Deployment of Dags/Tasks
  • Support for Celery, SLAs, great UI for monitoring mertices
  • Has support for Calendar schedule and Cron tab scheduling
  • Back fill: Ability to rerun a particular DAG instance incase of a failure.
  • Variables for making the changes to the Dags/Tasks quick and easy

Architecture of Airflow

Airflow typically constitutes of the below components.

  • Configuration file  → All the configuration points like “which port to run the web server on”, “which executor to use”, “config related to rabbitmq/redis”, workers, dags location, repository etc are configured.
  • Metadata database (mysql or postgres)  → The database where all the metadata related to the dags, dag_runs, tasks, variables are stored.
  • DAGs (Directed Acyclic Graphs) → These are the Workflow definitions (logical units) that contains the task definitions along with the dependencies info. These are the actual jobs that the user would be like to execute.
  • Scheduler  → A component that is responsible for triggering the dag_instances and job instances for each dag. The scheduler is also responsible for invoking the Executor (be it Local or Celery or Sequential)
  • Broker (redis or rabbitmq)  → In case of a Celery executor, the broker is required to hold the messages and act as a communicator between the executor and the workers.
  • Worker nodes  → the actual workers that execute the tasks and return the resumt of the task.
  • Web server  → A web server that renders the UI for Airflow through which one can view the DAGs, its status, rerun, create variables, connections etc.

So far so good, however, when it comes to deployment of Airflow system on Production systems, based on the needs, one may have to go with a simple setup or a complex setup of Airflow. There are different ways Airflow can be deployed (especially from an Executor point of view). Please check out part 2 of this series to get a better picture on how to deploy Airflow.