
These are just some notes on how Scrapy reacts to a blocked pipeline.
Firstly, Scrapy is (mostly) single threaded, it is just asynchronous and built upon the Twisted framework. See the first page of this popular Twisted intro to understand the difference. With asynchronous code, we have a single thread but the time spent on any given task is alternated. When a task starts blocking, focus is shifted to another non-blocking task, so the code in it's entirety only blocks when every task is blocked.
What does this mean for pipelines? Well let's say you used a standard python lib to write to a database synchronously, like pyodbc
, then if your write to database halts (blocks) for 10s, then EVERYTHING blocks for 10s, since this is a single thread. That means no more Scrapy requests are processed, and the spider stops crawling until the block clears. To illustrate this, consider the following pipeline
import time import json class BlockingWriter(object): """This pipeline is just a slow JSON lines writer that artificially introduces a delay to test what happens """ def __init__(self): self.file = open('tester.jl', 'wb') def process_item(self, item, spider): line = json.dumps(dict(item)) + "\n" spider.log("Start blocking...." ) time.sleep(10) spider.log("Finished blocking...." ) self.file.write(line) return item
If you add this pipeline to your ITEM_PIPELINES
setting in Scrapy, then run your spider, you will see that everytime an item passes through the pipeline the whole of Scrapy just HALTS. No more crawling is done until the block is cleared and the item is flushed out of the pipeline. This is what you are doing if you just use a library like MySQLdb or pyodbc to insert data to your database. Not good.
This is the importance of using Twisted's adbapi whenever you want to write to a db in a Scrapy pipeline. See here for a good starting point (although this should probably also implement the close_spider
method with a self.dbpool.close()
to make sure the connection pool is properly closed. The runInteraction
method will run the function passed to it (doing these blocking database operations) in a genuinely separate thread, not part of the main reactor loop. This means that Scrapy will at the same time be able to continue with the crawl and requests processing.
In a project I was working on, I had used Twisted's adbapi, and I had a really slow remote db that blocked for long period of time. At first Scrapy continued to process requests and crawl, but eventually it stopped, and the spider appeared to just hang with no output. I knew the database was blocking, but I didn't understand why Scrapy had stopped crawling, given that this database blocking operation was in a separate thread. I wanted to investigate this further so decided to artificially simulate the block.
class Delayed_adbapi(object): """ Another test. If we use twisted adbapi, but artificial delays what happens from varying connection pool sizes? """ def __init__(self, dbpool): self.dbpool = dbpool self.icnt = 0 # keep track of items processed so far def close_spider(self, spider): """ Cleanup function, called after crawing has finished to close open objects. Close ConnectionPool. """ spider.log('Closing connection pool...') self.dbpool.close() @classmethod def from_settings(cls, scrapy_settings): dbargs = dict( host=scrapy_settings['DB_HOST'], db=scrapy_settings['DB_NAME'], user=scrapy_settings['DB_USER'], passwd=scrapy_settings['DB_PWD'], charset='utf8', use_unicode=True, ) # cp_max controls number of conns in pool dbpool = adbapi.ConnectionPool('MySQLdb', cp_max=1, **dbargs) return cls(dbpool) def process_item(self, item, spider): d = self.dbpool.runInteraction(self._do_blocking, item, spider) d.addErrback(self._handle_error, item, spider) d.addBoth(lambda _: item) return d def _do_blocking(self, curs, item, spider): """ Simulate some blocking db operations using sleep and a countdown that logs the count. """ spider.log('Begin blocking... ') if self.icnt == 0: # Big delay for first item TICKTOCK = 40 else: # Small delay for next items TICKTOCK = 1 while TICKTOCK > 0: time.sleep(2) spider.logr('TICKTOCK:%i' % TICKTOCK) TICKTOCK += -1 self.icnt += 1 spider.log('Finished blocking... ') def _handle_error(self, failure, item, spider): log.error(failure)
The above pipeline just simulates a blocking database operation called with runInteraction. I've set it so that we pretend the first item has a really big delay (80 seconds) and that subsequent items only have a delay of 2 seconds. I've also initialized the connection pool with cp_max=1
meaning only ever a maximum of 1 connection in the pool. This makes analyzing the output simpler.
If you run the above pipeline with your spider, you'll notice that Scrapy will start crawling and processing requests, and then eventually the first item will be passed to the pipeline. At that point a new thread will have been created for the _do_blocking
function, and Scrapy continues to process more requests and do more crawling. You'll see in the output the crawls being randomly interspersed with "TICKTOCK 40", "TICKTOCK 39", etc, as Scrapy work continues alongside the blocking database function of the pipeline (if we've have set cp_max>1
things would be pretty much similar except more database threads would all being doing their countdowns simultaneously, which gets confusing). So far so good, this is what we expected to happen, and what we want to happen. The slow database writes are not stopping the crawl, but both exist side by side. Great.. However, you'll soon notice that eventually (if you are Scrapy enough items, and the pipeline block is long enough) Scrapy output just stops, and focus turns solely to the countdown whilst the first item is processed: "TICKTOCK: 20, TICKTOCK: 19, TICKTOCK: 18.....TICKTOCK:1" in sequence in the output whilst nothing else happens. Now the next item gets processed "TICKTOCK: 1", and the next "TICKTOCK:1", and so on...All the meanwhile no Scrapy requests/crawls are being processed. But why? After all _do_blocking
was started in a new thread so why should it stop Scrapy from doing its thing?
Well one answer could, of course, be that Scrapy has finished all Requests it has to process, and all there is left to do is process the item pipelines, but no that wasn't what was stopping things for me. As soon as a few items got flushed down the pipelines Scrapy restarted making requests again. So why does Scrapy stop crawling when you have a really slow pipeline like this?
I telneted into the spider during operation, telnet localhost 6023
and ran prefs()
:
prefs() Live References BookingSpider 1 oldest: 46s ago Selector 28 oldest: 41s ago HtmlResponse 28 oldest: 41s ago Request 100 oldest: 45s ago MyItem 28 oldest: 41s ago
Each time the block occurred, I noticed for a given spider there were the same number of Items (and Responses), 28. Also when I ran est()
in telnet it showed
engine.scraper.slot.active_size : 11428021 engine.scraper.slot.itemproc_size : 28 engine.scraper.slot.needs_backout() : True
The answer turns out to be that Scrapy is trying to manage memory.
Scrapy will effectively throttle itself and not take anymore Requests from the schedular to avoid piling up too much stuff in memory (this is controlled by needs_backout
function of scrapy/core/engine.py
which shows Scrapy does not load anymore requests from the scheduler when needs_backout
is True. In turn, needs_backout (see scrapy/core/scrapy.py
) is true when active_size > max_active_size
. Now as for active_size
, it is incremented/decremented with the len(response.body) every time we add/remove a response, so ultimately if no items are processed by our completely blocked pipeline, then no responses are removed, and active_size
soon exceeds max_active_size
, needs_backout
becomes True and no more requests are taken from the schedular. With Scrapy refusing to take on anymore work, all focus shifts to the blocked pipeline threads. This state will continue until those pipeline threads finish processing some items which can be released (along with their associated responses). So it's not the blocking call in the pipeline that is directly stopping Scrapy doing more work, it is just that items/responses are not being cleared by the pipeline, and build up in memory, and eventually Scrapy throttles itself to avoid piling up too much stuff in memory. At that point the only work that can be done is focus on the blocking pipeline, and that is what happens. In the artificial code this manifests as "TICKTOCK.." countdown sprawling down the screen, but in a real blocking database writes, you may see no output at all, just a complete indefinite hang, until the block clears. This can be disconcerting.
You can test this theory out by tweaking max_active_size
in the Scrapy source temporarily. Then telneting into your spider when all output ceases and Scrapy has stopped crawling. You'll see that the bigger you set max_active_size
the longer it will take Scrapy to stop crawling, and the more items/responses there will be in memory at that time.
As a final piece of bonus code, you could also observe an analogous phenomenon in a pipeline that had nothing to do with the database, but similarly offloaded blocking work to another twisted thread
class ThreadBlocker(object): """ Set REACTOR_THREADPOOL_SIZE = 1 in settings.py to only allow one Twisted thread at a time. This kind of simulates only one connection in database pool of the adapi example """ def __init__(self): self.icnt = 0 # count items processed so far def blocker(self, item, delay): log.error('begin blocking for: %s' % delay) if self.icnt == 0: # Big initial delay TICKTOCK = delay else: # Small/no delay for next items TICKTOCK = 1 while TICKTOCK > 0: time.sleep(2) log.error('TICKTOCK:%i' % TICKTOCK) TICKTOCK += -1 self.icnt += 1 return True def finished(result): log.error('success finished: %s' % result) def process_item(self, item, spider): d1 = threads.deferToThread(self.blocker, item, 30) d1.addCallback(self.finished) d1.addBoth(lambda _: item) return d1
Comments