mirror of
https://github.com/scrapy/scrapy.git
synced 2025-02-27 12:24:51 +00:00
s3 image pipeline: use scrapyengine for downloads
--HG-- extra : convert_revision : svn%3Ab85faa78-f9eb-468e-a121-7cced6da292c%40402
This commit is contained in:
parent
c4434590b0
commit
2142d4f7e0
@ -1,25 +1,24 @@
|
|||||||
import time
|
import time
|
||||||
|
import hmac
|
||||||
|
import base64
|
||||||
import hashlib
|
import hashlib
|
||||||
import rfc822
|
import rfc822
|
||||||
from cStringIO import StringIO
|
from cStringIO import StringIO
|
||||||
|
|
||||||
import Image
|
import Image
|
||||||
import boto
|
|
||||||
|
|
||||||
from scrapy import log
|
from scrapy import log
|
||||||
|
from scrapy.http import Request
|
||||||
from scrapy.stats import stats
|
from scrapy.stats import stats
|
||||||
from scrapy.core.exceptions import DropItem, NotConfigured
|
from scrapy.core.exceptions import DropItem, NotConfigured, HttpException
|
||||||
from scrapy.core.exceptions import HttpException
|
|
||||||
from scrapy.contrib.pipeline.media import MediaPipeline
|
from scrapy.contrib.pipeline.media import MediaPipeline
|
||||||
|
from scrapy.contrib.aws import canonical_string
|
||||||
from scrapy.conf import settings
|
from scrapy.conf import settings
|
||||||
|
|
||||||
class NoimagesDrop(DropItem):
|
from .images import BaseImagesPipeline, NoimagesDrop, ImageException
|
||||||
"""Product with no images exception"""
|
|
||||||
|
|
||||||
class ImageException(Exception):
|
|
||||||
"""General image error exception"""
|
|
||||||
|
|
||||||
class S3ImagesPipeline(MediaPipeline):
|
class S3ImagesPipeline(BaseImagesPipeline):
|
||||||
MEDIA_TYPE = 'image'
|
MEDIA_TYPE = 'image'
|
||||||
THUMBS = (
|
THUMBS = (
|
||||||
("50", (50, 50)),
|
("50", (50, 50)),
|
||||||
@ -31,55 +30,32 @@ class S3ImagesPipeline(MediaPipeline):
|
|||||||
if not settings['S3_IMAGES']:
|
if not settings['S3_IMAGES']:
|
||||||
raise NotConfigured
|
raise NotConfigured
|
||||||
|
|
||||||
# days to wait before redownloading images
|
|
||||||
self.image_refresh_days = settings.getint('IMAGES_REFRESH_DAYS', 90)
|
|
||||||
|
|
||||||
self.bucket_name = settings['S3_BUCKET']
|
self.bucket_name = settings['S3_BUCKET']
|
||||||
self.prefix = settings['S3_PREFIX']
|
self.prefix = settings['S3_PREFIX']
|
||||||
access_key = settings['AWS_ACCESS_KEY_ID']
|
self.access_key = settings['AWS_ACCESS_KEY_ID']
|
||||||
secret_key = settings['AWS_SECRET_ACCESS_KEY']
|
self.image_refresh_days = settings.getint('IMAGES_REFRESH_DAYS', 90)
|
||||||
conn = boto.connect_s3(access_key, secret_key)
|
self._hmac = hmac.new(settings['AWS_SECRET_ACCESS_KEY'], digestmod=hashlib.sha1)
|
||||||
self.bucket = conn.get_bucket(self.bucket_name)
|
|
||||||
|
|
||||||
MediaPipeline.__init__(self)
|
MediaPipeline.__init__(self)
|
||||||
|
|
||||||
def media_to_download(self, request, info):
|
def s3request(self, key, method, body=None, headers=None):
|
||||||
key = self.s3_image_key(request.url)
|
url = 'http://%s.s3.amazonaws.com/%s' % (self.bucket_name, key)
|
||||||
if not self.s3_should_download(request.url):
|
req = Request(url, method=method, body=body, headers=headers)
|
||||||
self.inc_stats(info.domain, 'uptodate')
|
|
||||||
referer = request.headers.get('Referer')
|
|
||||||
log.msg('Image (uptodate) type=%s at <%s> referred from <%s>' % \
|
|
||||||
(self.MEDIA_TYPE, request.url, referer), level=log.DEBUG, domain=info.domain)
|
|
||||||
return key
|
|
||||||
|
|
||||||
def media_downloaded(self, response, request, info):
|
if not (headers and 'Date' in headers):
|
||||||
mtype = self.MEDIA_TYPE
|
req.headers['Date'] = time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime())
|
||||||
referer = request.headers.get('Referer')
|
|
||||||
|
|
||||||
if not response or not response.body.to_string():
|
fullkey = '/%s/%s' % (self.bucket_name, key)
|
||||||
msg = 'Image (empty): Empty %s (no content) in %s referred in <%s>: Empty image (no-content)' % (mtype, request, referer)
|
c_string = canonical_string(method, fullkey, req.headers)
|
||||||
log.msg(msg, level=log.WARNING, domain=info.domain)
|
_hmac = self._hmac.copy()
|
||||||
raise ImageException(msg)
|
_hmac.update(c_string)
|
||||||
|
b64_hmac = base64.encodestring(_hmac.digest()).strip()
|
||||||
|
req.headers['Authorization'] = "AWS %s:%s" % (self.access_key, b64_hmac)
|
||||||
|
return req
|
||||||
|
|
||||||
result = self.save_image(response, request, info) # save and thumbs response
|
def image_downloaded(self, response, request, info):
|
||||||
|
|
||||||
status = 'cached' if getattr(response, 'cached', False) else 'downloaded'
|
|
||||||
msg = 'Image (%s): Downloaded %s from %s referred in <%s>' % (status, mtype, request, referer)
|
|
||||||
log.msg(msg, level=log.DEBUG, domain=info.domain)
|
|
||||||
self.inc_stats(info.domain, status)
|
|
||||||
return result
|
|
||||||
|
|
||||||
def media_failed(self, failure, request, info):
|
|
||||||
referer = request.headers.get('Referer')
|
|
||||||
errmsg = str(failure.value) if isinstance(failure.value, HttpException) else str(failure)
|
|
||||||
msg = 'Image (http-error): Error downloading %s from %s referred in <%s>: %s' % (self.MEDIA_TYPE, request, referer, errmsg)
|
|
||||||
log.msg(msg, level=log.WARNING, domain=info.domain)
|
|
||||||
raise ImageException(msg)
|
|
||||||
|
|
||||||
def save_image(self, response, request, info):
|
|
||||||
try:
|
try:
|
||||||
key = self.s3_image_key(request.url)
|
key = self.s3_image_key(request.url)
|
||||||
self.s3_store_image(response, request.url)
|
self.s3_store_image(response, request.url, info)
|
||||||
except ImageException, ex:
|
except ImageException, ex:
|
||||||
log.msg(str(ex), level=log.WARNING, domain=info.domain)
|
log.msg(str(ex), level=log.WARNING, domain=info.domain)
|
||||||
raise ex
|
raise ex
|
||||||
@ -89,10 +65,6 @@ class S3ImagesPipeline(MediaPipeline):
|
|||||||
|
|
||||||
return key # success value sent as input result for item_media_downloaded
|
return key # success value sent as input result for item_media_downloaded
|
||||||
|
|
||||||
def inc_stats(self, domain, status):
|
|
||||||
stats.incpath('%s/image_count' % domain)
|
|
||||||
stats.incpath('%s/image_status_count/%s' % (domain, status))
|
|
||||||
|
|
||||||
def s3_image_key(self, url):
|
def s3_image_key(self, url):
|
||||||
"""Return the relative path on the target filesystem for an image to be
|
"""Return the relative path on the target filesystem for an image to be
|
||||||
downloaded to.
|
downloaded to.
|
||||||
@ -107,48 +79,70 @@ class S3ImagesPipeline(MediaPipeline):
|
|||||||
image_guid = hashlib.sha1(url).hexdigest()
|
image_guid = hashlib.sha1(url).hexdigest()
|
||||||
return '%s/thumbs/%s/%s.jpg' % (self.prefix, thumb_id, image_guid)
|
return '%s/thumbs/%s/%s.jpg' % (self.prefix, thumb_id, image_guid)
|
||||||
|
|
||||||
def s3_should_download(self, url):
|
def media_to_download(self, request, info):
|
||||||
"""Return if the image should be downloaded by checking if it's already in
|
"""Return if the image should be downloaded by checking if it's already in
|
||||||
the S3 storage and not too old"""
|
the S3 storage and not too old"""
|
||||||
key = self.s3_image_key(url)
|
|
||||||
k = self.bucket.get_key(key)
|
def _on200(response):
|
||||||
if k is None:
|
if 'Last-Modified' not in response.headers:
|
||||||
return True
|
return True
|
||||||
modified_tuple = rfc822.parsedate_tz(k.last_modified)
|
|
||||||
|
last_modified = response.headers['Last-Modified'][0]
|
||||||
|
modified_tuple = rfc822.parsedate_tz(last_modified)
|
||||||
modified_stamp = int(rfc822.mktime_tz(modified_tuple))
|
modified_stamp = int(rfc822.mktime_tz(modified_tuple))
|
||||||
age_seconds = time.time() - modified_stamp
|
age_seconds = time.time() - modified_stamp
|
||||||
age_days = age_seconds / 60 / 60 / 24
|
age_days = age_seconds / 60 / 60 / 24
|
||||||
return age_days > self.image_refresh_days
|
return age_days > self.image_refresh_days
|
||||||
|
|
||||||
def s3_store_image(self, response, url):
|
def _non200(_failure):
|
||||||
|
return True
|
||||||
|
|
||||||
|
def _evaluate(should):
|
||||||
|
if not should:
|
||||||
|
self.inc_stats(info.domain, 'uptodate')
|
||||||
|
referer = request.headers.get('Referer')
|
||||||
|
log.msg('Image (uptodate) type=%s at <%s> referred from <%s>' % \
|
||||||
|
(self.MEDIA_TYPE, request.url, referer), level=log.DEBUG, domain=info.domain)
|
||||||
|
return key
|
||||||
|
|
||||||
|
key = self.s3_image_key(request.url)
|
||||||
|
req = self.s3request(key, method='HEAD')
|
||||||
|
dfd = self.download(req, info)
|
||||||
|
dfd.addCallbacks(_on200, _non200)
|
||||||
|
dfd.addCallback(_evaluate)
|
||||||
|
dfd.addErrback(log.err, 'S3ImagesPipeline.media_to_download')
|
||||||
|
return dfd
|
||||||
|
|
||||||
|
def s3_store_image(self, response, url, info):
|
||||||
"""Upload image to S3 storage"""
|
"""Upload image to S3 storage"""
|
||||||
buf = StringIO(response.body.to_string())
|
buf = StringIO(response.body.to_string())
|
||||||
image = Image.open(buf)
|
image = Image.open(buf)
|
||||||
key = self.s3_image_key(url)
|
key = self.s3_image_key(url)
|
||||||
self._s3_put_image(image, key)
|
self._s3_put_image(image, key, info)
|
||||||
self.s3_store_thumbnails(image, url)
|
self.s3_store_thumbnails(image, url, info)
|
||||||
|
|
||||||
def s3_store_thumbnails(self, image, url):
|
def s3_store_thumbnails(self, image, url, info):
|
||||||
"""Upload image thumbnails to S3 storage"""
|
"""Upload image thumbnails to S3 storage"""
|
||||||
for thumb_id, size in self.THUMBS or []:
|
for thumb_id, size in self.THUMBS or []:
|
||||||
thumb = image.copy() if image.mode == 'RGB' else image.convert('RGB')
|
thumb = image.copy() if image.mode == 'RGB' else image.convert('RGB')
|
||||||
thumb.thumbnail(size, Image.ANTIALIAS)
|
thumb.thumbnail(size, Image.ANTIALIAS)
|
||||||
key = self.s3_thumb_key(url, thumb_id)
|
key = self.s3_thumb_key(url, thumb_id)
|
||||||
self._s3_put_image(thumb, key)
|
self._s3_put_image(thumb, key, info)
|
||||||
|
|
||||||
def s3_public_url(self, key):
|
def _s3_put_image(self, image, key, info):
|
||||||
return "http://%s.s3.amazonaws.com/%s" % (self.bucket_name, key)
|
|
||||||
|
|
||||||
def _s3_put_image(self, image, key):
|
|
||||||
buf = StringIO()
|
buf = StringIO()
|
||||||
try:
|
try:
|
||||||
image.save(buf, 'JPEG')
|
image.save(buf, 'JPEG')
|
||||||
except Exception, ex:
|
except Exception, ex:
|
||||||
raise ImageException("Cannot process image. Error: %s" % ex)
|
raise ImageException("Cannot process image. Error: %s" % ex)
|
||||||
|
|
||||||
buf.seek(0)
|
buf.seek(0)
|
||||||
k = self.bucket.new_key(key)
|
|
||||||
k.content_type = 'image/jpeg'
|
headers = {
|
||||||
k.set_contents_from_file(buf, policy='public-read')
|
'Content-Type': 'image/jpeg',
|
||||||
log.msg("Uploaded to S3: %s" % self.s3_public_url(key), level=log.DEBUG)
|
'X-Amz-Acl': 'public-read',
|
||||||
|
}
|
||||||
|
|
||||||
|
req = self.s3request(key, method='PUT', body=buf.read(), headers=headers)
|
||||||
|
return self.download(req, info)
|
||||||
|
|
||||||
|
|
||||||
|
@ -55,3 +55,4 @@ def exc(message, level=ERROR, component=BOT_NAME, domain=None):
|
|||||||
message = message + '\n' + format_exc()
|
message = message + '\n' + format_exc()
|
||||||
msg(message, level, component, domain)
|
msg(message, level, component, domain)
|
||||||
|
|
||||||
|
err = log.err
|
||||||
|
Loading…
x
Reference in New Issue
Block a user