Blog

Airflow Installation manual and workflow example

Airflow (ubuntu manual):

I used the below command, it took me several attempts, so i list here the list of CMD’s that worked for me.

Prerequisites

 

sudo apt-get update –fix-missing

      sudo apt-get -y install build-essential autoconf libtool pkg-config python-opengl python-imaging python-pyrex python-pyside.qtopengl idle-python2.7 qt4-dev-tools qt4-designer libqtgui4 libqtcore4 libqt4-xml libqt4-test libqt4-script libqt4-network libqt4-dbus python-qt4 python-qt4-gl libgle3 python-dev

     sudo apt-get -y install build-essential autoconf libtool pkg-config python-opengl python-imaging python-pyrex python-pyside.qtopengl idle-python2.7 qt4-dev-tools qt4-designer libqtgui4 libqtcore4 libqt4-xml libqt4-test libqt4-script libqt4-network libqt4-dbus python-qt4 python-qt4-gl libgle3 python-dev

      sudo pip install airflow

     sudo apt-get -y install python-setuptools python-dev build-essential

     sudo easy_install pip

 sudo pip install airflow


pip install pystan
apt install -y libmysqlclient-dev 

sudo -H pip install apache-airflow[all_dbs]

sudo -H pip install apache-airflow[devel]

pip install apache-airflow[all]


export AIRFLOW_HOME=~/airflow

# install from pypi using pip
pip install apache-airflow

# initialize the database
airflow initdb

# start the web server, default port is 8080
airflow webserver -p 8080

----------------------------

notice SQLlite is the default DB , it is not possible to run parralel on it. it is used to get started
can you mysql

scales out with mesos.

DAG - is just a containr script to connect all task, cant pass data betwean task via the dag, there is s spesific module for it. 

operator - what runs defacto

Airflow provides operators for many common tasks, including:

  • BashOperator – executes a bash command
  • PythonOperator – calls an arbitrary Python function
  • EmailOperator – sends an email
  • HTTPOperator – sends an HTTP request
  • MySqlOperatorSqliteOperatorPostgresOperatorMsSqlOperatorOracleOperatorJdbcOperator, etc. – executes a SQL command
  • Sensor – waits for a certain time, file, database row, S3 key, etc…

In addition to these basic building blocks, there are many more specific operators: DockerOperatorHiveOperatorS3FileTransferOperatorPrestoToMysqlOperatorSlackOperator… you get the idea!

 

Quick summary of the terms used by Airflow

TASK- a running operator…
Hooks are interfaces to external platforms and databases like Hive, S3, MySQL, Postgres, HDFS, and Pig

Airflow pools can be used to limit the execution parallelism on arbitrary sets of tasks. 

The connection information to external systems is stored in the Airflow metadata database and managed in the UI

Queues

When using the CeleryExecutor, the celery queues that tasks are sent to can be specified. queue is an attribute of BaseOperator,

XComs let tasks exchange messages

Variables are a generic way to store and retrieve arbitrary content or settings as a simple key value store within Airflow.

Branching

Sometimes you need a workflow to branch, or only go down a certain path based on an arbitrary condition which is typically related to something that happened in an upstream task. One way to do this is by using the BranchPythonOperator.

SubDAGs

SubDAGs are perfect for repeating patterns. Defining a function that returns a DAG object is a nice design pattern when using Airflow.

SLAs

Service Level Agreements, or time by which a task or DAG should have succeeded, can be set at a task level as a timedelta

Trigger Rules

Though the normal workflow behavior is to trigger tasks when all their directly upstream tasks have succeeded, Airflow allows for more complex dependency settings.

Jinja Templating

Airflow leverages the power of Jinja Templating and this can be a powerful tool to use in combination with macros (see the Macros section).

——

My first DAG:

https://airflow.apache.org/tutorial.html

confirm no syntax errors:

python ~/airflow/dags/tutorial.py

# print the list of active DAGs
airflow list_dags

# prints the list of tasks the "tutorial" dag_id
airflow list_tasks tutorial

# prints the hierarchy of tasks in the tutorial DAG
airflow list_tasks tutorial --tree


# command layout: command subcommand dag_id task_id date

# testing print_date
airflow test tutorial print_date 2015-06-01

# testing sleep
airflow test tutorial sleep 2015-06-01

Now remember what we did with templating earlier? See how this template gets rendered and executed by running this command:

# testing templated
airflow test tutorial templated 2015-06-01

setting the default interpreter of zeppelin for bootstrapping

When you bootstrap a new EMR zeppelin, once you open the notebook, you will be asked to save the default interpreter. in transient cluster you may want to set the default interpreter automatically.

To set the default interpreter, check /etc/zeppelin/conf/interpreter.json and look for something like:

...
{
  "name": "spark",
  "class": "org.apache.zeppelin.spark.SparkInterpreter",
  "defaultInterpreter": true,
  "editor": {
    "language": "scala",
    "editOnDblClick": false
  }
},
...
{
  "name": "pyspark",
  "class": "org.apache.zeppelin.spark.PySparkInterpreter",
  "defaultInterpreter": false,
  "editor": {
    "language": "python",
    "editOnDblClick": false
  }
}

Now everything seems trivial. We just need to change the defaultInterpreter of spark to false, and defaultInterpreter of pyspark to true.

And then restart the zeppelin

(sudo stop zeppelin; sudo start zeppelin).

 

AWS EMR Hive Dynamic partitioning transformation job example in SQL

This is an example to understand the power of Dynamic partitioning in Hive

 

set hive.exec.dynamic.partition.mode=nonstrict;

CREATE EXTERNAL TABLE IF NOT EXISTS sampledb.myTable(

  GAID string,

  leave_timestamp string,

  latitude  string,

  longitude string,

  stay_time string,

  country string,

  city string,

  Street string,

  house string,

Home_Country string,

Home_City string,

Home_Neighborhood string,

Home_Zip_Code string,

Office_Country string,

Office_City string,

Office_Neighborhood string,

Office_Zip_Code string,

Zip_of_the_location string,

POI_Name string,

POI_Type string,

POI_Tag_Value string,

Altitude string,

Server_Version string,

Ver_No string)

PARTITIONED BY (dt string)

ROW FORMAT DELIMITED FIELDS TERMINATED BY ‘,’ LINES TERMINATED BY ‘\n’

LOCATION ‘s3://myBucket/production/rawdata/online/‘ ;

MSCK REPAIR TABLE sampledb.myTable;

    CREATE EXTERNAL TABLE IF NOT EXISTS sampledb.myTable_parquet(

    GAID string,

    leave_timestamp string,

    latitude  string,

    longitude string,

    stay_time string,

    country string,

    city string,

    Street string,

    house string,

    Home_Country string,

    Home_City string,

    Home_Neighborhood string,

    Home_Zip_Code string,

    Office_Country string,

    Office_City string,

    Office_Neighborhood string,

    Office_Zip_Code string,

    Zip_of_the_location string,

    POI_Name string,

    POI_Type string,

    POI_Tag_Value string,

    Altitude string,

    Server_Version string,

    Ver_No string

    )

    PARTITIONED BY (dt string)

    STORED AS PARQUET

    LOCATION ‘s3://myBucket/production/parquetdata/online_new/‘ ;

MSCK REPAIR TABLE sampledb.myTable_parquet;

INSERT OVERWRITE TABLE sampledb.myTable_parquet  partition (dt)

SELECT

regexp_replace(gaid, ‘”‘,”),

regexp_replace(leave_timestamp, ‘”‘,”) ,

regexp_replace(latitude, ‘”‘,”),

regexp_replace(longitude, ‘”‘,”) ,

regexp_replace(stay_time, ‘”‘,”) ,

regexp_replace(country, ‘”‘,”),

regexp_replace(regexp_replace(city, ‘”‘,”), “‘”,””),

regexp_replace(street, ‘”‘,”),

regexp_replace(house, ‘”‘,”),

regexp_replace(Home_Country, ‘”‘,”),

regexp_replace(regexp_replace(home_city, ‘”‘,”), “‘”,””),

regexp_replace(Home_Neighborhood, ‘”‘,”),

regexp_replace(Home_Zip_Code, ‘”‘,”),

regexp_replace(Office_Country, ‘”‘,”),

regexp_replace(regexp_replace(office_city, ‘”‘,”), “‘”,””),

regexp_replace(Office_Neighborhood, ‘”‘,”),

regexp_replace(Office_Zip_Code, ‘”‘,”),

regexp_replace(Zip_of_the_location, ‘”‘,”),

regexp_replace(POI_Name, ‘”‘,”),

regexp_replace(POI_Type, ‘”‘,”),

regexp_replace(POI_Tag_Value, ‘”‘,”),

regexp_replace(Altitude, ‘”‘,”),

regexp_replace(server_version, ‘”‘,”),

regexp_replace(ver_no, ‘”‘,”),

  dt

FROM sampledb.myTable WHERE dt>=current_date() – interval ‘3’ day;

Recommendations for a new AWS account

AWS offers a number of tools to help secure your account. Many of these measures are not active by default, and you must take direct action to implement them. Here are some recommended practices to consider to help secure your account and its resources:

AWS Identity and Access Management (IAM) [1]
========================================
The two main types of credentials used for accessing your account are passwords and access keys. Both types of credentials can be applied to the root account or to individual IAM users. You should safeguard passwords and access keys as you would any other confidential personal data, and never embed them in publicly accessible code (i.e. a public Git repository). For added security, frequently rotate or update all security credentials.
–  If you have root account access keys, remove them and use IAM roles or user access keys instead.
–  Ensure you have a documented process for adding and removing authorized users. Ultimately, it should fully integrate with an organization’s existing employee provisioning/de-provisioning process.
–  Create IAM groups that reflect organizational roles, and use managed policies to grant specific technical permissions as required.
–  If you have an existing identity federation provider, you can use the AWS Security Token Service to grant external identities secure access to your AWS resources without having to create IAM users.

Logging and Auditing
=====================
AWS provides several different tools to help customers monitor their account activities and trends. AWS recommends all customers enable the following features:
–  Create a security email distribution list to receive security-related notifications. This will make it easier to configure and manage monitoring notifications associated with the monitoring services described below.
–  Create an Amazon Simple Notification Service (Amazon SNS) topic for security notifications and subscribe the security email distribution list to the topic [2]. This will make it easier to create and manage security-related alerts.
–  Enable CloudTrail in all AWS Regions [3], which by default will capture global service events. Enable CloudTrail log file integrity validation and send logs to a central S3 bucket that your security team owns.
–  Configure CloudTrail integration with Amazon CloudWatch Logs and launch the provided AWS CloudFormation template to create CloudWatch alarms for security and network-related API activity.
–  Enable AWS Config. Use the predefined rules CLOUD_TRAIL_ENABLED and RESTRICTED_INCOMING_TRAFFIC to notify the security SNS topic if CloudTrail is disabled for the account or if someone creates insecure security group rules.
–  Create an S3 bucket for storing monitoring data and configure the bucket policy to allow the appropriate services (CloudTrail, AWS Config) to store AWS log and configuration data. For multiple accounts, use a single bucket to consolidate this data and restrict access appropriately. [4]

Billing and Cost Monitoring
============================
AWS forecasting and budgeting services help you accurately plan and monitor your usage and spending levels. Here are steps to establish a baseline for your account:
–  Configure AWS usage and billing reports to get detailed information regarding trends in your account activity. [5]
–  Designate an email distribution list that will receive billing notifications.
–  Create an SNS topic for budget notifications and subscribe to the billing email distribution list to this topic.
–  Create one or more budgets in your account and configure notifications if forecasted spending exceeds your budgeted usage.

Communication with AWS
==================
When a customer creates a new AWS account, AWS captures the primary contact information that it will use for all communication about the account, unless alternate contacts are also added. AWS accounts can include alternate contacts for Billing, Operations, and Security. These contacts will receive copies of relevant notifications and serve as secondary communication points if the primary contact is unavailable. When setting up communication channels with AWS, keep the following best practices in mind:
–  Configure the AWS account contact information with a corporate email distribution list (e.g. aws-<org_name>@yourdomain.com) and company phone number rather than an individual user’s email address or personal cell phone. [6]
–  Configure the account’s alternate contacts to point to a group rather than an individual. For example, create separate email distribution lists for billing, operations, and security and configure these as Billing, Security, and Operations contacts in each active AWS account. This ensures that multiple people will receive AWS notifications and be able to respond, even if someone is on vacation, changes roles, or leaves the company.
–  Sign up for an AWS support plan that aligns with your organization’s support expectations. Business and Enterprise support plans provide additional contact mechanisms (web, chat, and phone) that are especially useful when a customer needs an immediate response from AWS.

For more details you can visit links [7], [8]

References:
=========
[1] https://docs.aws.amazon.com/IAM/latest/UserGuide/best-practices.html
[2] https://docs.aws.amazon.com/sns/latest/dg/GettingStarted.html
[3] https://docs.aws.amazon.com/awscloudtrail/latest/userguide/cloudtrail-user-guide.html
[4] https://docs.aws.amazon.com/AmazonS3/latest/dev/Introduction.html
[5] https://docs.aws.amazon.com/awsaccountbilling/latest/aboutv2/billing-reports.html#turnonreports
[6] https://docs.aws.amazon.com/awsaccountbilling/latest/aboutv2/manage-account-payment.html#account-inf
[7] https://aws.amazon.com/premiumsupport/knowledge-center/security-best-practices/
[8] https://aws.amazon.com/answers/security/aws-secure-account-setup/

Working Cloud Formation example with EMR – adding volumes to Core group and Master group and increasing root partition

The below is a complete working example of an EMR cluster

1 X master node, on demand

2X core nodes on demand.

no task group, not auto scaling.

and mydomain.

notice the MasterInstanceGroup, CoreInstanceGroup section in the json.

adding 320 GB to both core and master, and increasing the root partition to 100GB (maximum supported)

you can you https://jsonformatter.curiousconcept.com/ to reformat the json below. (bug in word press, I dont have a solution for) , sorry about this 🙂

{
“AWSTemplateFormatVersion”: “2010-09-09”,
“Conditions”: {
“Hbase”: {
“Fn::Equals”: [
{
“Ref”: “Applications”
},
“Hbase”
]
},
“Spark”: {
“Fn::Equals”: [
{
“Ref”: “Applications”
},
“Spark”
]
}
},
“Description”: “myProdCluster”,
“Mappings”: {},
“Outputs”: {},
“Parameters”: {
“Applications”: {
“AllowedValues”: [
“Spark”,
“TBD”
],
“Description”: “Cluster setup:”,
“Type”: “String”
},
“CoreInstanceType”: {
“Default”: “r4.4xlarge”,
“Description”: “Instance type to be used for core instances.”,
“Type”: “String”
},
“EMRClusterName”: {
“Default”: “myProdCluster”,
“Description”: “Name of the cluster”,
“Type”: “String”
},
“KeyName”: {
“Default”: “walla_omid”,
“Description”: “Must be an existing Keyname”,
“Type”: “String”
},
“LogUri”: {
“Default”: “s3://aws-logs-1123-eu-west-1/elasticmapreduce/”,
“Description”: “Must be a valid S3 URL”,
“Type”: “String”
},
“MasterInstacneType”: {
“Default”: “r4.4xlarge”,
“Description”: “Instance type to be used for the master instance.”,
“Type”: “String”
},
“NumberOfCoreInstances”: {
“Default”: 2,
“Description”: “Must be a valid number”,
“Type”: “Number”
},
“ReleaseLabel”: {
“Default”: “emr-5.13.0”,
“Description”: “Must be a valid EMR release version”,
“Type”: “String”
},
“S3DataUri”: {
“Default”: “s3://aws-logs-1234-eu-west-1/elasticmapreduce/”,
“Description”: “Must be a valid S3 bucket URL “,
“Type”: “String”
},
“SubnetID”: {
“Default”: “subnet-0647325e”,
“Description”: “Must be Valid public subnet ID”,
“Type”: “String”
}
},
“Resources”: {
“EMRCluster”: {
“DependsOn”: [
“EMRClusterServiceRole”,
“EMRClusterinstanceProfileRole”,
“EMRClusterinstanceProfile”
],
“Properties”: {
“Applications”: [
{
“Name”: “Ganglia”
},
{
“Name”: “Spark”
},
{
“Name”: “Hive”
},
{
“Name”: “Tez”
},
{
“Name”: “Zeppelin”
},
{
“Name”: “Oozie”
},
{
“Name”: “Hue”
},
{
“Name”: “Presto”
},
{
“Name”: “Livy”
}
],
“AutoScalingRole”: “EMR_AutoScaling_DefaultRole”,
“Configurations”: [
{
“Classification”: “hive-site”,
“ConfigurationProperties”: {
“hive.metastore.client.factory.class”: “com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory”
}
},
{
“Classification”: “spark”,
“ConfigurationProperties”: {
“maximizeResourceAllocation”: “true”
}
},
{
“Classification”: “presto-connector-hive”,
“ConfigurationProperties”: {
“hive.metastore.glue.datacatalog.enabled”: “true”
}
},
{
“Classification”: “spark-hive-site”,
“ConfigurationProperties”: {
“hive.metastore.client.factory.class”: “com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory”
}
}
],
“EbsRootVolumeSize” : 100,
“Instances”: {
“AdditionalMasterSecurityGroups”: [
“sg-111”
],
“AdditionalSlaveSecurityGroups”: [
“sg-112”
],

CoreInstanceGroup“: {

“EbsConfiguration”: {
“EbsBlockDeviceConfigs”: [
{
“VolumeSpecification”: {
“SizeInGB”: “320”,
“VolumeType”: “gp2”
},
“VolumesPerInstance”: “1”
}
],
“EbsOptimized”: “true”
},
“InstanceCount” : 2,
“InstanceType” : “r4.4xlarge”,
“Market” : “ON_DEMAND”,
“Name” : “coreNinja”

},
MasterInstanceGroup“: {
“EbsConfiguration”: {
“EbsBlockDeviceConfigs”: [
{
“VolumeSpecification”: {
“SizeInGB”: “320”,
“VolumeType”: “gp2”
},
“VolumesPerInstance”: “1”
}
],
“EbsOptimized”: “true”
},
“InstanceCount” : 1,
“InstanceType” : “r4.4xlarge”,
“Market” : “ON_DEMAND”,
“Name” : “masterNinja”

},
“Ec2KeyName”: {
“Ref”: “KeyName”
},
“Ec2SubnetId”: {
“Ref”: “SubnetID”
},
“TerminationProtected”: false
},
“JobFlowRole”: {
“Ref”: “EMRClusterinstanceProfile”
},
“LogUri”: {
“Ref”: “LogUri”
},
“Name”: {
“Ref”: “EMRClusterName”
},
“ReleaseLabel”: {
“Ref”: “ReleaseLabel”
},
“ServiceRole”: {
“Ref”: “EMRClusterServiceRole”
},
“VisibleToAllUsers”: true
},
“Type”: “AWS::EMR::Cluster”
},

“EMRClusterServiceRole”: {
“Properties”: {
“AssumeRolePolicyDocument”: {
“Statement”: [
{
“Action”: [
“sts:AssumeRole”
],
“Effect”: “Allow”,
“Principal”: {
“Service”: [
“elasticmapreduce.amazonaws.com”
]
}
}
],
“Version”: “2012-10-17”
},
“ManagedPolicyArns”: [
“arn:aws:iam::aws:policy/service-role/AmazonElasticMapReduceRole”
],
“Path”: “/”,
“Policies”: [
{
“PolicyName”: “s3fullaccess”,
“PolicyDocument”: {
“Version”: “2012-10-17”,
“Statement”: [
{
“Effect”: “Allow”,
“Action”: “s3:*”,
“Resource”: “*”
}
]
}
}
]
},
“Type”: “AWS::IAM::Role”
},
“EMRClusterinstanceProfile”: {
“Properties”: {
“Path”: “/”,
“Roles”: [
{
“Ref”: “EMRClusterinstanceProfileRole”
}
]
},
“Type”: “AWS::IAM::InstanceProfile”
},
“EMRClusterinstanceProfileRole”: {
“Properties”: {
“AssumeRolePolicyDocument”: {
“Statement”: [
{
“Action”: [
“sts:AssumeRole”
],
“Effect”: “Allow”,
“Principal”: {
“Service”: [
“ec2.amazonaws.com”
]
}
}
],
“Version”: “2012-10-17”
},
“ManagedPolicyArns”: [
“arn:aws:iam::aws:policy/service-role/AmazonElasticMapReduceforEC2Role”
],
“Path”: “/”
},
“Type”: “AWS::IAM::Role”
},
“TestStep”: {
“Type”: “AWS::EMR::Step”,

“Properties”: {
“ActionOnFailure”: “CONTINUE”,
“HadoopJarStep”: {
“Args”: [
“s3://byoo-emr-bootstrap/bootstrap-emr.sh”
],
“Jar”: “s3://eu-west-1.elasticmapreduce/libs/script-runner/script-runner.jar”
},
“Name”: “CustomBootstrap”,
“JobFlowId”: {
“Ref”: “EMRCluster”
}
}
},
“myDNSRecord”: {
“Type”: “AWS::Route53::RecordSet”,
“DependsOn”: [“EMRCluster”],
“Properties”: {
“HostedZoneName”: “myDomain.”,
“Comment”: “DNS name for my instance. for emr. cloud formation”,
“Name”: “mydomain.com”,
“Type”: “CNAME”,
“TTL”: “600”,
“ResourceRecords”: [
{
“Fn::GetAtt”: [
“EMRCluster”,
“MasterPublicDNS”
]
}
]
}
}
}
}

 

The reason i am sharing this, as the example provided by AWS are not good enough, and it is very confusing to connect the dots.

If this helps you , please “like”  and subscribe 🙂