ASPN ActiveState Programmer Network  
ActiveState, a division of Sophos
/ Home / Perl / PHP / Python / Tcl / XSLT /
/ Safari / My ASPN /
Cookbooks | Documentation | Mailing Lists | Modules | News Feeds | Products | User Groups
Submit Recipe
My Recipes

All Recipes
All Cookbooks


View by Category

Title: Rss aggregator with twisted
Submitter: Valentino Volonghi (other recipes)
Last Updated: 2004/04/12
Version no: 1.1
Category: Web

 

4 stars 3 vote(s)


Description:

This is a fully featured Rss aggregator with parsing included.

It's scalable too very high numbers of feeds and can be used in multi-client environment through web using twisted with a little code on top of Nevow (www.nevow.com), or can easily be integrated inside every app which uses some of the toolkits supported by Twisted.

Source: Text Source

from twisted.internet import reactor, protocol, defer
from twisted.web import client
import feedparser, time, sys

# You can get this file from http://xoomer.virgilio.it/dialtone/out.py
# Since it's only a list of URLs made in this way:
# out = [  ( 'URL', 'EXTRA_INFOS'),
#          ( 'URL', 'EXTRA_INFOS')
#          ...
#        ]
import out

try:
    import cStringIO as _StringIO
except ImportError:
    import StringIO as _StringIO

rss_feeds = out.rss_feed # This is the HUGE feed list  (730 feeds)

DEFERRED_GROUPS = 60  # Number of simultaneous connections
INTER_QUERY_TIME = 300 # Max Age (in seconds) of each feed in the cache
TIMEOUT = 30 # Timeout in seconds for the web request

# This dict structure will be the following:
# { 'URL': (TIMESTAMP, value) }
cache = {}

class FeederProtocol(object):
    def __init__(self):
        self.parsed = 1
        self.with_errors = 0
        self.error_list = []
        
    def isCached(self, site):
        # Try to get the tuple (TIMESTAMP, FEED_STRUCT) from the dict if it has
        # already been downloaded. Otherwise assign None to already_got
        already_got = cache.get(site[0], None)

        # Ok guys, we got it cached, let's see what we will do
        if already_got:
            # Well, it's cached, but will it be recent enough?
            elapsed_time = time.time() - already_got[0]
            
            # Woooohooo it is, elapsed_time is less than INTER_QUERY_TIME so I
            # can get the page from the memory, recent enough
            if elapsed_time < INTER_QUERY_TIME:
                return True
            
            else:    
                # Uhmmm... actually it's a bit old, I'm going to get it from the
                # Net then, then I'll parse it and then I'll try to memoize it
                # again
                return False
            
        else: 
            # Well... We hadn't it cached in, so we need to get it from the Net
            # now, It's useless to check if it's recent enough, it's not there.
            return False

    def gotError(self, traceback, extra_args):
        # An Error as occurred, print traceback infos and go on
        print traceback, extra_args
        self.with_errors += 1
        self.error_list.append(extra_args)
        print "="*20
        print "Trying to go on..."
        
    def getPageFromMemory(self, data, key=None):
        # Getting the second element of the tuple which is the parsed structure
        # of the feed at address key, the first element of the tuple is the
        # timestamp
        print "Getting from memory..."
        return defer.succeed(cache.get(key,key)[1])

    def parseFeed(self, feed):
        # This is self explaining :)
        print "parsing..."
        try:
            feed+''
            parsed = feedparser.parse(_StringIO.StringIO(feed))
        except TypeError:
            parsed = feedparser.parse(_StringIO.StringIO(str(feed)))
        print "parsed feed"
        return parsed
   
    def memoize(self, feed, addr):
        # feed is the raw structure, just as returned from feedparser.parse()
        # while addr is the address from which the feed was got.
        print "Memoizing",addr,"..."
        if cache.get(addr, None):
            cache[addr] = (time.time(), feed)
        else:
            cache.setdefault(addr, (time.time(),feed))
        return feed
    
    def workOnPage(self, parsed_feed, addr):
        # As usual, addr is the feed address and file is the file in
        # which you can eventually save the structure.
        print "-"*20
        print "finished retrieving"
        print "Feed Version:",parsed_feed.get('version','Unknown')
        
        #
        #  Uncomment the following if you want to print the feeds
        #
        chan = parsed_feed.get('channel', None)
        if chan:
            print chan.get('title', '')
            #print chan.get('link', '')
            #print chan.get('tagline', '')
            #print chan.get('description','')
        print "-"*20
        #items = parsed_feed.get('items', None)
        #if items:
        #    for item in items:
        #        print '\tTitle: ', item.get('title','')
        #        print '\tDate: ', item.get('date', '')
        #        print '\tLink: ', item.get('link', '')
        #        print '\tDescription: ', item.get('description', '')
        #        print '\tSummary: ', item.get('summary','')
        #        print "-"*20
        #print "got",addr
        #print "="*40
        return parsed_feed
        
    def stopWorking(self, data=None):
        print "Closing connection number %d..."%(self.parsed,)
        print "=-"*20
        
        # This is here only for testing. When a protocol/interface will be
        # created to communicate with this rss-aggregator server, we won't need
        # to die after we parsed some feeds just one time.
        self.parsed += 1
        print self.parsed,  self.END_VALUE
        if self.parsed > self.END_VALUE:   #
            print "Closing all..."         #
            for i in self.error_list:      #  Just for testing sake
                print i                    #
            print len(self.error_list)     #
            reactor.stop()                 #

    def getPage(self, data, args):
        return client.getPage(args,timeout=TIMEOUT)

    def printStatus(self, data=None):
        print "Starting feed group..."
            
    def start(self, data=None, std_alone=True):
        d = defer.succeed(self.printStatus())
        for feed in data:
        
            # Now we start telling the reactor that it has
            # to get all the feeds one by one...
            cached = self.isCached(feed)
            if not cached: 
                # When the feed is not cached, it's time to
                # go and get it from the web directly
                d.addCallback(self.getPage, feed[0])
                d.addErrback(self.gotError, (feed[0], 'getting'))
                
                # Parse the feed and if there's some errors call self.gotError
                d.addCallback(self.parseFeed)
                d.addErrback(self.gotError, (feed[0], 'parsing'))
                
                # Now memoize it, if there's some error call self.getError
                d.addCallback(self.memoize, feed[0])
                d.addErrback(self.gotError, (feed[0], 'memoizing'))
                
            else: # If it's cached
                d.addCallback(self.getPageFromMemory, feed[0])
                d.addErrback(self.gotError, (feed[0], 'getting from memory'))
            
            # When you get the raw structure you can work on it
            # to format in the best way you can think of.
            # For any error call self.gotError.
            d.addCallback(self.workOnPage, feed[0])
            d.addErrback(self.gotError, (feed[0], 'working on page'))
            
            # And when the for loop is ended we put 
            # stopWorking on the callback for the last 
            # feed gathered
            # This is only for testing purposes
            if std_alone:
                d.addCallback(self.stopWorking)
                d.addErrback(self.gotError, (feed[0], 'while stopping'))
        if not std_alone:
            return d

class FeederFactory(protocol.ClientFactory):
    protocol = FeederProtocol()
    def __init__(self, std_alone=False):
        self.feeds = self.getFeeds()
        self.std_alone = std_alone
        
        self.protocol.factory = self
        self.protocol.END_VALUE = len(self.feeds) # this is just for testing

        if std_alone:
            self.start(self.feeds)

    def start(self, addresses):
        # Divide into groups all the feeds to download
        if len(addresses) > DEFERRED_GROUPS:
            url_groups = [[] for x in xrange(DEFERRED_GROUPS)]
            for i, addr in enumerate(addresses):
                url_groups[i%DEFERRED_GROUPS].append(addr)
        else:
            url_groups = [[addr] for addr in addresses]
            
        for group in url_groups:
            if not self.std_alone:
                return self.protocol.start(group, self.std_alone)
            else:
                self.protocol.start(group, self.std_alone)

    def getFeeds(self, where=None):
        # This is used when you call a COMPLETE refresh of the feeds,
        # or for testing purposes
        #print "getting feeds"
        # This is to get the feeds we want
        if not where: # We don't have a database, then we use the local
                      # variabile rss_feeds
            return rss_feeds
        else: return None
        
if __name__=="__main__":
    f = FeederFactory(std_alone=True)
    reactor.run()

Discussion:

It's made on top of Twisted Matrix with the aid of Universal Feed Parser from Mark Pilgrim.

It has the following features:
- Easily set the number of parallel connections to download the feeds
- Easily set the timeout for each feed request
- Easily set the max feed age (used for the memoizing feature)

From some tests I've been doing, it takes 6 minutes to download and parse over 730 feeds, which is less than half a second for each feed.

I've already submitted it to Straw developers who answered to help them in integrating this engine inside the new version of Straw rss reader.

I've also written a very simple web interface with Nevow (www.nevow.com), which is available in the complete package from http://xoomer.virgilio.it/dialtone/rss-aggregator-web-v0.1.tar.bz2



Add comment

Number of comments: 1

Conditional HTTP GETs, Harry Fuecks, 2005/10/25
Technique described here: http://fishbowl.pastiche.org/2002/10/21/http_conditional_get_for_rss_hackers Script found here http://www.phppatterns.com/docs/develop/twisted_aggregator - essential bit below;

class ConditionalHTTPPageGetter(HTTPPageGetter):
    
    def handleStatus_200(self):
        # If we're good, try recording the last-modified header
        if self.headers.has_key('last-modified'):
            self.factory.lastModified(self.headers['last-modified'])
    
    def handleStatus_304(self):
        # Close connection
        self.factory.notModified()
        self.transport.loseConnection()
 
class ConditionalHTTPClientFactory(HTTPClientFactory):
    
    protocol = ConditionalHTTPPageGetter
    
    def __init__(self, cacheDir, url, method='GET', postdata=None, headers=None,
                 agent="Twisted ConditionalPageGetter", timeout=0, cookies=None,
                 followRedirect=1):
        
        self.cachefile = cacheDir+os.path.sep+self.getHashForUrl(url)
        
        if os.path.exists(self.cachefile):
            
            lastModified = open(self.cachefile).readline().strip()
            
            if headers is not None:
                headers['last-modified'] = lastModified
            else:
                headers = {'last-modified': lastModified }
            
        
        HTTPClientFactory.__init__(self, url, method=method, postdata=postdata,
                headers=headers, agent=agent, timeout=timeout, cookies=cookies,
                followRedirect=followRedirect
                )
        
        self.waiting = 1
        self.deferred = defer.Deferred()
 
        
    def lastModified(self, modtime):
        f = open(self.cachefile,'w')
        f.write(modtime[0])
    
    def getHashForUrl(self, url):
        hash = md5.new()
        hash.update(url)
        return hash.hexdigest()
    
    def notModified():
        if self.waiting:
            self.waiting = 0
 
def conditionalGetPage(cacheDir, url, contextFactory=None, *args, **kwargs):
    scheme, host, port, path = client._parse(url)
    factory = ConditionalHTTPClientFactory(cacheDir, url, *args, **kwargs)
    if scheme == 'https':
        from twisted.internet import ssl
        if contextFactory is None:
            contextFactory = ssl.ClientContextFactory()
        reactor.connectSSL(host, port, factory, contextFactory)
    else:
        reactor.connectTCP(host, port, factory)
    return factory.deferred

Add comment



Highest rated recipes:

1. Implementation of sets ...

2. bag collection class

3. deque collection class

4. Floating Point Simulator

5. HTML colors to/from RGB ...

6. Select the nth smallest ...

7. Function Decorators by ...

8. MS SQL Server log monitor

9. Table objects with ...

10. wx twisted support using ...




Privacy Policy | Email Opt-out | Feedback | Syndication
© 2006 ActiveState Software Inc. All rights reserved.