These are just some notes on how Scrapy reacts to a blocked pipeline.

Firstly, Scrapy is (mostly) single threaded, it is just asynchronous and built upon the Twisted framework. See the first page of this popular Twisted intro to understand the difference. With asynchronous code, we have a single thread but the time spent on any given task is alternated. When a task starts blocking, focus is shifted to another non-blocking task, so the code in it's entirety only blocks when every task is blocked.

What does this mean for pipelines? Well let's say you used a standard python lib to write to a database synchronously, like pyodbc, then if your write to database halts (blocks) for 10s, then EVERYTHING blocks for 10s, since this is a single thread. That means no more Scrapy requests are processed, and the spider stops crawling until the block clears. To illustrate this, consider the following pipeline

 import time
 import json
class BlockingWriter(object):
    """This pipeline is just a slow JSON lines writer
       that artificially introduces a delay to test what happens
    """
    def __init__(self):
        self.file = open('tester.jl', 'wb')

    def process_item(self, item, spider):
        line = json.dumps(dict(item)) + "\n"
        spider.log("Start blocking...." )
        time.sleep(10)
        spider.log("Finished blocking...." )
        self.file.write(line)
        return item

If you add this pipeline to your ITEM_PIPELINES setting in Scrapy, then run your spider, you will see that everytime an item passes through the pipeline the whole of Scrapy just HALTS. No more crawling is done until the block is cleared and the item is flushed out of the pipeline. This is what you are doing if you just use a library like MySQLdb or pyodbc to insert data to your database. Not good.

This is the importance of using Twisted's adbapi whenever you want to write to a db in a Scrapy pipeline. See here for a good starting point (although this should probably also implement the close_spider method with a self.dbpool.close() to make sure the connection pool is properly closed. The runInteraction method will run the function passed to it (doing these blocking database operations) in a genuinely separate thread, not part of the main reactor loop. This means that Scrapy will at the same time be able to continue with the crawl and requests processing.

In a project I was working on, I had used Twisted's adbapi, and I had a really slow remote db that blocked for long period of time. At first Scrapy continued to process requests and crawl, but eventually it stopped, and the spider appeared to just hang with no output. I knew the database was blocking, but I didn't understand why Scrapy had stopped crawling, given that this database blocking operation was in a separate thread. I wanted to investigate this further so decided to artificially simulate the block.

class Delayed_adbapi(object):
    """
    Another test. If we use twisted adbapi, but artificial delays what happens from varying connection pool sizes?
    """
    def __init__(self, dbpool):
        self.dbpool = dbpool
        self.icnt = 0    # keep track of items processed so far

    def close_spider(self, spider):
        """ Cleanup function, called after crawing has finished to close open
            objects.
            Close ConnectionPool. """
        spider.log('Closing connection pool...')
        self.dbpool.close()


    @classmethod
    def from_settings(cls, scrapy_settings):
        dbargs = dict(
            host=scrapy_settings['DB_HOST'],
            db=scrapy_settings['DB_NAME'],
            user=scrapy_settings['DB_USER'],
            passwd=scrapy_settings['DB_PWD'],
            charset='utf8',
            use_unicode=True,
        )
        # cp_max controls number of conns in pool
        dbpool = adbapi.ConnectionPool('MySQLdb', cp_max=1,  **dbargs)
        return cls(dbpool)

    def process_item(self, item, spider):
        d = self.dbpool.runInteraction(self._do_blocking, item, spider)
        d.addErrback(self._handle_error, item, spider)
        d.addBoth(lambda _: item)
        return d

    def _do_blocking(self, curs, item, spider):
        """
        Simulate some blocking db operations using sleep and a countdown
        that logs the count.
        """
        spider.log('Begin blocking... ')
        if self.icnt == 0:
            # Big delay for first item
            TICKTOCK = 40
        else:
            # Small delay for next items
            TICKTOCK = 1
        while TICKTOCK > 0:
            time.sleep(2)
            spider.logr('TICKTOCK:%i' % TICKTOCK)
            TICKTOCK += -1
        self.icnt += 1
       spider.log('Finished blocking... ')

    def _handle_error(self, failure, item, spider):
        log.error(failure)

The above pipeline just simulates a blocking database operation called with runInteraction. I've set it so that we pretend the first item has a really big delay (80 seconds) and that subsequent items only have a delay of 2 seconds. I've also initialized the connection pool with cp_max=1 meaning only ever a maximum of 1 connection in the pool. This makes analyzing the output simpler.

If you run the above pipeline with your spider, you'll notice that Scrapy will start crawling and processing requests, and then eventually the first item will be passed to the pipeline. At that point a new thread will have been created for the _do_blocking function, and Scrapy continues to process more requests and do more crawling. You'll see in the output the crawls being randomly interspersed with "TICKTOCK 40", "TICKTOCK 39", etc, as Scrapy work continues alongside the blocking database function of the pipeline (if we've have set cp_max>1 things would be pretty much similar except more database threads would all being doing their countdowns simultaneously, which gets confusing). So far so good, this is what we expected to happen, and what we want to happen. The slow database writes are not stopping the crawl, but both exist side by side. Great.. However, you'll soon notice that eventually (if you are Scrapy enough items, and the pipeline block is long enough) Scrapy output just stops, and focus turns solely to the countdown whilst the first item is processed: "TICKTOCK: 20, TICKTOCK: 19, TICKTOCK: 18.....TICKTOCK:1" in sequence in the output whilst nothing else happens. Now the next item gets processed "TICKTOCK: 1", and the next "TICKTOCK:1", and so on...All the meanwhile no Scrapy requests/crawls are being processed. But why? After all _do_blocking was started in a new thread so why should it stop Scrapy from doing its thing?

Well one answer could, of course, be that Scrapy has finished all Requests it has to process, and all there is left to do is process the item pipelines, but no that wasn't what was stopping things for me. As soon as a few items got flushed down the pipelines Scrapy restarted making requests again. So why does Scrapy stop crawling when you have a really slow pipeline like this?

I telneted into the spider during operation, telnet localhost 6023 and ran prefs():

prefs()
Live References

BookingSpider                       1   oldest: 46s ago
Selector                           28   oldest: 41s ago
HtmlResponse                       28   oldest: 41s ago
Request                           100   oldest: 45s ago
MyItem                        28   oldest: 41s ago

Each time the block occurred, I noticed for a given spider there were the same number of Items (and Responses), 28. Also when I ran est() in telnet it showed

engine.scraper.slot.active_size                 : 11428021
engine.scraper.slot.itemproc_size               : 28
engine.scraper.slot.needs_backout()             : True

The answer turns out to be that Scrapy is trying to manage memory. Scrapy will effectively throttle itself and not take anymore Requests from the schedular to avoid piling up too much stuff in memory (this is controlled by needs_backout function of scrapy/core/engine.py which shows Scrapy does not load anymore requests from the scheduler when needs_backout is True. In turn, needs_backout (see scrapy/core/scrapy.py) is true when active_size > max_active_size. Now as for active_size, it is incremented/decremented with the len(response.body) every time we add/remove a response, so ultimately if no items are processed by our completely blocked pipeline, then no responses are removed, and active_size soon exceeds max_active_size, needs_backoutbecomes True and no more requests are taken from the schedular. With Scrapy refusing to take on anymore work, all focus shifts to the blocked pipeline threads. This state will continue until those pipeline threads finish processing some items which can be released (along with their associated responses). So it's not the blocking call in the pipeline that is directly stopping Scrapy doing more work, it is just that items/responses are not being cleared by the pipeline, and build up in memory, and eventually Scrapy throttles itself to avoid piling up too much stuff in memory. At that point the only work that can be done is focus on the blocking pipeline, and that is what happens. In the artificial code this manifests as "TICKTOCK.." countdown sprawling down the screen, but in a real blocking database writes, you may see no output at all, just a complete indefinite hang, until the block clears. This can be disconcerting.

You can test this theory out by tweaking max_active_size in the Scrapy source temporarily. Then telneting into your spider when all output ceases and Scrapy has stopped crawling. You'll see that the bigger you set max_active_size the longer it will take Scrapy to stop crawling, and the more items/responses there will be in memory at that time.

As a final piece of bonus code, you could also observe an analogous phenomenon in a pipeline that had nothing to do with the database, but similarly offloaded blocking work to another twisted thread

class ThreadBlocker(object):
    """
    Set REACTOR_THREADPOOL_SIZE = 1 in settings.py
    to only allow one Twisted thread at a time. This kind of simulates
    only one connection in database pool of the adapi example
    """

    def __init__(self):
        self.icnt = 0  # count items processed so far

    def blocker(self, item, delay):
        log.error('begin blocking for: %s' % delay)
        if self.icnt == 0:
            # Big initial delay
            TICKTOCK = delay
        else:
            # Small/no delay for next items
            TICKTOCK = 1
        while TICKTOCK > 0:
            time.sleep(2)
            log.error('TICKTOCK:%i' % TICKTOCK)
            TICKTOCK += -1
        self.icnt += 1
        return True

    def finished(result):
        log.error('success finished: %s' % result)

    def process_item(self, item, spider):
        d1 = threads.deferToThread(self.blocker, item, 30)
        d1.addCallback(self.finished)
        d1.addBoth(lambda _: item)
        return d1
Current rating: 4.3

About Lee

I am a Theoretical Physics PhD graduate now working in the technology sector. I have strong mathematical skills and originally started in heavy-duty scientific computing, but now I work mostly with Python and the Django framework. I am available for hire now, so check out my resume and get in touch.

Comments