Airflow Tutorial for Beginners - Full Course in 2 Hours 2022
Introducción al Tutorial de Airflow
Presentación del Curso
- Bienvenida a todos al tutorial de Airflow para principiantes, un curso completo de dos horas que combina teoría y demostraciones prácticas.
- No se requieren conocimientos previos, aunque se recomienda tener conocimientos básicos de Python para facilitar el aprendizaje.
Contenido del Curso
- Se presentará Apache Airflow y cómo ejecutarlo localmente en un entorno Python mediante ejemplos prácticos.
- Aprenderás sobre conceptos fundamentales de Airflow, ciclo de vida de tareas y arquitectura básica.
- Se cubrirán operadores como Bash y Python, así como la forma de compartir datos entre tareas usando XComs.
Características Avanzadas y Conexiones Externas
Nuevas Funcionalidades
- Se abordará la nueva característica introducida en Airflow 2.0: la API Task Flow con ejemplos prácticos.
- También aprenderás a programar DAG utilizando expresiones cron y conectar servicios externos como bases de datos Postgres y AWS S3.
Instalación y Configuración
- Instrucciones sobre cómo instalar paquetes de Python en el contenedor Docker de Airflow serán compartidas.
- El proceso incluye inicializar la base de datos para Airflow después de establecer el directorio principal.
Configuración Inicial y Ejecución
Creación del Usuario
- Comando para crear una base de datos SQLite, una carpeta de logs y archivos de configuración será ejecutado.
- Para iniciar sesión en el servidor web, se debe crear un usuario con parámetros específicos desde la terminal.
Ejecución del Scheduler
- Es necesario iniciar el scheduler para ejecutar los DAG; esto se realiza desde otra terminal asegurando que las variables ambientales estén configuradas correctamente.
¿Cómo instalar Airflow usando Docker?
Instalación de Docker y Docker Compose
- Para comenzar, se debe visitar el sitio web oficial y buscar la documentación. En lugar de ejecutar Airflow localmente, se optará por instalarlo con Docker.
- Si se utiliza un laptop Mac o Windows, es necesario instalar la aplicación Docker Desktop. El enlace para descargar está disponible en la descripción.
- Una vez instalado, se puede verificar que Docker y Docker Compose están funcionando correctamente ejecutando los comandos
docker --versionydocker-compose --version.
Configuración del archivo docker-compose.yml
- Se descarga el archivo oficial
docker-compose.ymldesde la documentación de Airflow mediante un comando en la terminal.
- Este archivo define varios servicios; por defecto, utiliza el ejecutor Celery. Se cambiará a Local Executor para simplificar la configuración.
- Se eliminan las dependencias innecesarias como Redis y los trabajadores de Celery, ajustando así el archivo YAML según las necesidades.
Creación de directorios necesarios
- Es importante crear carpetas para DAGs, logs y plugins. Esto se realiza copiando un comando en la terminal.
- La identificación del usuario y grupo de Airflow solo es necesaria si se usa Linux; este paso puede ser omitido en macOS.
Inicialización de la base de datos
- La base de datos se inicializa con el comando
docker-compose up airflow init, lo que descargará las imágenes necesarias y configurará un usuario administrador con "airflow" como nombre de usuario y contraseña.
Ejecución del servidor Airflow
- Para ejecutar Airflow en modo desatendido (detached), se utiliza el comando
docker-compose up -d.
- Se verifica qué contenedores están corriendo utilizando
docker ps, donde aparecerán los servicios activos como el servidor web de Airflow y PostgreSQL.
¿Qué es Apache Airflow?
Origen e historia
- Apache Airflow comenzó como una herramienta interna para gestionar flujos de trabajo complejos en 2014. Fue abierto al público como proyecto incubador en marzo de 2016.
Conceptos clave sobre flujos de trabajo
- Un flujo de trabajo (workflow) es una secuencia organizada de tareas representadas como un DAG (Directed Acyclic Graph).
Definición del DAG
- Un DAG organiza todas las tareas que deben ejecutarse reflejando sus relaciones y dependencias; por ejemplo, Task A debe completarse antes que Tasks B y C.
Tareas dentro del DAG
- Cada tarea es una unidad específica dentro del DAG, representada como un nodo gráfico. Las tareas tienen dependencias entre sí; por ejemplo, Task C depende directamente del resultado de Task A.
Operadores en Airflow
- Los operadores determinan qué acciones realiza cada tarea. Existen diferentes tipos como BashOperator o PythonOperator que permiten ejecutar comandos específicos o scripts.
Resumen final sobre tareas y operadores
Comprendiendo el Ciclo de Vida de las Tareas en Airflow
Conceptos Esenciales de Airflow
- La fecha de ejecución es la lógica que determina cuándo se ejecutan un "deck run" y sus instancias de tarea. Por ejemplo, puede haber tres ejecuciones DAG en progreso desde el 1 hasta el 3 de enero de 2021.
- Un "deck run" es una instancia que contiene tareas que se ejecutan para una fecha específica. Las tareas se ejecutan secuencialmente según sus dependencias.
Etapas del Ciclo de Vida de las Tareas
- Cada tarea pasa por diferentes etapas desde su inicio hasta su finalización, con un total de 11 estados diferentes que indican el estado actual de la instancia.
- Los estados incluyen: "en progreso", "éxito", "fallido", entre otros. Se utilizan colores en la interfaz gráfica para representar cada etapa.
Proceso Detallado del Ciclo de Vida
- El ciclo comienza sin estado; luego, hay cuatro etapas posibles: programada, eliminada, fallida upstream o saltada.
- Si la tarea es programada, pasa a ser ejecutada por un trabajador cuando los recursos están disponibles. Su estado cambia a "ejecutando".
Resultados y Manejo de Errores
- Dependiendo del resultado, una tarea puede tener tres estados finales: éxito, fallido o apagado. Si falla y no se excede el máximo reintentos, se programa nuevamente.
- En algunos casos específicos, una tarea en ejecución puede ser reprogramada si depende de condiciones externas (por ejemplo, la existencia de un archivo).
Resumen del Proceso
- El proceso completo inicia sin estado; tras ser programado y ejecutado exitosamente por un trabajador, culmina con una ejecución satisfactoria.
Arquitectura Básica de Apache Airflow
Componentes Clave
- La arquitectura incluye componentes como ingenieros de datos, servidor web, programador y trabajadores. Cada uno tiene responsabilidades específicas dentro del sistema.
Rol del Ingeniero de Datos
- Los ingenieros son responsables por construir y monitorear procesos ETL y configurar ajustes como tipo de ejecutor y base de datos.
Interacción entre Componentes
- Todos los componentes están conectados a una base de datos para persistir información sobre los DAG. Se pueden elegir motores como MySQL o Postgres.
Configuración Inicial en Docker
Lanzamiento y Configuración
- Para iniciar Airflow se utiliza
docker compose up -d, seguido por acceder al puerto localhost 8080 para ver la página principal.
Eliminación Ejemplos Predefinidos
- Antes crear un nuevo DAG propio, es necesario eliminar ejemplos existentes usando
docker compose down -v.
Ajustes Finales
Creación de un DAG en Airflow
Introducción a la implementación del DAG
- Se inicia la creación de un DAG llamado "our first deck wpy" y se abre una implementación DAC. Es necesario importar el DAC desde Airflow y crear una instancia utilizando la declaración
with.
- Se define el ID único del DAG como "our first deck" y se describe como "este es nuestro primer airflow deck". Se establece que comenzará el 30 de julio y se ejecutará diariamente a las 2 AM.
Configuración de parámetros comunes
- Se configura la fecha de inicio con
start_dateigual adata time(2021, 7, 30, 2)y el intervalo del programador (scheduler_interval) como diario.
- Se definen parámetros comunes para inicializar los operadores en
default_args, incluyendo propietario, número máximo de reintentos (5), y un retraso entre reintentos de 5 minutos.
Creación de tareas en el DAG
- Se crea una tarea simple usando el operador Bash para ejecutar comandos. Primero se importa el operador Bash.
- La tarea se implementa con un mensaje "Hello World", estableciendo su ID como
first_task. Al intentar ejecutar, surgen errores en el código.
Resolución de errores
- Un error indica que no hay módulo llamado
airflow.operator.
- Cambiando "operator" por "operators", se corrige este error. Sin embargo, aparece otro error relacionado con argumentos inválidos para
time data.
Ejecución del DAG
- Tras corregir los errores relacionados con los argumentos, se visualiza correctamente el primer DAG sin errores. Este muestra solo una tarea llamada
first_task.
- El DAG está configurado para ejecutarse desde la fecha de inicio hasta ayer. Al revisar los registros, se confirma que el mensaje ha sido impreso correctamente.
Creación y dependencia de múltiples tareas
- Se añade una segunda tarea (
task_2) que depende del éxito de la primera tarea (task_1). Esta imprime un mensaje indicando su ejecución posterior.
- Para establecer dependencias entre tareas, se puede definir
task_2como descendiente otask_1como antecesor.
Implementación adicional de tareas
- Si se desea agregar una tercera tarea (
task_3), esta también debe depender del éxito detask_1, ejecutándose simultáneamente contask_2.
- Existen diferentes métodos para construir dependencias; uno consiste en añadir directamente las tareas descendientes al primer task.
Métodos alternativos para definir dependencias
- Otro método utiliza operadores bitwise para definir las relaciones entre las tareas:
task_1 >> task_2y así sucesivamente.
- También es posible simplificar esta relación utilizando corchetes:
[task_2, task_3].
Verificación final del entorno Airflow
- Antes de continuar, es importante verificar si Airflow está funcionando mediante Docker. Si no lo está, puede iniciarse con comandos específicos.
¿Cómo crear y ejecutar un DAG en Airflow usando Python Operator?
Configuración inicial del DAG
- Se establece el DAG
d4igual al definidod4 oct, asignando el ID del DAC como "python operator version 01" y describiéndolo como "nuestro primer deck usando python operator".
- La fecha de inicio del DAC se configura para ayer, 6 de octubre de 2021, programándolo para que se ejecute diariamente.
Creación de la función y tarea
- Se define una función simple llamada
greet, que imprime "Hello World" cuando se ejecuta. Luego, se crea una tarea utilizando el módulo Python Operator.
- Al refrescar el servidor web de Airflow, no aparecen errores y se visualiza el DAG con una única tarea llamada
greet. Se activa el DAC y se verifica que la función ha sido ejecutada correctamente.
Uso de parámetros en funciones
- Se actualiza la función
greetpara aceptar parámetros (nameyh) e imprimir un mensaje personalizado.
- Los valores de los parámetros son pasados exitosamente a través del diccionario
op_kwargs, mostrando "Hello World, my name is Tom and I am 20 years old".
Compartir información entre tareas
- Se introduce el concepto de XComms en Airflow, permitiendo compartir información entre diferentes tareas mediante la función de retorno automática.
- Se crea una nueva función llamada
get_name, que devuelve "Jerry". Esta función es utilizada en otra tarea configurada con Python Operator.
Extracción de datos desde XComms
- En los registros, aparece que el valor devuelto por la función
get_namefue almacenado en XComms.
- Para extraer este valor en la función
greet, se utiliza la instancia de tarea (ti), eliminando así el parámetro nombre.
Manejo avanzado con múltiples valores
- Para enviar múltiples valores a XComms desde una sola función, se modifica la implementación para incluir tanto nombre como apellido.
- La impresión final incluye ambos nombres ("Jerry Friedman"), confirmando que los valores fueron diferenciados correctamente por sus claves únicas.
Actualización adicional: Obtener edad desde XComms
- Se crea otra función llamada
get_agepara almacenar un valor asociado a la edad. Esta también es integrada al flujo del DAG.
¿Cómo gestionar tareas y dependencias en Airflow?
Uso de XComs para compartir valores entre tareas
- Se utiliza
xcompara compartir valores entre tareas, actualizando las dependencias de las tareas al agregar la tarea 3 como upstream de la tarea 1.
- Es importante tener en cuenta que el tamaño máximo de los
xcomses de solo 48 kilobytes, lo que limita su uso para datos grandes como DataFrames de pandas.
Reescribiendo DAG con Task Flow API
- Se crea un nuevo archivo Python utilizando la API de flujo de tareas (Task Flow API), comenzando por importar las bibliotecas necesarias.
- Se define una función llamada
hello_world_etl, donde se asignan valores a ID del DAG, fecha de inicio y intervalo de programación.
Definición y dependencia de tareas
- Se crean tres funciones:
get_name,get_ageygreet, cada una representada por un decorador específico.
- Las funciones
get_nameyget_agedevuelven valores que se pasan a la funcióngreet, estableciendo automáticamente las dependencias gracias a la API.
Ejecución del DAG y verificación
- Después de guardar el DAG, se verifica en el navegador que las dependencias están correctamente definidas.
- Al ejecutar el DAG, se comprueba que los valores devueltos han sido almacenados correctamente en los XCom.
Manejo de múltiples salidas en funciones
- Para devolver tanto el primer nombre como el apellido desde la tarea
get_name, se modifica la función greet para aceptar nuevos parámetros.
- Se establece un diccionario con múltiples salidas en el decorador correspondiente, permitiendo así pasar varios valores a través del sistema.
Comparación entre versiones del código
- La versión reescrita usando Task Flow API reduce significativamente el número total de líneas a aproximadamente 40, mejorando la legibilidad y eficiencia del código.
Creación y configuración básica del DAG
- Se crea un nuevo DAG llamado "deck" con parámetros básicos utilizando BashOperator; se establece una fecha inicial pasada para su ejecución diaria.
Configuración del parámetro Catch Up
- El parámetro "catch up" está habilitado por defecto; sin embargo, se puede desactivar manualmente según sea necesario.
Visualización y ejecución del DAG
- Al visualizar el árbol del DAG después de ejecutarlo, se observa cómo ha corrido desde noviembre 1 hasta noviembre 9 debido al catch up activado inicialmente.
Desactivación del Catch Up
- Cambiando el parámetro catch up a falso permite ejecutar solo instancias específicas sin retroceder a fechas anteriores no deseadas.
Comandos para ejecutar Backfill
¿Cómo programar tareas en Airflow usando expresiones cron?
Visualización de DAGs y Ejecución
- Se muestra cómo visualizar el historial de ejecución de un DAG desde el 1 de noviembre hasta el 8 de noviembre de 2021, utilizando un ID específico del DAG.
- Para crear un DAG en Airflow, es necesario definir el parámetro
schedule_interval, que acepta una expresión cron como cadena o un objeto datetime.
Expresiones Cron
- Una expresión cron consiste en cinco campos separados por espacios que representan un conjunto de tiempos para ejecutar rutinas. Airflow ofrece presets como "diario" y "horario".
- Se crea un nuevo archivo DAG llamado
deck_with_cron_expression.py, donde se define una tarea simple con el operador bash, programada para comenzar el 1 de noviembre y ejecutarse diariamente.
Actualización del Intervalo de Programación
- Al cambiar el parámetro
schedule_intervala una cadena cron específica (0 0 * * *), se actualiza la versión del DAG y se observa que la historia de ejecución coincide con la configuración diaria anterior.
- Se menciona crontab.guru como herramienta útil para generar y verificar expresiones cron personalizadas.
Generación y Verificación de Expresiones Cron
- En crontab.guru, se puede ingresar una expresión cron para verificar su validez; si hay errores, el campo se vuelve rojo.
- Se intenta generar una expresión cron que ejecute tareas semanalmente los martes a las 3 AM. La expresión generada es verificada en crontab.guru.
Programación Semanal Personalizada
- Para ejecutar tareas semanalmente los martes y viernes a las 3 AM, simplemente se añade "viernes" a la expresión anterior.
- También se puede usar "martes-viernes" para abarcar múltiples días; ambas formas son válidas al actualizar la programación del DAG.
Conexión a Servicios Externos
- Al construir un ETL en Airflow, es esencial conectarse a servicios externos como bases de datos (MySQL, PostgreSQL). Esto requiere credenciales adecuadas.
- Las conexiones pueden ser gestionadas fácilmente desde la interfaz web de Airflow bajo el menú "Admin", donde se pueden agregar nuevas conexiones según sea necesario.
Uso del Operador PostgreSQL
- Para demostrar cómo utilizar el operador PostgreSQL, primero se debe exponer una base de datos PostgreSQL mediante Docker.
- Se utiliza DBeaver para conectar con la base de datos PostgreSQL creada previamente; tras verificar la conexión, se crea una nueva base llamada 'test'.
Creación e Inserción en Base de Datos
Creación de Tareas en Airflow con PostgreSQL
Importación y Configuración Inicial
- Se comienza importando los paquetes necesarios en el archivo
operator.py.
- Se define un auxiliar por defecto y se inicializa un DAG configurando su ID, fecha de inicio e intervalo de programación.
Creación de la Conexión a PostgreSQL
- Para crear una tarea usando el operador PostgreSQL, se requiere un ID de tarea, un ID de conexión y una consulta SQL.
- Al configurar la conexión, se debe seleccionar el tipo como PostgreSQL; el esquema es el nombre de la base de datos (en este caso "test") y las credenciales son "airflow" con puerto 5432.
Ejecución y Solución de Errores
- Si se usa Docker Desktop o MacOS, para conectarse desde un contenedor a localhost se debe usar
host.docker.internal.
- Tras guardar los cambios, al intentar ejecutar la tarea falla debido a un error en el nombre del host. Se corrige a
host.docker.internal.
Corrección de Errores en Consultas SQL
- Un nuevo fallo ocurre por un error sintáctico en la consulta SQL; falta una letra 't' en el tipo de dato.
- Después de corregirlo, la creación de la tabla se ejecuta correctamente. Se verifica que la tabla fue creada como se definió.
Inserción y Dependencias entre Tareas
- Se crea otra tarea para insertar datos en la tabla recién creada utilizando nuevamente el operador PostgreSQL.
- La consulta SQL utiliza variables predefinidas por Airflow para acceder a fechas y IDs.
Manejo de Errores Adicionales
- Al establecer dependencias entre tareas, surge otro error relacionado con una variable no definida. Se corrige cambiando 'dt' por 'ds'.
- Tras realizar las correcciones necesarias, se confirma que los datos fueron insertados correctamente en la base de datos.
Prevención de Duplicados
- Es recomendable eliminar datos antes de insertar nuevos registros para evitar duplicaciones o violaciones a claves primarias.
- Se añade una tarea para eliminar registros previos antes del proceso de inserción.
Actualización y Verificación Final
- Luego del ajuste en las tareas, al ejecutar nuevamente no hay errores relacionados con claves primarias.
- Finalmente, al verificar los registros insertados mediante DeBeaver, se confirman dos entradas correctas sin conflictos.
Instalación de Paquetes Python en Airflow
Métodos para Instalar Dependencias
- Existen dos métodos principales para instalar paquetes Python: extender o personalizar la imagen Docker utilizada por Airflow.
Extensión vs Personalización
- Extender es más sencillo ya que solo requiere conocimientos básicos sobre imágenes Docker; no necesita código fuente ni toma mucho tiempo construirla.
Personalización Avanzada
Instalación de Dependencias en Airflow
Extensión de la Imagen de Docker
- Se añade una línea al archivo de requisitos para incluir
scikit-learnversión 0.24.2 y se procede a instalar todas las dependencias definidas.
- Se crea un Dockerfile en la carpeta raíz del proyecto, extendiendo la imagen oficial de Airflow versión 2.0.1.
Configuración del Dockerfile
- Se copia el archivo de requisitos al contenedor y se ejecutan comandos para actualizar pip e instalar las dependencias de Python.
- Se construye la imagen extendida usando el comando
docker build, nombrándola comoextending airflow latest.
Verificación de Instalación
- Se modifica el archivo
docker-compose.ymlpara cambiar el nombre de la imagen aextending airflow latest.
- Se inicializa una instancia DAG y se crea una función llamada
get_scikit_learnpara imprimir la versión del paquete instalado.
Manejo de Nuevas Dependencias
- Al intentar agregar
matplotlib, surge un error porque los cambios no se reflejan en la imagen del contenedor.
- Es necesario reconstruir la imagen cada vez que se cambian las dependencias en el archivo.
Personalización Avanzada
- Después de reconstruir, se verifica que ahora ambas bibliotecas están instaladas correctamente.
- Para personalizar aún más, se clona el código fuente de Airflow desde GitHub y se crean archivos necesarios para definir nuevas dependencias.
Construcción y Ejecución del Contenedor Personalizado
Creación del Archivo Requisitos
- En la carpeta adecuada, se crea un archivo con las versiones necesarias para
scikit-learnymatplotlib.
Construcción Finalizada
- La construcción finaliza tras varios minutos, donde todas las dependencias son instaladas automáticamente desde el archivo definido.
Actualización del Comando Compose
- Se actualiza nuevamente el nombre de la imagen en el archivo
docker-compose.ymlantes de recrear los contenedores.
Comparativa entre Métodos
Elección del Método Adecuado
- El método recomendado es extender imágenes por su rapidez; sin embargo, si hay necesidad de optimizar tamaño o personalizar más, es mejor construir desde cero.
Uso Práctico con S3 Compatible
¿Cómo configurar MinIO y Airflow para trabajar con S3?
Configuración de MinIO
- MinIO es compatible con la API del servicio de almacenamiento en la nube AWS S3, lo que facilita su integración en proyectos ETL.
- Se muestra un comando de Docker para lanzar un contenedor de MinIO, el cual expone dos puertos: 9000 para la API y 9001 para la consola.
- Al acceder a la consola web, se requiere ingresar el nombre de usuario y contraseña raíz. Una vez dentro, se puede crear un bucket llamado "airflow".
- Es importante asegurarse de tener permisos de lectura y escritura al crear el bucket. Luego se puede cargar archivos o crear rutas desde la interfaz.
Generación y carga de archivos CSV
- En VS Code, se crea una carpeta llamada "data" y un archivo CSV llamado "data.csv" con columnas como ID del producto y fecha de entrega.
- Después de guardar el archivo, se sube a nuestro bucket "airflow", confirmando que ahora existe en el entorno S3.
Creación del DAG en Airflow
- Se inicia creando un archivo Python llamado
dag_menu_s3.py, donde se importan los paquetes necesarios para definir el DAG (Directed Acyclic Graph).
- Se verifica la versión del paquete proveedor de Amazon en Airflow e incluso se recomienda revisar videos anteriores sobre cómo extender imágenes Docker.
Configuración del sensor S3
- Se busca información sobre el sensor S3 en la documentación oficial. Este sensor espera que un archivo esté presente en un bucket específico.
- Para usarlo, son necesarios parámetros como nombre del bucket, clave del objeto y ID de conexión AWS. Se importa el operador correspondiente desde el directorio adecuado.
Conexión a AWS
- En la interfaz web de Airflow, se añade una nueva conexión configurando los campos requeridos como claves secretas y detalles específicos del host local.
- Tras guardar los cambios, se establece esta conexión recién creada como parámetro dentro del DAG antes mencionado.
Ejecución y solución de problemas
- Al iniciar el DAG por primera vez, surge un error relacionado con las comillas en los campos extra; esto requiere ajustes para corregirlo.
Ejecución de Tareas con Airflow y PostgreSQL
Monitoreo de Archivos CSV en Airflow
- Se inicia una tarea de sensor que verifica la existencia del archivo
data.csven el bucket de Airflow, pero falla al no encontrarlo dentro del límite de tiempo.
- Se sube el archivo
data.csvjusto antes del tiempo de espera; el sensor detecta su existencia inmediatamente después, marcando la tarea como exitosa.
Importación y Configuración de Base de Datos
- El archivo CSV llamado
orders, que contiene órdenes de venta artificiales, se debe importar a una base de datos PostgreSQL.
- Se crea una tabla llamada
ordersen la base de datostest, definiendo correctamente los tipos para las cuatro columnas: order id, date, product name y quantity.
Proceso de Importación
- Se importa el archivo CSV a la tabla
orders, asegurándose que los nombres de las columnas coincidan; se recibe un mensaje exitoso tras la importación.
- Se consulta las primeras 100 filas para verificar que la tabla ha sido configurada correctamente.
Creación del DAC (Directed Acyclic Graph)
- En VS Code, se crea un archivo Python llamado
dac_postgreshook.pypara implementar un DAC que consultará datos desde PostgreSQL.
- Se necesita importar paquetes requeridos y crear una función llamada
postgres_to_s3para realizar dos pasos: consultar datos y subirlos a un bucket S3.
Conexión a PostgreSQL
- Para interactuar con PostgreSQL, se verifica la versión instalada del paquete correspondiente mediante comandos Docker.
- La documentación muestra cómo establecer conexión con PostgreSQL utilizando el hook adecuado; se inicializa con el ID de conexión correspondiente.
Ejecución y Manejo de Errores
- Al ejecutar consultas SQL, es crucial corregir errores como nombres incorrectos o sintaxis inadecuada.
- Tras corregir errores en el código (como usar "order" en lugar de "orders"), se ejecuta nuevamente el DAC con éxito; los resultados son guardados en un archivo llamado
getorders.txt.
Optimización del Proceso
¿Cómo gestionar fechas de ejecución en Airflow?
Uso de macros para fechas de ejecución
- Se explica cómo obtener la fecha de ejecución actual y la siguiente para una función Python utilizando macros predefinidas en Airflow.
- Es necesario actualizar la consulta SQL para incluir condiciones que filtren los datos entre las fechas de ejecución actuales y siguientes.
- Se menciona la importancia de hacer dinámico el nombre del archivo, incorporando la fecha de ejecución como sufijo.
Ejecución y verificación del DAG
- Se observan dos ejecuciones exitosas del DAG en las fechas 30 de abril y 1 de mayo, confirmando que los pedidos se guardaron correctamente en un archivo de texto.
- Al revisar el contenido del archivo, se verifica que solo contiene datos entre las fechas actuales y siguientes, cumpliendo con los requisitos establecidos.
¿Cómo cargar archivos a S3 usando Airflow?
Preparación para cargar archivos a S3
- Para subir el archivo a un bucket S3, es necesario utilizar la biblioteca S3 Hook. Primero se verifica qué versión del paquete Amazon Providers está instalada.
- Se accede a la documentación para encontrar información sobre cómo inicializar el hook S3, asegurándose de tener configurado el ID de conexión AWS correcto.
Implementación del proceso de carga
- La función
load_filepermite cargar un archivo local al bucket S3; se deben especificar el nombre del archivo, el bucket y si se debe reemplazar un archivo existente.
- En VS Code, se implementa esta función utilizando el nombre adecuado para los archivos generados por cada ejecución.
Optimización del manejo de archivos temporales
Uso eficiente del espacio en disco
- Se plantea una solución para evitar acumular muchos archivos en la carpeta local mediante el uso del módulo
tempfileque permite crear archivos temporales.
- Antes de subir a S3, es crucial vaciar (flush) el objeto temporal para asegurar que los datos sean guardados correctamente.
Verificación final y limpieza
- Después de ejecutar nuevamente el DAG y verificar su éxito, se observa que no quedan archivos residuales en la carpeta local ni en S3.