Airflow Tutorial for Beginners - Full Course in 2 Hours 2022

Airflow Tutorial for Beginners - Full Course in 2 Hours 2022

Introducción al Tutorial de Airflow

Presentación del Curso

  • Bienvenida a todos al tutorial de Airflow para principiantes, un curso completo de dos horas que combina teoría y demostraciones prácticas.
  • No se requieren conocimientos previos, aunque se recomienda tener conocimientos básicos de Python para facilitar el aprendizaje.

Contenido del Curso

  • Se presentará Apache Airflow y cómo ejecutarlo localmente en un entorno Python mediante ejemplos prácticos.
  • Aprenderás sobre conceptos fundamentales de Airflow, ciclo de vida de tareas y arquitectura básica.
  • Se cubrirán operadores como Bash y Python, así como la forma de compartir datos entre tareas usando XComs.

Características Avanzadas y Conexiones Externas

Nuevas Funcionalidades

  • Se abordará la nueva característica introducida en Airflow 2.0: la API Task Flow con ejemplos prácticos.
  • También aprenderás a programar DAG utilizando expresiones cron y conectar servicios externos como bases de datos Postgres y AWS S3.

Instalación y Configuración

  • Instrucciones sobre cómo instalar paquetes de Python en el contenedor Docker de Airflow serán compartidas.
  • El proceso incluye inicializar la base de datos para Airflow después de establecer el directorio principal.

Configuración Inicial y Ejecución

Creación del Usuario

  • Comando para crear una base de datos SQLite, una carpeta de logs y archivos de configuración será ejecutado.
  • Para iniciar sesión en el servidor web, se debe crear un usuario con parámetros específicos desde la terminal.

Ejecución del Scheduler

  • Es necesario iniciar el scheduler para ejecutar los DAG; esto se realiza desde otra terminal asegurando que las variables ambientales estén configuradas correctamente.

¿Cómo instalar Airflow usando Docker?

Instalación de Docker y Docker Compose

  • Para comenzar, se debe visitar el sitio web oficial y buscar la documentación. En lugar de ejecutar Airflow localmente, se optará por instalarlo con Docker.
  • Si se utiliza un laptop Mac o Windows, es necesario instalar la aplicación Docker Desktop. El enlace para descargar está disponible en la descripción.
  • Una vez instalado, se puede verificar que Docker y Docker Compose están funcionando correctamente ejecutando los comandos docker --version y docker-compose --version.

Configuración del archivo docker-compose.yml

  • Se descarga el archivo oficial docker-compose.yml desde la documentación de Airflow mediante un comando en la terminal.
  • Este archivo define varios servicios; por defecto, utiliza el ejecutor Celery. Se cambiará a Local Executor para simplificar la configuración.
  • Se eliminan las dependencias innecesarias como Redis y los trabajadores de Celery, ajustando así el archivo YAML según las necesidades.

Creación de directorios necesarios

  • Es importante crear carpetas para DAGs, logs y plugins. Esto se realiza copiando un comando en la terminal.
  • La identificación del usuario y grupo de Airflow solo es necesaria si se usa Linux; este paso puede ser omitido en macOS.

Inicialización de la base de datos

  • La base de datos se inicializa con el comando docker-compose up airflow init, lo que descargará las imágenes necesarias y configurará un usuario administrador con "airflow" como nombre de usuario y contraseña.

Ejecución del servidor Airflow

  • Para ejecutar Airflow en modo desatendido (detached), se utiliza el comando docker-compose up -d.
  • Se verifica qué contenedores están corriendo utilizando docker ps, donde aparecerán los servicios activos como el servidor web de Airflow y PostgreSQL.

¿Qué es Apache Airflow?

Origen e historia

  • Apache Airflow comenzó como una herramienta interna para gestionar flujos de trabajo complejos en 2014. Fue abierto al público como proyecto incubador en marzo de 2016.

Conceptos clave sobre flujos de trabajo

  • Un flujo de trabajo (workflow) es una secuencia organizada de tareas representadas como un DAG (Directed Acyclic Graph).

Definición del DAG

  • Un DAG organiza todas las tareas que deben ejecutarse reflejando sus relaciones y dependencias; por ejemplo, Task A debe completarse antes que Tasks B y C.

Tareas dentro del DAG

  • Cada tarea es una unidad específica dentro del DAG, representada como un nodo gráfico. Las tareas tienen dependencias entre sí; por ejemplo, Task C depende directamente del resultado de Task A.

Operadores en Airflow

  • Los operadores determinan qué acciones realiza cada tarea. Existen diferentes tipos como BashOperator o PythonOperator que permiten ejecutar comandos específicos o scripts.

Resumen final sobre tareas y operadores

Comprendiendo el Ciclo de Vida de las Tareas en Airflow

Conceptos Esenciales de Airflow

  • La fecha de ejecución es la lógica que determina cuándo se ejecutan un "deck run" y sus instancias de tarea. Por ejemplo, puede haber tres ejecuciones DAG en progreso desde el 1 hasta el 3 de enero de 2021.
  • Un "deck run" es una instancia que contiene tareas que se ejecutan para una fecha específica. Las tareas se ejecutan secuencialmente según sus dependencias.

Etapas del Ciclo de Vida de las Tareas

  • Cada tarea pasa por diferentes etapas desde su inicio hasta su finalización, con un total de 11 estados diferentes que indican el estado actual de la instancia.
  • Los estados incluyen: "en progreso", "éxito", "fallido", entre otros. Se utilizan colores en la interfaz gráfica para representar cada etapa.

Proceso Detallado del Ciclo de Vida

  • El ciclo comienza sin estado; luego, hay cuatro etapas posibles: programada, eliminada, fallida upstream o saltada.
  • Si la tarea es programada, pasa a ser ejecutada por un trabajador cuando los recursos están disponibles. Su estado cambia a "ejecutando".

Resultados y Manejo de Errores

  • Dependiendo del resultado, una tarea puede tener tres estados finales: éxito, fallido o apagado. Si falla y no se excede el máximo reintentos, se programa nuevamente.
  • En algunos casos específicos, una tarea en ejecución puede ser reprogramada si depende de condiciones externas (por ejemplo, la existencia de un archivo).

Resumen del Proceso

  • El proceso completo inicia sin estado; tras ser programado y ejecutado exitosamente por un trabajador, culmina con una ejecución satisfactoria.

Arquitectura Básica de Apache Airflow

Componentes Clave

  • La arquitectura incluye componentes como ingenieros de datos, servidor web, programador y trabajadores. Cada uno tiene responsabilidades específicas dentro del sistema.

Rol del Ingeniero de Datos

  • Los ingenieros son responsables por construir y monitorear procesos ETL y configurar ajustes como tipo de ejecutor y base de datos.

Interacción entre Componentes

  • Todos los componentes están conectados a una base de datos para persistir información sobre los DAG. Se pueden elegir motores como MySQL o Postgres.

Configuración Inicial en Docker

Lanzamiento y Configuración

  • Para iniciar Airflow se utiliza docker compose up -d, seguido por acceder al puerto localhost 8080 para ver la página principal.

Eliminación Ejemplos Predefinidos

  • Antes crear un nuevo DAG propio, es necesario eliminar ejemplos existentes usando docker compose down -v.

Ajustes Finales

Creación de un DAG en Airflow

Introducción a la implementación del DAG

  • Se inicia la creación de un DAG llamado "our first deck wpy" y se abre una implementación DAC. Es necesario importar el DAC desde Airflow y crear una instancia utilizando la declaración with.
  • Se define el ID único del DAG como "our first deck" y se describe como "este es nuestro primer airflow deck". Se establece que comenzará el 30 de julio y se ejecutará diariamente a las 2 AM.

Configuración de parámetros comunes

  • Se configura la fecha de inicio con start_date igual a data time(2021, 7, 30, 2) y el intervalo del programador (scheduler_interval) como diario.
  • Se definen parámetros comunes para inicializar los operadores en default_args, incluyendo propietario, número máximo de reintentos (5), y un retraso entre reintentos de 5 minutos.

Creación de tareas en el DAG

  • Se crea una tarea simple usando el operador Bash para ejecutar comandos. Primero se importa el operador Bash.
  • La tarea se implementa con un mensaje "Hello World", estableciendo su ID como first_task. Al intentar ejecutar, surgen errores en el código.

Resolución de errores

  • Un error indica que no hay módulo llamado airflow.operator.
  • Cambiando "operator" por "operators", se corrige este error. Sin embargo, aparece otro error relacionado con argumentos inválidos para time data.

Ejecución del DAG

  • Tras corregir los errores relacionados con los argumentos, se visualiza correctamente el primer DAG sin errores. Este muestra solo una tarea llamada first_task.
  • El DAG está configurado para ejecutarse desde la fecha de inicio hasta ayer. Al revisar los registros, se confirma que el mensaje ha sido impreso correctamente.

Creación y dependencia de múltiples tareas

  • Se añade una segunda tarea (task_2) que depende del éxito de la primera tarea (task_1). Esta imprime un mensaje indicando su ejecución posterior.
  • Para establecer dependencias entre tareas, se puede definir task_2 como descendiente o task_1 como antecesor.

Implementación adicional de tareas

  • Si se desea agregar una tercera tarea (task_3), esta también debe depender del éxito de task_1, ejecutándose simultáneamente con task_2.
  • Existen diferentes métodos para construir dependencias; uno consiste en añadir directamente las tareas descendientes al primer task.

Métodos alternativos para definir dependencias

  • Otro método utiliza operadores bitwise para definir las relaciones entre las tareas: task_1 >> task_2 y así sucesivamente.
  • También es posible simplificar esta relación utilizando corchetes: [task_2, task_3].

Verificación final del entorno Airflow

  • Antes de continuar, es importante verificar si Airflow está funcionando mediante Docker. Si no lo está, puede iniciarse con comandos específicos.

¿Cómo crear y ejecutar un DAG en Airflow usando Python Operator?

Configuración inicial del DAG

  • Se establece el DAG d4 igual al definido d4 oct, asignando el ID del DAC como "python operator version 01" y describiéndolo como "nuestro primer deck usando python operator".
  • La fecha de inicio del DAC se configura para ayer, 6 de octubre de 2021, programándolo para que se ejecute diariamente.

Creación de la función y tarea

  • Se define una función simple llamada greet, que imprime "Hello World" cuando se ejecuta. Luego, se crea una tarea utilizando el módulo Python Operator.
  • Al refrescar el servidor web de Airflow, no aparecen errores y se visualiza el DAG con una única tarea llamada greet. Se activa el DAC y se verifica que la función ha sido ejecutada correctamente.

Uso de parámetros en funciones

  • Se actualiza la función greet para aceptar parámetros (name y h) e imprimir un mensaje personalizado.
  • Los valores de los parámetros son pasados exitosamente a través del diccionario op_kwargs, mostrando "Hello World, my name is Tom and I am 20 years old".

Compartir información entre tareas

  • Se introduce el concepto de XComms en Airflow, permitiendo compartir información entre diferentes tareas mediante la función de retorno automática.
  • Se crea una nueva función llamada get_name, que devuelve "Jerry". Esta función es utilizada en otra tarea configurada con Python Operator.

Extracción de datos desde XComms

  • En los registros, aparece que el valor devuelto por la función get_name fue almacenado en XComms.
  • Para extraer este valor en la función greet, se utiliza la instancia de tarea (ti), eliminando así el parámetro nombre.

Manejo avanzado con múltiples valores

  • Para enviar múltiples valores a XComms desde una sola función, se modifica la implementación para incluir tanto nombre como apellido.
  • La impresión final incluye ambos nombres ("Jerry Friedman"), confirmando que los valores fueron diferenciados correctamente por sus claves únicas.

Actualización adicional: Obtener edad desde XComms

  • Se crea otra función llamada get_age para almacenar un valor asociado a la edad. Esta también es integrada al flujo del DAG.

¿Cómo gestionar tareas y dependencias en Airflow?

Uso de XComs para compartir valores entre tareas

  • Se utiliza xcom para compartir valores entre tareas, actualizando las dependencias de las tareas al agregar la tarea 3 como upstream de la tarea 1.
  • Es importante tener en cuenta que el tamaño máximo de los xcoms es de solo 48 kilobytes, lo que limita su uso para datos grandes como DataFrames de pandas.

Reescribiendo DAG con Task Flow API

  • Se crea un nuevo archivo Python utilizando la API de flujo de tareas (Task Flow API), comenzando por importar las bibliotecas necesarias.
  • Se define una función llamada hello_world_etl, donde se asignan valores a ID del DAG, fecha de inicio y intervalo de programación.

Definición y dependencia de tareas

  • Se crean tres funciones: get_name, get_age y greet, cada una representada por un decorador específico.
  • Las funciones get_name y get_age devuelven valores que se pasan a la función greet, estableciendo automáticamente las dependencias gracias a la API.

Ejecución del DAG y verificación

  • Después de guardar el DAG, se verifica en el navegador que las dependencias están correctamente definidas.
  • Al ejecutar el DAG, se comprueba que los valores devueltos han sido almacenados correctamente en los XCom.

Manejo de múltiples salidas en funciones

  • Para devolver tanto el primer nombre como el apellido desde la tarea get_name, se modifica la función greet para aceptar nuevos parámetros.
  • Se establece un diccionario con múltiples salidas en el decorador correspondiente, permitiendo así pasar varios valores a través del sistema.

Comparación entre versiones del código

  • La versión reescrita usando Task Flow API reduce significativamente el número total de líneas a aproximadamente 40, mejorando la legibilidad y eficiencia del código.

Creación y configuración básica del DAG

  • Se crea un nuevo DAG llamado "deck" con parámetros básicos utilizando BashOperator; se establece una fecha inicial pasada para su ejecución diaria.

Configuración del parámetro Catch Up

  • El parámetro "catch up" está habilitado por defecto; sin embargo, se puede desactivar manualmente según sea necesario.

Visualización y ejecución del DAG

  • Al visualizar el árbol del DAG después de ejecutarlo, se observa cómo ha corrido desde noviembre 1 hasta noviembre 9 debido al catch up activado inicialmente.

Desactivación del Catch Up

  • Cambiando el parámetro catch up a falso permite ejecutar solo instancias específicas sin retroceder a fechas anteriores no deseadas.

Comandos para ejecutar Backfill

¿Cómo programar tareas en Airflow usando expresiones cron?

Visualización de DAGs y Ejecución

  • Se muestra cómo visualizar el historial de ejecución de un DAG desde el 1 de noviembre hasta el 8 de noviembre de 2021, utilizando un ID específico del DAG.
  • Para crear un DAG en Airflow, es necesario definir el parámetro schedule_interval, que acepta una expresión cron como cadena o un objeto datetime.

Expresiones Cron

  • Una expresión cron consiste en cinco campos separados por espacios que representan un conjunto de tiempos para ejecutar rutinas. Airflow ofrece presets como "diario" y "horario".
  • Se crea un nuevo archivo DAG llamado deck_with_cron_expression.py, donde se define una tarea simple con el operador bash, programada para comenzar el 1 de noviembre y ejecutarse diariamente.

Actualización del Intervalo de Programación

  • Al cambiar el parámetro schedule_interval a una cadena cron específica (0 0 * * *), se actualiza la versión del DAG y se observa que la historia de ejecución coincide con la configuración diaria anterior.
  • Se menciona crontab.guru como herramienta útil para generar y verificar expresiones cron personalizadas.

Generación y Verificación de Expresiones Cron

  • En crontab.guru, se puede ingresar una expresión cron para verificar su validez; si hay errores, el campo se vuelve rojo.
  • Se intenta generar una expresión cron que ejecute tareas semanalmente los martes a las 3 AM. La expresión generada es verificada en crontab.guru.

Programación Semanal Personalizada

  • Para ejecutar tareas semanalmente los martes y viernes a las 3 AM, simplemente se añade "viernes" a la expresión anterior.
  • También se puede usar "martes-viernes" para abarcar múltiples días; ambas formas son válidas al actualizar la programación del DAG.

Conexión a Servicios Externos

  • Al construir un ETL en Airflow, es esencial conectarse a servicios externos como bases de datos (MySQL, PostgreSQL). Esto requiere credenciales adecuadas.
  • Las conexiones pueden ser gestionadas fácilmente desde la interfaz web de Airflow bajo el menú "Admin", donde se pueden agregar nuevas conexiones según sea necesario.

Uso del Operador PostgreSQL

  • Para demostrar cómo utilizar el operador PostgreSQL, primero se debe exponer una base de datos PostgreSQL mediante Docker.
  • Se utiliza DBeaver para conectar con la base de datos PostgreSQL creada previamente; tras verificar la conexión, se crea una nueva base llamada 'test'.

Creación e Inserción en Base de Datos

Creación de Tareas en Airflow con PostgreSQL

Importación y Configuración Inicial

  • Se comienza importando los paquetes necesarios en el archivo operator.py.
  • Se define un auxiliar por defecto y se inicializa un DAG configurando su ID, fecha de inicio e intervalo de programación.

Creación de la Conexión a PostgreSQL

  • Para crear una tarea usando el operador PostgreSQL, se requiere un ID de tarea, un ID de conexión y una consulta SQL.
  • Al configurar la conexión, se debe seleccionar el tipo como PostgreSQL; el esquema es el nombre de la base de datos (en este caso "test") y las credenciales son "airflow" con puerto 5432.

Ejecución y Solución de Errores

  • Si se usa Docker Desktop o MacOS, para conectarse desde un contenedor a localhost se debe usar host.docker.internal.
  • Tras guardar los cambios, al intentar ejecutar la tarea falla debido a un error en el nombre del host. Se corrige a host.docker.internal.

Corrección de Errores en Consultas SQL

  • Un nuevo fallo ocurre por un error sintáctico en la consulta SQL; falta una letra 't' en el tipo de dato.
  • Después de corregirlo, la creación de la tabla se ejecuta correctamente. Se verifica que la tabla fue creada como se definió.

Inserción y Dependencias entre Tareas

  • Se crea otra tarea para insertar datos en la tabla recién creada utilizando nuevamente el operador PostgreSQL.
  • La consulta SQL utiliza variables predefinidas por Airflow para acceder a fechas y IDs.

Manejo de Errores Adicionales

  • Al establecer dependencias entre tareas, surge otro error relacionado con una variable no definida. Se corrige cambiando 'dt' por 'ds'.
  • Tras realizar las correcciones necesarias, se confirma que los datos fueron insertados correctamente en la base de datos.

Prevención de Duplicados

  • Es recomendable eliminar datos antes de insertar nuevos registros para evitar duplicaciones o violaciones a claves primarias.
  • Se añade una tarea para eliminar registros previos antes del proceso de inserción.

Actualización y Verificación Final

  • Luego del ajuste en las tareas, al ejecutar nuevamente no hay errores relacionados con claves primarias.
  • Finalmente, al verificar los registros insertados mediante DeBeaver, se confirman dos entradas correctas sin conflictos.

Instalación de Paquetes Python en Airflow

Métodos para Instalar Dependencias

  • Existen dos métodos principales para instalar paquetes Python: extender o personalizar la imagen Docker utilizada por Airflow.

Extensión vs Personalización

  • Extender es más sencillo ya que solo requiere conocimientos básicos sobre imágenes Docker; no necesita código fuente ni toma mucho tiempo construirla.

Personalización Avanzada

Instalación de Dependencias en Airflow

Extensión de la Imagen de Docker

  • Se añade una línea al archivo de requisitos para incluir scikit-learn versión 0.24.2 y se procede a instalar todas las dependencias definidas.
  • Se crea un Dockerfile en la carpeta raíz del proyecto, extendiendo la imagen oficial de Airflow versión 2.0.1.

Configuración del Dockerfile

  • Se copia el archivo de requisitos al contenedor y se ejecutan comandos para actualizar pip e instalar las dependencias de Python.
  • Se construye la imagen extendida usando el comando docker build, nombrándola como extending airflow latest.

Verificación de Instalación

  • Se modifica el archivo docker-compose.yml para cambiar el nombre de la imagen a extending airflow latest.
  • Se inicializa una instancia DAG y se crea una función llamada get_scikit_learn para imprimir la versión del paquete instalado.

Manejo de Nuevas Dependencias

  • Al intentar agregar matplotlib, surge un error porque los cambios no se reflejan en la imagen del contenedor.
  • Es necesario reconstruir la imagen cada vez que se cambian las dependencias en el archivo.

Personalización Avanzada

  • Después de reconstruir, se verifica que ahora ambas bibliotecas están instaladas correctamente.
  • Para personalizar aún más, se clona el código fuente de Airflow desde GitHub y se crean archivos necesarios para definir nuevas dependencias.

Construcción y Ejecución del Contenedor Personalizado

Creación del Archivo Requisitos

  • En la carpeta adecuada, se crea un archivo con las versiones necesarias para scikit-learn y matplotlib.

Construcción Finalizada

  • La construcción finaliza tras varios minutos, donde todas las dependencias son instaladas automáticamente desde el archivo definido.

Actualización del Comando Compose

  • Se actualiza nuevamente el nombre de la imagen en el archivo docker-compose.yml antes de recrear los contenedores.

Comparativa entre Métodos

Elección del Método Adecuado

  • El método recomendado es extender imágenes por su rapidez; sin embargo, si hay necesidad de optimizar tamaño o personalizar más, es mejor construir desde cero.

Uso Práctico con S3 Compatible

¿Cómo configurar MinIO y Airflow para trabajar con S3?

Configuración de MinIO

  • MinIO es compatible con la API del servicio de almacenamiento en la nube AWS S3, lo que facilita su integración en proyectos ETL.
  • Se muestra un comando de Docker para lanzar un contenedor de MinIO, el cual expone dos puertos: 9000 para la API y 9001 para la consola.
  • Al acceder a la consola web, se requiere ingresar el nombre de usuario y contraseña raíz. Una vez dentro, se puede crear un bucket llamado "airflow".
  • Es importante asegurarse de tener permisos de lectura y escritura al crear el bucket. Luego se puede cargar archivos o crear rutas desde la interfaz.

Generación y carga de archivos CSV

  • En VS Code, se crea una carpeta llamada "data" y un archivo CSV llamado "data.csv" con columnas como ID del producto y fecha de entrega.
  • Después de guardar el archivo, se sube a nuestro bucket "airflow", confirmando que ahora existe en el entorno S3.

Creación del DAG en Airflow

  • Se inicia creando un archivo Python llamado dag_menu_s3.py, donde se importan los paquetes necesarios para definir el DAG (Directed Acyclic Graph).
  • Se verifica la versión del paquete proveedor de Amazon en Airflow e incluso se recomienda revisar videos anteriores sobre cómo extender imágenes Docker.

Configuración del sensor S3

  • Se busca información sobre el sensor S3 en la documentación oficial. Este sensor espera que un archivo esté presente en un bucket específico.
  • Para usarlo, son necesarios parámetros como nombre del bucket, clave del objeto y ID de conexión AWS. Se importa el operador correspondiente desde el directorio adecuado.

Conexión a AWS

  • En la interfaz web de Airflow, se añade una nueva conexión configurando los campos requeridos como claves secretas y detalles específicos del host local.
  • Tras guardar los cambios, se establece esta conexión recién creada como parámetro dentro del DAG antes mencionado.

Ejecución y solución de problemas

  • Al iniciar el DAG por primera vez, surge un error relacionado con las comillas en los campos extra; esto requiere ajustes para corregirlo.

Ejecución de Tareas con Airflow y PostgreSQL

Monitoreo de Archivos CSV en Airflow

  • Se inicia una tarea de sensor que verifica la existencia del archivo data.csv en el bucket de Airflow, pero falla al no encontrarlo dentro del límite de tiempo.
  • Se sube el archivo data.csv justo antes del tiempo de espera; el sensor detecta su existencia inmediatamente después, marcando la tarea como exitosa.

Importación y Configuración de Base de Datos

  • El archivo CSV llamado orders, que contiene órdenes de venta artificiales, se debe importar a una base de datos PostgreSQL.
  • Se crea una tabla llamada orders en la base de datos test, definiendo correctamente los tipos para las cuatro columnas: order id, date, product name y quantity.

Proceso de Importación

  • Se importa el archivo CSV a la tabla orders, asegurándose que los nombres de las columnas coincidan; se recibe un mensaje exitoso tras la importación.
  • Se consulta las primeras 100 filas para verificar que la tabla ha sido configurada correctamente.

Creación del DAC (Directed Acyclic Graph)

  • En VS Code, se crea un archivo Python llamado dac_postgreshook.py para implementar un DAC que consultará datos desde PostgreSQL.
  • Se necesita importar paquetes requeridos y crear una función llamada postgres_to_s3 para realizar dos pasos: consultar datos y subirlos a un bucket S3.

Conexión a PostgreSQL

  • Para interactuar con PostgreSQL, se verifica la versión instalada del paquete correspondiente mediante comandos Docker.
  • La documentación muestra cómo establecer conexión con PostgreSQL utilizando el hook adecuado; se inicializa con el ID de conexión correspondiente.

Ejecución y Manejo de Errores

  • Al ejecutar consultas SQL, es crucial corregir errores como nombres incorrectos o sintaxis inadecuada.
  • Tras corregir errores en el código (como usar "order" en lugar de "orders"), se ejecuta nuevamente el DAC con éxito; los resultados son guardados en un archivo llamado getorders.txt.

Optimización del Proceso

¿Cómo gestionar fechas de ejecución en Airflow?

Uso de macros para fechas de ejecución

  • Se explica cómo obtener la fecha de ejecución actual y la siguiente para una función Python utilizando macros predefinidas en Airflow.
  • Es necesario actualizar la consulta SQL para incluir condiciones que filtren los datos entre las fechas de ejecución actuales y siguientes.
  • Se menciona la importancia de hacer dinámico el nombre del archivo, incorporando la fecha de ejecución como sufijo.

Ejecución y verificación del DAG

  • Se observan dos ejecuciones exitosas del DAG en las fechas 30 de abril y 1 de mayo, confirmando que los pedidos se guardaron correctamente en un archivo de texto.
  • Al revisar el contenido del archivo, se verifica que solo contiene datos entre las fechas actuales y siguientes, cumpliendo con los requisitos establecidos.

¿Cómo cargar archivos a S3 usando Airflow?

Preparación para cargar archivos a S3

  • Para subir el archivo a un bucket S3, es necesario utilizar la biblioteca S3 Hook. Primero se verifica qué versión del paquete Amazon Providers está instalada.
  • Se accede a la documentación para encontrar información sobre cómo inicializar el hook S3, asegurándose de tener configurado el ID de conexión AWS correcto.

Implementación del proceso de carga

  • La función load_file permite cargar un archivo local al bucket S3; se deben especificar el nombre del archivo, el bucket y si se debe reemplazar un archivo existente.
  • En VS Code, se implementa esta función utilizando el nombre adecuado para los archivos generados por cada ejecución.

Optimización del manejo de archivos temporales

Uso eficiente del espacio en disco

  • Se plantea una solución para evitar acumular muchos archivos en la carpeta local mediante el uso del módulo tempfile que permite crear archivos temporales.
  • Antes de subir a S3, es crucial vaciar (flush) el objeto temporal para asegurar que los datos sean guardados correctamente.

Verificación final y limpieza

  • Después de ejecutar nuevamente el DAG y verificar su éxito, se observa que no quedan archivos residuales en la carpeta local ni en S3.
Video description

Airflow Tutorial for Beginners - Full Course in 2 Hours 2022 #Airflow #AirflowTutorial #Coder2j ========== VIDEO CONTENT 📚 ========== In this 2-hour Airflow Tutorial for Beginners Full Course, we combine theory explanation and practical demos to help you get started quickly as an absolute beginner. You don’t need any prerequisite to start this course, but basic Python knowledge is recommended. To make the most out of it, it is highly encouraged to follow and try out the hands-on examples. Video Request: https://forms.gle/UMp4GA3krcSMMWzy9 Subscribe and Smash the like button to unlock bonus tutorial videos!! 1000 likes 👍 - Bonus videos about how to debug Airflow DAG: https://youtu.be/5QxqqeOxJhI 5000 likes 👍 - Bonus videos about Airflow Docker Operator: https://youtu.be/uZy2Lwioi3g ========== T I M E S T A M P ⏰ ========== Throughout the course, you will learn: 00:00 - Airflow Introduction 03:06 - Run Airflow in Python Env 10:44 - Run Airflow in Docker 17:55 - Airflow Basics and Core Concepts 21:55 - Airflow Task Lifecycle 26:19 - Airflow Basic Architecture 28:14 - Airflow DAG with Bash Operator 40:09 - Airflow DAG with Python Operator 45:04 - Data Sharing via Airflow XComs 52:53 - Airflow Task Flow API 57:56 - Airflow Catch-Up and Backfill 01:02:09 - Airflow Scheduler with Cron Expression 01:07:25 - Airflow Connection to Postgres 01:08:58 - Airflow Postgres Operator 01:19:30 - Airflow Docker Install Python Package 2 ways 01:29:34 - Airflow AWS S3 Sensor Operator 01:42:37 - Airflow Hooks S3 PostgreSQL 02:00:43 - Course Bonus ========== L I N K S 🔗 ========== Airflow Books 👉 https://amzn.to/3N43rlI Airflow Documentation 👉 https://bit.ly/3wbTqv4 Course GitHub Repo 👉 https://github.com/coder2j/airflow-docker ========== Connect with me 👏 ========== Twitter 👉 https://twitter.com/Coder2j Website 👉 https://coder2j.com GitHub 👉 https://github.com/coder2j