Caching RSS Feeds With feedcache
The past several years have seen a steady increase in the use of RSS and Atom feeds for data sharing. Blogs, podcasts, social networking sites, search engines, and news services are just a few examples of data sources delivered via such feeds. Working with internet services requires care, because inefficiencies in one client implementation may cause performance problems with the service that can be felt by all of the consumers accessing the same server. In this article, I describe the development of the feedcache package, and give examples of how you can use it to optimize the use of data feeds in your application.
I frequently find myself wanting to listen to one or two episodes from
a podcast, but not wanting to subscribe to the entire series. In order
to scratch this itch, I built a web based tool, hosted at http://www.castsampler.com/, to
let me pick and choose individual episodes from a variety of podcast
feeds, then construct a single feed with the results. Now I subscribe
to the single feed with my podcast client, and easily populate it with
new episodes when I encounter any that sound interesting. The feedcache
package was developed as part of this tool to manage accessing and
updating the feeds efficiently, and has been released separately under
the BSD license.
Example Feed Data
The two most common publicly implemented formats for syndicating web data are RSS (in one of a few versions) and Atom. Both formats have a similar structure. Each feed begins with basic information about the data source (title, link, description, etc.). The introductory information is followed by a series of “items”, each of which represents a resource like a blog post, news article, or podcast. Each item, in turn, has a title, description, and other information like when it was written. It may also refer to one or more attachments, or enclosures.
Listing 1 shows a sample RSS 2.0 feed and Listing 2 shows a sample Atom feed. Each sample listing contains one item with a single podcast enclosure. Both formats are XML, and contain essentially the same data. They use slightly different tag names though, and podcast enclosures are handled differently between the two formats, which can make working with different feed formats more work in some environments. Fortunately, Python developers do not need to worry about the differences in the feed formats, thanks to the Universal Feed Parser.
Listing 1
<?xml version="1.0" encoding="utf-8"?>
<rss version="2.0">
<channel>
<title>Sample RSS 2.0 Feed</title>
<link>http://www.example.com/rss.xml</link>
<description>Sample feed using RSS 2.0 format.</description>
<language>en-us</language>
<item>
<title>item title goes here</title>
<link>http://www.example.com/items/1/</link>
<description>description goes here</description>
<author>authoremail@example.com (author goes here)</author>
<pubDate>Sat, 4 Aug 2007 15:00:36 -0000</pubDate>
<guid>http://www.example.com/items/1/</guid>
<enclosure url="http://www.example.com/items/1/enclosure" length="100" type="audio/mpeg">
</enclosure>
</item>
</channel>
</rss>
Listing 2
<?xml version="1.0" encoding="utf-8"?>
<feed xmlns="http://www.w3.org/2005/Atom" xml:lang="en-us">
<title>Sample Atom Feed</title>
<link href="http://www.example.com/" rel="alternate"></link>
<link href="http://www.example.com/atom.xml" rel="self"></link>
<id>http://www.example.com/atom.xml</id>
<updated>2007-08-04T15:00:36Z</updated>
<entry>
<title>title goes here</title>
<link href="http://www.example.com/items/1/" rel="alternate"></link>
<updated>2007-08-04T15:00:36Z</updated>
<author>
<name>author goes here</name>
<email>authoremail@example.com</email>
</author>
<id>http://www.example.com/items/1/</id>
<summary type="html">description goes here</summary>
<link length="100" href="http://www.example.com/items/1/enclosure" type="audio/mpeg" rel="enclosure">
</link>
</entry>
</feed>
Universal Feed Parser
Mark Pilgrim’s Universal Feed Parser is an open source module that manages most aspects of downloading and parsing RSS and Atom feeds. Once the feed has been downloaded and parsed, the parser returns an object with all of the parsed data easily accessible through a single API, regardless of the original feed format.
Listing 3 shows a simple example program for accessing feeds with feedparser. On line 9, a URL
from the command line arguments is passed to feedparser.parse()
to
be downloaded and parsed. The results are returned as a
FeedParserDict. The properties of the FeedParserDict can be accessed
via the dictionary API or using attribute names as illustrated on line
10.
Listing 3
#!/usr/bin/env python
"""Print contents of feeds specified on the command line.
"""
import feedparser
import sys
for url in sys.argv[1:]:
data = feedparser.parse(url)
for entry in data.entries:
print '%s: %s' % (data.feed.title, entry.title)
When the sample program in Listing 3 is run with the URL for the feed of feedcache project releases, it shows the titles for the releases available right now:
$ python Listing3.py http://feeds.feedburner.com/FeedcacheReleases
feedcache Releases: feedcache 0.1
feedcache Releases: feedcache 0.2
feedcache Releases: feedcache 0.3
feedcache Releases: feedcache 0.4
feedcache Releases: feedcache 0.5
Every time the program runs, it fetches the entire feed, whether the contents have changed or not. That inefficiency might not matter a great deal for a client program that is not run frequently, but the inefficiencies add up on the server when many clients access the same feed, especially if they check the feed on a regular basis. This inefficient behavior can become an especially bad problem for the server if the feed contents are produced dynamically, since each client incurs a certain amount of CPU, I/O, and bandwidth load needed to produce the XML representation of the feed. Some sites are understandably strict about how often a client can retrieve feeds, to cut down on heavy bandwidth and CPU consumers. Slashdot, for example, returns a special feed with a warning to any client that accesses their RSS feed too frequently over a short span of time.
A Different Type of Podcast Aggregator
A typical aggregator design would include a monitor to regularly download the feeds and store the fresh information about the feed and its contents in a database. The requirements for CastSampler are a little different, though.
CastSampler remembers the feeds to which a user has subscribed, but unlike other feed aggregators, it only downloads the episode metadata while the user is choosing episodes to add to their download feed. Since the user does not automatically receive every episode of every feed, the aggregator does not need to constantly monitor all of the feeds. Instead, it shows a list of episodes for a selected feed, and lets the user choose which episodes to download. Then it needs to remember those selected episodes later so it can produce the combined feed for the user’s podcast client.
If every item from every feed was stored in the database, most of the data in the database would be for items that were never selected for download. There would need to be a way to remove old data from the database when it expired or was no longer valid, adding to the maintenance work for the site. Instead, CastSampler only uses the database to store information about episodes selected by the user. The rest of the data about the feed is stored outside of the database in a form that makes it easier to discard old data when the feed is updated. This division eliminates a lot of the data management effort behind running the site.
Feedcache Requirements
An important goal for this project was to make CastSampler a polite consumer of feeds, and ensure that it did not overload servers while a user was selecting podcast episodes interactively. By caching the feed data for a short period of time, CastSampler could avoid accessing feeds every time it needed to show the feed data. A persistent cache, written to disk, would let the data be reused even if the application was restarted, such as might happen during development. Using a cache would also improve responsiveness, since reading data from the local disk would be faster than fetching the feed from the remote server. To further reduce server load, feedcache is designed to take advantage of conditional GET features of HTTP, to avoid downloading the full feed whenever possible.
Another goal was to have a small API for the cache. It should take care of everything for the caller, so there would not need to be many functions to interact with it. To retrieve the contents of a feed, the caller should only have to provide the URL for that feed. All other information needed to track the freshness of the data in the cache would be managed internally.
It was also important for the cache to be able to store data in multiple ways, to make it more flexible for other programmers who might want to use it. Although CastSampler was going to store the cache on disk, other applications with more computing resources or tighter performance requirements might prefer to hold the cache in memory. Using disk storage should not be hard coded into the cache management logic.
These requirements led to a design which split the responsibility for managing the cached data between two objects. The Cache object tracks information about a feed so it can download the latest version as efficiently as possible, only when needed. Persistent storage of the data in the cache is handled by a separate back end storage object. Dividing the responsibilities in this way maximizes the flexibility of the Cache, since it can concentrate on tracking whether the feed is up to date without worrying about storage management. It also let Cache users take advantage of multiple storage implementations.
The Cache Class
Once the basic requirements and a skeleton design were worked out, the next step was to start writing tests so the implementation of Cache could begin. Working with a few simple tests would clarify how a Cache user would want to access feeds. The first test was to verify that the Cache would fetch feed data.
import unittest, cache
class CacheTest(unittest.TestCase):
def testFetch(self):
c = cache.Cache({})
parsed_feed = c.fetch('http://feeds.feedburner.com/FeedcacheReleases')
self.failUnless(parsed_feed.entries)
Since the design separated storage and feed management responsibilities, it was natural to pass the storage handler to the Cache when it is initialized. The dictionary API is used for the storage because there are several storage options available that support it. The shelve module in the Python standard library stores data persistently using an object that conforms to the dictionary API, as does the shove library from L.C. Rees. Either library would work well for the final application. For initial testing, using a simple dictionary to hold the data in memory was convenient, since that meant the tests would not need any external resources.
After constructing the Cache, the next step in the
test is to retrieve a feed. I considered using using the
__getitem__()
hook, but since Cache would not
support any of the other dictionary methods, I rejected it in favor of
an explicit method, fetch()
. The caller passes a feed URL to
fetch()
, which returns a FeedParserDict instance. Listing 4 shows
the first version of the Cache class that works for
the test as it is written. No actual caching is being done, yet. The
Cache instance simply uses the
feedparser module to retrieve and parse the feed.
Listing 4
#!/usr/bin/env python
"""The first version of Cache
"""
import unittest
import feedparser
class Cache:
def __init__(self, storage):
self.storage = storage
return
def fetch(self, url):
return feedparser.parse(url)
class CacheTest(unittest.TestCase):
def testFetch(self):
c = Cache({})
parsed_feed = c.fetch('http://feeds.feedburner.com/FeedcacheReleases')
self.failUnless(parsed_feed.entries)
return
if __name__ == '__main__':
unittest.main()
Throttling Downloads
Now that Cache could successfully download feed data,
the first optimization to make was to hold on to the data and track
its age. Then for every call to fetch()
, Cache
could first check to see if fresh data was already available locally
before going out to the server to download the feed again.
Listing 5 shows the version of Cache with a download
throttle, in the form of a timeToLiveSeconds
parameter. Items
already in the cache will be reused until they are older than
timeToLiveSeconds
. The default value for timeToLiveSeconds
means
that any given feed will not be checked more often than every five
minutes.
Listing 5
#!/usr/bin/env python
"""The first version of Cache
"""
import time
import unittest
import feedparser
class Cache:
def __init__(self, storage, timeToLiveSeconds=300):
self.storage = storage
self.time_to_live = timeToLiveSeconds
return
def fetch(self, url):
now = time.time()
cached_time, cached_content = self.storage.get(url, (None, None))
# Does the storage contain a version of the data
# which is older than the time-to-live?
if cached_time is not None:
age = now - cached_time
if age <= self.time_to_live:
return cached_content
parsed_data = feedparser.parse(url)
self.storage[url] = (now, parsed_data)
return parsed_data
class CacheTest(unittest.TestCase):
def testFetch(self):
c = Cache({})
parsed_feed = c.fetch('http://feeds.feedburner.com/FeedcacheReleases')
self.failUnless(parsed_feed.entries)
return
def testReuseContentsWithinTimeToLiveWindow(self):
url = 'http://feeds.feedburner.com/FeedcacheReleases'
c = Cache({ url:(time.time(), 'prepopulated cache')})
cache_contents = c.fetch(url)
self.failUnlessEqual(cache_contents, 'prepopulated cache')
return
if __name__ == '__main__':
unittest.main()
The new implementation of fetch()
stores the current time along with
the feed data when the storage is updated. When fetch()
is called
again with the same URL, the time in the cache is checked against the
current time to determine if the value in the cache is fresh
enough. The test on line 38 verifies this behavior by pre-populating
the Cache‘s storage with data, and checking to see
that the existing cache contents are returned instead of the contents
of the feed.
Conditional HTTP GET
Conditional HTTP GET allows a client to tell a server something about the version of a feed the client already has. The server can decide if the contents of the feed have changed and, if they have not, send a short status code in the HTTP response instead of a complete copy of the feed data. Conditional GET is primarily a way to conserve bandwidth, but if the feed has not changed and the server’s version checking algorithm is efficient then the server may use fewer CPU resources to prepare the response, as well.
When a server implements conditional GET, it uses extra headers with each response to notify the client. There are two headers involved, and the server can use either or both together, in case the client only supports one. Cache supports both headers.
Although timestamps are an imprecise way to detect change, since the
time on different servers in a pool might vary slightly, they are
simple to work with. The Last-Modified
header contains a timestamp
value that indicates when the feed contents last changed. The client
sends the timestamp back to the server in the next request as
If-Modified-Since
. The server then compares the dates to determine
if the feed has been modified since the last request from the client.
A more precise way to determine if the feed has changed is to use an
Entity Tag in the ETag
header. An ETag
is a hashed
representation of the feed state, or of a value the server can use to
quickly determine if the feed has been updated. The data and algorithm
for computing the hash is left up to the server, but it should be less
expensive than returning the feed contents or there won’t be any
performance gains. When the client sees an ETag
header, it can send
the associated value back to the server with the next request in the
If-None-Match
request header. When the server sees If-None-Match
,
it computes the current hash and compares it to the value sent by the
client. If they match, the feed has not changed.
When using either ETag
or modification timestamps, if the server
determines that the feed has not been updated since the previous
request, it returns a response code of 304
, or “Not Modified” and
includes nothing in the body of the response. When it sees the 304
status in the response from the server, the client should reuse the
version of the feed it already has.
Creating a Test Server
In order to write correct tests to exercise conditional GET in
feedcache, more control over the server would be
important. The feedburner URL used in the earlier tests might be down,
or return different data if a feed was updated. It would be necessary
for the server to respond reliably with data the test code knew in
advance, and to be sure it would not stop responding if it was queried
too often by the tests. The tests also control which of the headers
(ETag
or If-Modified-Since
) was used to determine if the feed had
changed, so both methods could be tested independently. The solution
was to write a small test HTTP server that could be managed by the
unit tests and configured as needed. Creating the test server was
easy, using a few standard library modules.
The test server code, along with a base class for unit tests that use
it, can be found in Listing 6. The TestHTTPServer
(line 91) is
derived from BaseHTTPServer.HTTPServer
. The serve_forever()
method
(line 112) has been overridden with an implementation that checks a
flag after each request to see if the server should keep running. The
test harness sets the flag to stop the test server after each
test. The serve_forever()
loop also counts the requests successfully
processed, so the tests can determine how many times the
Cache fetches a feed.
Listing 6
#!/usr/bin/env python
"""Simple HTTP server for testing the feed cache.
"""
import BaseHTTPServer
import email.utils
import logging
import md5
import threading
import time
import unittest
import urllib
def make_etag(data):
"""Given a string containing data to be returned to the client,
compute an ETag value for the data.
"""
_md5 = md5.new()
_md5.update(data)
return _md5.hexdigest()
class TestHTTPHandler(BaseHTTPServer.BaseHTTPRequestHandler):
"HTTP request handler which serves the same feed data every time."
FEED_DATA = """<?xml version="1.0" encoding="utf-8"?>
<feed xmlns="http://www.w3.org/2005/Atom" xml:lang="en-us">
<title>CacheTest test data</title>
<link href="http://localhost/feedcache/" rel="alternate"></link>
<link href="http://localhost/feedcache/atom/" rel="self"></link>
<id>http://localhost/feedcache/</id>
<updated>2006-10-14T11:00:36Z</updated>
<entry>
<title>single test entry</title>
<link href="http://www.example.com/" rel="alternate"></link>
<updated>2006-10-14T11:00:36Z</updated>
<author>
<name>author goes here</name>
<email>authoremail@example.com</email>
</author>
<id>http://www.example.com/</id>
<summary type="html">description goes here</summary>
<link length="100" href="http://www.example.com/enclosure" type="text/html" rel="enclosure">
</link>
</entry>
</feed>"""
# The data does not change, so save the ETag and modified times
# as class attributes.
ETAG = make_etag(FEED_DATA)
MODIFIED_TIME = email.utils.formatdate(usegmt=True)
def do_GET(self):
"Handle GET requests."
if self.path == '/shutdown':
# Shortcut to handle stopping the server
self.server.stop()
self.send_response(200)
else:
incoming_etag = self.headers.get('If-None-Match', None)
incoming_modified = self.headers.get('If-Modified-Since', None)
send_data = True
# Does the client have the same version of the data we have?
if self.server.apply_modified_headers:
if incoming_etag == self.ETAG:
self.send_response(304)
send_data = False
elif incoming_modified == self.MODIFIED_TIME:
self.send_response(304)
send_data = False
# Now optionally send the data, if the client needs it
if send_data:
self.send_response(200)
self.send_header('Content-Type', 'application/atom+xml')
self.send_header('ETag', self.ETAG)
self.send_header('Last-Modified', self.MODIFIED_TIME)
self.end_headers()
self.wfile.write(self.FEED_DATA)
return
class TestHTTPServer(BaseHTTPServer.HTTPServer):
"""HTTP Server which counts the number of requests made
and can stop based on client instructions.
"""
def __init__(self, applyModifiedHeaders=True):
self.apply_modified_headers = applyModifiedHeaders
self.keep_serving = True
self.request_count =
BaseHTTPServer.HTTPServer.__init__(self, ('', 9999), TestHTTPHandler)
return
def getNumRequests(self):
"Return the number of requests which have been made on the server."
return self.request_count
def stop(self):
"Stop serving requests, after the next request."
self.keep_serving = False
return
def serve_forever(self):
"Main loop for server"
while self.keep_serving:
self.handle_request()
self.request_count += 1
return
class HTTPTestBase(unittest.TestCase):
"Base class for tests that use a TestHTTPServer"
TEST_URL = 'http://localhost:9999/'
CACHE_TTL =
def setUp(self):
self.server = self.getServer()
self.server_thread = threading.Thread(target=self.server.serve_forever)
self.server_thread.setDaemon(True) # so the tests don't hang if cleanup fails
self.server_thread.start()
return
def getServer(self):
"Return a web server for the test."
return TestHTTPServer()
def tearDown(self):
# Stop the server thread
ignore = urllib.urlretrieve('http://localhost:9999/shutdown')
time.sleep(1)
self.server.server_close()
self.server_thread.join()
return
class HTTPTest(HTTPTestBase):
def testResponse(self):
# Verify that the server thread responds
# without error.
filename, response = urllib.urlretrieve(self.TEST_URL)
return
if __name__ == '__main__':
unittest.main()
The test server processes incoming HTTP requests with
TestHTTPHandler
(line 24), derived from
BaseHTTPServer.BaseHTTPRequestHandler
. TestHTTPHandler
implements
do_GET()
(line 55) to respond to HTTP GET requests. Feed data for
the tests is hard coded in the FEED_DATA
class attribute (line
27). The URL path /shutdown
is used to tell the server to stop
responding to requests. All other paths are treated as requests for
the feed data. The requests are processed by checking the
If-None-Match
and If-Modified-Since
headers, and responding either
with a 304
status or with the static feed data.
HTTPTestBase
is a convenience base class to be used by other
tests. It manages a TestHTTPServer
instance in a separate thread, so
the tests can all run in a single process. Listing 7 shows what the
existing tests look like, rewritten to use the HTTPTestBase
as a
base class. The only differences are the base class for the tests and
the use of self.TEST_URL
, which points to the local test server
instead of the feedburner URL from Listing 5.
Listing 7
#!/usr/bin/env python
"""The first version of Cache
"""
import time
import unittest
import feedparser
from Listing5 import Cache
from Listing6 import HTTPTestBase
class CacheTest(HTTPTestBase):
def testFetch(self):
c = Cache({})
parsed_feed = c.fetch(self.TEST_URL)
self.failUnless(parsed_feed.entries)
return
def testReuseContentsWithinTimeToLiveWindow(self):
c = Cache({ self.TEST_URL:(time.time(), 'prepopulated cache')})
cache_contents = c.fetch(self.TEST_URL)
self.failUnlessEqual(cache_contents, 'prepopulated cache')
return
if __name__ == '__main__':
unittest.main()
Implementing Conditional HTTP GET
With these testing tools in place, the next step was to enhance the
Cache class to monitor and use the conditional HTTP
GET parameters. Listing 8 shows the final version of
Cache with these features. The fetch()
method has
been enhanced to send the ETag
and modified time from the cached
version of the feed to the server, when they are available.
Listing 8
#!/usr/bin/env python
"""Cache class with conditional HTTP GET support.
"""
import feedparser
import time
import unittest
import UserDict
import Listing6 # For the test base class
class Cache:
def __init__(self, storage, timeToLiveSeconds=300, userAgent='feedcache'):
self.storage = storage
self.time_to_live = timeToLiveSeconds
self.user_agent = userAgent
return
def fetch(self, url):
modified = None
etag = None
now = time.time()
cached_time, cached_content = self.storage.get(url, (None, None))
# Does the storage contain a version of the data
# which is older than the time-to-live?
if cached_time is not None:
if self.time_to_live:
age = now - cached_time
if age <= self.time_to_live:
return cached_content
# The cache is out of date, but we have
# something. Try to use the etag and modified_time
# values from the cached content.
etag = cached_content.get('etag')
modified = cached_content.get('modified')
# We know we need to fetch, so go ahead and do it.
parsed_result = feedparser.parse(url,
agent=self.user_agent,
modified=modified,
etag=etag,
)
status = parsed_result.get('status', None)
if status == 304:
# No new data, based on the etag or modified values.
# We need to update the modified time in the
# storage, though, so we know that what we have
# stored is up to date.
self.storage[url] = (now, cached_content)
# Return the data from the cache, since
# the parsed data will be empty.
parsed_result = cached_content
elif status == 200:
# There is new content, so store it unless there was an error.
error = parsed_result.get('bozo_exception')
if not error:
self.storage[url] = (now, parsed_result)
return parsed_result
class SingleWriteMemoryStorage(UserDict.UserDict):
"""Cache storage which only allows the cache value
for a URL to be updated one time.
"""
def __setitem__(self, url, data):
if url in self.keys():
modified, existing = self[url]
# Allow the modified time to change,
# but not the feed content.
if data[1] != existing:
raise AssertionError('Trying to update cache for %s to %s'
% (url, data))
UserDict.UserDict.__setitem__(self, url, data)
return
class CacheConditionalGETTest(Listing6.HTTPTestBase):
def setUp(self):
Listing6.HTTPTestBase.setUp(self)
self.cache = Cache(storage=SingleWriteMemoryStorage(),
timeToLiveSeconds=, # so we do not reuse the local copy
)
return
def testFetchOnceForEtag(self):
# Fetch data which has a valid ETag value, and verify
# that while we hit the server twice the response
# codes cause us to use the same data.
# First fetch populates the cache
response1 = self.cache.fetch(self.TEST_URL)
self.failUnlessEqual(response1.feed.title, 'CacheTest test data')
# Remove the modified setting from the cache so we know
# the next time we check the etag will be used
# to check for updates. Since we are using an in-memory
# cache, modifying response1 updates the cache storage
# directly.
response1['modified'] = None
# Wait so the cache data times out
time.sleep(1)
# This should result in a 304 status, and no data from
# the server. That means the cache won't try to
# update the storage, so our SingleWriteMemoryStorage
# should not raise and we should have the same
# response object.
response2 = self.cache.fetch(self.TEST_URL)
self.failUnless(response1 is response2)
# Should have hit the server twice
self.failUnlessEqual(self.server.getNumRequests(), 2)
return
def testFetchOnceForModifiedTime(self):
# Fetch data which has a valid Last-Modified value, and verify
# that while we hit the server twice the response
# codes cause us to use the same data.
# First fetch populates the cache
response1 = self.cache.fetch(self.TEST_URL)
self.failUnlessEqual(response1.feed.title, 'CacheTest test data')
# Remove the etag setting from the cache so we know
# the next time we check the modified time will be used
# to check for updates. Since we are using an in-memory
# cache, modifying response1 updates the cache storage
# directly.
response1['etag'] = None
# Wait so the cache data times out
time.sleep(1)
# This should result in a 304 status, and no data from
# the server. That means the cache won't try to
# update the storage, so our SingleWriteMemoryStorage
# should not raise and we should have the same
# response object.
response2 = self.cache.fetch(self.TEST_URL)
self.failUnless(response1 is response2)
# Should have hit the server twice
self.failUnlessEqual(self.server.getNumRequests(), 2)
return
if __name__ == '__main__':
unittest.main()
The FeedParserDict object returned from
feedparser.fetch()
conveniently includes the ETag
and modified
timestamp, if the server sent them. On lines 38-39, once the cached
feed is determined to be out of date, the ETag
and modified values
are retrieved so they can be passed in to feedparser.parse()
on line
42.
Since the updated client sends ETag
and If-Modified-Since
headers,
the server may now respond with a status code indicating that the
cached copy of the data is still valid. It is no longer sufficient to
simply store the response from the server before returning it. The
status code must be checked, as on line 49, and if the status is 304
then the timestamp of the cached copy is updated. If the timestamp was
not updated, then as soon as the cached copy of the feed exceeded the
time-to-live, the Cache would request a new copy of
the feed from the server every time the feed was accessed. Updating
the timestamp ensures that the download throttling remains enforced.
Separate tests for each conditional GET header are implemented in
CacheConditionalGETTest
. To verify that the Cache
handles the 304
status code properly and does not try to update the
contents of the storage on a second fetch, these tests use a special
storage class. The SingleWriteMemoryStorage raises an
AssertionError if the a value is modified after it is
set the first time. An AssertionError
is used, because that is how
unittest.TestCase
signals a test failure, and modifying the contents
of the storage is a failure for these tests.
Each test method of CacheConditionalGETTest
verifies handling for
one of the conditional GET headers at a time. Since the test server
always sets both headers, each test clears one value from the cache
before making the second request. The remaining header value is sent
to the server as part of the second request, and the server responds
with the 304
status code.
Persistent Storage With shelve
All of the examples and tests so far have used in-memory storage options. For CastSampler, though, the cache of feed data needed to be stored on disk. As mentioned earlier, the shelve module in the standard library provides a simple persistent storage mechanism. It also conforms to the dictionary API used by the Cache class.
Using shelve by itself works in a simple single
threaded case but it is not clear from its documentation whether
shelve supports write access from multiple concurrent
threads. To ensure the shelf is not corrupted, a thread lock should be
used. CacheStorageLock
is a simple wrapper around
shelve that uses a lock to prevent more than one
thread from accessing the shelf simultaneously. Listing 9 contains the
code for the CacheStorageLock
and a test that illustrates using it
to combine a Cache and shelve.
Listing 9
#!/usr/bin/env python
from __future__ import with_statement
"""Using Cache with shelve.
"""
import os
import shelve
import tempfile
import threading
import unittest
from Listing6 import HTTPTestBase
from Listing8 import Cache
class CacheStorageLock:
def __init__(self, shelf):
self.lock = threading.Lock()
self.shelf = shelf
return
def __getitem__(self, key):
with self.lock:
return self.shelf[key]
def get(self, key, default=None):
with self.lock:
try:
return self.shelf[key]
except KeyError:
return default
def __setitem__(self, key, value):
with self.lock:
self.shelf[key] = value
class CacheShelveTest(HTTPTestBase):
def setUp(self):
HTTPTestBase.setUp(self)
handle, self.shelve_filename = tempfile.mkstemp('.shelve')
os.close(handle) # we just want the file name, so close the open handle
os.unlink(self.shelve_filename) # remove empty file so shelve is not confused
return
def tearDown(self):
try:
os.unlink(self.shelve_filename)
except AttributeError:
pass
HTTPTestBase.tearDown(self)
return
def test(self):
storage = shelve.open(self.shelve_filename)
locking_storage = CacheStorageLock(storage)
try:
fc = Cache(locking_storage)
# First fetch the data through the cache
parsed_data = fc.fetch(self.TEST_URL)
self.failUnlessEqual(parsed_data.feed.title, 'CacheTest test data')
# Now retrieve the same data directly from the shelf
modified, shelved_data = storage[self.TEST_URL]
# The data should be the same
self.failUnlessEqual(parsed_data, shelved_data)
finally:
storage.close()
return
if __name__ == '__main__':
unittest.main()
The test setUp()
method uses tempfile
to create a temporary
filename for the cache. The temporary file has to be deleted in
setUp()
because if the file exists, but is empty,
shelve cannot determine which database module to use
to open an empty file. The test()
method fetches the data from the
server, then compares the returned data with the data in the shelf to
verify that they are the same.
CacheStorageLock
uses a threading.Lock
instance to control access
to the shelf. It only manages access for the methods known to be used
by Cache. The lock is acquired and released using the
with
statement, which is new for Python 2.6. Since this code was
written with Python 2.5, the module starts with a from __future__
import statement to enable the syntax for with
.
Other Persistence Options
At any one time, shelve only allows one process to
open a shelf file to write to it. In applications with multiple
processes that need to modify the cache, alternative storage options
are desirable. Cache treats its storage object as a
dictionary, so any class that conforms to the dictionary API can be
used for back end storage. The shove
module, by L. C. Rees, uses the
dictionary API and offers support for a variety of back end storage
options. The supported options include relational databases, BSD-style
databases, Amazon’s S3 storage service, and others.
The filesystem store option was particularly interesting for CastSampler. With shove‘s file store, each key is mapped to a filename. The data associated with the key is pickled and stored in the file. By using separate files, it is possible to have separate threads and processes updating the cache simultaneously. Although the shove file implementation does not handle file locking, for my purposes it was unlikely that two threads would try to update the same feed at the same time.
Listing 10 includes a test that illustrates using shove file storage with feedcache. The primary difference in the APIs for shove and shelve is the syntax for specifying the storage destination. Shove uses a URL syntax to indicate which back end should be used. The format for each back end is described in the docstrings.
Listing 10
#!/usr/bin/env python
"""Tests with shove filesystem storage.
"""
import os
import shove
import tempfile
import threading
import unittest
from Listing6 import HTTPTestBase
from Listing8 import Cache
class CacheShoveTest(HTTPTestBase):
def setUp(self):
HTTPTestBase.setUp(self)
self.shove_dirname = tempfile.mkdtemp('shove')
return
def tearDown(self):
try:
os.system('rm -rf %s' % self.storage_dirname)
except AttributeError:
pass
HTTPTestBase.tearDown(self)
return
def test(self):
# First fetch the data through the cache
storage = shove.Shove('file://' + self.shove_dirname)
try:
fc = Cache(storage)
parsed_data = fc.fetch(self.TEST_URL)
self.failUnlessEqual(parsed_data.feed.title, 'CacheTest test data')
finally:
storage.close()
# Now retrieve the same data directly from the shelf
storage = shove.Shove('file://' + self.shove_dirname)
try:
modified, shelved_data = storage[self.TEST_URL]
finally:
storage.close()
# The data should be the same
self.failUnlessEqual(parsed_data, shelved_data)
return
if __name__ == '__main__':
unittest.main()
Using feedcache With Multiple Threads
Up to this point, all of the examples have been running in a single thread driven by the unittest framework. Now that integrating shove and feedcache has been shown to work, it is possible to take a closer look at using multiple threads to fetch feeds, and build a more complex example application. Spreading the work of fetching data into multiple processing threads is more complicated, but yields better performance under most circumstances because while one thread is blocked waiting for data from the network, another thread can take over and process a different URL.
Listing 11 shows a sample application which accepts URLs as arguments on the command line and prints the titles of all of the entries in the feeds. The results may be mixed together, depending on how the processing control switches between active threads. This example program is more like a traditional feed aggregator, since it processes every entry of every feed.
Listing 11
#!/usr/bin/env python
"""Example use of feedcache.Cache combined with threads.
"""
import Queue
import sys
import shove
import threading
from Listing8 import Cache
MAX_THREADS=5
OUTPUT_DIR='/tmp/feedcache_example'
def main(urls=[]):
if not urls:
print 'Specify the URLs to a few RSS or Atom feeds on the command line.'
return
# Add the URLs to a queue
url_queue = Queue.Queue()
for url in urls:
url_queue.put(url)
# Add poison pills to the url queue to cause
# the worker threads to break out of their loops
for i in range(MAX_THREADS):
url_queue.put(None)
# Track the entries in the feeds being fetched
entry_queue = Queue.Queue()
print 'Saving feed data to', OUTPUT_DIR
storage = shove.Shove('file://' + OUTPUT_DIR)
try:
# Start a few worker threads
worker_threads = []
for i in range(MAX_THREADS):
t = threading.Thread(target=fetch_urls,
args=(storage, url_queue, entry_queue,))
worker_threads.append(t)
t.setDaemon(True)
t.start()
# Start a thread to print the results
printer_thread = threading.Thread(target=print_entries, args=(entry_queue,))
printer_thread.setDaemon(True)
printer_thread.start()
# Wait for all of the URLs to be processed
url_queue.join()
# Wait for the worker threads to finish
for t in worker_threads:
t.join()
# Poison the print thread and wait for it to exit
entry_queue.put((None,None))
entry_queue.join()
printer_thread.join()
finally:
storage.close()
return
def fetch_urls(storage, input_queue, output_queue):
"""Thread target for fetching feed data.
"""
c = Cache(storage)
while True:
next_url = input_queue.get()
if next_url is None: # None causes thread to exit
input_queue.task_done()
break
feed_data = c.fetch(next_url)
for entry in feed_data.entries:
output_queue.put( (feed_data.feed, entry) )
input_queue.task_done()
return
def print_entries(input_queue):
"""Thread target for printing the contents of the feeds.
"""
while True:
feed, entry = input_queue.get()
if feed is None: # None causes thread to exist
input_queue.task_done()
break
print '%s: %s' % (feed.title, entry.title)
input_queue.task_done()
return
if __name__ == '__main__':
main(sys.argv[1:])
The design uses queues to pass data between two different types of threads to work on the feeds. Multiple threads use feedcache to fetch feed data. Each of these threads has its own Cache, but they all share a common shove store. A single thread waits for the feed entries to be added to its queue, and then prints each feed title and entry title.
The main()
function sets up two different queues for passing data in
and out of the worker threads. The url_queue
(lines 23-25) contains
the URLs for feeds, taken from the command line arguments. The
entry_queue
(line 33) is used to pass feed content from the threads
that fetch the feeds to the queue that prints the results. A
shove filesystem store (line 36) is used to cache the
feeds. Once all of the worker threads are started (lines 40-51), the
rest of the main program simply waits for each stage of the work to be
completed by the threads.
The last entries added to the url_queue
are None
values, which
trigger the worker thread to exit. When the url_queue
has been
drained (line 54), the worker threads can be cleaned up. After the
worker threads have finished, (None, None)
is added to the
entry_queue
to trigger the printing thread to exit when all of the
entries have been printed.
The fetch_urls()
function (lines 70-85) runs in the worker
threads. It takes one feed URL at a time from the input queue,
retrieves the feed contents from a cache, then adds the feed entries
to the output queue. When the item taken out of the queue is None
instead of a URL string, it is interpreted as a signal that the thread
should break out of its processing loop. Each thread running
fetch_urls()
creates a local Cache instance using a
common storage back end. Sharing the storage ensures that all of the
feed data is written to the same place, while creating a local
Cache instance ensures threads can fetch data in
parallel.
The consumer of the queue of entries is print_entries()
(lines
88-99). It takes one entry at a time from the queue and prints the
feed and entry titles. Only one thread runs print_entries()
, but a
separate thread is used so that output can be produced as soon as
possible, instead of waiting for all of the fetch_urls()
threads to
complete before printing the feed contents.
Running the program produces output similar to the example in Listing 3:
$ python Listing11.py http://feeds.feedburner.com/FeedcacheReleases
Saving feed data to /tmp/feedcache_example
feedcache Releases: feedcache 0.1
feedcache Releases: feedcache 0.2
feedcache Releases: feedcache 0.3
feedcache Releases: feedcache 0.4
feedcache Releases: feedcache 0.5
The difference is that it takes much less time to run the program in Listing 11 when multiple feeds are passed on the command line, and when some of the data has already been cached.
Future Work
The current version of feedcache meets most of the requirements for CastSampler, but there is still room to improve it as a general purpose tool. It would be nice if it offered finer control over the length of time data stays in the cache, for example. And, although shove is a completely separate project, feedcache would be more reliable if shove‘s file storage were used file locking, to prevent corruption when two threads or processes write to the same part of the cache at the same time.
Determining how long to hold the data in a cache can be a tricky
problem. With web content such as RSS and Atom feeds, the web server
may offer hints by including explicit expiration dates or caching
instructions. HTTP headers such as Expires
and Cache-Control
can
include details beyond the Last-Modified
and ETag
values already
being handled by the Cache. If the server uses
additional cache headers, feedparser saves the
associated values in the FeedParserDict. To support
the caching hints, feedcache would need to be
enhanced to understand the rules for the Cache-Control
header, and
to save the expiration time as well as the time-to-live for each feed.
Supporting a separate time-to-live value for each feed would let feedcache use a different refresh throttle for different sites. Data from relatively infrequently updated feeds, such as Slashdot, would stay in the cache longer than data from more frequently updated feeds, such as a Twitter feed. Applications that use feedcache in a more traditional way would be able to adjust the update throttle for each feed separately to balance the freshness of the data in the cache and the load placed on the server.
Conclusions
Original sources of RSS and Atom feeds are being created all the time as new and existing applications expose data for syndication. With the development of mash-up tools such as Yahoo! Pipes and Google’s Mashup Editor, these feeds can be combined, filtered, and expanded in new and interesting ways, creating even more sources of data. I hope this article illustrates how building your own applications to read and manipulate syndication feeds in Python with tools like feedparser and feedcache is easy, even while including features that make your program cooperate with servers to manage load.
I would like to offer a special thanks to Mrs. PyMOTW for her help editing this article.