|
|
 |
|
Title: Rss aggregator with twisted
Submitter: Valentino Volonghi
(other recipes)
Last Updated: 2004/04/12
Version no: 1.1
Category:
Web
|
|
3 vote(s)
|
|
|
|
Description:
This is a fully featured Rss aggregator with parsing included.
It's scalable too very high numbers of feeds and can be used in multi-client environment through web using twisted with a little code on top of Nevow (www.nevow.com), or can easily be integrated inside every app which uses some of the toolkits supported by Twisted.
Source: Text Source
from twisted.internet import reactor, protocol, defer
from twisted.web import client
import feedparser, time, sys
import out
try:
import cStringIO as _StringIO
except ImportError:
import StringIO as _StringIO
rss_feeds = out.rss_feed
DEFERRED_GROUPS = 60
INTER_QUERY_TIME = 300
TIMEOUT = 30
cache = {}
class FeederProtocol(object):
def __init__(self):
self.parsed = 1
self.with_errors = 0
self.error_list = []
def isCached(self, site):
already_got = cache.get(site[0], None)
if already_got:
elapsed_time = time.time() - already_got[0]
if elapsed_time < INTER_QUERY_TIME:
return True
else:
return False
else:
return False
def gotError(self, traceback, extra_args):
print traceback, extra_args
self.with_errors += 1
self.error_list.append(extra_args)
print "="*20
print "Trying to go on..."
def getPageFromMemory(self, data, key=None):
print "Getting from memory..."
return defer.succeed(cache.get(key,key)[1])
def parseFeed(self, feed):
print "parsing..."
try:
feed+''
parsed = feedparser.parse(_StringIO.StringIO(feed))
except TypeError:
parsed = feedparser.parse(_StringIO.StringIO(str(feed)))
print "parsed feed"
return parsed
def memoize(self, feed, addr):
print "Memoizing",addr,"..."
if cache.get(addr, None):
cache[addr] = (time.time(), feed)
else:
cache.setdefault(addr, (time.time(),feed))
return feed
def workOnPage(self, parsed_feed, addr):
print "-"*20
print "finished retrieving"
print "Feed Version:",parsed_feed.get('version','Unknown')
chan = parsed_feed.get('channel', None)
if chan:
print chan.get('title', '')
print "-"*20
return parsed_feed
def stopWorking(self, data=None):
print "Closing connection number %d..."%(self.parsed,)
print "=-"*20
self.parsed += 1
print self.parsed, self.END_VALUE
if self.parsed > self.END_VALUE:
print "Closing all..."
for i in self.error_list:
print i
print len(self.error_list)
reactor.stop()
def getPage(self, data, args):
return client.getPage(args,timeout=TIMEOUT)
def printStatus(self, data=None):
print "Starting feed group..."
def start(self, data=None, std_alone=True):
d = defer.succeed(self.printStatus())
for feed in data:
cached = self.isCached(feed)
if not cached:
d.addCallback(self.getPage, feed[0])
d.addErrback(self.gotError, (feed[0], 'getting'))
d.addCallback(self.parseFeed)
d.addErrback(self.gotError, (feed[0], 'parsing'))
d.addCallback(self.memoize, feed[0])
d.addErrback(self.gotError, (feed[0], 'memoizing'))
else:
d.addCallback(self.getPageFromMemory, feed[0])
d.addErrback(self.gotError, (feed[0], 'getting from memory'))
d.addCallback(self.workOnPage, feed[0])
d.addErrback(self.gotError, (feed[0], 'working on page'))
if std_alone:
d.addCallback(self.stopWorking)
d.addErrback(self.gotError, (feed[0], 'while stopping'))
if not std_alone:
return d
class FeederFactory(protocol.ClientFactory):
protocol = FeederProtocol()
def __init__(self, std_alone=False):
self.feeds = self.getFeeds()
self.std_alone = std_alone
self.protocol.factory = self
self.protocol.END_VALUE = len(self.feeds)
if std_alone:
self.start(self.feeds)
def start(self, addresses):
if len(addresses) > DEFERRED_GROUPS:
url_groups = [[] for x in xrange(DEFERRED_GROUPS)]
for i, addr in enumerate(addresses):
url_groups[i%DEFERRED_GROUPS].append(addr)
else:
url_groups = [[addr] for addr in addresses]
for group in url_groups:
if not self.std_alone:
return self.protocol.start(group, self.std_alone)
else:
self.protocol.start(group, self.std_alone)
def getFeeds(self, where=None):
if not where:
return rss_feeds
else: return None
if __name__=="__main__":
f = FeederFactory(std_alone=True)
reactor.run()
Discussion:
It's made on top of Twisted Matrix with the aid of Universal Feed Parser from Mark Pilgrim.
It has the following features:
- Easily set the number of parallel connections to download the feeds
- Easily set the timeout for each feed request
- Easily set the max feed age (used for the memoizing feature)
From some tests I've been doing, it takes 6 minutes to download and parse over 730 feeds, which is less than half a second for each feed.
I've already submitted it to Straw developers who answered to help them in integrating this engine inside the new version of Straw rss reader.
I've also written a very simple web interface with Nevow (www.nevow.com), which is available in the complete package from http://xoomer.virgilio.it/dialtone/rss-aggregator-web-v0.1.tar.bz2
|
|
Add comment
|
|
Number of comments: 1
Conditional HTTP GETs, Harry Fuecks, 2005/10/25
Technique described here: http://fishbowl.pastiche.org/2002/10/21/http_conditional_get_for_rss_hackers
Script found here http://www.phppatterns.com/docs/develop/twisted_aggregator - essential bit below;
class ConditionalHTTPPageGetter(HTTPPageGetter):
def handleStatus_200(self):
# If we're good, try recording the last-modified header
if self.headers.has_key('last-modified'):
self.factory.lastModified(self.headers['last-modified'])
def handleStatus_304(self):
# Close connection
self.factory.notModified()
self.transport.loseConnection()
class ConditionalHTTPClientFactory(HTTPClientFactory):
protocol = ConditionalHTTPPageGetter
def __init__(self, cacheDir, url, method='GET', postdata=None, headers=None,
agent="Twisted ConditionalPageGetter", timeout=0, cookies=None,
followRedirect=1):
self.cachefile = cacheDir+os.path.sep+self.getHashForUrl(url)
if os.path.exists(self.cachefile):
lastModified = open(self.cachefile).readline().strip()
if headers is not None:
headers['last-modified'] = lastModified
else:
headers = {'last-modified': lastModified }
HTTPClientFactory.__init__(self, url, method=method, postdata=postdata,
headers=headers, agent=agent, timeout=timeout, cookies=cookies,
followRedirect=followRedirect
)
self.waiting = 1
self.deferred = defer.Deferred()
def lastModified(self, modtime):
f = open(self.cachefile,'w')
f.write(modtime[0])
def getHashForUrl(self, url):
hash = md5.new()
hash.update(url)
return hash.hexdigest()
def notModified():
if self.waiting:
self.waiting = 0
def conditionalGetPage(cacheDir, url, contextFactory=None, *args, **kwargs):
scheme, host, port, path = client._parse(url)
factory = ConditionalHTTPClientFactory(cacheDir, url, *args, **kwargs)
if scheme == 'https':
from twisted.internet import ssl
if contextFactory is None:
contextFactory = ssl.ClientContextFactory()
reactor.connectSSL(host, port, factory, contextFactory)
else:
reactor.connectTCP(host, port, factory)
return factory.deferred
Add comment
|
|
|
|
|
 |
|