Airflow Tutorial for Beginners - Full Course in 2 Hours 2022
Introdução ao Curso de Airflow
Visão Geral do Curso
- O curso é voltado para iniciantes e combina teoria, explicações e demonstrações práticas.
- Não há pré-requisitos, mas recomenda-se conhecimento básico em Python para facilitar o aprendizado.
- O curso abrange conceitos fundamentais do Apache Airflow, ciclo de vida das tarefas e arquitetura básica.
Conteúdos Abordados
- Aprenderá a construir DAGs (Directed Acyclic Graphs) com diferentes operadores como Bash e Python.
- Serão abordadas novas funcionalidades introduzidas na versão 2.0 do Airflow, incluindo a API Task Flow.
- O curso também ensinará como conectar serviços externos como bancos de dados Postgres e AWS S3.
Configuração Inicial do Ambiente
Criação do Projeto
- Instruções para criar um projeto Python no Visual Studio Code e verificar a versão do Python instalada.
- É necessário ter uma versão do Python acima de 3.6; o instrutor confirma que possui a versão 3.6.7.
Instalação do Airflow
- Criação de um ambiente virtual usando
python -m venvpara isolar as dependências do projeto.
- O instrutor acessa o repositório oficial do GitHub para copiar o comando de instalação do Airflow.
Resolução de Problemas Durante a Instalação
Erros Comuns
- Um erro relacionado à falta da ferramenta GCC é resolvido instalando as ferramentas de linha de comando no macOS.
Inicialização da Base de Dados
- Após instalar o Airflow, é necessário inicializar o banco de dados com
airflow db init, criando uma base SQLite e arquivos de configuração.
Iniciando o Servidor Web
Configuração Inicial
- O servidor web é iniciado com
airflow webserver -p 8080, permitindo acesso via navegador na porta padrão 8080.
Criação de Usuário
- Para acessar a interface web, um usuário deve ser criado utilizando
airflow users create.
Executando DAGS no Airflow
Ativação dos DAGS
- Para executar os DAGS, é necessário iniciar o scheduler com
airflow scheduler.
Monitoramento das Tarefas
Instalação do Airflow com Docker
Preparação para a Instalação
- O processo de instalação do Airflow será feito utilizando o Docker, em vez de rodá-lo localmente.
- Para usuários de Mac ou Windows, é necessário instalar o aplicativo Docker Desktop, disponível nos links da descrição.
- Após a instalação, deve-se verificar as versões do Docker e do Docker Compose usando os comandos
docker --versionedocker-compose --version.
Configuração do Arquivo Docker Compose
- O arquivo
docker-compose.ymlé baixado da documentação oficial e define vários serviços necessários para o Airflow.
- A configuração padrão utiliza o Celery Executor; no entanto, será alterada para Local Executor removendo dependências desnecessárias como Redis e Celery Worker.
Criação das Pastas Necessárias
- Pastas para DAGs, logs e plugins são criadas através de um comando simples no terminal.
- A inicialização do banco de dados é feita com o comando
docker-compose up airflow init, que também configura um usuário administrador.
Execução do Airflow
- O Airflow é iniciado em modo destacado (detached mode) com o comando
docker-compose up -d.
- Verifica-se quais containers estão rodando usando
docker ps, confirmando a presença dos serviços essenciais como web server e scheduler.
Acesso à Interface Web do Airflow
- A interface web pode ser acessada pelo navegador em
0.0.0.0:8080, onde se faz login com as credenciais padrão (usuário: airflow).
Compreendendo os Conceitos Centrais do Airflow
Origem e Popularidade
- O Airflow foi desenvolvido inicialmente como uma ferramenta interna da LBMB em 2014 e se tornou um projeto open source em 2016.
- Desde janeiro de 2019, é um projeto da Apache Software Foundation, sendo uma das plataformas mais populares para gerenciamento de workflows.
Definição de Workflow
- Um workflow consiste em uma sequência de tarefas organizadas como um DAG (Directed Acyclic Graph), onde cada tarefa depende da conclusão anterior.
Estrutura das Tarefas no DAG
- Uma tarefa é definida como uma unidade de trabalho dentro de um DAG; cada tarefa representa um nó no gráfico.
Operadores no Airflow
- Os operadores determinam as ações executadas por cada tarefa; existem diversos tipos como Bash Operator e Python Operator.
Resumo dos Componentes Principais
Compreendendo o Ciclo de Vida das Tarefas no Airflow
Conceitos Fundamentais do Airflow
- A data de execução é a lógica que define quando um DAG (Directed Acyclic Graph) e suas instâncias de tarefa estão em execução. Por exemplo, pode haver várias execuções de DAG entre 1º e 3 de janeiro de 2021.
- Um DAG run é uma instância que contém tarefas executadas para uma data específica. As tarefas são executadas sequencialmente com base em suas dependências.
Estágios das Tarefas
- Cada tarefa passa por diferentes estágios até sua conclusão, com status como "em andamento" ou "sucesso". Existem 11 tipos diferentes de status na interface do usuário do Airflow.
- Os estágios são representados por cores na visualização gráfica e em árvore. Os estados incluem: "aguardando", "executando", "sucesso", "falha" e outros.
Ciclo de Vida da Tarefa
- O ciclo começa sem status, onde o agendador cria uma instância vazia da tarefa. Os quatro principais estágios são: agendada, removida, falha upstream ou pulada.
- Se a tarefa for agendada, o executor entra em ação e muda seu status para "criada". Quando um trabalhador executa a tarefa, ela muda para "em execução".
Resultados da Execução
- Após a execução, os resultados podem ser: sucesso (completou sem falhas), falha (não completou), ou encerramento (aborto da execução).
- Se a tarefa falhar ou for encerrada antes do máximo de tentativas ser atingido, ela será marcada como “aguardando nova tentativa” ou “aguardando reprogramação”.
Arquitetura Básica do Airflow
- A arquitetura básica inclui componentes como engenheiro de dados, servidor web, agendador e trabalhadores. Cada componente tem responsabilidades específicas na configuração e monitoramento dos processos ETL.
- Engenheiros de dados configuram o ambiente do Airflow e gerenciam os DAGs através da interface web. O executor é responsável por persistir as atualizações no banco de dados.
Configuração Inicial do Projeto
- Para iniciar o projeto no VS Code, utiliza-se o comando
docker compose up -dpara levantar os containers do Airflow.
- É recomendado remover todos os exemplos existentes antes de criar novos DAGs personalizados usando
docker compose down -v.
Criando um Novo DAG
- Após configurar corretamente as opções no arquivo YAML (
airflow core load examplesdeve ser definido como falso), podemos começar a definir nosso primeiro DAG como um arquivo Python na pasta designada.
Criando o Primeiro DAG no Airflow
Implementação do DAG
- Para criar um novo DAG, começamos importando a classe
DAGdo Airflow e instanciamos um objeto usando a declaraçãowith, garantindo que todo o código subsequente esteja dentro do escopo do nosso DAG.
- Definimos o ID único do DAG como "our_first_dag" e descrevemos-o como "este é nosso primeiro DAG no Airflow". Também decidimos a data de início e a frequência de execução, configurando para iniciar em 30 de julho às 2 da manhã.
Parâmetros Comuns
- O parâmetro
start_dateé definido comodatetime(2021, 7, 30, 2)e o intervalo de agendamento é configurado para executar diariamente.
- Além disso, definimos parâmetros comuns como proprietário (
owner), número máximo de tentativas (max_retries) e atraso entre tentativas (retry_delay), importando os pacotes necessários.
Criação da Tarefa
- A seguir, criamos uma tarefa simples utilizando o operador Bash para executar comandos. Precisamos importar primeiro o operador Bash.
- A tarefa é implementada com um ID chamado "first_task", onde usamos o comando
echopara imprimir uma mensagem: "Hello World! Esta é a primeira tarefa".
Resolução de Erros
- Ao tentar executar, encontramos um erro indicando que não existe um módulo chamado
airflow.operator.
- Corrigimos isso mudando "operator" para "operators". Após essa correção, surgiram novos erros relacionados ao argumento inválido
scheduler_interval, que foi corrigido alterando paraschedule_interval.
Execução e Dependências das Tarefas
- Após resolver os erros, conseguimos visualizar nosso primeiro DAG com uma única tarefa chamada "first_task". O DAG será executado a partir da data inicial definida.
- Criamos uma segunda tarefa que será executada após a conclusão da primeira. Definimos esta nova tarefa com um comando Bash semelhante à primeira.
Gerenciando Dependências entre Tarefas
Configuração das Dependências
- Para estabelecer dependências entre as tarefas, definimos a segunda tarefa como downstream da primeira ou vice-versa.
- Mudamos o ID do DAG para "our_first_dag_version_2" e confirmamos visualmente as dependências na interface gráfica.
Execução das Tarefas
- Ao iniciar o DAG novamente, observamos que a segunda tarefa só foi executada após a conclusão bem-sucedida da primeira.
Adicionando Mais Tarefas
- Decidimos adicionar uma terceira tarefa que também deve ser executada após a primeira. Essa nova tarefa imprime: “Hey! Eu sou a task 3”.
Métodos de Construção de Dependência
- Utilizamos diferentes métodos para construir as dependências: adicionando diretamente ou usando operadores bitwise (shift).
Refinamento das Dependências
Métodos Alternativos
- Testamos outro método ajustado utilizando operadores bitwise em uma linha única para definir múltiplas tarefas downstream.
Verificação Final das Configurações
- Confirmamos se todas as dependências estavam corretamente configuradas através da interface gráfica.
Preparação do Ambiente Airflow
Verificando Instalação do Airflow
- Abrindo nossa pasta de projeto no VS Code, verificamos se os componentes do Airflow estão em execução através do comando Docker. Se necessário, podemos iniciar com
docker-compose up -d.
Criação de Novo Arquivo Python
Criando e Executando um DAG com Python Operator no Airflow
Definindo o DAG e a Função Inicial
- O DAG é configurado com o ID "dac" e descrito como "primeiro deck usando Python operator".
- A data de início do DAG é definida para 6 de outubro de 2021, programando sua execução diária.
- Uma função simples chamada
greeté criada, que imprime "Hello World" quando executada.
Criando a Tarefa com Python Operator
- Para executar a função
greet, o módulo Python Operator é importado, e a tarefa recebe o ID "greet".
- Após atualizar o DAG, ele aparece na interface do Airflow sem erros, mostrando uma única tarefa chamada "greet".
Execução da Função e Passagem de Parâmetros
- A função
greeté atualizada para aceitar parâmetros (nome e idade), imprimindo uma mensagem personalizada.
- Os parâmetros são passados utilizando
op_kwargs, permitindo que valores sejam definidos diretamente na execução da tarefa.
Compartilhamento de Informações entre Tarefas
- O uso do XCom permite compartilhar informações entre diferentes tarefas dentro do Airflow.
- Uma nova função chamada
get_nameé criada para retornar um nome ("Jerry"), que será utilizado em outra tarefa.
Implementação do Pulling via XCom
- A tarefa
get_nameretorna seu valor ao XCom, que pode ser acessado por outras tarefas.
- Na função
greet, o parâmetro nome é removido e substituído pela instância da tarefa (ti) para puxar o valor armazenado no XCom.
Resolvendo Erros e Verificando Resultados
- Um erro ocorre devido à utilização incorreta da função para puxar dados; a correção envolve usar
xcom_pull.
- Após corrigir os erros, a execução bem-sucedida resulta na impressão correta dos dados: "Hello World! Meu nome é Jerry...".
Enviando Múltiplos Valores via XCom
- É possível enviar múltiplos valores ao XCom; neste caso, primeiro nome ("Jerry") e sobrenome ("Friedman") são enviados.
- A modificação das funções permite distinguir os valores pelo uso de chaves únicas no XCom.
Atualizando Código para Incluir Idade via XCom
- Uma nova função chamada
get_ageé criada para enviar um valor de idade (19), que também será puxado pela funçãogreet.
Atualização de Tarefas e Dependências no Airflow
Compartilhamento de Valores com XComs
- O valor é compartilhado via XCom, utilizando a chave 'h' e os IDs das tarefas para obter a idade. As dependências da tarefa são atualizadas ao adicionar a tarefa 3 como upstream da tarefa 1.
- É importante notar que o tamanho máximo dos XComs é de apenas 48 kilobytes, não megabytes ou gigabytes. Isso deve ser confirmado no código-fonte do Airflow, evitando o uso de grandes dados como DataFrames do Pandas.
Reescrevendo DAG com Task Flow API
- O código do DAG original possui cerca de 60 linhas. A reescrita usando a Task Flow API visa reduzir essa quantidade.
- Um novo arquivo Python chamado
dac_with_task_flow_api.pyé criado, onde se importam as bibliotecas necessárias e se define uma variável auxiliar padrão para recarga e atraso na repetição.
Definição de Tarefas
- Três tarefas são definidas:
get_name,get_ageegreet. Cada tarefa é representada por uma função Python decorada adequadamente.
- A função
greetimprime uma mensagem formatada com o nome e a idade retornados pelas funções anteriores.
Construindo Dependências Automáticas
- As dependências entre as tarefas são automaticamente calculadas pela Task Flow API ao chamar as funções
get_nameeget_age, passando seus resultados para a funçãogreet.
Execução e Verificação dos Resultados
- Após ativar o DAG, os logs mostram que a mensagem correta foi impressa com os valores esperados (nome: Jerry, idade: 19). Os valores também foram armazenados nos XComs.
Manipulação Avançada de Saídas em Tarefas
Retornando Múltiplos Valores
- Para retornar primeiro nome e sobrenome na tarefa
get_name, a funçãogreetprecisa ser ajustada para aceitar esses novos parâmetros.
- O decorador da tarefa deve incluir um parâmetro para múltiplas saídas. Um dicionário contendo primeiro nome e sobrenome é retornado.
Atualização do DAG
- Após atualizar o DAG, os logs confirmam que tanto o primeiro nome quanto o sobrenome foram corretamente registrados nas mensagens exibidas.
Configuração de Catch Up em DAG
Criação de Novo DAG com Catch Up
- Um novo DAG chamado "deck" é criado utilizando um operador Bash simples. A data inicial do DAG é definida como 1º de novembro.
Comportamento Padrão do Catch Up
- O parâmetro "catch up" está definido como verdadeiro por padrão. Ao executar o DAG, várias execuções são registradas desde sua data inicial até a mais recente.
Desativando Catch Up
- Para desativar essa funcionalidade, altera-se o parâmetro "catch up" para falso. Isso resulta na execução apenas da execução mais recente registrada no dia 9 de novembro.
Execução Passada Usando Back View
Execução de DAGs no Airflow
Visualizando Execuções de DAG
- O usuário acessa o histórico de execuções do DAG entre 1º e 8 de novembro de 2021, utilizando um ID específico para visualizar os logs.
- É necessário definir o parâmetro
schedule_intervalao criar um DAG, que aceita uma expressão cron como string ou objeto datetime.
Entendendo Expressões Cron
- As expressões cron são strings compostas por cinco campos separados por espaços, representando um conjunto de horários para execução.
- O Airflow já oferece presets como "diariamente" e "horariamente", facilitando a programação dos DAGs.
Criando e Atualizando um DAG
- Um novo arquivo chamado
deck_with_cron_expression.pyé criado, onde se define uma tarefa simples usando o operador bash e se programa a execução diária a partir de 1º de novembro.
- Após atualizar o intervalo do agendamento para uma string cron específica (
0 0 * * *), o usuário observa que a execução do DAG permanece inalterada em relação ao preset diário.
Gerando Expressões Cron Personalizadas
- O site crontab.guru é utilizado para gerar e verificar expressões cron visualmente, permitindo ao usuário entender melhor as entradas válidas.
- O usuário tenta criar uma expressão que execute tarefas semanalmente às terças-feiras às 3 da manhã, atualizando posteriormente o intervalo do agendamento no código.
Agendamentos Semanais com Múltiplos Dias
- Para executar o DAG semanalmente nas terças e sextas-feiras às 3 da manhã, basta adicionar "sexta" à expressão anterior.
- A interpretação da nova expressão confirma que as execuções ocorrerão conforme desejado.
Conexões no Airflow
- Ao construir um ETL no Airflow, é essencial conectar-se a serviços externos como bancos de dados (MySQL, PostgreSQL).
- As credenciais necessárias podem ser gerenciadas através das conexões do Airflow na interface web.
Configurando Conexões com PostgreSQL
- O menu "conexões" permite visualizar todas as conexões criadas; novas conexões podem ser adicionadas facilmente.
- Para demonstrar o operador PostgreSQL, é necessário expor um banco de dados PostgreSQL existente através da configuração correta no Docker Compose.
Criando Banco de Dados com DBeaver
- O DBeaver é utilizado como ferramenta para gerenciar bancos de dados; após configurar a conexão com sucesso, um novo banco chamado "test" é criado.
Como Configurar o Airflow com PostgreSQL?
Importação de Pacotes e Inicialização do DAG
- O primeiro passo é abrir o arquivo
operator.pye importar os pacotes necessários.
- Define-se um valor padrão para
aux, inicializando um DAG ao configurar seu ID, data de início e intervalo de agendamento.
Criação da Conexão com PostgreSQL
- Para criar a primeira tarefa usando o operador Postgres, são necessários três parâmetros: ID da tarefa, ID da conexão e uma consulta SQL que será executada.
- Ao configurar a conexão no Airflow UI, deve-se usar "postgres localhost" como ID de conexão. O tipo de conexão deve ser selecionado como Postgres, com informações como nome do banco (schema), usuário e senha.
Resolução de Erros na Conexão
- Se estiver usando Docker Desktop ou MacOS/Windows, a conexão deve ser feita através de
host.docker.internalem vez delocalhost.
- Após salvar as configurações, se ocorrer falha na execução da tarefa devido a erro no nome do host (
host.darker.local), é necessário corrigir parahost.darker.internal.
Execução da Tarefa e Verificação
- Após corrigir o erro anterior, se houver uma falha novamente por erro de sintaxe na consulta SQL (falta de 't' em 'character varying'), isso deve ser ajustado.
- Com as correções feitas, a criação da tabela é bem-sucedida. A tabela "dag_runs" é verificada no DBeaver.
Inserção de Dados na Tabela
- Uma nova tarefa é criada para inserir dados nas tabelas "dag_runs", utilizando o operador Postgres novamente.
- A instrução SQL para inserção utiliza variáveis do Airflow (
dspara data de execução edag.id).
Dependências das Tarefas
- As dependências das tarefas são configuradas; após atualização do DAG, se ocorrer um erro devido à variável indefinida (
dt), isso deve ser corrigido consultando a documentação dos macros do Airflow.
Validação dos Resultados
- Após ajustes nas variáveis utilizadas nas consultas SQL, os registros são inseridos corretamente na tabela com as datas esperadas.
- É recomendado limpar dados antes da inserção para evitar duplicações ou violações da chave primária.
Implementação da Exclusão Antes da Inserção
- Para evitar erros ao tentar inserir dados já existentes, uma tarefa adicional é criada para excluir registros antes da inserção.
- A instrução SQL para exclusão especifica condições baseadas nos valores das variáveis.
Finalização e Verificação dos Registros
- Após realizar as alterações necessárias nas tarefas e suas dependências, verifica-se que não há mais violação das chaves primárias durante as inserções subsequentes.
Como Instalar Pacotes Python no Airflow?
Métodos para Instalação de Dependências Python
- Existem duas abordagens principais: estender ou personalizar a imagem Docker do Airflow para instalar dependências Python. Cada método possui vantagens e desvantagens.
Extensão vs Personalização
- Estender a imagem requer conhecimento básico sobre imagens Docker; não precisa do código-fonte do Airflow e compila rapidamente.
Instalação de Dependências Python no Airflow
Extensão da Imagem do Airflow
- Para adicionar a biblioteca
scikit-learnna versão 0.24.2, é necessário incluir uma linha no arquivo de requisitos.
- A imagem oficial do Airflow que será estendida é a versão 2.0.1.
- O arquivo de requisitos deve ser copiado para a imagem Docker e as dependências devem ser instaladas usando o comando
pip.
Construindo a Imagem Estendida
- A construção da nova imagem é feita com o comando
docker build, nomeando-a como "extending airflow" e marcando-a como "latest".
- Após a construção, é necessário atualizar o arquivo
docker-compose.ymlpara usar a nova imagem.
Verificação da Instalação
- Um DAG (Directed Acyclic Graph) deve ser criado para verificar se o pacote
scikit-learnfoi instalado corretamente.
- É preciso reconstruir os serviços do servidor web e do scheduler do Airflow após modificar o nome da imagem.
Adicionando Mais Dependências
- Para adicionar outra dependência, como
matplotlib, ela deve ser incluída no arquivo de requisitos.
- Se ocorrer um erro ao tentar acessar
matplotlib, isso indica que as alterações não foram refletidas na imagem Docker.
Rebuild Necessário
- Sempre que houver mudanças nas dependências Python, é necessário reconstruir a imagem Docker novamente.
- Após reiniciar os containers, as versões dos pacotes devem aparecer nos logs confirmando sua instalação bem-sucedida.
Personalização Avançada da Imagem do Airflow
Clonagem do Código Fonte
- Para personalizações mais profundas, clone-se o código fonte do Airflow através do repositório oficial no GitHub.
Configuração das Dependências
- Crie um novo arquivo de requisitos dentro da pasta correta para definir as dependências necessárias antes de construir a nova imagem.
Construindo uma Nova Imagem Personalizada
- Utilize o comando
docker buildcom parâmetros específicos para criar uma nova versão personalizada da imagem do Airflow.
Atualizando o Compose
- O nome da imagem também precisa ser atualizado no arquivo
docker-compose.yml.
Escolhendo Entre Métodos de Instalação
Comparação entre Métodos
- O método mais simples e rápido geralmente envolve estender a imagem existente; recomendado em 99% dos casos.
Considerações Finais sobre Customização
- Se for necessária uma personalização significativa ou otimização no tamanho da imagem, considere construir uma nova partir do código fonte.
Uso de Operadores Especiais no Airflow
Introdução aos Operadores Especiais
Configuração do MinIO e Integração com o Airflow
Introdução ao MinIO
- O MinIO é compatível com a API do AWS S3, facilitando a construção de ETL.
- É possível configurar um serviço MinIO em um contêiner Docker, acessando a documentação oficial para orientações.
Configuração do Contêiner Docker
- Um comando Docker é utilizado para iniciar uma imagem do MinIO, expondo as portas 9000 (API) e 9001 (console).
- Após o login no console, é necessário criar um bucket S3 chamado "airflow" e garantir permissões de leitura e escrita.
Upload de Arquivos
- Um arquivo CSV chamado "data.csv" é gerado no VS Code e enviado para o bucket "airflow".
- A confirmação da existência do arquivo no bucket indica que a configuração está correta.
Criação do DAG no Airflow
- Um novo arquivo Python chamado "dag_menu_s3.py" é criado para conectar-se ao bucket S3.
- O operador S3 Sensor é utilizado para verificar a existência de arquivos no bucket configurado.
Verificação da Versão do Pacote
- A versão instalada do pacote Amazon Airflow Provider deve ser verificada através de comandos Docker.
- A documentação oficial do Apache Airflow fornece informações sobre como utilizar o sensor S3.
Configuração da Conexão AWS
- Parâmetros como nome do bucket, chave do arquivo e ID da conexão AWS são necessários para configurar o sensor.
- A conexão deve incluir credenciais como AWS Access Key ID e Secret Access Key, além das configurações específicas para o MinIO.
Execução e Solução de Problemas
- Após salvar as configurações, inicia-se o DAG recém-criado; se falhar, os logs devem ser consultados.
- Erros comuns incluem formatação incorreta dos campos na conexão; ajustes devem ser feitos utilizando aspas duplas.
Monitoramento da Execução
Execução de Tarefas com Airflow e PostgreSQL
Monitoramento de Arquivos CSV no Airflow
- A tarefa do sensor é reaberta para monitorar o log, que verifica a presença do arquivo
data.csva cada 5 segundos. O processo falha após 30 segundos se o arquivo não for encontrado.
- Após aguardar alguns segundos, o arquivo
data.csvé carregado no bucket do Airflow. O sensor detecta sua existência imediatamente após o upload, marcando a execução da tarefa como bem-sucedida.
Importação de Dados para PostgreSQL
- Um arquivo CSV chamado
orders, contendo pedidos de vendas artificiais entre março e junho de 2022, precisa ser importado para um banco de dados PostgreSQL.
- É criado uma tabela chamada
ordersno banco de dadostest, definindo corretamente os tipos das quatro colunas e estabelecendoorder_idcomo chave primária.
Importação e Verificação dos Dados
- Os dados são importados da tabela usando a opção "importar dados" selecionando o arquivo CSV correspondente. A mensagem de sucesso confirma que a importação foi realizada corretamente.
- Para verificar a configuração da tabela, as primeiras 100 linhas são consultadas, confirmando que os dados foram configurados adequadamente.
Criação do DAG para Consultas
- Um novo arquivo Python chamado
dac_postgreshook.pyé criado para implementar um DAG (Directed Acyclic Graph), onde será feita uma consulta aos dados da tabelaorders.
- O operador Python é utilizado para executar funções específicas dentro do DAG. É necessário importar pacotes relevantes e inicializar o hook PostgreSQL.
Conexão com Banco de Dados PostgreSQL
- A versão instalada do pacote PostgreSQL é verificada através do comando Docker. Isso garante que as dependências estão corretas antes da implementação.
- O hook é inicializado utilizando um ID de conexão específico, permitindo interações diretas com o banco de dados PostgreSQL.
Execução da Consulta SQL
- Uma consulta SQL é preparada para obter todos os pedidos antes de 1º de maio de 2022. Erros na sintaxe são corrigidos ao ajustar nomes das tabelas e formatação das datas.
- Os resultados da consulta são salvos em um arquivo texto (
getorders.txt) utilizando o módulo CSV, garantindo que os dados sejam armazenados corretamente.
Resolução de Erros e Ajustes Finais
- Durante a execução inicial, erros relacionados ao ID da tarefa são identificados e corrigidos. A execução subsequente mostra sucesso na operação.
- O conteúdo gerado pelo segundo run sobrescreve arquivos anteriores; ajustes na consulta garantem que apenas os pedidos durante intervalos específicos sejam salvos sem sobreposição.
Melhoria na Geração dos Nomes dos Arquivos
Como Obter Datas de Execução em Python com Airflow
Configuração Inicial do Airflow
- O vídeo inicia com a explicação sobre como obter as datas de execução atual e próxima para uma função Python no Airflow, utilizando a documentação da versão 2.0.1.
- É necessário atualizar a consulta SQL para incluir condições de data que sejam maiores ou iguais à data de execução atual e menores que a próxima data de execução.
Atualização do Nome do Arquivo
- O nome do arquivo gerado deve ser dinâmico, incorporando a data de execução como sufixo, e o log também deve refletir essa mudança.
- Após salvar as alterações e atualizar o DAG, é importante verificar se os arquivos foram criados corretamente na pasta designada.
Carregamento dos Arquivos no S3
- A segunda etapa envolve o upload do arquivo texto ordenado para um bucket S3, utilizando a biblioteca S3 Hook.
- Verifica-se qual versão do pacote Amazon Providers está instalada , antes de consultar a documentação para interagir com o AWS S3.
Implementação da Função de Upload
- Um objeto S3 Hook é criado usando o ID da conexão configurada no servidor web do Airflow.
- A função
load_fileé utilizada para carregar um arquivo local no S3, especificando o nome do arquivo, bucket e chave.
Execução e Verificação dos Resultados
- O arquivo gerado é nomeado conforme as ordens executadas, garantindo que os dados sejam salvos corretamente.
- Após executar novamente o DAG, são verificados os logs das execuções bem-sucedidas e confirmada a presença dos arquivos texto no bucket S3.
Limpeza da Estrutura de Pastas
- Para evitar acumulação desnecessária de arquivos na pasta local do projeto, discute-se o uso da biblioteca
tempfileem Python.
- A criação de arquivos temporários permite manter o espaço de trabalho limpo ao utilizar um diretório temporário do sistema.
Finalização e Conclusão
- O código finaliza com instruções sobre como garantir que os arquivos temporários sejam excluídos após seu uso.
- Ao revisar os logs das execuções bem-sucedidas, confirma-se que todos os dados foram processados corretamente sem deixar resíduos na pasta local.