From ababe3cdf32098b6479c6fb5ff61bafc04fd9614 Mon Sep 17 00:00:00 2001 From: Quentame Date: Tue, 24 Mar 2020 14:54:43 +0100 Subject: [PATCH] Back is black (#259) * Back is black * Format with black --- .travis.yml | 1 + README.rst | 16 +- pyicloud/base.py | 205 +++++------ pyicloud/cmdline.py | 123 ++++--- pyicloud/exceptions.py | 2 + pyicloud/services/account.py | 17 +- pyicloud/services/calendar.py | 49 +-- pyicloud/services/contacts.py | 38 +- pyicloud/services/findmyiphone.py | 119 +++--- pyicloud/services/photos.py | 580 +++++++++++++++++------------- pyicloud/services/reminders.py | 119 +++--- pyicloud/services/ubiquity.py | 52 +-- pyicloud/utils.py | 30 +- pyproject.toml | 21 ++ scripts/check_format.sh | 16 + setup.py | 36 +- tests/__init__.py | 93 ++--- tests/test_cmdline.py | 30 +- 18 files changed, 808 insertions(+), 739 deletions(-) create mode 100644 pyproject.toml create mode 100755 scripts/check_format.sh diff --git a/.travis.yml b/.travis.yml index d69a091..779deeb 100644 --- a/.travis.yml +++ b/.travis.yml @@ -15,4 +15,5 @@ before_install: - pip install -e . script: - pylint pyicloud tests + - ./scripts/check_format.sh; - py.test diff --git a/README.rst b/README.rst index 8f97405..05a2189 100644 --- a/README.rst +++ b/README.rst @@ -3,23 +3,25 @@ pyiCloud ******** .. image:: https://travis-ci.org/picklepete/pyicloud.svg?branch=master - :alt: Check out our test status at https://travis-ci.org/picklepete/pyicloud - :target: https://travis-ci.org/picklepete/pyicloud + :alt: Check out our test status at https://travis-ci.org/picklepete/pyicloud + :target: https://travis-ci.org/picklepete/pyicloud .. image:: https://img.shields.io/pypi/v/pyiCloud.svg :target: https://pypi.org/project/pyiCloud .. image:: https://img.shields.io/pypi/pyversions/pyiCloud.svg - :target: https://pypi.org/project/pyiCloud + :target: https://pypi.org/project/pyiCloud .. image:: https://requires.io/github/Quentame/pyicloud/requirements.svg?branch=master + :alt: Requirements Status :target: https://requires.io/github/Quentame/pyicloud/requirements/?branch=master - :alt: Requirements Status + +.. image:: https://img.shields.io/badge/code%20style-black-000000.svg + :target: https://github.com/psf/black .. image:: https://badges.gitter.im/Join%20Chat.svg - :alt: Join the chat at https://gitter.im/picklepete/pyicloud - :target: https://gitter.im/picklepete/pyicloud?utm_source=badge&utm_medium=badge&utm_campaign=pr-badge&utm_content=badge - + :alt: Join the chat at https://gitter.im/picklepete/pyicloud + :target: https://gitter.im/picklepete/pyicloud?utm_source=badge&utm_medium=badge&utm_campaign=pr-badge&utm_content=badge PyiCloud is a module which allows pythonistas to interact with iCloud webservices. It's powered by the fantastic `requests `_ HTTP library. diff --git a/pyicloud/base.py b/pyicloud/base.py index be0803f..b73c878 100644 --- a/pyicloud/base.py +++ b/pyicloud/base.py @@ -14,7 +14,7 @@ from pyicloud.exceptions import ( PyiCloudFailedLoginException, PyiCloudAPIResponseException, PyiCloud2SARequiredException, - PyiCloudServiceNotActivatedException + PyiCloudServiceNotActivatedException, ) from pyicloud.services import ( FindMyiPhoneServiceManager, @@ -23,7 +23,7 @@ from pyicloud.services import ( ContactsService, RemindersService, PhotosService, - AccountService + AccountService, ) from pyicloud.utils import get_password_from_keyring @@ -38,6 +38,7 @@ LOGGER = logging.getLogger(__name__) class PyiCloudPasswordFilter(logging.Filter): """Password log hider.""" + def __init__(self, password): super(PyiCloudPasswordFilter, self).__init__(password) @@ -52,6 +53,7 @@ class PyiCloudPasswordFilter(logging.Filter): class PyiCloudSession(Session): """iCloud session.""" + def __init__(self, service): self.service = service super(PyiCloudSession, self).__init__() @@ -61,27 +63,25 @@ class PyiCloudSession(Session): # Charge logging to the right service endpoint callee = inspect.stack()[2] module = inspect.getmodule(callee[0]) - request_logger = logging.getLogger(module.__name__).getChild('http') + request_logger = logging.getLogger(module.__name__).getChild("http") if self.service.password_filter not in request_logger.filters: request_logger.addFilter(self.service.password_filter) - request_logger.debug("%s %s %s", args[0], args[1], kwargs.get('data', '')) + request_logger.debug("%s %s %s", args[0], args[1], kwargs.get("data", "")) - kwargs.pop('retried', None) + kwargs.pop("retried", None) response = super(PyiCloudSession, self).request(*args, **kwargs) - content_type = response.headers.get('Content-Type', '').split(';')[0] - json_mimetypes = ['application/json', 'text/json'] + content_type = response.headers.get("Content-Type", "").split(";")[0] + json_mimetypes = ["application/json", "text/json"] if not response.ok and content_type not in json_mimetypes: - if kwargs.get('retried') is None and response.status_code == 450: + if kwargs.get("retried") is None and response.status_code == 450: api_error = PyiCloudAPIResponseException( - response.reason, - response.status_code, - retry=True + response.reason, response.status_code, retry=True ) request_logger.warn(api_error) - kwargs['retried'] = True + kwargs["retried"] = True return self.request(*args, **kwargs) self._raise_error(response.status_code, response.reason) @@ -91,22 +91,22 @@ class PyiCloudSession(Session): try: data = response.json() except: # pylint: disable=bare-except - request_logger.warning('Failed to parse response with JSON mimetype') + request_logger.warning("Failed to parse response with JSON mimetype") return response request_logger.debug(data) - reason = data.get('errorMessage') - reason = reason or data.get('reason') - reason = reason or data.get('errorReason') - if not reason and isinstance(data.get('error'), six.string_types): - reason = data.get('error') - if not reason and data.get('error'): + reason = data.get("errorMessage") + reason = reason or data.get("reason") + reason = reason or data.get("errorReason") + if not reason and isinstance(data.get("error"), six.string_types): + reason = data.get("error") + if not reason and data.get("error"): reason = "Unknown reason" - code = data.get('errorCode') - if not code and data.get('serverErrorCode'): - code = data.get('serverErrorCode') + code = data.get("errorCode") + if not code and data.get("serverErrorCode"): + code = data.get("serverErrorCode") if reason: self._raise_error(code, reason) @@ -114,20 +114,25 @@ class PyiCloudSession(Session): return response def _raise_error(self, code, reason): - if self.service.requires_2sa and \ - reason == 'Missing X-APPLE-WEBAUTH-TOKEN cookie': - raise PyiCloud2SARequiredException(self.service.user['apple_id']) - if code in ('ZONE_NOT_FOUND', 'AUTHENTICATION_FAILED'): - reason = 'Please log into https://icloud.com/ to manually ' \ - 'finish setting up your iCloud service' + if ( + self.service.requires_2sa + and reason == "Missing X-APPLE-WEBAUTH-TOKEN cookie" + ): + raise PyiCloud2SARequiredException(self.service.user["apple_id"]) + if code in ("ZONE_NOT_FOUND", "AUTHENTICATION_FAILED"): + reason = ( + "Please log into https://icloud.com/ to manually " + "finish setting up your iCloud service" + ) api_error = PyiCloudServiceNotActivatedException(reason, code) LOGGER.error(api_error) - raise(api_error) - if code == 'ACCESS_DENIED': - reason = reason + '. Please wait a few minutes then try ' \ - 'again. The remote servers might be trying to ' \ - 'throttle requests.' + raise (api_error) + if code == "ACCESS_DENIED": + reason = ( + reason + ". Please wait a few minutes then try again." + "The remote servers might be trying to throttle requests." + ) api_error = PyiCloudAPIResponseException(reason, code) LOGGER.error(api_error) @@ -146,8 +151,13 @@ class PyiCloudService(object): """ def __init__( - self, apple_id, password=None, cookie_directory=None, verify=True, - client_id=None, with_family=True + self, + apple_id, + password=None, + cookie_directory=None, + verify=True, + client_id=None, + with_family=True, ): if password is None: password = get_password_from_keyring(apple_id) @@ -155,33 +165,32 @@ class PyiCloudService(object): self.data = {} self.client_id = client_id or str(uuid.uuid1()).upper() self.with_family = with_family - self.user = {'apple_id': apple_id, 'password': password} + self.user = {"apple_id": apple_id, "password": password} self.password_filter = PyiCloudPasswordFilter(password) LOGGER.addFilter(self.password_filter) - self._home_endpoint = 'https://www.icloud.com' - self._setup_endpoint = 'https://setup.icloud.com/setup/ws/1' + self._home_endpoint = "https://www.icloud.com" + self._setup_endpoint = "https://setup.icloud.com/setup/ws/1" - self._base_login_url = '%s/login' % self._setup_endpoint + self._base_login_url = "%s/login" % self._setup_endpoint if cookie_directory: self._cookie_directory = os.path.expanduser( os.path.normpath(cookie_directory) ) else: - self._cookie_directory = os.path.join( - tempfile.gettempdir(), - 'pyicloud', - ) + self._cookie_directory = os.path.join(tempfile.gettempdir(), "pyicloud",) self.session = PyiCloudSession(self) self.session.verify = verify - self.session.headers.update({ - 'Origin': self._home_endpoint, - 'Referer': '%s/' % self._home_endpoint, - 'User-Agent': 'Opera/9.52 (X11; Linux i686; U; en)' - }) + self.session.headers.update( + { + "Origin": self._home_endpoint, + "Referer": "%s/" % self._home_endpoint, + "User-Agent": "Opera/9.52 (X11; Linux i686; U; en)", + } + ) cookiejar_path = self._get_cookiejar_path() self.session.cookies = cookielib.LWPCookieJar(filename=cookiejar_path) @@ -196,11 +205,11 @@ class PyiCloudService(object): LOGGER.warning("Failed to read cookiejar %s", cookiejar_path) self.params = { - 'clientBuildNumber': '17DHotfix5', - 'clientMasteringNumber': '17DHotfix5', - 'ckjsBuildVersion': '17DProjectDev77', - 'ckjsVersion': '2.0.5', - 'clientId': self.client_id, + "clientBuildNumber": "17DHotfix5", + "clientMasteringNumber": "17DHotfix5", + "ckjsBuildVersion": "17DProjectDev77", + "ckjsVersion": "2.0.5", + "clientId": self.client_id, } self.authenticate() @@ -214,26 +223,24 @@ class PyiCloudService(object): subsequent logins will not cause additional e-mails from Apple. """ - LOGGER.info("Authenticating as %s", self.user['apple_id']) + LOGGER.info("Authenticating as %s", self.user["apple_id"]) data = dict(self.user) # We authenticate every time, so "remember me" is not needed - data.update({'extended_login': False}) + data.update({"extended_login": False}) try: req = self.session.post( - self._base_login_url, - params=self.params, - data=json.dumps(data) + self._base_login_url, params=self.params, data=json.dumps(data) ) except PyiCloudAPIResponseException as error: - msg = 'Invalid email/password combination.' + msg = "Invalid email/password combination." raise PyiCloudFailedLoginException(msg, error) self.data = req.json() - self.params.update({'dsid': self.data['dsInfo']['dsid']}) - self._webservices = self.data['webservices'] + self.params.update({"dsid": self.data["dsInfo"]["dsid"]}) + self._webservices = self.data["webservices"] if not os.path.exists(self._cookie_directory): os.mkdir(self._cookie_directory) @@ -247,48 +254,46 @@ class PyiCloudService(object): """Get path for cookiejar file.""" return os.path.join( self._cookie_directory, - ''.join([c for c in self.user.get('apple_id') if match(r'\w', c)]) + "".join([c for c in self.user.get("apple_id") if match(r"\w", c)]), ) @property def requires_2sa(self): """Returns True if two-step authentication is required.""" - return self.data.get('hsaChallengeRequired', False) \ - and self.data['dsInfo'].get('hsaVersion', 0) >= 1 + return ( + self.data.get("hsaChallengeRequired", False) + and self.data["dsInfo"].get("hsaVersion", 0) >= 1 + ) # FIXME: Implement 2FA for hsaVersion == 2 # pylint: disable=fixme @property def trusted_devices(self): """Returns devices trusted for two-step authentication.""" request = self.session.get( - '%s/listDevices' % self._setup_endpoint, - params=self.params + "%s/listDevices" % self._setup_endpoint, params=self.params ) - return request.json().get('devices') + return request.json().get("devices") def send_verification_code(self, device): """Requests that a verification code is sent to the given device.""" data = json.dumps(device) request = self.session.post( - '%s/sendVerificationCode' % self._setup_endpoint, + "%s/sendVerificationCode" % self._setup_endpoint, params=self.params, - data=data + data=data, ) - return request.json().get('success', False) + return request.json().get("success", False) def validate_verification_code(self, device, code): """Verifies a verification code received on a trusted device.""" - device.update({ - 'verificationCode': code, - 'trustBrowser': True - }) + device.update({"verificationCode": code, "trustBrowser": True}) data = json.dumps(device) try: self.session.post( - '%s/validateVerificationCode' % self._setup_endpoint, + "%s/validateVerificationCode" % self._setup_endpoint, params=self.params, - data=data + data=data, ) except PyiCloudAPIResponseException as error: if error.code == -21669: @@ -306,20 +311,16 @@ class PyiCloudService(object): """Get webservice URL, raise an exception if not exists.""" if self._webservices.get(ws_key) is None: raise PyiCloudServiceNotActivatedException( - 'Webservice not available', - ws_key + "Webservice not available", ws_key ) - return self._webservices[ws_key]['url'] + return self._webservices[ws_key]["url"] @property def devices(self): """Returns all devices.""" - service_root = self._get_webservice_url('findme') + service_root = self._get_webservice_url("findme") return FindMyiPhoneServiceManager( - service_root, - self.session, - self.params, - self.with_family + service_root, self.session, self.params, self.with_family ) @property @@ -330,63 +331,51 @@ class PyiCloudService(object): @property def account(self): """Gets the 'Account' service.""" - service_root = self._get_webservice_url('account') - return AccountService( - service_root, - self.session, - self.params - ) + service_root = self._get_webservice_url("account") + return AccountService(service_root, self.session, self.params) @property def files(self): """Gets the 'File' service.""" if not self._files: - service_root = self._get_webservice_url('ubiquity') - self._files = UbiquityService( - service_root, - self.session, - self.params - ) + service_root = self._get_webservice_url("ubiquity") + self._files = UbiquityService(service_root, self.session, self.params) return self._files @property def photos(self): """Gets the 'Photo' service.""" if not self._photos: - service_root = self._get_webservice_url('ckdatabasews') - self._photos = PhotosService( - service_root, - self.session, - self.params - ) + service_root = self._get_webservice_url("ckdatabasews") + self._photos = PhotosService(service_root, self.session, self.params) return self._photos @property def calendar(self): """Gets the 'Calendar' service.""" - service_root = self._get_webservice_url('calendar') + service_root = self._get_webservice_url("calendar") return CalendarService(service_root, self.session, self.params) @property def contacts(self): """Gets the 'Contacts' service.""" - service_root = self._get_webservice_url('contacts') + service_root = self._get_webservice_url("contacts") return ContactsService(service_root, self.session, self.params) @property def reminders(self): """Gets the 'Reminders' service.""" - service_root = self._get_webservice_url('reminders') + service_root = self._get_webservice_url("reminders") return RemindersService(service_root, self.session, self.params) def __unicode__(self): - return 'iCloud API: %s' % self.user.get('apple_id') + return "iCloud API: %s" % self.user.get("apple_id") def __str__(self): as_unicode = self.__unicode__() if sys.version_info[0] >= 3: return as_unicode - return as_unicode.encode('utf-8', 'ignore') + return as_unicode.encode("utf-8", "ignore") def __repr__(self): - return '<%s>' % str(self) + return "<%s>" % str(self) diff --git a/pyicloud/cmdline.py b/pyicloud/cmdline.py index 53377b9..6101fa0 100644 --- a/pyicloud/cmdline.py +++ b/pyicloud/cmdline.py @@ -16,14 +16,14 @@ from pyicloud import PyiCloudService from pyicloud.exceptions import PyiCloudFailedLoginException from . import utils +# fmt: off if six.PY2: input = raw_input # pylint: disable=redefined-builtin,invalid-name,undefined-variable else: - input = input # pylint: disable=bad-option-value,self-assigning-variable,invalid-name + input = input # pylint: disable=bad-option-value,self-assigning-variable,invalid-name +# fmt: on -DEVICE_ERROR = ( - "Please use the --device switch to indicate which device to use." -) +DEVICE_ERROR = "Please use the --device switch to indicate which device to use." def create_pickled_data(idevice, filename): @@ -34,7 +34,7 @@ def create_pickled_data(idevice, filename): This allows the data to be used without resorting to screen / pipe scrapping. """ - pickle_file = open(filename, 'wb') + pickle_file = open(filename, "wb") pickle.dump(idevice.content, pickle_file, protocol=pickle.HIGHEST_PROTOCOL) pickle_file.close() @@ -44,15 +44,14 @@ def main(args=None): if args is None: args = sys.argv[1:] - parser = argparse.ArgumentParser( - description="Find My iPhone CommandLine Tool") + parser = argparse.ArgumentParser(description="Find My iPhone CommandLine Tool") parser.add_argument( "--username", action="store", dest="username", default="", - help="Apple ID to Use" + help="Apple ID to Use", ) parser.add_argument( "--password", @@ -62,7 +61,7 @@ def main(args=None): help=( "Apple ID Password to Use; if unspecified, password will be " "fetched from the system keyring." - ) + ), ) parser.add_argument( "-n", @@ -70,7 +69,7 @@ def main(args=None): action="store_false", dest="interactive", default=True, - help="Disable interactive prompts." + help="Disable interactive prompts.", ) parser.add_argument( "--delete-from-keyring", @@ -189,54 +188,59 @@ def main(args=None): # Which password we use is determined by your username, so we # do need to check for this first and separately. if not username: - parser.error('No username supplied') + parser.error("No username supplied") if not password: password = utils.get_password( - username, - interactive=command_line.interactive + username, interactive=command_line.interactive ) if not password: - parser.error('No password supplied') + parser.error("No password supplied") try: - api = PyiCloudService( - username.strip(), - password.strip() - ) + api = PyiCloudService(username.strip(), password.strip()) if ( - not utils.password_exists_in_keyring(username) and - command_line.interactive and - confirm("Save password in keyring?") + not utils.password_exists_in_keyring(username) + and command_line.interactive + and confirm("Save password in keyring?") ): utils.store_password_in_keyring(username, password) if api.requires_2sa: - print("\nTwo-step authentication required.", - "\nYour trusted devices are:") + # fmt: off + print( + "\nTwo-step authentication required.", + "\nYour trusted devices are:" + ) + # fmt: on devices = api.trusted_devices for i, device in enumerate(devices): - print(" %s: %s" % ( - i, device.get( - 'deviceName', - "SMS to %s" % device.get('phoneNumber')))) + print( + " %s: %s" + % ( + i, + device.get( + "deviceName", "SMS to %s" % device.get("phoneNumber") + ), + ) + ) - print('\nWhich device would you like to use?') - device = int(input('(number) --> ')) + print("\nWhich device would you like to use?") + device = int(input("(number) --> ")) device = devices[device] if not api.send_verification_code(device): print("Failed to send verification code") sys.exit(1) - print('\nPlease enter validation code') - code = input('(string) --> ') + print("\nPlease enter validation code") + code = input("(string) --> ") if not api.validate_verification_code(device, code): print("Failed to verify verification code") sys.exit(1) - print('') + print("") break except PyiCloudFailedLoginException: # If they have a stored password; we just used it and @@ -256,12 +260,8 @@ def main(args=None): print(message, file=sys.stderr) for dev in api.devices: - if ( - not command_line.device_id or - ( - command_line.device_id.strip().lower() == - dev.content["id"].strip().lower() - ) + if not command_line.device_id or ( + command_line.device_id.strip().lower() == dev.content["id"].strip().lower() ): # List device(s) if command_line.locate: @@ -270,19 +270,17 @@ def main(args=None): if command_line.output_to_file: create_pickled_data( dev, - filename=( - dev.content["name"].strip().lower() + ".fmip_snapshot" - ) + filename=(dev.content["name"].strip().lower() + ".fmip_snapshot"), ) contents = dev.content if command_line.longlist: - print("-"*30) + print("-" * 30) print(contents["name"]) for key in contents: print("%20s - %s" % (key, contents[key])) elif command_line.list: - print("-"*30) + print("-" * 30) print("Name - %s" % contents["name"]) print("Display Name - %s" % contents["deviceDisplayName"]) print("Location - %s" % contents["location"]) @@ -297,9 +295,10 @@ def main(args=None): dev.play_sound() else: raise RuntimeError( - "\n\n\t\t%s %s\n\n" % ( + "\n\n\t\t%s %s\n\n" + % ( "Sounds can only be played on a singular device.", - DEVICE_ERROR + DEVICE_ERROR, ) ) @@ -307,16 +306,14 @@ def main(args=None): if command_line.message: if command_line.device_id: dev.display_message( - subject='A Message', - message=command_line.message, - sounds=True + subject="A Message", message=command_line.message, sounds=True ) else: raise RuntimeError( - "%s %s" % ( - "Messages can only be played " - "on a singular device.", - DEVICE_ERROR + "%s %s" + % ( + "Messages can only be played on a singular device.", + DEVICE_ERROR, ) ) @@ -324,16 +321,17 @@ def main(args=None): if command_line.silentmessage: if command_line.device_id: dev.display_message( - subject='A Silent Message', + subject="A Silent Message", message=command_line.silentmessage, - sounds=False + sounds=False, ) else: raise RuntimeError( - "%s %s" % ( + "%s %s" + % ( "Silent Messages can only be played " "on a singular device.", - DEVICE_ERROR + DEVICE_ERROR, ) ) @@ -343,17 +341,18 @@ def main(args=None): dev.lost_device( number=command_line.lost_phone.strip(), text=command_line.lost_message.strip(), - newpasscode=command_line.lost_password.strip() + newpasscode=command_line.lost_password.strip(), ) else: raise RuntimeError( - "%s %s" % ( - "Lost Mode can only be activated " - "on a singular device.", - DEVICE_ERROR + "%s %s" + % ( + "Lost Mode can only be activated on a singular device.", + DEVICE_ERROR, ) ) sys.exit(0) -if __name__ == '__main__': + +if __name__ == "__main__": main() diff --git a/pyicloud/exceptions.py b/pyicloud/exceptions.py index cc833e4..6fa16f2 100644 --- a/pyicloud/exceptions.py +++ b/pyicloud/exceptions.py @@ -1,4 +1,6 @@ """Library exceptions.""" + + class PyiCloudException(Exception): """Generic iCloud exception.""" pass diff --git a/pyicloud/services/account.py b/pyicloud/services/account.py index efa9230..0551d5c 100644 --- a/pyicloud/services/account.py +++ b/pyicloud/services/account.py @@ -8,19 +8,20 @@ from pyicloud.utils import underscore_to_camelcase class AccountService(object): """The 'Account' iCloud service.""" + def __init__(self, service_root, session, params): self.session = session self.params = params self._service_root = service_root self._devices = [] - self._acc_endpoint = '%s/setup/web/device' % self._service_root - self._account_devices_url = '%s/getDevices' % self._acc_endpoint + self._acc_endpoint = "%s/setup/web/device" % self._service_root + self._account_devices_url = "%s/getDevices" % self._acc_endpoint req = self.session.get(self._account_devices_url, params=self.params) self.response = req.json() - for device_info in self.response['devices']: + for device_info in self.response["devices"]: # device_id = device_info['udid'] # self._devices[device_id] = AccountDevice(device_info) self._devices.append(AccountDevice(device_info)) @@ -34,6 +35,7 @@ class AccountService(object): @six.python_2_unicode_compatible class AccountDevice(dict): """Account device.""" + def __getattr__(self, name): try: return self[underscore_to_camelcase(name)] @@ -42,15 +44,14 @@ class AccountDevice(dict): def __str__(self): return u"{display_name}: {name}".format( - display_name=self.model_display_name, - name=self.name, + display_name=self.model_display_name, name=self.name, ) def __repr__(self): - return '<{display}>'.format( + return "<{display}>".format( display=( six.text_type(self) - if sys.version_info[0] >= 3 else - six.text_type(self).encode('utf8', 'replace') + if sys.version_info[0] >= 3 + else six.text_type(self).encode("utf8", "replace") ) ) diff --git a/pyicloud/services/calendar.py b/pyicloud/services/calendar.py index e3ca145..9dc0422 100644 --- a/pyicloud/services/calendar.py +++ b/pyicloud/services/calendar.py @@ -10,16 +10,15 @@ class CalendarService(object): """ The 'Calendar' iCloud service, connects to iCloud and returns events. """ + def __init__(self, service_root, session, params): self.session = session self.params = params self._service_root = service_root - self._calendar_endpoint = '%s/ca' % self._service_root - self._calendar_refresh_url = '%s/events' % self._calendar_endpoint - self._calendar_event_detail_url = '%s/eventdetail' % ( - self._calendar_endpoint, - ) - self._calendars = '%s/startup' % self._calendar_endpoint + self._calendar_endpoint = "%s/ca" % self._service_root + self._calendar_refresh_url = "%s/events" % self._calendar_endpoint + self._calendar_event_detail_url = "%s/eventdetail" % (self._calendar_endpoint,) + self._calendars = "%s/startup" % self._calendar_endpoint self.response = {} @@ -29,11 +28,11 @@ class CalendarService(object): (a calendar) and a guid (an event's ID). """ params = dict(self.params) - params.update({'lang': 'en-us', 'usertz': get_localzone().zone}) - url = '%s/%s/%s' % (self._calendar_event_detail_url, pguid, guid) + params.update({"lang": "en-us", "usertz": get_localzone().zone}) + url = "%s/%s/%s" % (self._calendar_event_detail_url, pguid, guid) req = self.session.get(url, params=params) self.response = req.json() - return self.response['Event'][0] + return self.response["Event"][0] def refresh_client(self, from_dt=None, to_dt=None): """ @@ -48,12 +47,14 @@ class CalendarService(object): if not to_dt: to_dt = datetime(today.year, today.month, last_day) params = dict(self.params) - params.update({ - 'lang': 'en-us', - 'usertz': get_localzone().zone, - 'startDate': from_dt.strftime('%Y-%m-%d'), - 'endDate': to_dt.strftime('%Y-%m-%d') - }) + params.update( + { + "lang": "en-us", + "usertz": get_localzone().zone, + "startDate": from_dt.strftime("%Y-%m-%d"), + "endDate": to_dt.strftime("%Y-%m-%d"), + } + ) req = self.session.get(self._calendar_refresh_url, params=params) self.response = req.json() @@ -62,7 +63,7 @@ class CalendarService(object): Retrieves events for a given date range, by default, this month. """ self.refresh_client(from_dt, to_dt) - return self.response.get('Event') + return self.response.get("Event") def calendars(self): """ @@ -73,12 +74,14 @@ class CalendarService(object): from_dt = datetime(today.year, today.month, first_day) to_dt = datetime(today.year, today.month, last_day) params = dict(self.params) - params.update({ - 'lang': 'en-us', - 'usertz': get_localzone().zone, - 'startDate': from_dt.strftime('%Y-%m-%d'), - 'endDate': to_dt.strftime('%Y-%m-%d') - }) + params.update( + { + "lang": "en-us", + "usertz": get_localzone().zone, + "startDate": from_dt.strftime("%Y-%m-%d"), + "endDate": to_dt.strftime("%Y-%m-%d"), + } + ) req = self.session.get(self._calendars, params=params) self.response = req.json() - return self.response['Collection'] + return self.response["Collection"] diff --git a/pyicloud/services/contacts.py b/pyicloud/services/contacts.py index 67a53d1..a7ecf35 100644 --- a/pyicloud/services/contacts.py +++ b/pyicloud/services/contacts.py @@ -11,10 +11,10 @@ class ContactsService(object): self.session = session self.params = params self._service_root = service_root - self._contacts_endpoint = '%s/co' % self._service_root - self._contacts_refresh_url = '%s/startup' % self._contacts_endpoint - self._contacts_next_url = '%s/contacts' % self._contacts_endpoint - self._contacts_changeset_url = '%s/changeset' % self._contacts_endpoint + self._contacts_endpoint = "%s/co" % self._service_root + self._contacts_refresh_url = "%s/startup" % self._contacts_endpoint + self._contacts_next_url = "%s/contacts" % self._contacts_endpoint + self._contacts_changeset_url = "%s/changeset" % self._contacts_endpoint self.response = {} @@ -24,28 +24,22 @@ class ContactsService(object): contacts data is up-to-date. """ params_contacts = dict(self.params) - params_contacts.update({ - 'clientVersion': '2.1', - 'locale': 'en_US', - 'order': 'last,first', - }) - req = self.session.get( - self._contacts_refresh_url, - params=params_contacts + params_contacts.update( + {"clientVersion": "2.1", "locale": "en_US", "order": "last,first",} ) + req = self.session.get(self._contacts_refresh_url, params=params_contacts) self.response = req.json() params_next = dict(params_contacts) - params_next.update({ - 'prefToken': self.response["prefToken"], - 'syncToken': self.response["syncToken"], - 'limit': '0', - 'offset': '0', - }) - req = self.session.get( - self._contacts_next_url, - params=params_next + params_next.update( + { + "prefToken": self.response["prefToken"], + "syncToken": self.response["syncToken"], + "limit": "0", + "offset": "0", + } ) + req = self.session.get(self._contacts_next_url, params=params_next) self.response = req.json() def all(self): @@ -53,4 +47,4 @@ class ContactsService(object): Retrieves all contacts. """ self.refresh_client() - return self.response.get('contacts') + return self.response.get("contacts") diff --git a/pyicloud/services/findmyiphone.py b/pyicloud/services/findmyiphone.py index 66d05bb..6096e5e 100644 --- a/pyicloud/services/findmyiphone.py +++ b/pyicloud/services/findmyiphone.py @@ -19,11 +19,11 @@ class FindMyiPhoneServiceManager(object): self.params = params self.with_family = with_family - fmip_endpoint = '%s/fmipservice/client/web' % service_root - self._fmip_refresh_url = '%s/refreshClient' % fmip_endpoint - self._fmip_sound_url = '%s/playSound' % fmip_endpoint - self._fmip_message_url = '%s/sendMessage' % fmip_endpoint - self._fmip_lost_url = '%s/lostDevice' % fmip_endpoint + fmip_endpoint = "%s/fmipservice/client/web" % service_root + self._fmip_refresh_url = "%s/refreshClient" % fmip_endpoint + self._fmip_sound_url = "%s/playSound" % fmip_endpoint + self._fmip_message_url = "%s/sendMessage" % fmip_endpoint + self._fmip_lost_url = "%s/lostDevice" % fmip_endpoint self._devices = {} self.refresh_client() @@ -39,18 +39,18 @@ class FindMyiPhoneServiceManager(object): params=self.params, data=json.dumps( { - 'clientContext': { - 'fmly': self.with_family, - 'shouldLocate': True, - 'selectedDevice': 'all', + "clientContext": { + "fmly": self.with_family, + "shouldLocate": True, + "selectedDevice": "all", } } - ) + ), ) self.response = req.json() - for device_info in self.response['content']: - device_id = device_info['id'] + for device_info in self.response["content"]: + device_id = device_info["id"] if device_id not in self._devices: self._devices[device_id] = AppleDevice( device_info, @@ -85,7 +85,7 @@ class FindMyiPhoneServiceManager(object): as_unicode = self.__unicode__() if sys.version_info[0] >= 3: return as_unicode - return as_unicode.encode('utf-8', 'ignore') + return as_unicode.encode("utf-8", "ignore") def __repr__(self): return six.text_type(self) @@ -93,9 +93,16 @@ class FindMyiPhoneServiceManager(object): class AppleDevice(object): """Apple device.""" + def __init__( - self, content, session, params, manager, - sound_url=None, lost_url=None, message_url=None + self, + content, + session, + params, + manager, + sound_url=None, + lost_url=None, + message_url=None, ): self.content = content self.manager = manager @@ -113,7 +120,7 @@ class AppleDevice(object): def location(self): """Updates the device location.""" self.manager.refresh_client() - return self.content['location'] + return self.content["location"] def status(self, additional=[]): # pylint: disable=dangerous-default-value """Returns status information for device. @@ -121,34 +128,29 @@ class AppleDevice(object): This returns only a subset of possible properties. """ self.manager.refresh_client() - fields = ['batteryLevel', 'deviceDisplayName', 'deviceStatus', 'name'] + fields = ["batteryLevel", "deviceDisplayName", "deviceStatus", "name"] fields += additional properties = {} for field in fields: properties[field] = self.content.get(field) return properties - def play_sound(self, subject='Find My iPhone Alert'): + def play_sound(self, subject="Find My iPhone Alert"): """Send a request to the device to play a sound. It's possible to pass a custom message by changing the `subject`. """ - data = json.dumps({ - 'device': self.content['id'], - 'subject': subject, - 'clientContext': { - 'fmly': True + data = json.dumps( + { + "device": self.content["id"], + "subject": subject, + "clientContext": {"fmly": True}, } - }) - self.session.post( - self.sound_url, - params=self.params, - data=data ) + self.session.post(self.sound_url, params=self.params, data=data) def display_message( - self, subject='Find My iPhone Alert', message="This is a note", - sounds=False + self, subject="Find My iPhone Alert", message="This is a note", sounds=False ): """Send a request to the device to play a sound. @@ -156,23 +158,17 @@ class AppleDevice(object): """ data = json.dumps( { - 'device': self.content['id'], - 'subject': subject, - 'sound': sounds, - 'userText': True, - 'text': message + "device": self.content["id"], + "subject": subject, + "sound": sounds, + "userText": True, + "text": message, } ) - self.session.post( - self.message_url, - params=self.params, - data=data - ) + self.session.post(self.message_url, params=self.params, data=data) def lost_device( - self, number, - text='This iPhone has been lost. Please call me.', - newpasscode="" + self, number, text="This iPhone has been lost. Please call me.", newpasscode="" ): """Send a request to the device to trigger 'lost mode'. @@ -180,20 +176,18 @@ class AppleDevice(object): been passed, then the person holding the device can call the number without entering the passcode. """ - data = json.dumps({ - 'text': text, - 'userText': True, - 'ownerNbr': number, - 'lostModeEnabled': True, - 'trackingEnabled': True, - 'device': self.content['id'], - 'passcode': newpasscode - }) - self.session.post( - self.lost_url, - params=self.params, - data=data + data = json.dumps( + { + "text": text, + "userText": True, + "ownerNbr": number, + "lostModeEnabled": True, + "trackingEnabled": True, + "device": self.content["id"], + "passcode": newpasscode, + } ) + self.session.post(self.lost_url, params=self.params, data=data) @property def data(self): @@ -207,18 +201,15 @@ class AppleDevice(object): return getattr(self.content, attr) def __unicode__(self): - display_name = self['deviceDisplayName'] - name = self['name'] - return '%s: %s' % ( - display_name, - name, - ) + display_name = self["deviceDisplayName"] + name = self["name"] + return "%s: %s" % (display_name, name,) def __str__(self): as_unicode = self.__unicode__() if sys.version_info[0] >= 3: return as_unicode - return as_unicode.encode('utf-8', 'ignore') + return as_unicode.encode("utf-8", "ignore") def __repr__(self): - return '' % str(self) + return "" % str(self) diff --git a/pyicloud/services/photos.py b/pyicloud/services/photos.py index dc39ea6..78b2f5a 100644 --- a/pyicloud/services/photos.py +++ b/pyicloud/services/photos.py @@ -12,121 +12,115 @@ from future.moves.urllib.parse import urlencode class PhotosService(object): """The 'Photos' iCloud service.""" + SMART_FOLDERS = { "All Photos": { "obj_type": "CPLAssetByAddedDate", "list_type": "CPLAssetAndMasterByAddedDate", "direction": "ASCENDING", - "query_filter": None + "query_filter": None, }, "Time-lapse": { "obj_type": "CPLAssetInSmartAlbumByAssetDate:Timelapse", "list_type": "CPLAssetAndMasterInSmartAlbumByAssetDate", "direction": "ASCENDING", - "query_filter": [{ - "fieldName": "smartAlbum", - "comparator": "EQUALS", - "fieldValue": { - "type": "STRING", - "value": "TIMELAPSE" + "query_filter": [ + { + "fieldName": "smartAlbum", + "comparator": "EQUALS", + "fieldValue": {"type": "STRING", "value": "TIMELAPSE"}, } - }] + ], }, "Videos": { "obj_type": "CPLAssetInSmartAlbumByAssetDate:Video", "list_type": "CPLAssetAndMasterInSmartAlbumByAssetDate", "direction": "ASCENDING", - "query_filter": [{ - "fieldName": "smartAlbum", - "comparator": "EQUALS", - "fieldValue": { - "type": "STRING", - "value": "VIDEO" + "query_filter": [ + { + "fieldName": "smartAlbum", + "comparator": "EQUALS", + "fieldValue": {"type": "STRING", "value": "VIDEO"}, } - }] + ], }, "Slo-mo": { "obj_type": "CPLAssetInSmartAlbumByAssetDate:Slomo", "list_type": "CPLAssetAndMasterInSmartAlbumByAssetDate", "direction": "ASCENDING", - "query_filter": [{ - "fieldName": "smartAlbum", - "comparator": "EQUALS", - "fieldValue": { - "type": "STRING", - "value": "SLOMO" + "query_filter": [ + { + "fieldName": "smartAlbum", + "comparator": "EQUALS", + "fieldValue": {"type": "STRING", "value": "SLOMO"}, } - }] + ], }, "Bursts": { "obj_type": "CPLAssetBurstStackAssetByAssetDate", "list_type": "CPLBurstStackAssetAndMasterByAssetDate", "direction": "ASCENDING", - "query_filter": None + "query_filter": None, }, "Favorites": { "obj_type": "CPLAssetInSmartAlbumByAssetDate:Favorite", "list_type": "CPLAssetAndMasterInSmartAlbumByAssetDate", "direction": "ASCENDING", - "query_filter": [{ - "fieldName": "smartAlbum", - "comparator": "EQUALS", - "fieldValue": { - "type": "STRING", - "value": "FAVORITE" + "query_filter": [ + { + "fieldName": "smartAlbum", + "comparator": "EQUALS", + "fieldValue": {"type": "STRING", "value": "FAVORITE"}, } - }] + ], }, "Panoramas": { "obj_type": "CPLAssetInSmartAlbumByAssetDate:Panorama", "list_type": "CPLAssetAndMasterInSmartAlbumByAssetDate", "direction": "ASCENDING", - "query_filter": [{ - "fieldName": "smartAlbum", - "comparator": "EQUALS", - "fieldValue": { - "type": "STRING", - "value": "PANORAMA" + "query_filter": [ + { + "fieldName": "smartAlbum", + "comparator": "EQUALS", + "fieldValue": {"type": "STRING", "value": "PANORAMA"}, } - }] + ], }, "Screenshots": { "obj_type": "CPLAssetInSmartAlbumByAssetDate:Screenshot", "list_type": "CPLAssetAndMasterInSmartAlbumByAssetDate", "direction": "ASCENDING", - "query_filter": [{ - "fieldName": "smartAlbum", - "comparator": "EQUALS", - "fieldValue": { - "type": "STRING", - "value": "SCREENSHOT" + "query_filter": [ + { + "fieldName": "smartAlbum", + "comparator": "EQUALS", + "fieldValue": {"type": "STRING", "value": "SCREENSHOT"}, } - }] + ], }, "Live": { "obj_type": "CPLAssetInSmartAlbumByAssetDate:Live", "list_type": "CPLAssetAndMasterInSmartAlbumByAssetDate", "direction": "ASCENDING", - "query_filter": [{ - "fieldName": "smartAlbum", - "comparator": "EQUALS", - "fieldValue": { - "type": "STRING", - "value": "LIVE" + "query_filter": [ + { + "fieldName": "smartAlbum", + "comparator": "EQUALS", + "fieldValue": {"type": "STRING", "value": "LIVE"}, } - }] + ], }, "Recently Deleted": { "obj_type": "CPLAssetDeletedByExpungedDate", "list_type": "CPLAssetAndMasterDeletedByExpungedDate", "direction": "ASCENDING", - "query_filter": None + "query_filter": None, }, "Hidden": { "obj_type": "CPLAssetHiddenByAssetDate", "list_type": "CPLAssetAndMasterHiddenByAssetDate", "direction": "ASCENDING", - "query_filter": None + "query_filter": None, }, } @@ -134,32 +128,29 @@ class PhotosService(object): self.session = session self.params = dict(params) self._service_root = service_root - self.service_endpoint = \ - ('%s/database/1/com.apple.photos.cloud/production/private' - % self._service_root) + self.service_endpoint = ( + "%s/database/1/com.apple.photos.cloud/production/private" + % self._service_root + ) self._albums = None - self.params.update({ - 'remapEnums': True, - 'getCurrentSyncToken': True - }) + self.params.update({"remapEnums": True, "getCurrentSyncToken": True}) - url = ('%s/records/query?%s' % - (self.service_endpoint, urlencode(self.params))) - json_data = ('{"query":{"recordType":"CheckIndexingState"},' - '"zoneID":{"zoneName":"PrimarySync"}}') + url = "%s/records/query?%s" % (self.service_endpoint, urlencode(self.params)) + json_data = ( + '{"query":{"recordType":"CheckIndexingState"},' + '"zoneID":{"zoneName":"PrimarySync"}}' + ) request = self.session.post( - url, - data=json_data, - headers={'Content-type': 'text/plain'} + url, data=json_data, headers={"Content-type": "text/plain"} ) response = request.json() - indexing_state = response['records'][0]['fields']['state']['value'] - if indexing_state != 'FINISHED': + indexing_state = response["records"][0]["fields"]["state"]["value"] + if indexing_state != "FINISHED": raise PyiCloudServiceNotActivatedException( - 'iCloud Photo Library not finished indexing. ' - 'Please try again in a few minutes.' + "iCloud Photo Library not finished indexing. " + "Please try again in a few minutes." ) # TODO: Does syncToken ever change? # pylint: disable=fixme @@ -174,63 +165,79 @@ class PhotosService(object): def albums(self): """Returns photo albums.""" if not self._albums: - self._albums = {name: PhotoAlbum(self, name, **props) - for (name, props) in self.SMART_FOLDERS.items()} + self._albums = { + name: PhotoAlbum(self, name, **props) + for (name, props) in self.SMART_FOLDERS.items() + } for folder in self._fetch_folders(): # TODO: Handle subfolders # pylint: disable=fixme - if folder['recordName'] == '----Root-Folder----' or \ - (folder['fields'].get('isDeleted') and - folder['fields']['isDeleted']['value']): + if folder["recordName"] == "----Root-Folder----" or ( + folder["fields"].get("isDeleted") + and folder["fields"]["isDeleted"]["value"] + ): continue - folder_id = folder['recordName'] - folder_obj_type = \ + folder_id = folder["recordName"] + folder_obj_type = ( "CPLContainerRelationNotDeletedByAssetDate:%s" % folder_id + ) folder_name = base64.b64decode( - folder['fields']['albumNameEnc']['value']).decode('utf-8') - query_filter = [{ - "fieldName": "parentId", - "comparator": "EQUALS", - "fieldValue": { - "type": "STRING", - "value": folder_id + folder["fields"]["albumNameEnc"]["value"] + ).decode("utf-8") + query_filter = [ + { + "fieldName": "parentId", + "comparator": "EQUALS", + "fieldValue": {"type": "STRING", "value": folder_id}, } - }] + ] - album = PhotoAlbum(self, folder_name, - 'CPLContainerRelationLiveByAssetDate', - folder_obj_type, 'ASCENDING', query_filter) + album = PhotoAlbum( + self, + folder_name, + "CPLContainerRelationLiveByAssetDate", + folder_obj_type, + "ASCENDING", + query_filter, + ) self._albums[folder_name] = album return self._albums def _fetch_folders(self): - url = ('%s/records/query?%s' % - (self.service_endpoint, urlencode(self.params))) - json_data = ('{"query":{"recordType":"CPLAlbumByPositionLive"},' - '"zoneID":{"zoneName":"PrimarySync"}}') + url = "%s/records/query?%s" % (self.service_endpoint, urlencode(self.params)) + json_data = ( + '{"query":{"recordType":"CPLAlbumByPositionLive"},' + '"zoneID":{"zoneName":"PrimarySync"}}' + ) request = self.session.post( - url, - data=json_data, - headers={'Content-type': 'text/plain'} + url, data=json_data, headers={"Content-type": "text/plain"} ) response = request.json() - return response['records'] + return response["records"] @property def all(self): """Returns all photos.""" - return self.albums['All Photos'] + return self.albums["All Photos"] class PhotoAlbum(object): """A photo album.""" - def __init__(self, service, name, list_type, obj_type, direction, - query_filter=None, page_size=100): + def __init__( + self, + service, + name, + list_type, + obj_type, + direction, + query_filter=None, + page_size=100, + ): self.name = name self.service = service self.list_type = list_type @@ -251,41 +258,41 @@ class PhotoAlbum(object): def __len__(self): if self._len is None: - url = ('%s/internal/records/query/batch?%s' % - (self.service.service_endpoint, - urlencode(self.service.params))) + url = "%s/internal/records/query/batch?%s" % ( + self.service.service_endpoint, + urlencode(self.service.params), + ) request = self.service.session.post( url, data=json.dumps( { - u'batch': [{ - u'resultsLimit': 1, - u'query': { - u'filterBy': { - u'fieldName': u'indexCountID', - u'fieldValue': { - u'type': u'STRING_LIST', - u'value': [ - self.obj_type - ] + u"batch": [ + { + u"resultsLimit": 1, + u"query": { + u"filterBy": { + u"fieldName": u"indexCountID", + u"fieldValue": { + u"type": u"STRING_LIST", + u"value": [self.obj_type], + }, + u"comparator": u"IN", }, - u'comparator': u'IN' + u"recordType": u"HyperionIndexCountLookup", }, - u'recordType': u'HyperionIndexCountLookup' - }, - u'zoneWide': True, - u'zoneID': { - u'zoneName': u'PrimarySync' + u"zoneWide": True, + u"zoneID": {u"zoneName": u"PrimarySync"}, } - }] + ] } ), - headers={'Content-type': 'text/plain'} + headers={"Content-type": "text/plain"}, ) response = request.json() - self._len = (response["batch"][0]["records"][0]["fields"] - ["itemCount"]["value"]) + self._len = response["batch"][0]["records"][0]["fields"]["itemCount"][ + "value" + ] return self._len @@ -297,26 +304,28 @@ class PhotoAlbum(object): else: offset = 0 - while(True): - url = ('%s/records/query?' % self.service.service_endpoint) + \ - urlencode(self.service.params) + while True: + url = ("%s/records/query?" % self.service.service_endpoint) + urlencode( + self.service.params + ) request = self.service.session.post( url, - data=json.dumps(self._list_query_gen( - offset, self.list_type, self.direction, - self.query_filter)), - headers={'Content-type': 'text/plain'} + data=json.dumps( + self._list_query_gen( + offset, self.list_type, self.direction, self.query_filter + ) + ), + headers={"Content-type": "text/plain"}, ) response = request.json() asset_records = {} master_records = [] - for rec in response['records']: - if rec['recordType'] == "CPLAsset": - master_id = \ - rec['fields']['masterRef']['value']['recordName'] + for rec in response["records"]: + if rec["recordType"] == "CPLAsset": + master_id = rec["fields"]["masterRef"]["value"]["recordName"] asset_records[master_id] = rec - elif rec['recordType'] == "CPLMaster": + elif rec["recordType"] == "CPLMaster": master_records.append(rec) master_records_len = len(master_records) @@ -327,72 +336,135 @@ class PhotoAlbum(object): offset = offset + master_records_len for master_record in master_records: - record_name = master_record['recordName'] - yield PhotoAsset(self.service, master_record, - asset_records[record_name]) + record_name = master_record["recordName"] + yield PhotoAsset( + self.service, master_record, asset_records[record_name] + ) else: break def _list_query_gen(self, offset, list_type, direction, query_filter=None): query = { - u'query': { - u'filterBy': [ - {u'fieldName': u'startRank', u'fieldValue': - {u'type': u'INT64', u'value': offset}, - u'comparator': u'EQUALS'}, - {u'fieldName': u'direction', u'fieldValue': - {u'type': u'STRING', u'value': direction}, - u'comparator': u'EQUALS'} + u"query": { + u"filterBy": [ + { + u"fieldName": u"startRank", + u"fieldValue": {u"type": u"INT64", u"value": offset}, + u"comparator": u"EQUALS", + }, + { + u"fieldName": u"direction", + u"fieldValue": {u"type": u"STRING", u"value": direction}, + u"comparator": u"EQUALS", + }, ], - u'recordType': list_type + u"recordType": list_type, }, - u'resultsLimit': self.page_size * 2, - u'desiredKeys': [ - u'resJPEGFullWidth', u'resJPEGFullHeight', - u'resJPEGFullFileType', u'resJPEGFullFingerprint', - u'resJPEGFullRes', u'resJPEGLargeWidth', - u'resJPEGLargeHeight', u'resJPEGLargeFileType', - u'resJPEGLargeFingerprint', u'resJPEGLargeRes', - u'resJPEGMedWidth', u'resJPEGMedHeight', - u'resJPEGMedFileType', u'resJPEGMedFingerprint', - u'resJPEGMedRes', u'resJPEGThumbWidth', - u'resJPEGThumbHeight', u'resJPEGThumbFileType', - u'resJPEGThumbFingerprint', u'resJPEGThumbRes', - u'resVidFullWidth', u'resVidFullHeight', - u'resVidFullFileType', u'resVidFullFingerprint', - u'resVidFullRes', u'resVidMedWidth', u'resVidMedHeight', - u'resVidMedFileType', u'resVidMedFingerprint', - u'resVidMedRes', u'resVidSmallWidth', u'resVidSmallHeight', - u'resVidSmallFileType', u'resVidSmallFingerprint', - u'resVidSmallRes', u'resSidecarWidth', u'resSidecarHeight', - u'resSidecarFileType', u'resSidecarFingerprint', - u'resSidecarRes', u'itemType', u'dataClassType', - u'filenameEnc', u'originalOrientation', u'resOriginalWidth', - u'resOriginalHeight', u'resOriginalFileType', - u'resOriginalFingerprint', u'resOriginalRes', - u'resOriginalAltWidth', u'resOriginalAltHeight', - u'resOriginalAltFileType', u'resOriginalAltFingerprint', - u'resOriginalAltRes', u'resOriginalVidComplWidth', - u'resOriginalVidComplHeight', u'resOriginalVidComplFileType', - u'resOriginalVidComplFingerprint', u'resOriginalVidComplRes', - u'isDeleted', u'isExpunged', u'dateExpunged', u'remappedRef', - u'recordName', u'recordType', u'recordChangeTag', - u'masterRef', u'adjustmentRenderType', u'assetDate', - u'addedDate', u'isFavorite', u'isHidden', u'orientation', - u'duration', u'assetSubtype', u'assetSubtypeV2', - u'assetHDRType', u'burstFlags', u'burstFlagsExt', u'burstId', - u'captionEnc', u'locationEnc', u'locationV2Enc', - u'locationLatitude', u'locationLongitude', u'adjustmentType', - u'timeZoneOffset', u'vidComplDurValue', u'vidComplDurScale', - u'vidComplDispValue', u'vidComplDispScale', - u'vidComplVisibilityState', u'customRenderedValue', - u'containerId', u'itemId', u'position', u'isKeyAsset' + u"resultsLimit": self.page_size * 2, + u"desiredKeys": [ + u"resJPEGFullWidth", + u"resJPEGFullHeight", + u"resJPEGFullFileType", + u"resJPEGFullFingerprint", + u"resJPEGFullRes", + u"resJPEGLargeWidth", + u"resJPEGLargeHeight", + u"resJPEGLargeFileType", + u"resJPEGLargeFingerprint", + u"resJPEGLargeRes", + u"resJPEGMedWidth", + u"resJPEGMedHeight", + u"resJPEGMedFileType", + u"resJPEGMedFingerprint", + u"resJPEGMedRes", + u"resJPEGThumbWidth", + u"resJPEGThumbHeight", + u"resJPEGThumbFileType", + u"resJPEGThumbFingerprint", + u"resJPEGThumbRes", + u"resVidFullWidth", + u"resVidFullHeight", + u"resVidFullFileType", + u"resVidFullFingerprint", + u"resVidFullRes", + u"resVidMedWidth", + u"resVidMedHeight", + u"resVidMedFileType", + u"resVidMedFingerprint", + u"resVidMedRes", + u"resVidSmallWidth", + u"resVidSmallHeight", + u"resVidSmallFileType", + u"resVidSmallFingerprint", + u"resVidSmallRes", + u"resSidecarWidth", + u"resSidecarHeight", + u"resSidecarFileType", + u"resSidecarFingerprint", + u"resSidecarRes", + u"itemType", + u"dataClassType", + u"filenameEnc", + u"originalOrientation", + u"resOriginalWidth", + u"resOriginalHeight", + u"resOriginalFileType", + u"resOriginalFingerprint", + u"resOriginalRes", + u"resOriginalAltWidth", + u"resOriginalAltHeight", + u"resOriginalAltFileType", + u"resOriginalAltFingerprint", + u"resOriginalAltRes", + u"resOriginalVidComplWidth", + u"resOriginalVidComplHeight", + u"resOriginalVidComplFileType", + u"resOriginalVidComplFingerprint", + u"resOriginalVidComplRes", + u"isDeleted", + u"isExpunged", + u"dateExpunged", + u"remappedRef", + u"recordName", + u"recordType", + u"recordChangeTag", + u"masterRef", + u"adjustmentRenderType", + u"assetDate", + u"addedDate", + u"isFavorite", + u"isHidden", + u"orientation", + u"duration", + u"assetSubtype", + u"assetSubtypeV2", + u"assetHDRType", + u"burstFlags", + u"burstFlagsExt", + u"burstId", + u"captionEnc", + u"locationEnc", + u"locationV2Enc", + u"locationLatitude", + u"locationLongitude", + u"adjustmentType", + u"timeZoneOffset", + u"vidComplDurValue", + u"vidComplDurScale", + u"vidComplDispValue", + u"vidComplDispScale", + u"vidComplVisibilityState", + u"customRenderedValue", + u"containerId", + u"itemId", + u"position", + u"isKeyAsset", ], - u'zoneID': {u'zoneName': u'PrimarySync'} + u"zoneID": {u"zoneName": u"PrimarySync"}, } if query_filter: - query['query']['filterBy'].extend(query_filter) + query["query"]["filterBy"].extend(query_filter) return query @@ -403,17 +475,15 @@ class PhotoAlbum(object): as_unicode = self.__unicode__() if sys.version_info[0] >= 3: return as_unicode - return as_unicode.encode('utf-8', 'ignore') + return as_unicode.encode("utf-8", "ignore") def __repr__(self): - return "<%s: '%s'>" % ( - type(self).__name__, - self - ) + return "<%s: '%s'>" % (type(self).__name__, self) class PhotoAsset(object): """A photo.""" + def __init__(self, service, master_record, asset_record): self._service = service self._master_record = master_record @@ -424,31 +494,31 @@ class PhotoAsset(object): PHOTO_VERSION_LOOKUP = { u"original": u"resOriginal", u"medium": u"resJPEGMed", - u"thumb": u"resJPEGThumb" + u"thumb": u"resJPEGThumb", } VIDEO_VERSION_LOOKUP = { u"original": u"resOriginal", u"medium": u"resVidMed", - u"thumb": u"resVidSmall" + u"thumb": u"resVidSmall", } @property def id(self): """Gets the photo id.""" - return self._master_record['recordName'] + return self._master_record["recordName"] @property def filename(self): """Gets the photo file name.""" return base64.b64decode( - self._master_record['fields']['filenameEnc']['value'] - ).decode('utf-8') + self._master_record["fields"]["filenameEnc"]["value"] + ).decode("utf-8") @property def size(self): """Gets the photo size.""" - return self._master_record['fields']['resOriginalRes']['value']['size'] + return self._master_record["fields"]["resOriginalRes"]["value"]["size"] @property def created(self): @@ -460,8 +530,8 @@ class PhotoAsset(object): """Gets the photo asset date.""" try: return datetime.fromtimestamp( - self._asset_record['fields']['assetDate']['value'] / 1000.0, - tz=UTC) + self._asset_record["fields"]["assetDate"]["value"] / 1000.0, tz=UTC + ) except KeyError: return datetime.fromtimestamp(0) @@ -469,106 +539,104 @@ class PhotoAsset(object): def added_date(self): """Gets the photo added date.""" return datetime.fromtimestamp( - self._asset_record['fields']['addedDate']['value'] / 1000.0, - tz=UTC) + self._asset_record["fields"]["addedDate"]["value"] / 1000.0, tz=UTC + ) @property def dimensions(self): """Gets the photo dimensions.""" - return (self._master_record['fields']['resOriginalWidth']['value'], - self._master_record['fields']['resOriginalHeight']['value']) + return ( + self._master_record["fields"]["resOriginalWidth"]["value"], + self._master_record["fields"]["resOriginalHeight"]["value"], + ) @property def versions(self): """Gets the photo versions.""" if not self._versions: self._versions = {} - if 'resVidSmallRes' in self._master_record['fields']: + if "resVidSmallRes" in self._master_record["fields"]: typed_version_lookup = self.VIDEO_VERSION_LOOKUP else: typed_version_lookup = self.PHOTO_VERSION_LOOKUP for key, prefix in typed_version_lookup.items(): - if '%sRes' % prefix in self._master_record['fields']: - fields = self._master_record['fields'] - version = {'filename': self.filename} + if "%sRes" % prefix in self._master_record["fields"]: + fields = self._master_record["fields"] + version = {"filename": self.filename} - width_entry = fields.get('%sWidth' % prefix) + width_entry = fields.get("%sWidth" % prefix) if width_entry: - version['width'] = width_entry['value'] + version["width"] = width_entry["value"] else: - version['width'] = None + version["width"] = None - height_entry = fields.get('%sHeight' % prefix) + height_entry = fields.get("%sHeight" % prefix) if height_entry: - version['height'] = height_entry['value'] + version["height"] = height_entry["value"] else: - version['height'] = None + version["height"] = None - size_entry = fields.get('%sRes' % prefix) + size_entry = fields.get("%sRes" % prefix) if size_entry: - version['size'] = size_entry['value']['size'] - version['url'] = size_entry['value']['downloadURL'] + version["size"] = size_entry["value"]["size"] + version["url"] = size_entry["value"]["downloadURL"] else: - version['size'] = None - version['url'] = None + version["size"] = None + version["url"] = None - type_entry = fields.get('%sFileType' % prefix) + type_entry = fields.get("%sFileType" % prefix) if type_entry: - version['type'] = type_entry['value'] + version["type"] = type_entry["value"] else: - version['type'] = None + version["type"] = None self._versions[key] = version return self._versions - def download(self, version='original', **kwargs): + def download(self, version="original", **kwargs): """Returns the photo file.""" if version not in self.versions: return None return self._service.session.get( - self.versions[version]['url'], - stream=True, - **kwargs + self.versions[version]["url"], stream=True, **kwargs ) def delete(self): """Deletes the photo.""" - json_data = ('{"query":{"recordType":"CheckIndexingState"},' - '"zoneID":{"zoneName":"PrimarySync"}}') + json_data = ( + '{"query":{"recordType":"CheckIndexingState"},' + '"zoneID":{"zoneName":"PrimarySync"}}' + ) - json_data = ('{"operations":[{' - '"operationType":"update",' - '"record":{' - '"recordName":"%s",' - '"recordType":"%s",' - '"recordChangeTag":"%s",' - '"fields":{"isDeleted":{"value":1}' - '}}}],' - '"zoneID":{' - '"zoneName":"PrimarySync"' - '},"atomic":true}' - % ( - self._asset_record['recordName'], - self._asset_record['recordType'], - self._master_record['recordChangeTag'] - ) - ) + json_data = ( + '{"operations":[{' + '"operationType":"update",' + '"record":{' + '"recordName":"%s",' + '"recordType":"%s",' + '"recordChangeTag":"%s",' + '"fields":{"isDeleted":{"value":1}' + "}}}]," + '"zoneID":{' + '"zoneName":"PrimarySync"' + '},"atomic":true}' + % ( + self._asset_record["recordName"], + self._asset_record["recordType"], + self._master_record["recordChangeTag"], + ) + ) endpoint = self._service.service_endpoint params = urlencode(self._service.params) - url = ('%s/records/modify?%s' % (endpoint, params)) + url = "%s/records/modify?%s" % (endpoint, params) return self._service.session.post( - url, - data=json_data, - headers={'Content-type': 'text/plain'} + url, data=json_data, headers={"Content-type": "text/plain"} ) def __repr__(self): - return "<%s: id=%s>" % ( - type(self).__name__, - self.id - ) + return "<%s: id=%s>" % (type(self).__name__, self.id) diff --git a/pyicloud/services/reminders.py b/pyicloud/services/reminders.py index 4f7387a..9ab0694 100644 --- a/pyicloud/services/reminders.py +++ b/pyicloud/services/reminders.py @@ -10,6 +10,7 @@ from tzlocal import get_localzone class RemindersService(object): """The 'Reminders' iCloud service.""" + def __init__(self, service_root, session, params): self.session = session self._params = params @@ -23,64 +24,61 @@ class RemindersService(object): def refresh(self): """Refresh data.""" params_reminders = dict(self._params) - params_reminders.update({ - 'clientVersion': '4.0', - 'lang': 'en-us', - 'usertz': get_localzone().zone - }) + params_reminders.update( + {"clientVersion": "4.0", "lang": "en-us", "usertz": get_localzone().zone} + ) # Open reminders req = self.session.get( - self._service_root + '/rd/startup', - params=params_reminders + self._service_root + "/rd/startup", params=params_reminders ) data = req.json() self.lists = {} self.collections = {} - for collection in data['Collections']: + for collection in data["Collections"]: temp = [] - self.collections[collection['title']] = { - 'guid': collection['guid'], - 'ctag': collection['ctag'] + self.collections[collection["title"]] = { + "guid": collection["guid"], + "ctag": collection["ctag"], } - for reminder in data['Reminders']: + for reminder in data["Reminders"]: - if reminder['pGuid'] != collection['guid']: + if reminder["pGuid"] != collection["guid"]: continue - if reminder.get('dueDate'): + if reminder.get("dueDate"): due = datetime( - reminder['dueDate'][1], - reminder['dueDate'][2], - reminder['dueDate'][3], - reminder['dueDate'][4], - reminder['dueDate'][5] + reminder["dueDate"][1], + reminder["dueDate"][2], + reminder["dueDate"][3], + reminder["dueDate"][4], + reminder["dueDate"][5], ) else: due = None - temp.append({ - "title": reminder['title'], - "desc": reminder.get('description'), - "due": due - }) - self.lists[collection['title']] = temp + temp.append( + { + "title": reminder["title"], + "desc": reminder.get("description"), + "due": due, + } + ) + self.lists[collection["title"]] = temp def post(self, title, description="", collection=None, due_date=None): """Adds a new reminder.""" - pguid = 'tasks' + pguid = "tasks" if collection: if collection in self.collections: - pguid = self.collections[collection]['guid'] + pguid = self.collections[collection]["guid"] params_reminders = dict(self._params) - params_reminders.update({ - 'clientVersion': '4.0', - 'lang': 'en-us', - 'usertz': get_localzone().zone - }) + params_reminders.update( + {"clientVersion": "4.0", "lang": "en-us", "usertz": get_localzone().zone} + ) due_dates = None if due_date: @@ -90,34 +88,37 @@ class RemindersService(object): due_date.month, due_date.day, due_date.hour, - due_date.minute + due_date.minute, ] req = self.session.post( - self._service_root + '/rd/reminders/tasks', - data=json.dumps({ - "Reminders": { - 'title': title, - "description": description, - "pGuid": pguid, - "etag": None, - "order": None, - "priority": 0, - "recurrence": None, - "alarms": [], - "startDate": None, - "startDateTz": None, - "startDateIsAllDay": False, - "completedDate": None, - "dueDate": due_dates, - "dueDateIsAllDay": False, - "lastModifiedDate": None, - "createdDate": None, - "isFamily": None, - "createdDateExtended": int(time.time()*1000), - "guid": str(uuid.uuid4()) - }, - "ClientState": {"Collections": list(self.collections.values())} - }), - params=params_reminders) + self._service_root + "/rd/reminders/tasks", + data=json.dumps( + { + "Reminders": { + "title": title, + "description": description, + "pGuid": pguid, + "etag": None, + "order": None, + "priority": 0, + "recurrence": None, + "alarms": [], + "startDate": None, + "startDateTz": None, + "startDateIsAllDay": False, + "completedDate": None, + "dueDate": due_dates, + "dueDateIsAllDay": False, + "lastModifiedDate": None, + "createdDate": None, + "isFamily": None, + "createdDateExtended": int(time.time() * 1000), + "guid": str(uuid.uuid4()), + }, + "ClientState": {"Collections": list(self.collections.values())}, + } + ), + params=params_reminders, + ) return req.ok diff --git a/pyicloud/services/ubiquity.py b/pyicloud/services/ubiquity.py index 468cb18..8c6c420 100644 --- a/pyicloud/services/ubiquity.py +++ b/pyicloud/services/ubiquity.py @@ -11,7 +11,7 @@ class UbiquityService(object): self.params = params self._root = None - self._node_url = service_root + '/ws/%s/%s/%s' + self._node_url = service_root + "/ws/%s/%s/%s" @property def root(self): @@ -20,13 +20,9 @@ class UbiquityService(object): self._root = self.get_node(0) return self._root - def get_node_url(self, node_id, variant='item'): + def get_node_url(self, node_id, variant="item"): """Returns a node URL.""" - return self._node_url % ( - self.params['dsid'], - variant, - node_id - ) + return self._node_url % (self.params["dsid"], variant, node_id) def get_node(self, node_id): """Returns a node.""" @@ -35,18 +31,13 @@ class UbiquityService(object): def get_children(self, node_id): """Returns a node children.""" - request = self.session.get( - self.get_node_url(node_id, 'parent') - ) - items = request.json()['item_list'] + request = self.session.get(self.get_node_url(node_id, "parent")) + items = request.json()["item_list"] return [UbiquityNode(self, item) for item in items] def get_file(self, node_id, **kwargs): """Returns a node file.""" - return self.session.get( - self.get_node_url(node_id, 'file'), - **kwargs - ) + return self.session.get(self.get_node_url(node_id, "file"), **kwargs) def __getattr__(self, attr): return getattr(self.root, attr) @@ -57,6 +48,7 @@ class UbiquityService(object): class UbiquityNode(object): """Ubiquity node.""" + def __init__(self, conn, data): self.data = data self.connection = conn @@ -66,38 +58,35 @@ class UbiquityNode(object): @property def item_id(self): """Gets the node id.""" - return self.data.get('item_id') + return self.data.get("item_id") @property def name(self): """Gets the node name.""" - return self.data.get('name') + return self.data.get("name") @property def type(self): """Gets the node type.""" - return self.data.get('type') + return self.data.get("type") @property def size(self): """Gets the node size.""" try: - return int(self.data.get('size')) + return int(self.data.get("size")) except ValueError: return None @property def modified(self): """Gets the node modified date.""" - return datetime.strptime( - self.data.get('modified'), - '%Y-%m-%dT%H:%M:%SZ' - ) - + return datetime.strptime(self.data.get("modified"), "%Y-%m-%dT%H:%M:%SZ") + def open(self, **kwargs): """Returns the node file.""" return self.connection.get_file(self.item_id, **kwargs) - + def get_children(self): """Returns the node children.""" if not self._children: @@ -110,15 +99,13 @@ class UbiquityNode(object): def get(self, name): """Returns a child node by its name.""" - return [ - child for child in self.get_children() if child.name == name - ][0] + return [child for child in self.get_children() if child.name == name][0] def __getitem__(self, key): try: return self.get(key) except IndexError: - raise KeyError('No child named %s exists' % key) + raise KeyError("No child named %s exists" % key) def __unicode__(self): return self.name @@ -127,10 +114,7 @@ class UbiquityNode(object): as_unicode = self.__unicode__() if sys.version_info[0] >= 3: return as_unicode - return as_unicode.encode('utf-8', 'ignore') + return as_unicode.encode("utf-8", "ignore") def __repr__(self): - return "<%s: '%s'>" % ( - self.type.capitalize(), - self - ) + return "<%s: '%s'>" % (self.type.capitalize(), self) diff --git a/pyicloud/utils.py b/pyicloud/utils.py index e724e57..d90f3ca 100644 --- a/pyicloud/utils.py +++ b/pyicloud/utils.py @@ -6,7 +6,7 @@ import sys from .exceptions import PyiCloudNoStoredPasswordAvailableException -KEYRING_SYSTEM = 'pyicloud://icloud-password' +KEYRING_SYSTEM = "pyicloud://icloud-password" def get_password(username, interactive=sys.stdout.isatty()): @@ -18,9 +18,7 @@ def get_password(username, interactive=sys.stdout.isatty()): raise return getpass.getpass( - 'Enter iCloud password for {username}: '.format( - username=username, - ) + "Enter iCloud password for {username}: ".format(username=username,) ) @@ -36,18 +34,13 @@ def password_exists_in_keyring(username): def get_password_from_keyring(username): """Get the password from a username.""" - result = keyring.get_password( - KEYRING_SYSTEM, - username - ) + result = keyring.get_password(KEYRING_SYSTEM, username) if result is None: raise PyiCloudNoStoredPasswordAvailableException( "No pyicloud password for {username} could be found " "in the system keychain. Use the `--store-in-keyring` " "command-line option for storing a password for this " - "username.".format( - username=username, - ) + "username.".format(username=username,) ) return result @@ -55,25 +48,18 @@ def get_password_from_keyring(username): def store_password_in_keyring(username, password): """Store the password of a username.""" - return keyring.set_password( - KEYRING_SYSTEM, - username, - password, - ) + return keyring.set_password(KEYRING_SYSTEM, username, password,) def delete_password_in_keyring(username): """Delete the password of a username.""" - return keyring.delete_password( - KEYRING_SYSTEM, - username, - ) + return keyring.delete_password(KEYRING_SYSTEM, username,) def underscore_to_camelcase(word, initial_capital=False): """Transform a word to camelCase.""" - words = [x.capitalize() or '_' for x in word.split('_')] + words = [x.capitalize() or "_" for x in word.split("_")] if not initial_capital: words[0] = words[0].lower() - return ''.join(words) + return "".join(words) diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..7334c01 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,21 @@ +[tool.black] +line-length = 88 +target-version = ["py27", "py33", "py34", "py35", "py36", "py37", "py38"] +exclude = ''' + +( + /( + \.eggs + | \.git + | \.hg + | \.mypy_cache + | \.tox + | \.venv + | _build + | buck-out + | build + | dist + )/ + | exceptions.py +) +''' \ No newline at end of file diff --git a/scripts/check_format.sh b/scripts/check_format.sh new file mode 100755 index 0000000..32522cf --- /dev/null +++ b/scripts/check_format.sh @@ -0,0 +1,16 @@ +./scripts/common.sh + +if ! hash python3; then + echo "python3 is not installed" + exit 0 +fi + +ver=$(python3 -V 2>&1 | sed 's/.* \([0-9]\).\([0-9]\).*/\1\2/') +if [ "$ver" -lt "36" ]; then + echo "This script requires python 3.6 or greater" + exit 0 +fi + +pip install black==19.10b0 + +black --check --fast . diff --git a/setup.py b/setup.py index 4f36dd4..597f8c4 100644 --- a/setup.py +++ b/setup.py @@ -1,38 +1,34 @@ from setuptools import setup, find_packages -with open('requirements.txt') as f: +with open("requirements.txt") as f: required = f.read().splitlines() setup( - name='pyicloud', - version='0.9.6.1', - url='https://github.com/picklepete/pyicloud', + name="pyicloud", + version="0.9.6.1", + url="https://github.com/picklepete/pyicloud", description=( - 'PyiCloud is a module which allows pythonistas to ' - 'interact with iCloud webservices.' + "PyiCloud is a module which allows pythonistas to " + "interact with iCloud webservices." ), - maintainer='The PyiCloud Authors', - maintainer_email=' ', - license='MIT', + maintainer="The PyiCloud Authors", + maintainer_email=" ", + license="MIT", packages=find_packages(include=["pyicloud*"]), install_requires=required, classifiers=[ - 'Intended Audience :: Developers', - 'License :: OSI Approved :: MIT License', - 'Operating System :: OS Independent', - 'Programming Language :: Python', - 'Programming Language :: Python :: 2.7', - 'Programming Language :: Python :: 3', + "Intended Audience :: Developers", + "License :: OSI Approved :: MIT License", + "Operating System :: OS Independent", + "Programming Language :: Python", + "Programming Language :: Python :: 2.7", + "Programming Language :: Python :: 3", "Programming Language :: Python :: 3.3", "Programming Language :: Python :: 3.4", "Programming Language :: Python :: 3.7", "Programming Language :: Python :: 3.8", ], - entry_points={ - 'console_scripts': [ - 'icloud = pyicloud.cmdline:main' - ] - }, + entry_points={"console_scripts": ["icloud = pyicloud.cmdline:main"]}, ) diff --git a/tests/__init__.py b/tests/__init__.py index bfc60fc..04b92f7 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -12,41 +12,41 @@ VALID_USERS = [AUTHENTICATED_USER, REQUIRES_2SA_USER] class PyiCloudServiceMock(base.PyiCloudService): """Mocked PyiCloudService.""" + def __init__( - self, - apple_id, - password=None, - cookie_directory=None, - verify=True, - client_id=None, - with_family=True - ): - base.PyiCloudService.__init__(self, apple_id, password, cookie_directory, verify, client_id, with_family) + self, + apple_id, + password=None, + cookie_directory=None, + verify=True, + client_id=None, + with_family=True, + ): + base.PyiCloudService.__init__( + self, apple_id, password, cookie_directory, verify, client_id, with_family + ) base.FindMyiPhoneServiceManager = FindMyiPhoneServiceManagerMock def authenticate(self): - if not self.user.get("apple_id") or self.user.get("apple_id") not in VALID_USERS: - raise PyiCloudFailedLoginException("Invalid email/password combination.", None) + if ( + not self.user.get("apple_id") + or self.user.get("apple_id") not in VALID_USERS + ): + raise PyiCloudFailedLoginException( + "Invalid email/password combination.", None + ) if not self.user.get("password") or self.user.get("password") != "valid_pass": - raise PyiCloudFailedLoginException("Invalid email/password combination.", None) - - self.params.update({'dsid': 'ID'}) + raise PyiCloudFailedLoginException( + "Invalid email/password combination.", None + ) + + self.params.update({"dsid": "ID"}) self._webservices = { - 'account': { - 'url': 'account_url', - }, - 'findme': { - 'url': 'findme_url', - }, - 'calendar': { - 'url': 'calendar_url', - }, - 'contacts': { - 'url': 'contacts_url', - }, - 'reminders': { - 'url': 'reminders_url', - } + "account": {"url": "account_url",}, + "findme": {"url": "findme_url",}, + "calendar": {"url": "calendar_url",}, + "contacts": {"url": "contacts_url",}, + "reminders": {"url": "reminders_url",}, } @property @@ -55,13 +55,18 @@ class PyiCloudServiceMock(base.PyiCloudService): @property def trusted_devices(self): - return [ - {"deviceType": "SMS", "areaCode": "", "phoneNumber": "*******58", "deviceId": "1"} + return [ + { + "deviceType": "SMS", + "areaCode": "", + "phoneNumber": "*******58", + "deviceId": "1", + } ] - + def send_verification_code(self, device): return device - + def validate_verification_code(self, device, code): if not device or code != 0: self.user["apple_id"] = AUTHENTICATED_USER @@ -78,7 +83,7 @@ IPHONE_DEVICE = AppleDevice( "playSound": True, "vibrate": True, "createTimestamp": 1568031021347, - "statusCode": "200" + "statusCode": "200", }, "canWipeAfterLock": True, "baUUID": "", @@ -111,7 +116,7 @@ IPHONE_DEVICE = AppleDevice( "CWP": False, "KEY": False, "KPD": False, - "WIP": True + "WIP": True, }, "lowPowerMode": True, "rawDeviceModel": "iPhone11,8", @@ -125,10 +130,7 @@ IPHONE_DEVICE = AppleDevice( "locationEnabled": True, "lockedTimestamp": None, "locFoundEnabled": False, - "snd": { - "createTimestamp": 1568031021347, - "statusCode": "200" - }, + "snd": {"createTimestamp": 1568031021347, "statusCode": "200"}, "fmlyShare": False, "lostDevice": { "stopLostMode": False, @@ -138,7 +140,7 @@ IPHONE_DEVICE = AppleDevice( "ownerNbr": "", "text": "", "createTimestamp": 1558383841233, - "statusCode": "2204" + "statusCode": "2204", }, "lostModeCapable": True, "wipedTimestamp": None, @@ -164,16 +166,16 @@ IPHONE_DEVICE = AppleDevice( "timeStamp": 1568827039692, "locationFinished": False, "verticalAccuracy": 0.0, - "longitude": 5.012345678 + "longitude": 5.012345678, }, "deviceModel": "iphoneXR-1-6-0", "maxMsgChar": 160, "darkWake": False, - "remoteWipe": None + "remoteWipe": None, }, None, None, - None + None, ) DEVICES = { @@ -183,8 +185,11 @@ DEVICES = { class FindMyiPhoneServiceManagerMock(FindMyiPhoneServiceManager): """Mocked FindMyiPhoneServiceManager.""" + def __init__(self, service_root, session, params, with_family=False): - FindMyiPhoneServiceManager.__init__(self, service_root, session, params, with_family) + FindMyiPhoneServiceManager.__init__( + self, service_root, session, params, with_family + ) def refresh_client(self): self._devices = DEVICES diff --git a/tests/test_cmdline.py b/tests/test_cmdline.py index 087ec29..b63dfa0 100644 --- a/tests/test_cmdline.py +++ b/tests/test_cmdline.py @@ -7,13 +7,16 @@ import sys import pickle import pytest from unittest import TestCase + if sys.version_info >= (3, 3): from unittest.mock import patch # pylint: disable=no-name-in-module,import-error else: from mock import patch + class TestCmdline(TestCase): """Cmdline test cases.""" + main = None def setUp(self): @@ -34,13 +37,13 @@ class TestCmdline(TestCase): def test_help(self): """Test the help command.""" with pytest.raises(SystemExit, match="0"): - self.main(['--help']) + self.main(["--help"]) def test_username(self): """Test the username command.""" # No username supplied with pytest.raises(SystemExit, match="2"): - self.main(['--username']) + self.main(["--username"]) @patch("getpass.getpass") def test_username_password_invalid(self, mock_getpass): @@ -48,42 +51,49 @@ class TestCmdline(TestCase): # No password supplied mock_getpass.return_value = None with pytest.raises(SystemExit, match="2"): - self.main(['--username', 'invalid_user']) + self.main(["--username", "invalid_user"]) # Bad username or password mock_getpass.return_value = "invalid_pass" - with pytest.raises(RuntimeError, match="Bad username or password for invalid_user"): - self.main(['--username', 'invalid_user']) + with pytest.raises( + RuntimeError, match="Bad username or password for invalid_user" + ): + self.main(["--username", "invalid_user"]) # We should not use getpass for this one, but we reset the password at login fail - with pytest.raises(RuntimeError, match="Bad username or password for invalid_user"): - self.main(['--username', 'invalid_user', '--password', 'invalid_pass']) - + with pytest.raises( + RuntimeError, match="Bad username or password for invalid_user" + ): + self.main(["--username", "invalid_user", "--password", "invalid_pass"]) - @patch('pyicloud.cmdline.input') + @patch("pyicloud.cmdline.input") def test_username_password_requires_2sa(self, mock_input): """Test username and password commands.""" # Valid connection for the first time mock_input.return_value = "0" with pytest.raises(SystemExit, match="0"): + # fmt: off self.main([ '--username', REQUIRES_2SA_USER, '--password', 'valid_pass', '--non-interactive', ]) + # fmt: on def test_device_outputfile(self): """Test the outputfile command.""" with pytest.raises(SystemExit, match="0"): + # fmt: off self.main([ '--username', AUTHENTICATED_USER, '--password', 'valid_pass', '--non-interactive', '--outputfile' ]) + # fmt: on for key in DEVICES: - file_name = DEVICES[key].content['name'].strip().lower() + ".fmip_snapshot" + file_name = DEVICES[key].content["name"].strip().lower() + ".fmip_snapshot" pickle_file = open(file_name, "rb") assert pickle_file