Source code for spyder.core.frontier

#
# Copyright (c) 2011 Daniel Truemper truemped@googlemail.com
#
# frontier.py 26-Jan-2011
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""
Generic Frontier implementation.

The :class:`SingleHostFrontier` will only select URIs from the queues by
iterating over all available queues and added into a priority queue.

The priority is calculated based on the timestamp it should be crawled next.

In contrast to the :mod:`spyder.core.sqlitequeues` module, URIs in this module
are represented as :class:`spyder.thrift.gen.ttypes.CrawlUri`.
"""

import time
from datetime import datetime
from datetime import timedelta

from Queue import PriorityQueue, Empty, Full
from urlparse import urlparse

from spyder.core.constants import CURI_SITE_USERNAME, CURI_SITE_PASSWORD
from spyder.core.constants import CURI_EXTRACTED_URLS
from spyder.core.dnscache import DnsCache
from spyder.time import serialize_date_time, deserialize_date_time
from spyder.core.log import LoggingMixin
from spyder.core.prioritizer import SimpleTimestampPrioritizer
from spyder.core.sqlitequeues import SQLiteSingleHostUriQueue
from spyder.core.sqlitequeues import SQLiteMultipleHostUriQueue
from spyder.core.uri_uniq import UniqueUriFilter
from spyder.thrift.gen.ttypes import CrawlUri
from spyder.import_util import import_class


# some default port numbers as of /etc/services
PROTOCOLS_DEFAULT_PORT = {
    "http": 80,
    "https": 443,
    "ftp": 21,
    "ftps": 990,
    "sftp": 115,
}


[docs]class AbstractBaseFrontier(object, LoggingMixin): """ A base class for implementing frontiers. Basically this class provides the different general methods and configuration parameters used for frontiers. """ def __init__(self, settings, log_handler, front_end_queues, prioritizer, unique_hash='sha1'): """ Initialize the frontier and instantiate the :class:`SQLiteSingleHostUriQueue`. The default frontier we will use the `sha1` hash function for the unique uri filter. For very large crawls you might want to use a larger hash function (`sha512`, e.g.) """ LoggingMixin.__init__(self, log_handler, settings.LOG_LEVEL_MASTER) # front end queue self._prioritizer = prioritizer self._front_end_queues = front_end_queues # checkpointing self._checkpoint_interval = settings.FRONTIER_CHECKPOINTING self._uris_added = 0 # the heap self._heap = PriorityQueue(maxsize=settings.FRONTIER_HEAP_SIZE) self._heap_min_size = settings.FRONTIER_HEAP_MIN # a list of uris currently being crawled. self._current_uris = dict() # dns cache self._dns_cache = DnsCache(settings) # unique uri filter self._unique_uri = UniqueUriFilter(unique_hash) for url in self._front_end_queues.all_uris(): assert not self._unique_uri.is_known(url, add_if_unknown=True) # the sinks self._sinks = [] # timezone self._timezone = settings.LOCAL_TIMEZONE self._logger.info("frontier::initialized")
[docs] def add_sink(self, sink): """ Add a sink to the frontier. A sink will be responsible for the long term storage of the crawled contents. """ self._sinks.append(sink)
[docs] def add_uri(self, curi): """ Add the specified :class:`CrawlUri` to the frontier. `next_date` is a datetime object for the next time the uri should be crawled. Note: time based crawling is never strict, it is generally used as some kind of prioritization. """ if self._unique_uri.is_known(curi.url, add_if_unknown=True): # we already know this uri self._logger.debug("frontier::Trying to update a known uri... " + \ "(%s)" % (curi.url,)) return self._logger.info("frontier::Adding '%s' to the frontier" % curi.url) self._front_end_queues.add_uri(self._uri_from_curi(curi)) self._maybe_checkpoint()
[docs] def update_uri(self, curi): """ Update a given uri. """ self._front_end_queues.update_uri(self._uri_from_curi(curi)) self._maybe_checkpoint()
[docs] def get_next(self): """ Return the next uri scheduled for crawling. """ if self._heap.qsize() < self._heap_min_size: self._update_heap() try: (_next_date, next_uri) = self._heap.get_nowait() except Empty: # heap is empty, there is nothing to crawl right now! # maybe log this in the future raise return self._crawluri_from_uri(next_uri)
[docs] def close(self): """ Close the underlying frontend queues. """ self._front_end_queues.checkpoint() self._front_end_queues.close()
def _add_to_heap(self, uri, next_date): """ Add an URI to the heap that is ready to be crawled. """ self._heap.put_nowait((next_date, uri)) (url, _etag, _mod_date, _next_date, _prio) = uri self._current_uris[url] = uri self._logger.debug("frontier::Adding '%s' to the heap" % url) def _reschedule_uri(self, curi): """ Return the `next_crawl_date` for :class:`CrawlUri`s. """ (prio, delta) = self._prioritizer.calculate_priority(curi) now = datetime.now(self._timezone) return (prio, time.mktime((now + delta).timetuple())) def _ignore_uri(self, curi): """ Ignore a :class:`CrawlUri` from now on. """ self._front_end_queues.ignore_uri(curi.url, curi.status_code) def _uri_from_curi(self, curi): """ Create the uri tuple from the :class:`CrawlUri` and calculate the priority. Overwrite this method in more specific frontiers. """ etag = mod_date = None if curi.rep_header: if "Etag" in curi.rep_header: etag = curi.rep_header["Etag"] if "Last-Modified" in curi.rep_header: mod_date = time.mktime(deserialize_date_time( curi.rep_header["Last-Modified"]).timetuple()) if not mod_date and 'Date' in curi.rep_header: mod_date = time.mktime(deserialize_date_time( curi.rep_header["Date"]).timetuple()) if mod_date: # only reschedule if it has been crawled before (prio, next_crawl_date) = self._reschedule_uri(curi) else: (prio, next_crawl_date) = (1, time.mktime(datetime.now(self._timezone).timetuple())) return (curi.url, etag, mod_date, next_crawl_date, prio) def _crawluri_from_uri(self, uri): """ Convert an URI tuple to a :class:`CrawlUri`. Replace the hostname with the real IP in order to cache DNS queries. """ (url, etag, mod_date, _next_date, prio) = uri parsed_url = urlparse(url) # dns resolution and caching port = parsed_url.port if not port: port = PROTOCOLS_DEFAULT_PORT[parsed_url.scheme] effective_netloc = self._dns_cache["%s:%s" % (parsed_url.hostname, port)] curi = CrawlUri(url) curi.effective_url = url.replace(parsed_url.netloc, "%s:%s" % effective_netloc) curi.current_priority = prio curi.req_header = dict() if etag: curi.req_header["Etag"] = etag if mod_date: mod_date_time = datetime.fromtimestamp(mod_date) curi.req_header["Last-Modified"] = serialize_date_time( mod_date_time) curi.optional_vars = dict() if parsed_url.username and parsed_url.password: curi.optional_vars[CURI_SITE_USERNAME] = \ parsed_url.username.encode() curi.optional_vars[CURI_SITE_PASSWORD] = \ parsed_url.password.encode() return curi def _update_heap(self): """ Abstract method. Implement this in the actual Frontier. The implementation should really only add uris to the heap if they can be downloaded right away. """ pass def _maybe_checkpoint(self, force_checkpoint=False): """ Periodically checkpoint the state db. """ self._uris_added += 1 if self._uris_added > self._checkpoint_interval or force_checkpoint: self._front_end_queues.checkpoint() self._uris_added = 0
[docs] def process_successful_crawl(self, curi): """ Called when an URI has been crawled successfully. `curi` is a :class:`CrawlUri` """ self.update_uri(curi) if curi.optional_vars and CURI_EXTRACTED_URLS in curi.optional_vars: for url in curi.optional_vars[CURI_EXTRACTED_URLS].split("\n"): if len(url) > 5 and not self._unique_uri.is_known(url): self.add_uri(CrawlUri(url)) del self._current_uris[curi.url] for sink in self._sinks: sink.process_successful_crawl(curi)
[docs] def process_not_found(self, curi): """ Called when an URL was not found. This could mean, that the URL has been removed from the server. If so, do something about it! Override this method in the actual frontier implementation. """ del self._current_uris[curi.url] self._ignore_uri(curi) for sink in self._sinks: sink.process_not_found(curi)
[docs] def process_redirect(self, curi): """ Called when there were too many redirects for an URL, or the site has note been updated since the last visit. In the latter case, update the internal uri and increase the priority level. """ del self._current_uris[curi.url] if curi.status_code in [301, 302]: # simply ignore the URL. The URL that is being redirected to is # extracted and added in the processing self._ignore_uri(curi) if curi.status_code == 304: # the page has not been modified since the last visit! Update it # NOTE: prio increasing happens in the prioritizer self.update_uri(curi) for sink in self._sinks: sink.process_redirect(curi)
[docs] def process_server_error(self, curi): """ Called when there was some kind of server error. Override this method in the actual frontier implementation. """ del self._current_uris[curi.url] self._ignore_uri(curi) for sink in self._sinks: sink.process_server_error(curi)
[docs]class SingleHostFrontier(AbstractBaseFrontier): """ A frontier for crawling a single host. """ def __init__(self, settings, log_handler): """ Initialize the base frontier. """ prio_clazz = import_class(settings.PRIORITIZER_CLASS) AbstractBaseFrontier.__init__(self, settings, log_handler, SQLiteSingleHostUriQueue(settings.FRONTIER_STATE_FILE), prio_clazz(settings)) self._crawl_delay = settings.FRONTIER_CRAWL_DELAY_FACTOR self._min_delay = settings.FRONTIER_MIN_DELAY self._next_possible_crawl = time.time()
[docs] def get_next(self): """ Get the next URI. Only return the next URI if we have waited enough. """ if self._heap.qsize() < self._heap_min_size: self._update_heap() if time.time() >= self._next_possible_crawl: (next_date, next_uri) = self._heap.get_nowait() now = datetime.now(self._timezone) localized_next_date = self._timezone.fromutc( datetime.utcfromtimestamp(next_date)) if now < localized_next_date: # reschedule the uri for crawling self._heap.put_nowait((next_date, next_uri)) raise Empty() self._next_possible_crawl = time.time() + self._min_delay return self._crawluri_from_uri(next_uri) raise Empty()
def _update_heap(self): """ Update the heap with URIs we should crawl. Note: it is possible that the heap is not full after it was updated! """ self._logger.debug("frontier::Updating heap") for uri in self._front_end_queues.queue_head(n=50): (url, _etag, _mod_date, next_date, _prio) = uri if url not in self._current_uris: try: self._add_to_heap(uri, next_date) except Full: # heap is full, return to the caller self._logger.error("singlehostfrontier::Heap is full " + \ "during update") return
[docs] def process_successful_crawl(self, curi): """ Add the timebased politeness to this frontier. """ AbstractBaseFrontier.process_successful_crawl(self, curi) now = time.time() self._next_possible_crawl = now + max(self._crawl_delay * curi.req_time, self._min_delay) self._logger.debug("singlehostfrontier::Next possible crawl: %s" % (self._next_possible_crawl,))
[docs]class MultipleHostFrontier(AbstractBaseFrontier): """ A Frontier for crawling many hosts simultaneously. """ def __init__(self, settings, log_handler): """ Initialize the abstract base frontier and this implementation with the different configuration parameters. """ prio_clazz = import_class(settings.PRIORITIZER_CLASS) AbstractBaseFrontier.__init__(self, settings, log_handler, SQLiteMultipleHostUriQueue(settings.FRONTIER_STATE_FILE), prio_clazz(settings)) self._delay_factor = settings.FRONTIER_CRAWL_DELAY_FACTOR self._min_delay = settings.FRONTIER_MIN_DELAY self._num_active_queues = settings.FRONTIER_ACTIVE_QUEUES self._max_queue_budget = settings.FRONTIER_QUEUE_BUDGET self._budget_punishment = settings.FRONTIER_QUEUE_BUDGET_PUNISH self._queue_ids = [] for (queue, _) in self._front_end_queues.get_all_queues(): self._queue_ids.append(queue) qs_clazz = import_class(settings.QUEUE_SELECTOR_CLASS) self._backend_selector = qs_clazz(len(self._queue_ids)) qa_clazz = import_class(settings.QUEUE_ASSIGNMENT_CLASS) self._backend_assignment = qa_clazz(self._dns_cache) self._current_queues = dict() self._current_queues_in_heap = [] self._time_politeness = dict() self._budget_politeness = dict() def _uri_from_curi(self, curi): """ Override the uri creation in order to assign the queue to it. Otherwise the uri would not end up in the correct queue. """ uri = AbstractBaseFrontier._uri_from_curi(self, curi) (url, etag, mod_date, next_crawl_date, prio) = uri ident = self._backend_assignment.get_identifier(url) queue = self._front_end_queues.add_or_create_queue(ident) if queue not in self._queue_ids: self._queue_ids.append(queue) self._backend_selector.reset_queues(len(self._queue_ids)) return (url, queue, etag, mod_date, next_crawl_date, prio) def _add_to_heap(self, uri, next_date): """ Override the base method since it only accepts the smaller tuples. """ (url, queue, etag, mod_date, next_crawl_date, prio) = uri queue_free_uri = (url, etag, mod_date, next_crawl_date, prio) return AbstractBaseFrontier._add_to_heap(self, queue_free_uri, next_date)
[docs] def get_next(self): """ Get the next URI that is ready to be crawled. """ if self._heap.qsize() < self._heap_min_size: self._update_heap() (_date, uri) = self._heap.get_nowait() return self._crawluri_from_uri(uri)
def _update_heap(self): """ Update the heap from the currently used queues. Respect the time based politeness and the queue's budget. The algorithm is as follows: 1. Remove queues that are out of budget and add new ones 2. Add all URIs to the heap that are crawlable with respect to the time based politeness """ self._maybe_add_queues() self._cleanup_budget_politeness() now = datetime.now(self._timezone) for q in self._time_politeness.keys(): if now >= self._time_politeness[q] and \ q not in self._current_queues_in_heap: # we may crawl from this queue! queue = self._current_queues[q] try: (localized_next_date, next_uri) = queue.get_nowait() except Empty: # this queue is empty! Remove it and check the next queue self._remove_queue_from_memory(q) continue if now < localized_next_date: # reschedule the uri for crawling queue.put_nowait((localized_next_date, next_uri)) else: # add this uri to the heap, i.e. it can be crawled self._add_to_heap(next_uri, localized_next_date) self._current_queues_in_heap.append(q) def _maybe_add_queues(self): """ If there are free queue slots available, add inactive queues from the backend. """ qcount = self._front_end_queues.get_queue_count() acount = len(self._current_queues) while self._num_active_queues > acount and acount < qcount: next_queue = self._get_next_queue() if next_queue: self._add_queue_from_storage(next_queue) self._logger.debug("multifrontier::Adding queue with id=%s" % (next_queue,)) acount = len(self._current_queues) else: break def _cleanup_budget_politeness(self): """ Check if any queue has reached the `self._max_queue_budget` and replace those with queues from the storage. """ removeable = [] for q in self._budget_politeness.keys(): if self._budget_politeness[q] <= 0: removeable.append(q) for rm_queue in removeable: next_queue = self._get_next_queue() if next_queue: self._add_queue_from_storage(next_queue) self._remove_queue_from_memory(rm_queue) self._logger.debug("multifrontier::Removing queue with id=%s" % rm_queue) def _get_next_queue(self): """ Get the next queue candidate. """ for i in range(0, 10): next_id = self._backend_selector.get_queue() q = self._queue_ids[next_id] if q not in self._budget_politeness.keys(): return q return None def _get_queue_for_url(self, url): """ Determine the queue for a given `url`. """ ident = self._backend_assignment.get_identifier(url) return self._front_end_queues.get_queue_for_ident(ident) def _remove_queue_from_memory(self, queue): """ Remove a queue from the internal memory buffers. """ del self._time_politeness[queue] del self._budget_politeness[queue] del self._current_queues[queue] def _add_queue_from_storage(self, next_queue): """ Called when a queue should be crawled from now on. """ self._budget_politeness[next_queue] = self._max_queue_budget self._time_politeness[next_queue] = datetime.now(self._timezone) self._current_queues[next_queue] = \ PriorityQueue(maxsize=self._max_queue_budget) queue = self._current_queues[next_queue] for uri in self._front_end_queues.queue_head(next_queue, n=self._max_queue_budget): (_url, _queue, _etag, _mod_date, next_date, _prio) = uri localized_next_date = self._timezone.fromutc( datetime.utcfromtimestamp(next_date)) queue.put_nowait((localized_next_date, uri)) def _update_politeness(self, curi): """ Update all politeness rules. """ uri = self._uri_from_curi(curi) (url, queue, etag, mod_date, next_crawl_date, prio) = uri if 200 <= curi.status_code < 500: self._budget_politeness[queue] -= 1 if 500 <= curi.status_code < 600: self._budget_politeness[queue] -= self._budget_punishment now = datetime.now(self._timezone) delta_seconds = max(self._delay_factor * curi.req_time, self._min_delay) self._time_politeness[queue] = now + timedelta(seconds=delta_seconds) self._current_queues_in_heap.remove(queue)
[docs] def process_successful_crawl(self, curi): """ Crawling was successful, now update the politeness rules. """ self._update_politeness(curi) AbstractBaseFrontier.process_successful_crawl(self, curi)
[docs] def process_not_found(self, curi): """ The page does not exist anymore! """ self._update_politeness(curi) AbstractBaseFrontier.process_not_found(self, curi)
[docs] def process_redirect(self, curi): """ There was a redirect. """ self._update_politeness(curi) AbstractBaseFrontier.process_server_error(self, curi)
[docs] def process_server_error(self, curi): """ Punish any server errors in the budget for this queue. """ self._update_politeness(curi) AbstractBaseFrontier.process_server_error(self, curi)