Source code for landsat.downloader

# Landsat Util
# License: CC0 1.0 Universal
from os.path import join, exists, getsize

from homura import download as fetch
import requests

from utils import check_create_folder
from mixins import VerbosityMixin
import settings


[docs]class RemoteFileDoesntExist(Exception): """ Exception to be used when the remote file does not exist """ pass
[docs]class IncorrectSceneId(Exception): """ Exception to be used when scene id is incorrect """ pass
[docs]class Downloader(VerbosityMixin): """ The downloader class """ def __init__(self, verbose=False, download_dir=None): self.download_dir = download_dir if download_dir else settings.DOWNLOAD_DIR self.google = settings.GOOGLE_STORAGE self.s3 = settings.S3_LANDSAT # Make sure download directory exist check_create_folder(self.download_dir)
[docs] def download(self, scenes, bands=None): """ Download scenese from Google Storage or Amazon S3 if bands are provided :param scenes: A list of scene IDs :type scenes: List :param bands: A list of bands. Default value is None. :type scenes: List :returns: (List) includes downloaded scenes as key and source as value (aws or google) """ if isinstance(scenes, list): output = {} for scene in scenes: # If bands are provided the image is from 2015 or later use Amazon if (bands and int(scene[12]) > 4): if isinstance(bands, list): # Create a folder to download the specific bands into path = check_create_folder(join(self.download_dir, scene)) try: # Always grab MTL.txt if bands are specified bands_plus = bands bands_plus.append('MTL') for band in bands_plus: self.amazon_s3(scene, band, path) output[scene] = 'aws' except RemoteFileDoesntExist: self.google_storage(scene, self.download_dir) output[scene] = 'google' else: raise Exception('Expected bands list') else: self.google_storage(scene, self.download_dir) output[scene] = 'google' return output else: raise Exception('Expected sceneIDs list')
[docs] def google_storage(self, scene, path): """ Google Storage Downloader. :param scene: The scene id :type scene: String :param path: The directory path to where the image should be stored :type path: String :returns: Boolean """ sat = self.scene_interpreter(scene) filename = scene + '.tar.bz' url = self.google_storage_url(sat) if self.remote_file_exists(url): return self.fetch(url, path, filename) else: raise RemoteFileDoesntExist('%s is not available on Google Storage' % filename)
[docs] def amazon_s3(self, scene, band, path): """ Amazon S3 downloader :param scene: The scene ID. :type scene: String :param band: The band number. :type band: String, Integer :param path: The directory path to where the image should be stored :type path: String :returns: Boolean """ sat = self.scene_interpreter(scene) if band != 'MTL': filename = '%s_B%s.TIF' % (scene, band) else: filename = '%s_%s.txt' % (scene, band) url = self.amazon_s3_url(sat, filename) if self.remote_file_exists(url): return self.fetch(url, path, filename) else: raise RemoteFileDoesntExist('%s is not available on Amazon S3' % filename)
[docs] def fetch(self, url, path, filename): """ Downloads the given url. :param url: The url to be downloaded. :type url: String :param path: The directory path to where the image should be stored :type path: String :param filename: The filename that has to be downloaded :type filename: String :returns: Boolean """ self.output('Downloading: %s' % filename, normal=True, arrow=True) if exists(join(path, filename)): size = getsize(join(path, filename)) if size == self.get_remote_file_size(url): self.output('%s already exists on your system' % filename, normal=True, color='green', indent=1) return False fetch(url, path) self.output('stored at %s' % path, normal=True, color='green', indent=1) return True
[docs] def google_storage_url(self, sat): """ Returns a google storage url the contains the scene provided. :param sat: Expects an object created by scene_interpreter method :type sat: dict :returns: (String) The URL to a google storage file """ filename = sat['scene'] + '.tar.bz' return join(self.google, sat['sat'], sat['path'], sat['row'], filename)
[docs] def amazon_s3_url(self, sat, filename): """ Return an amazon s3 url the contains the scene and band provided. :param sat: Expects an object created by scene_interpreter method :type sat: dict :param filename: The filename that has to be downloaded from Amazon :type filename: String :returns: (String) The URL to a S3 file """ return join(self.s3, sat['sat'], sat['path'], sat['row'], sat['scene'], filename)
[docs] def remote_file_exists(self, url): """ Checks whether the remote file exists. :param url: The url that has to be checked. :type url: String :returns: **True** if remote file exists and **False** if it doesn't exist. """ status = requests.head(url).status_code if status == 200: return True else: return False
[docs] def get_remote_file_size(self, url): """ Gets the filesize of a remote file. :param url: The url that has to be checked. :type url: String :returns: int """ headers = requests.head(url).headers return int(headers['content-length'])
[docs] def scene_interpreter(self, scene): """ Conver sceneID to rows, paths and dates. :param scene: The scene ID. :type scene: String :returns: dict :Example output: >>> anatomy = { 'path': None, 'row': None, 'sat': None, 'scene': scene } """ anatomy = { 'path': None, 'row': None, 'sat': None, 'scene': scene } if isinstance(scene, str) and len(scene) == 21: anatomy['path'] = scene[3:6] anatomy['row'] = scene[6:9] anatomy['sat'] = 'L' + scene[2:3] return anatomy else: raise IncorrectSceneId('Received incorrect scene')
if __name__ == '__main__': d = Downloader() # d.download(['LC81990242015046LGN00', 'LC80030172015001LGN00']) d.download(['LC80030172015001LGN00'], bands=[5, 4])