Source code for ATM1b_QFIT.utilities

#!/usr/bin/env python
u"""
utilities.py
Written by Tyler Sutterley (04/2022)
Download and management utilities for syncing time and auxiliary files

PYTHON DEPENDENCIES:
    lxml: processing XML and HTML in Python
        https://pypi.python.org/pypi/lxml

UPDATE HISTORY:
    Updated 04/2022: updated docstrings to numpy documentation format
    Updated 10/2021: build python logging instance for handling verbose output
    Updated 09/2021: added generic list from Apache http server
    Updated 08/2021: added function to open a file path
    Updated 07/2021: add parser for converting file files to arguments
    Updated 03/2021: added sha1 option for retrieving file hashes
    Updated 01/2021: added username and password to ftp functions
        added ftp connection check
    Updated 12/2020: added file object keyword for downloads if verbose
        add url split function for creating url location lists
    Updated 11/2020: normalize source and destination paths in copy
        make context an optional keyword argument in from_http
    Updated 09/2020: copy from http and https to bytesIO object in chunks
        use netrc credentials if not entered from CDDIS functions
        generalize build opener function for different Earthdata instances
    Updated 08/2020: add GSFC CDDIS opener, login and download functions
    Written 08/2020
"""
from __future__ import print_function, division

import sys
import os
import re
import io
import ssl
import netrc
import ftplib
import shutil
import base64
import socket
import getpass
import hashlib
import inspect
import logging
import builtins
import warnings
import posixpath
import subprocess
import lxml.etree
import calendar, time
if sys.version_info[0] == 2:
    from urllib import quote_plus
    from cookielib import CookieJar
    import urllib2
else:
    from urllib.parse import quote_plus
    from http.cookiejar import CookieJar
    import urllib.request as urllib2

# PURPOSE: get absolute path within a package from a relative path
[docs]def get_data_path(relpath): """ Get the absolute path within a package from a relative path Parameters ---------- relpath: str, relative path """ # current file path filename = inspect.getframeinfo(inspect.currentframe()).filename filepath = os.path.dirname(os.path.abspath(filename)) if isinstance(relpath,list): # use *splat operator to extract from list return os.path.join(filepath,*relpath) elif isinstance(relpath,str): return os.path.join(filepath,relpath)
# PURPOSE: platform independent file opener
[docs]def file_opener(filename): """ Platform independent file opener Parameters ---------- filename: str path to file """ if (sys.platform == "win32"): os.startfile(os.path.expanduser(filename), "explore") elif (sys.platform == "darwin"): subprocess.call(["open", os.path.expanduser(filename)]) else: subprocess.call(["xdg-open", os.path.expanduser(filename)])
# PURPOSE: get the hash value of a file
[docs]def get_hash(local, algorithm='MD5'): """ Get the hash value from a local file or BytesIO object Parameters ---------- local: obj or str BytesIO object or path to file algorithm: str, default 'MD5' hashing algorithm for checksum validation - ``'MD5'``: Message Digest - ``'sha1'``: Secure Hash Algorithm """ # check if open file object or if local file exists if isinstance(local, io.IOBase): if (algorithm == 'MD5'): return hashlib.md5(local.getvalue()).hexdigest() elif (algorithm == 'sha1'): return hashlib.sha1(local.getvalue()).hexdigest() elif os.access(os.path.expanduser(local),os.F_OK): # generate checksum hash for local file # open the local_file in binary read mode with open(os.path.expanduser(local), 'rb') as local_buffer: # generate checksum hash for a given type if (algorithm == 'MD5'): return hashlib.md5(local_buffer.read()).hexdigest() elif (algorithm == 'sha1'): return hashlib.sha1(local_buffer.read()).hexdigest() else: return ''
# PURPOSE: get the git hash value def get_git_revision_hash(refname='HEAD', short=False): """ Get the git hash value for a particular reference Parameters ---------- refname: str, default HEAD Symbolic reference name short: bool, default False Return the shorted hash value """ # get path to .git directory from current file path filename = inspect.getframeinfo(inspect.currentframe()).filename basepath = os.path.dirname(os.path.dirname(os.path.abspath(filename))) gitpath = os.path.join(basepath,'.git') # build command cmd = ['git', f'--git-dir={gitpath}', 'rev-parse'] cmd.append('--short') if short else None cmd.append(refname) # get output with warnings.catch_warnings(): return str(subprocess.check_output(cmd), encoding='utf8').strip() # PURPOSE: get the current git status def get_git_status(): """Get the status of a git repository as a boolean value """ # get path to .git directory from current file path filename = inspect.getframeinfo(inspect.currentframe()).filename basepath = os.path.dirname(os.path.dirname(os.path.abspath(filename))) gitpath = os.path.join(basepath,'.git') # build command cmd = ['git', f'--git-dir={gitpath}', 'status', '--porcelain'] with warnings.catch_warnings(): return bool(subprocess.check_output(cmd)) # PURPOSE: recursively split a url path
[docs]def url_split(s): """ Recursively split a url path into a list Parameters ---------- s: str url string """ head, tail = posixpath.split(s) if head in ('http:','https:','ftp:','s3:'): return s, elif head in ('', posixpath.sep): return tail, return url_split(head) + (tail,)
# PURPOSE: convert file lines to arguments def convert_arg_line_to_args(arg_line): """ Convert file lines to arguments Parameters ---------- arg_line: str line string containing a single argument and/or comments """ # remove commented lines and after argument comments for arg in re.sub(r'\#(.*?)$',r'',arg_line).split(): if not arg.strip(): continue yield arg # PURPOSE: returns the Unix timestamp value for a formatted date string
[docs]def get_unix_time(time_string, format='%Y-%m-%d %H:%M:%S'): """ Get the Unix timestamp value for a formatted date string Parameters ---------- time_string: str formatted time string to parse format: str, default '%Y-%m-%d %H:%M:%S' format for input time string """ try: parsed_time = time.strptime(time_string.rstrip(), format) except (TypeError, ValueError): pass else: return calendar.timegm(parsed_time)
# PURPOSE: rounds a number to an even number less than or equal to original
[docs]def even(value): """ Rounds a number to an even number less than or equal to original Parameters ---------- value: float number to be rounded """ return 2*int(value//2)
# PURPOSE: rounds a number upward to its nearest integer
[docs]def ceil(value): """ Rounds a number upward to its nearest integer Parameters ---------- value: float number to be rounded upward """ return -int(-value//1)
# PURPOSE: make a copy of a file with all system information
[docs]def copy(source, destination, move=False, **kwargs): """ Copy or move a file with all system information Parameters ---------- source: str source file destination: str copied destination file move: bool, default False remove the source file """ source = os.path.abspath(os.path.expanduser(source)) destination = os.path.abspath(os.path.expanduser(destination)) # log source and destination logging.info('{0} -->\n\t{1}'.format(source,destination)) shutil.copyfile(source, destination) shutil.copystat(source, destination) if move: os.remove(source)
# PURPOSE: check ftp connection
[docs]def check_ftp_connection(HOST, username=None, password=None): """ Check internet connection with ftp host Parameters ---------- HOST: str remote ftp host username: str or NoneType ftp username password: str or NoneType ftp password """ # attempt to connect to ftp host try: f = ftplib.FTP(HOST) f.login(username, password) f.voidcmd("NOOP") except IOError: raise RuntimeError('Check internet connection') except ftplib.error_perm: raise RuntimeError('Check login credentials') else: return True
# PURPOSE: list a directory on a ftp host
[docs]def ftp_list(HOST, username=None, password=None, timeout=None, basename=False, pattern=None, sort=False): """ List a directory on a ftp host Parameters ---------- HOST: str or list remote ftp host path split as list username: str or NoneType ftp username password: str or NoneType ftp password timeout: int or NoneType, default None timeout in seconds for blocking operations basename: bool, default False return the file or directory basename instead of the full path pattern: str or NoneType, default None regular expression pattern for reducing list sort: bool, default False sort output list Returns ------- output: list items in a directory mtimes: list last modification times for items in the directory """ # verify inputs for remote ftp host if isinstance(HOST, str): HOST = url_split(HOST) # try to connect to ftp host try: ftp = ftplib.FTP(HOST[0],timeout=timeout) except (socket.gaierror,IOError): raise RuntimeError('Unable to connect to {0}'.format(HOST[0])) else: ftp.login(username,password) # list remote path output = ftp.nlst(posixpath.join(*HOST[1:])) # get last modified date of ftp files and convert into unix time mtimes = [None]*len(output) # iterate over each file in the list and get the modification time for i,f in enumerate(output): try: # try sending modification time command mdtm = ftp.sendcmd('MDTM {0}'.format(f)) except ftplib.error_perm: # directories will return with an error pass else: # convert the modification time into unix time mtimes[i] = get_unix_time(mdtm[4:], format="%Y%m%d%H%M%S") # reduce to basenames if basename: output = [posixpath.basename(i) for i in output] # reduce using regular expression pattern if pattern: i = [i for i,f in enumerate(output) if re.search(pattern,f)] # reduce list of listed items and last modified times output = [output[indice] for indice in i] mtimes = [mtimes[indice] for indice in i] # sort the list if sort: i = [i for i,j in sorted(enumerate(output), key=lambda i: i[1])] # sort list of listed items and last modified times output = [output[indice] for indice in i] mtimes = [mtimes[indice] for indice in i] # close the ftp connection ftp.close() # return the list of items and last modified times return (output, mtimes)
# PURPOSE: download a file from a ftp host
[docs]def from_ftp(HOST, username=None, password=None, timeout=None, local=None, hash='', chunk=8192, verbose=False, fid=sys.stdout, mode=0o775): """ Download a file from a ftp host Parameters ---------- HOST: str or list remote ftp host path username: str or NoneType ftp username password: str or NoneType ftp password timeout: int or NoneType, default None timeout in seconds for blocking operations local: str or NoneType, default None path to local file hash: str, default '' MD5 hash of local file chunk: int, default 8192 chunk size for transfer encoding verbose: bool, default False print file transfer information fid: obj, default sys.stdout open file object to print if verbose mode: oct, default 0o775 permissions mode of output local file Returns ------- remote_buffer: obj BytesIO representation of file """ # create logger loglevel = logging.INFO if verbose else logging.CRITICAL logging.basicConfig(stream=fid, level=loglevel) # verify inputs for remote ftp host if isinstance(HOST, str): HOST = url_split(HOST) # try downloading from ftp try: # try to connect to ftp host ftp = ftplib.FTP(HOST[0], timeout=timeout) except (socket.gaierror,IOError): raise RuntimeError('Unable to connect to {0}'.format(HOST[0])) else: ftp.login(username,password) # remote path ftp_remote_path = posixpath.join(*HOST[1:]) # copy remote file contents to bytesIO object remote_buffer = io.BytesIO() ftp.retrbinary('RETR {0}'.format(ftp_remote_path), remote_buffer.write, blocksize=chunk) remote_buffer.seek(0) # save file basename with bytesIO object remote_buffer.filename = HOST[-1] # generate checksum hash for remote file remote_hash = hashlib.md5(remote_buffer.getvalue()).hexdigest() # get last modified date of remote file and convert into unix time mdtm = ftp.sendcmd('MDTM {0}'.format(ftp_remote_path)) remote_mtime = get_unix_time(mdtm[4:], format="%Y%m%d%H%M%S") # compare checksums if local and (hash != remote_hash): # convert to absolute path local = os.path.abspath(local) # create directory if non-existent if not os.access(os.path.dirname(local), os.F_OK): os.makedirs(os.path.dirname(local), mode) # print file information args = (posixpath.join(*HOST),local) logging.info('{0} -->\n\t{1}'.format(*args)) # store bytes to file using chunked transfer encoding remote_buffer.seek(0) with open(os.path.expanduser(local), 'wb') as f: shutil.copyfileobj(remote_buffer, f, chunk) # change the permissions mode os.chmod(local,mode) # keep remote modification time of file and local access time os.utime(local, (os.stat(local).st_atime, remote_mtime)) # close the ftp connection ftp.close() # return the bytesIO object remote_buffer.seek(0) return remote_buffer
# PURPOSE: check internet connection def check_connection(HOST): """ Check internet connection with http host Parameters ---------- HOST: str remote http host """ # attempt to connect to http host try: urllib2.urlopen(HOST, timeout=20, context=ssl.SSLContext()) except urllib2.URLError: raise RuntimeError('Check internet connection') else: return True # PURPOSE: list a directory on an Apache http Server
[docs]def http_list(HOST, timeout=None, context=ssl.SSLContext(), parser=lxml.etree.HTMLParser(), format='%Y-%m-%d %H:%M', pattern='', sort=False): """ List a directory on an Apache http Server Parameters ---------- HOST: str or list remote http host path timeout: int or NoneType, default None timeout in seconds for blocking operations context: obj, default ssl.SSLContext() SSL context for url opener object parser: obj, default lxml.etree.HTMLParser() HTML parser for lxml format: str, default '%Y-%m-%d %H:%M' format for input time string pattern: str, default '' regular expression pattern for reducing list sort: bool, default False sort output list Returns ------- colnames: list column names in a directory collastmod: list last modification times for items in the directory """ # verify inputs for remote http host if isinstance(HOST, str): HOST = url_split(HOST) # try listing from http try: # Create and submit request. request = urllib2.Request(posixpath.join(*HOST)) response = urllib2.urlopen(request, timeout=timeout, context=context) except (urllib2.HTTPError, urllib2.URLError): raise Exception('List error from {0}'.format(posixpath.join(*HOST))) else: # read and parse request for files (column names and modified times) tree = lxml.etree.parse(response, parser) colnames = tree.xpath('//tr/td[not(@*)]//a/@href') # get the Unix timestamp value for a modification time collastmod = [get_unix_time(i,format=format) for i in tree.xpath('//tr/td[@align="right"][1]/text()')] # reduce using regular expression pattern if pattern: i = [i for i,f in enumerate(colnames) if re.search(pattern, f)] # reduce list of column names and last modified times colnames = [colnames[indice] for indice in i] collastmod = [collastmod[indice] for indice in i] # sort the list if sort: i = [i for i,j in sorted(enumerate(colnames), key=lambda i: i[1])] # sort list of column names and last modified times colnames = [colnames[indice] for indice in i] collastmod = [collastmod[indice] for indice in i] # return the list of column names and last modified times return (colnames, collastmod)
# PURPOSE: download a file from a http host
[docs]def from_http(HOST, timeout=None, context=ssl.SSLContext(), local=None, hash='', chunk=16384, verbose=False, fid=sys.stdout, mode=0o775): """ Download a file from a http host Parameters ---------- HOST: str or list remote http host path split as list timeout: int or NoneType, default None timeout in seconds for blocking operations context: obj, default ssl.SSLContext() SSL context for url opener object timeout: int or NoneType, default None timeout in seconds for blocking operations local: str or NoneType, default None path to local file hash: str, default '' MD5 hash of local file chunk: int, default 16384 chunk size for transfer encoding verbose: bool, default False print file transfer information fid: obj, default sys.stdout open file object to print if verbose mode: oct, default 0o775 permissions mode of output local file Returns ------- remote_buffer: obj BytesIO representation of file """ # create logger loglevel = logging.INFO if verbose else logging.CRITICAL logging.basicConfig(stream=fid, level=loglevel) # verify inputs for remote http host if isinstance(HOST, str): HOST = url_split(HOST) # try downloading from http try: # Create and submit request. request = urllib2.Request(posixpath.join(*HOST)) response = urllib2.urlopen(request, timeout=timeout, context=context) except: raise Exception('Download error from {0}'.format(posixpath.join(*HOST))) else: # copy remote file contents to bytesIO object remote_buffer = io.BytesIO() shutil.copyfileobj(response, remote_buffer, chunk) remote_buffer.seek(0) # save file basename with bytesIO object remote_buffer.filename = HOST[-1] # generate checksum hash for remote file remote_hash = hashlib.md5(remote_buffer.getvalue()).hexdigest() # compare checksums if local and (hash != remote_hash): # convert to absolute path local = os.path.abspath(local) # create directory if non-existent if not os.access(os.path.dirname(local), os.F_OK): os.makedirs(os.path.dirname(local), mode) # print file information args = (posixpath.join(*HOST),local) logging.info('{0} -->\n\t{1}'.format(*args)) # store bytes to file using chunked transfer encoding remote_buffer.seek(0) with open(os.path.expanduser(local), 'wb') as f: shutil.copyfileobj(remote_buffer, f, chunk) # change the permissions mode os.chmod(local,mode) # return the bytesIO object remote_buffer.seek(0) return remote_buffer
# PURPOSE: attempt to build an opener with netrc
[docs]def attempt_login(urs, context=ssl.SSLContext(), password_manager=True, get_ca_certs=False, redirect=False, authorization_header=False, **kwargs): """ attempt to build a urllib opener for NASA Earthdata Parameters ---------- urs: str Earthdata login URS 3 host context: obj, default ssl.SSLContext() SSL context for url opener object password_manager: bool, default True Create password manager context using default realm get_ca_certs: bool, default False Get list of loaded “certification authority” certificates redirect: bool, default False Create redirect handler object authorization_header: bool, default False Add base64 encoded authorization header to opener username: str, default from environmental variable NASA Earthdata username password: str, default from environmental variable NASA Earthdata password retries: int, default 5 number of retry attempts netrc: str, default ~/.netrc path to .netrc file for authentication Returns ------- opener: obj OpenerDirector instance """ # set default keyword arguments kwargs.setdefault('username', os.environ.get('EARTHDATA_USERNAME')) kwargs.setdefault('password', os.environ.get('EARTHDATA_PASSWORD')) kwargs.setdefault('retries', 5) kwargs.setdefault('netrc', os.path.expanduser('~/.netrc')) try: # only necessary on jupyterhub os.chmod(kwargs['netrc'], 0o600) # try retrieving credentials from netrc username, _, password = netrc.netrc(kwargs['netrc']).authenticators(urs) except Exception as e: # try retrieving credentials from environmental variables username, password = (kwargs['username'], kwargs['password']) pass # if username or password are not available if not username: username = builtins.input('Username for {0}: '.format(urs)) if not password: prompt = 'Password for {0}@{1}: '.format(username, urs) password = getpass.getpass(prompt=prompt) # for each retry for retry in range(kwargs['retries']): # build an opener for urs with credentials opener = build_opener(username, password, context=context, password_manager=password_manager, get_ca_certs=get_ca_certs, redirect=redirect, authorization_header=authorization_header, urs=urs) # try logging in by check credentials try: check_credentials() except Exception as e: pass else: return opener # reattempt login username = builtins.input('Username for {0}: '.format(urs)) password = getpass.getpass(prompt=prompt) # reached end of available retries raise RuntimeError('End of Retries: Check NASA Earthdata credentials')
# PURPOSE: "login" to NASA Earthdata with supplied credentials
[docs]def build_opener(username, password, context=ssl.SSLContext(ssl.PROTOCOL_TLS), password_manager=True, get_ca_certs=True, redirect=True, authorization_header=False, urs='https://urs.earthdata.nasa.gov'): """ build urllib opener for NASA Earthdata with supplied credentials Parameters ---------- username: str or NoneType, default None NASA Earthdata username password: str or NoneType, default None NASA Earthdata password context: obj, default ssl.SSLContext() SSL context for url opener object password_manager: bool, default True Create password manager context using default realm get_ca_certs: bool, default True Get list of loaded “certification authority” certificates redirect: bool, default True Create redirect handler object authorization_header: bool, default False Add base64 encoded authorization header to opener urs: str, default 'https://urs.earthdata.nasa.gov' Earthdata login URS 3 host """ # https://docs.python.org/3/howto/urllib2.html#id5 handler = [] # create a password manager if password_manager: password_mgr = urllib2.HTTPPasswordMgrWithDefaultRealm() # Add the username and password for NASA Earthdata Login system password_mgr.add_password(None, urs, username, password) handler.append(urllib2.HTTPBasicAuthHandler(password_mgr)) # Create cookie jar for storing cookies. This is used to store and return # the session cookie given to use by the data server (otherwise will just # keep sending us back to Earthdata Login to authenticate). cookie_jar = CookieJar() handler.append(urllib2.HTTPCookieProcessor(cookie_jar)) # SSL context handler if get_ca_certs: context.get_ca_certs() handler.append(urllib2.HTTPSHandler(context=context)) # redirect handler if redirect: handler.append(urllib2.HTTPRedirectHandler()) # create "opener" (OpenerDirector instance) opener = urllib2.build_opener(*handler) # Encode username/password for request authorization headers # add Authorization header to opener if authorization_header: b64 = base64.b64encode('{0}:{1}'.format(username, password).encode()) opener.addheaders = [("Authorization","Basic {0}".format(b64.decode()))] # Now all calls to urllib2.urlopen use our opener. urllib2.install_opener(opener) # All calls to urllib2.urlopen will now use handler # Make sure not to include the protocol in with the URL, or # HTTPPasswordMgrWithDefaultRealm will be confused. return opener
# PURPOSE: check that entered NASA Earthdata credentials are valid
[docs]def check_credentials(): """ Check that entered NASA Earthdata credentials are valid """ try: remote_path = posixpath.join('https://cddis.nasa.gov','archive') request = urllib2.Request(url=remote_path) response = urllib2.urlopen(request, timeout=20) except urllib2.HTTPError: raise RuntimeError('Check your NASA Earthdata credentials') except urllib2.URLError: raise RuntimeError('Check internet connection') else: return True
# PURPOSE: list a directory on GSFC CDDIS https server
[docs]def cddis_list(HOST, username=None, password=None, build=True, timeout=None, parser=lxml.etree.HTMLParser(), pattern='', sort=False): """ List a directory on GSFC CDDIS archive server Parameters ---------- HOST: str or list remote https host username: str or NoneType, default None NASA Earthdata username password: str or NoneType, default None NASA Earthdata password build: bool, default True Build opener and check Earthdata credentials timeout: int or NoneType, default None timeout in seconds for blocking operations parser: obj, default lxml.etree.HTMLParser() HTML parser for lxml pattern: str, default '' regular expression pattern for reducing list sort: bool, default False sort output list Returns ------- colnames: list column names in a directory collastmod: list last modification times for items in the directory """ # use netrc credentials if build and not (username or password): urs = 'urs.earthdata.nasa.gov' username,_,password = netrc.netrc().authenticators(urs) # build urllib2 opener and check credentials if build: # build urllib2 opener with credentials build_opener(username, password) # check credentials check_credentials() # verify inputs for remote https host if isinstance(HOST, str): HOST = url_split(HOST) # Encode username/password for request authorization headers base64_string = base64.b64encode('{0}:{1}'.format(username, password).encode()) authorization_header = "Basic {0}".format(base64_string.decode()) # try listing from https try: # Create and submit request. request = urllib2.Request(posixpath.join(*HOST)) request.add_header("Authorization", authorization_header) tree = lxml.etree.parse(urllib2.urlopen(request, timeout=timeout), parser) except: raise Exception('List error from {0}'.format(posixpath.join(*HOST))) else: # read and parse request for files (column names and modified times) # find directories colnames = tree.xpath('//div[@class="archiveDir"]/div/a/text()') collastmod = [None]*(len(colnames)) # find files colnames.extend(tree.xpath('//div[@class="archiveItem"]/div/a/text()')) # get the Unix timestamp value for a modification time collastmod.extend([get_unix_time(i[:19], format='%Y:%m:%d %H:%M:%S') for i in tree.xpath('//div[@class="archiveItem"]/div/span/text()')]) # reduce using regular expression pattern if pattern: i = [i for i,f in enumerate(colnames) if re.search(pattern, f)] # reduce list of column names and last modified times colnames = [colnames[indice] for indice in i] collastmod = [collastmod[indice] for indice in i] # sort the list if sort: i = [i for i,j in sorted(enumerate(colnames), key=lambda i: i[1])] # sort list of column names and last modified times colnames = [colnames[indice] for indice in i] collastmod = [collastmod[indice] for indice in i] # return the list of column names and last modified times return (colnames, collastmod)
# PURPOSE: download a file from a GSFC CDDIS https server
[docs]def from_cddis(HOST, username=None, password=None, build=True, timeout=None, local=None, hash='', chunk=16384, verbose=False, fid=sys.stdout, mode=0o775): """ Download a file from GSFC CDDIS archive server Parameters ---------- HOST: str or list remote https host username: str or NoneType, default None NASA Earthdata username password: str or NoneType, default None NASA Earthdata password build: bool, default True Build opener and check Earthdata credentials timeout: int or NoneType, default None timeout in seconds for blocking operations local: str or NoneType, default None path to local file hash: str, default '' MD5 hash of local file chunk: int, default 16384 chunk size for transfer encoding verbose: bool, default False print file transfer information fid: obj, default sys.stdout open file object to print if verbose mode: oct, default 0o775 permissions mode of output local file Returns ------- remote_buffer: obj BytesIO representation of file """ # create logger loglevel = logging.INFO if verbose else logging.CRITICAL logging.basicConfig(stream=fid, level=loglevel) # use netrc credentials if build and not (username or password): urs = 'urs.earthdata.nasa.gov' username,_,password = netrc.netrc().authenticators(urs) # build urllib2 opener and check credentials if build: # build urllib2 opener with credentials build_opener(username, password) # check credentials check_credentials() # verify inputs for remote https host if isinstance(HOST, str): HOST = url_split(HOST) # Encode username/password for request authorization headers base64_string = base64.b64encode('{0}:{1}'.format(username, password).encode()) authorization_header = "Basic {0}".format(base64_string.decode()) # try downloading from https try: # Create and submit request. request = urllib2.Request(posixpath.join(*HOST)) request.add_header("Authorization", authorization_header) response = urllib2.urlopen(request, timeout=timeout) except: raise Exception('Download error from {0}'.format(posixpath.join(*HOST))) else: # copy remote file contents to bytesIO object remote_buffer = io.BytesIO() shutil.copyfileobj(response, remote_buffer, chunk) remote_buffer.seek(0) # save file basename with bytesIO object remote_buffer.filename = HOST[-1] # generate checksum hash for remote file remote_hash = hashlib.md5(remote_buffer.getvalue()).hexdigest() # compare checksums if local and (hash != remote_hash): # convert to absolute path local = os.path.abspath(local) # create directory if non-existent if not os.access(os.path.dirname(local), os.F_OK): os.makedirs(os.path.dirname(local), mode) # print file information args = (posixpath.join(*HOST),local) logging.info('{0} -->\n\t{1}'.format(*args)) # store bytes to file using chunked transfer encoding remote_buffer.seek(0) with open(os.path.expanduser(local), 'wb') as f: shutil.copyfileobj(remote_buffer, f, chunk) # change the permissions mode os.chmod(local,mode) # return the bytesIO object remote_buffer.seek(0) return remote_buffer