airflow-usage

Airflow 的运行流程

对于Airflow的使用来说了解整个Airflow的运行流程是非常重要的,否则就会出现很多无法预料的行为,关于这部分,本少爷也是踩了很多的坑。

DAG,TASK和他们的实例

首先要明白一个概念,无论是DAG,还是TASK都是一个描述一个抽象的逻辑。
真正在某个时间点运行的DAG或TASK才是运行的实例。

在Airflow中,DAG的实例叫做Dag Run, Task的实例叫做Task Instance

Airflow 执行组件

Airflow的调度和执行流程中有两个核心的组件

  • Scheduler:这个是整个Airflow的调度器,Airflow所有DAG的调度过程是由Scheduler轮询来处理的。触发条件达到后,会丢给Executor执行。

  • Executor:现在的Executor有三种:

    1. SequnceExecutor:提供本地执行,并且串行执行一个DAG中的所有Task,基本上只用在初期的Airflow概念验证阶段

    2. LocalExecutor:这个是比较常用的Executor,可以在本地并行执行一个DAG内的所有Task

    3. CeleryExecutor:这个是在大型任务调度场景,或者是表较复杂的任务分离场景中需要用到的Executor。顾名思义,在这个Executor下,Airflow使用了Celery这个强大的Python分布式队列框架去分发任务,然后在这样的环境下,需要在执行任务的机器上启用Airflow Worker来处理队列中的请求。

      在一个Airflow中同时只能一个Executor启动,不能给指定的DAG指定Executor

  • Pool:这个Pool虽然不是Airflow的核心,但也跟整个Airflow的执行流程相关。任何一个Task其实都是指定了Pool这个参数的,即使没有自己指定,其实也是归结到了Default Pool这么个池子中。Pool本身是个抽象的概念,由Slot组成,可以建立任何一个Pool,指定Slot的数量。任何一个使用了这个Pool的Task Instance就需要占用一个Slot,Slot用完了,Task就处于等待状态。

Airflow 执行参数

在整个Airflow的执行流程中,有几个参数,控制了整个调度流程的并行度,但是在文档中却没有好好的写明白。

  1. parallelism:这个参数指定了整个Airflow系统,在任何一刻能同时运行的Task Instance的数量,这个数量跟DAG无关,只跟Executor和Task有关。举个例子:如果parallelism=15, 这时你有两个DAG,A和B,如果A需要同时开跑10个Task,B也要同时开跑10个Task,两个DAG同时触发,那么这时候同时在跑的Task数量只能是15,其余的5个会等之前的Task运行完了触发,这时的状态不会显示在web上。而且在这种情况下,触发的顺序是不确定的。

  2. dag_concurrency:这个参数指定了同一个Dag Run中能同时运行的Task Instance的个数

  3. max_active_runs_per_dag:这个参数指定了同一个Dag能被同时激活的Dag Run的数量

  4. non_pooled_task_slot_count:这个参数指定了默认的Pool能同时运行的Task Instance的数量,如果你的Task没有指定Pool选项,那么这个Task就是属于这个默认的Pool的

Airflow 执行状态

对于Airflow来说,Dag Run和Task Instance都有自己的执行状态,而且这两者的执行状态不关联,也就是说有可能某一个Dag Run是Success的,但是这个Dag Run里的Task Instance确是Failed或者无状态的,反之亦然。

怎么会出现这种情况呢?一般来说,正常的调度行为下,这种情况是不会出现的,但是如果说我们的Dag写错了,Task跑错了呢?

  • 错误的处理方法:直接在Dag Run的菜单中删除这个跑错的Dag Run,然后让调度器重跑,或者Backfill它

    但这时,实际上这个Dag Run跑过的Task Instance的状态还在数据库中,于是实际上根本就没有运行Task就调度器就自动判断跑完了。

    直接删除Task Instance也是一样的情况,调度器会认为这个Dag Run是Success的状态所以就不跑它。但这时可以Backfill

  • 所以正确的做法是使用Clear

    当我们Clear一个Task Instance时,这个Task Instance所属的Dag Run的状态会立即被置为Running,这样调度器就会认为这个Dag Run要继续跑。

    当然,如果我们同时删除了一批Task Instance和它们所属的Dag Run的话,调度器也会正常的重新开始执行,实际上这样的操作方式,在界面上更容易一点。

在清空状态或重跑时,暂停当前Dag的调度是比较靠谱的,否则会出现,清空到一半,当中的某个任务已经开始被调度的情况,所以最好全部清空完毕后,再打开调度器。

One more thing,还有非常重要的一点,如果当前有Task Instance在运行,这时我们如果删除了这个Task Instance的状态或者Clear它的状态,实际在后台运行着的任务并不会停止!所以需要手工Kill这个任务的运行,然后这时Scheduler进程收到了子进程(我们的运行的Task)异常退出的状态,就会把这个Task Instance的任务状态重新写成Failed,然后我们就又要清空一遍,所以在重跑任务前,一定要先停止调度,然后Kill当前正在运行的任务进程,最后清空任务状态

introduce-to-airflow

Airflow 抽象理解

在介绍Airflow之前,我们需要了解任务依赖的概念

任务依赖

通常,在一个运维系统,数据分析系统,或测试系统等大型系统中,我们会有各种各样的依赖需求。比如,

  • 时间依赖:任务需要等待某一个时间点触发

  • 外部系统依赖:任务依赖Mysql中的数据,HDFS中的数据等等,这些不同的外部系统需要调用接口去访问

  • 机器依赖:任务的执行只能在特定的某一台机器的环境中,可能这台机器内存比较大,也可能只有那台机器上有特殊的库文件

  • 任务间依赖:任务A需要在任务B完成后启动,两个任务互相间会产生影响

  • 资源依赖:任务消耗资源非常多,使用同一个资源的任务需要被限制,比如跑个数据转换任务要10个G,机器一共就30个G,最多只能跑两个,我希望类似的任务排个队

  • 权限依赖:某种任务只能由某个权限的用户启动

也许大家会觉得这些是在任务程序中的逻辑需要处理的部分,但是我认为,这些逻辑可以抽象为任务控制逻辑的部分,和实际任务执行逻辑解耦合

如何理解Crontab

现在让我们来看下最常用的依赖管理系统,Crontab

在各种系统中,总有些定时任务需要处理,每当在这个时候,我们第一个想到的总是crontab。

确实,crontab可以很好的处理定时执行任务的需求,但是对于crontab来说,执行任务,只是调用一个程序如此简单,而程序中的各种逻辑都不属于crontab的管辖范围(很好的遵循了KISS)

所以我们可以抽象的认为:

crontab是一种依赖管理系统,而且只管理时间上的依赖。

Airflow的处理依赖的方式

现在重点Airflow来了,看下它是怎么处理我们遇到的依赖问题。

  • Airflow的核心概念,是DAG(有向无环图),DAG由一个或多个TASK组成,而这个DAG正是解决了上文所说的任务间依赖。Task A 执行完成后才能执行 Task B,多个Task之间的依赖关系可以很好的用DAG表示完善

  • Airflow完整的支持crontab表达式,也支持直接使用python的datatime表述时间,还可以用datatime的delta表述时间差。这样可以解决任务的时间依赖问题。

  • Airflow在CeleryExecuter下可以使用不同的用户启动Worker,不同的Worker监听不同的Queue,这样可以解决用户权限依赖问题。Worker也可以启动在多个不同的机器上,解决机器依赖的问题。

  • Airflow可以为任意一个Task指定一个抽象的Pool,每个Pool可以指定一个Slot数。每当一个Task启动时,就占用一个Slot,当Slot数占满时,其余的任务就处于等待状态。这样就解决了资源依赖问题。

  • Airflow中有Hook机制(其实我觉得不应该叫Hook),作用时建立一个与外部数据系统之间的连接,比如Mysql,HDFS,本地文件系统(文件系统也被认为是外部系统)等,通过拓展Hook能够接入任意的外部系统的接口进行连接,这样就解决的外部系统依赖问题。


当然, 这些并不是Airflow的设计目的,Airflow设计时,只是为了很好的处理ETL任务而已,但是其精良的设计,正好可以用来解决任务的各种依赖问题。具体的Airflow使用方式请直接参考官方文档

Hello World

Welcome to Hexo! This is your very first post. Check documentation for more info. If you get any problems when using Hexo, you can find the answer in troubleshooting or you can ask me on GitHub.

Quick Start

Create a new post

1
$ hexo new "My New Post"

More info: Writing

Run server

1
$ hexo server

More info: Server

Generate static files

1
$ hexo generate

More info: Generating

Deploy to remote sites

1
$ hexo deploy

More info: Deployment