Airflow Tutorial for Beginners - Full Course in 2 Hours 2022
Airflow Tutorial Overview
In this tutorial, the instructor introduces an Apache Airflow tutorial for beginners, combining theory and practical demonstrations. The course covers essential concepts, hands-on examples, and the latest features of Apache Airflow version 2.0.
Introduction to Apache Airflow
- Apache Airflow is an open-source tool designed for creating, scheduling, and monitoring various workflows with tasks that need to run periodically in a specific order.
- It is ideal for workflows involving multiple tasks that require periodic execution in a defined sequence.
- Python forms the basis of Apache Airflow; hence, having prior Python coding experience is beneficial for users.
Setting Up Python Environment
- Ensure your local Python version meets the requirements (3.6 or above) for installing Apache Airflow.
- Create a Python environment using
python3 -m venvand activate it to proceed with installing Apache Airflow locally.
Installing and Initializing Apache Airflow
- Install Apache Airflow by following the commands provided in the official GitHub repository, ensuring compatibility with your local Python version.
- Resolve any installation errors like missing dependencies (e.g., GCC), which may arise during the installation process.
Database Initialization and Web Server Setup
This section focuses on initializing the database for Apache Airflow within a project directory and setting up the web server to manage workflows efficiently.
Database Initialization
- Set up the SQLite database along with necessary configuration files by running
airflow db init.
- Ensure proper directory configuration by exporting the
AIRFLOW_HOMEenvironment variable to your project directory.
Starting Web Server and Scheduler
- Commence the airflow web server using
airflow webserver -p 8080, allowing access through a specified port.
- Activate scheduler functionality by executing
airflow schedulerto enable task scheduling within Apache Airflow.
Executing Workflows and Monitoring Tasks
This segment delves into executing workflows, monitoring task progress, and ensuring successful task completion within Apache Airflow's interface.
Workflow Execution
- Initiate workflow execution by starting the airflow scheduler to schedule tasks as per defined dependencies.
- Monitor task progress through the web interface, observing scheduled tasks turning dark green upon successful execution.
Task Management
- Manage individual tasks within workflows by tracking their status changes from scheduled to executed in real-time.
Official Website and Docker Installation
In this section, the speaker discusses the process of installing Apache Airflow using Docker instead of running it locally.
Installing Docker and Docker Compose
- To install Apache Airflow with Docker, one needs to install Docker and Docker Compose on their laptop.
- After installing, check versions by running commands like
docker --versionanddocker-compose --version.
Setting Up Airflow with Docker Compose
- Download the official docker-compose.yaml file from the Airflow documentation.
- Modify the yaml file to use a local executor instead of Celery executor by changing core executor settings.
Initializing Database and Running Airflow
This part covers initializing the database for Apache Airflow and running it in a containerized environment.
Initializing Database
- Initialize the database using
docker-compose up airflow init.
- Once initialization is complete (exit code 0), proceed to run Airflow in detached mode with
docker-compose up -d.
Checking Containers and Accessing Web Server
- Check running containers with
docker pscommand.
- Access the Airflow web server through a browser using IP address and port number.
Understanding Apache Airflow Concepts
This segment delves into core concepts of Apache Airflow, its origins, workflows, Directed Acyclic Graph (DAG), tasks, and operators.
Core Concepts of Apache Airflow
- Apache Airflow originated as an internal tool at LBMB to manage complex workflows.
- Workflow in Apache Airflow is defined as a Directed Acyclic Graph (DAG), organizing tasks without cycles.
Tasks and Operators
- Tasks are units of work within a DAG represented as nodes; operators determine task execution methods.
Understanding Airflow Concepts
In this section, the speaker explains key concepts related to Airflow, such as execution date, task instances, and deck runs.
Execution Date and Task Instances
- The execution date in Airflow represents the logic date and time for which deck runs and task instances are executed.
- A task instance is a specific run of a task at a particular point in time, known as the execution date.
Deck Runs and Task Execution
- A deck run is an instantiation of a deck that includes task instances running for a specific execution date.
- When a deck run is triggered, its tasks are executed sequentially based on their dependencies.
Task Lifecycle Stages
- Tasks in Airflow go through various stages from start to completion with each stage indicating the status of the task instance.
- There are 11 different stages displayed in the Airflow UI graph and tree views, each represented by a color.
Airflow Task Lifecycle
This part delves into the detailed lifecycle of tasks within Airflow, explaining various stages like scheduled, removed, upstream failed, skipped, running, success, failed, shutdown.
Understanding Task Lifecycle Stages
- Tasks typically start with no status where the scheduler creates an empty task instance before moving through scheduled, removed, upstream failed or skipped stages.
- Scheduled indicates that the scheduler identified that the task instance needs to be run while upstream failed occurs when an upstream task fails.
- If a task is lucky enough to be scheduled successfully, it moves to the running stage where workers execute it.
Handling Task Failures
- Tasks can end in success if completed flawlessly or fail if unsuccessful. Shutdown occurs if a task is aborted during its execution stages.
- Failed or shutdown tasks may be retried if maximum retry limits are not exceeded or rescheduled at intervals for specific cases like file existence checks.
- ,
Airflow Basic Architecture
This segment explores the fundamental architecture of Apache Airflow including components like data engineer responsibilities and interactions between web server scheduler worker dax executor.
Components of Airflow Architecture
- Data engineers play a crucial role in configuring Airflow setups including selecting executors and databases while managing dacs through the user interface supported by web servers.
- ,
- Executors persistently update and retrieve dac information closely connected to database engines like MySQL or PostgreSQL chosen for setup configurations.
New Section
In this section, the process of creating a deck in Airflow and initializing it with a DAG implementation is discussed.
Creating the First Deck
- Initializing a deck named "our first deck wpy" as an instance of the class DAC by importing DAC from Airflow.
- Creating a DAG instance using the
withstatement to encapsulate subsequent code within the scope of the deck.
- Assigning values to parameters like deck ID, description, start date, and execution frequency.
New Section
This part focuses on setting up the initial parameters for the DAC and scheduling its execution.
Setting Initial Parameters
- Defining the unique deck ID and describing the DAC as the first Airflow deck.
- Specifying the start date (July 30) and scheduling it to run daily at 2 am.
- Setting parameters such as start date, scheduler interval, and common parameters for operator initialization.
New Section
Here, additional common parameters are defined along with operator-specific settings for initialization.
Defining Common Parameters
- Setting parameters like owner (code to j), maximum retries (5), retry delay (5 minutes).
- Importing time data package for retry delays and defining default arguments for operators.
New Section
The creation of a simple task using BashOperator is discussed along with resolving import errors.
Creating a Simple Task
- Implementing a task using BashOperator to execute bash commands.
- Resolving import errors related to airflow.operator by changing it to airflow.operators.
New Section
Discusses creating task dependencies within a DAG using downstream/upstream relationships between tasks.
Establishing Task Dependencies
- Defining tasks within a DAG as implementations of operators.
- Linking tasks together by setting one task downstream or upstream of another based on execution order requirements.
New Section
Focuses on fixing errors encountered during task creation in Airflow through code adjustments.
Error Resolution Process
- Identifying errors such as module import issues and incorrect argument names.
Setting up Airflow DAGs with Python Operators
In this section, the process of setting up Airflow Directed Acyclic Graphs (DAGs) using Python Operators is explained in detail.
Setting DAX D4 Act and Describing Deck
- Setting the DAX D4 Act to a defined value and describing it as the first deck using Python Operator.
Scheduling Tasks and Defining Functions
- Setting the start date of the DAC to yesterday, scheduling it daily, and defining a simple Python function named "greet" to output a "hello world" string when executed.
Creating Tasks with Python Operator
- Creating a task using Python Operator to run the "greet" function by importing the module, setting task ID to "greet," and passing the function name as a parameter.
Executing Tasks and Passing Parameters
- Triggering the DAC, checking task logs, executing Python functions with parameters using op keywords in Python Operator.
Sharing Information Between Tasks Using XComms
This section explores sharing information between tasks in Airflow using XComms.
Pushing Information to XComms
- Creating a new Python function called "get_name" that returns 'Jerry' as the name and pushing this value into XComms.
Pulling Information from XComms
- Building a task to run the "get_name" function, pulling 'Jerry' from XComms in the greet function using xcom_pull method.
Pushing Multiple Values into XComms
Demonstrating how multiple values can be pushed into XComms within one function.
Pushing First Name and Last Name
- Modifying the "get_name" function to push first name ('Jerry') and last name ('Friedman') into XComms separately.
Pulling Multiple Values from XComms
- Updating code in greet function to pull both first name and last name from XComms keys for display.
Value Sharing and Task Dependencies
In this section, the speaker discusses sharing values between tasks using x-coms and updating task dependencies in Airflow.
Updating Task Dependencies
- Update task dependencies by adding task 3 upstream of task 1 after setting the value via x-com with a key of 'h' and task ids as 'get age'.
- X-coms have a maximum size limit of 48 kilobytes, emphasizing not to use them for large data sharing to prevent crashes.
Task Flow API Implementation
This part focuses on reducing code complexity by utilizing the Task Flow API in Airflow.
Implementing Task Flow API
- Rewrite the Python operator DAG using the Task Flow API to reduce code length.
- Define tasks such as 'hello world etl' using decorators and create three tasks: get name, get age, and greet with respective functions.
Building Task Dependencies
Establishing dependencies between tasks using the Task Flow API in Airflow.
Defining Task Relationships
- Create functions for each task (get name, get age, greet), ensuring proper dependency calculation by calling functions to pass variables.
- Automatically calculate dependencies within the Task Flow API without manual intervention.
Execution and Verification
Executing DAG instances and verifying successful execution in Airflow.
Executing Tasks
- Pass variables from get name and get age functions to greet function for successful execution.
- Turn on the DAG, check log outputs for desired messages, and verify successful push-pull operations of x-com values.
Enhancing Functionality
Modifying tasks to return additional information and optimizing code efficiency in Airflow.
Customizing Tasks
- Modify functions to return first name and last name along with age parameters for enhanced functionality.
Setting Up Airflow DAGs and Custom Schedules
In this section, the process of setting up Airflow Directed Acyclic Graphs (DAGs) and custom schedules using cron expressions is explained in detail.
Creating and Executing DAGs
- The process involves navigating to a specific date range using the deck ID, executing commands, viewing log back views, and refreshing the browser to observe DAG runs.
- Understanding cron expressions as strings with five fields separated by whitespace that represent time sets the foundation for scheduling intervals in Airflow.
Customizing Schedule Intervals
- Utilizing cron expressions allows for flexible scheduling beyond presets like daily or hourly intervals provided by Airflow.
- Creating a new DAG file in VS Code involves defining tasks with operators like bash operator, setting start dates, and configuring daily scheduling using cron presets.
Generating Customized Cron Expressions
- A website called crontab.guru offers a visual tool to generate and verify cron expressions for customized schedule intervals.
- By inputting desired parameters such as running tasks weekly on specific days at particular times, users can create tailored cron expressions for their DAG schedules.
Implementing Multiple Weekday Schedules
- Adding multiple weekdays to a cron expression enables running tasks on specified days like Tuesdays and Fridays at set times.
- Adjusting schedule intervals in VS Code based on customized cron expressions allows for precise control over when tasks are executed within the Airflow environment.
Managing Connections in Airflow
This section delves into managing connections in Apache Airflow, essential for connecting to external services such as databases or cloud servers.
Understanding Airflow Connections
- Establishing connections is crucial for accessing external services like database servers or cloud platforms by providing necessary credentials such as host details and passwords.
- Navigating through the Admin panel reveals options to view existing connections and create new ones tailored to specific service requirements within Airflow's UI.
Creating Database Connections
- Through the Admin interface, users can define connection names, types, and relevant details required to establish connections effectively within their workflows.
- Once connections are set up appropriately, they can be utilized with operators like Postgres Operator to interact with databases seamlessly from within an Airflow project environment.
Operator.py Setup and Task Creation
In this section, the operator.py file is set up, and tasks are created using the Postgres operator in Airflow.
Setting Up Operator.py
- : Begin by opening the operator.py file and importing necessary packages.
Creating Tasks with Postgres Operator
- : Define default arguments and initialize a Directed Acyclic Graph (DAG) with essential parameters.
- : Create a task using the Postgres operator by specifying task ID, connection ID for the Postgres database, and SQL query statement.
- : Configure the Postgres connection in the Airflow web server UI by providing necessary details like connection type, schema, username, password, host, and port.
Troubleshooting Task Execution
This section focuses on identifying and resolving issues encountered during task execution using the Postgres operator in Airflow.
Debugging Task Failures
- : Encounter a task failure due to a typo in the host name; rectify it to ensure successful task execution.
- : Address a syntax error in the SQL query statement by correcting data type specifications for successful task completion.
Inserting Data into Database Tables
The process of inserting data into database tables using the Postgres operator is detailed here.
Inserting Data with Postgres Operator
- : Create a task that inserts deck ID and execution date into database tables utilizing the Postgres operator.
- : Encounter an issue related to template variables; resolve it by updating code based on airflow macros documentation for successful data insertion.
Maintaining Data Integrity
Strategies to maintain data integrity while inserting records into database tables are discussed here.
Ensuring Data Integrity
- : Emphasize deleting existing data before insertion to prevent duplication or primary key violations within database tables.
Training and Installing Dependencies
This section covers the process of training and installing dependencies for an Airflow project.
Setting Up Requirements and Dockerfile
- To specify the scikit-learn version, add a line in the requirements file with
scikit-learn==0.24.2.
- Create a Dockerfile in the project root folder to extend the Apache Airflow image by adding
FROM apache/airflow:2.0.1.
Building Docker Image
- Copy the requirements text file to the Docker image using the copy command, upgrade pip, and install Python dependencies.
- Build the extended image using
docker build --tag extending-airflow:latest.
Verifying Installation and Adding Dependencies
This section focuses on verifying installations, adding more Python dependencies, and rebuilding containers.
Verifying Installation
- Change image name in docker-compose.yaml to 'extending-airflow:latest' after building.
- Create a DAG file to check if scikit-learn is installed successfully.
Adding More Dependencies
- Add matplotlib to requirements.txt, create a function 'getmatplotlib', update DAG version, rebuild containers.
- Rebuilding is necessary when changing Python dependencies files locally but not in images or containers.
Customizing Airflow Image
Exploring customizing airflow images from source code.
Customizing Image
- Clone official Airflow GitHub repository for customization.
- Define Python dependencies in docker context files for automatic installation during image build.
minio Setup and Integration with AWS S3
In this section, the speaker demonstrates how to set up Minio, an API-compatible service with AWS S3, in a Docker container. The process involves creating an S3 bucket, uploading files, setting up an Airflow DAG to connect to the bucket, and configuring sensors for file existence checks.
Setting Up Minio in a Docker Container
- Minio is API compatible with AWS S3.
- Launch a Minio Docker container using a command in VS Code.
- Access Minio through ports 9000 (API) and 9001 (console).
Creating and Uploading Files
- Create an S3 bucket named "airflow" and upload a CSV file.
- Set permissions for the bucket and manage files within it.
Building Airflow DAG for File Sensing
- Generate a CSV file in VS Code for upload to the airflow bucket.
- Create an Airflow DAG with sensor operators to check file existence.
Configuring AWS Connection and Sensors
- Check the installed version of Amazon Airflow provider package.
- Configure sensors using the s3 key sensor operator from the Amazon provider package.
Testing and Troubleshooting
- Set up an AWS connection ID in Airflow UI for sensor operation.
- Resolve issues by updating connection fields as needed.
Sensor Task and Data Upload
In this section, the process of running a sensor task to detect file existence in an Airflow bucket and uploading data to the bucket is discussed.
Sensor Task Execution
- : The sensor task attempts to find the data.csv file every 5 seconds but fails due to a timeout if not found within 30 seconds.
- : Uploading the data.csv file before the timeout allows the sensor task to detect its existence successfully, marking the task as successful.
Data Preparation and Import
- : A CSV file named "orders" containing sales orders data is introduced, emphasizing the need for importing it into a Postgres database for further processing.
- : Creation of a table called "orders" in the Postgres database, followed by importing data from the CSV file into this table.
Creating Python DAC for Data Querying
This section focuses on setting up a Python DAC (Data Access Component) for querying data from Postgres and uploading it to an S3 bucket.
Setting Up Orders Table
- : Initiating a Python script named dac with necessary packages import and defining functions for querying order data from Postgres and uploading it to S3.
- : Creating a function "postgres2s3" within the DAC script to handle querying order data from Postgres and subsequent upload tasks.
Interacting with Postgres Database
- : Checking installed versions of PostgreSQL package, exploring documentation on hooks and operators for interacting with Postgres databases in Airflow.
- : Initializing the PostgreSQL hook in Python code using connection details obtained from Airflow admin page, enabling execution of SQL queries through cursor operations.
Executing Tasks with Postgres Operator
This part delves into executing tasks using the Postgres operator within Airflow DAG (Directed Acyclic Graph) workflows.
Running Python Functions
- : Writing SQL queries within Python functions to extract specific order data based on date criteria, saving results as text files using CSV module operations.
- : Implementing logging mechanisms within Python scripts to track task progress and outcomes during execution via Airflow DAG runs.
Task Execution and Error Handling
- : Configuring tasks with proper IDs using Postgres operators, addressing errors related to SQL syntax issues by correcting table names and query formats.
Python Function Execution and Airflow Documentation
In this section, the speaker discusses how to give the current and next execution date to a Python function using Airflow documentation.
Setting Current and Next Execution Date in Python Function
- The speaker instructs to open the Airflow documentation for version 2.0.1 and search for predefined macros.
- Find the macros "current" and "next execution date" in the list.
- Copy these macros to use them as parameters in the Python function for Airflow to render.
Updating Query Statement with Date Conditions
This part focuses on updating a query statement with specific date conditions based on the current and next execution dates.
Updating Query Statement
- Modify the query statement to include date conditions: greater than or equal to the current execution date and less than the next execution date.
- Use the two macros obtained earlier as parameters in the query.
- Make the text file name dynamic by incorporating the execution date.
DAG Execution and Text File Handling
Here, we delve into executing DAGs, handling text files dynamically, and observing successful runs.
Executing DAG and Handling Text Files
- Save changes, update DAG version, refresh browser page, select latest DAG stack, start it, and await execution.
- Witness two successful DAG runs on April 30th and May 1st.
- Check log files showing orders saved with specific suffixes based on execution dates.
Uploading Orders Text File to S3 Bucket
This segment covers uploading order text files into an S3 bucket using S3 hooks library functionalities.
Uploading Orders Text File
- Confirm Amazon providers package version installed.
- Initialize an S3 hook by setting AWS connection ID from Airflow web server connections page.
- Utilize S3 hook functions like
load_fileto upload local text files to an S3 bucket with specified details.
Using Temporary Files for Workspace Cleanliness
The focus shifts towards maintaining workspace cleanliness by utilizing temporary files instead of saving multiple text files locally.
Implementing Temporary Files
- Import
NamedTemporaryFilemodule fromtempfilepackage in Python.
- Create file objects using
NamedTemporaryFilefor temporary storage before uploading to S3.