Back is black (#259)

* Back is black

* Format with black
This commit is contained in:
Quentame 2020-03-24 14:54:43 +01:00 committed by GitHub
parent 9588c0d448
commit ababe3cdf3
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
18 changed files with 808 additions and 739 deletions

View file

@ -15,4 +15,5 @@ before_install:
- pip install -e .
script:
- pylint pyicloud tests
- ./scripts/check_format.sh;
- py.test

View file

@ -13,14 +13,16 @@ pyiCloud
:target: https://pypi.org/project/pyiCloud
.. image:: https://requires.io/github/Quentame/pyicloud/requirements.svg?branch=master
:target: https://requires.io/github/Quentame/pyicloud/requirements/?branch=master
:alt: Requirements Status
:target: https://requires.io/github/Quentame/pyicloud/requirements/?branch=master
.. image:: https://img.shields.io/badge/code%20style-black-000000.svg
:target: https://github.com/psf/black
.. image:: https://badges.gitter.im/Join%20Chat.svg
:alt: Join the chat at https://gitter.im/picklepete/pyicloud
:target: https://gitter.im/picklepete/pyicloud?utm_source=badge&utm_medium=badge&utm_campaign=pr-badge&utm_content=badge
PyiCloud is a module which allows pythonistas to interact with iCloud webservices. It's powered by the fantastic `requests <https://github.com/kennethreitz/requests>`_ HTTP library.
At its core, PyiCloud connects to iCloud using your username and password, then performs calendar and iPhone queries against their API.

View file

@ -14,7 +14,7 @@ from pyicloud.exceptions import (
PyiCloudFailedLoginException,
PyiCloudAPIResponseException,
PyiCloud2SARequiredException,
PyiCloudServiceNotActivatedException
PyiCloudServiceNotActivatedException,
)
from pyicloud.services import (
FindMyiPhoneServiceManager,
@ -23,7 +23,7 @@ from pyicloud.services import (
ContactsService,
RemindersService,
PhotosService,
AccountService
AccountService,
)
from pyicloud.utils import get_password_from_keyring
@ -38,6 +38,7 @@ LOGGER = logging.getLogger(__name__)
class PyiCloudPasswordFilter(logging.Filter):
"""Password log hider."""
def __init__(self, password):
super(PyiCloudPasswordFilter, self).__init__(password)
@ -52,6 +53,7 @@ class PyiCloudPasswordFilter(logging.Filter):
class PyiCloudSession(Session):
"""iCloud session."""
def __init__(self, service):
self.service = service
super(PyiCloudSession, self).__init__()
@ -61,27 +63,25 @@ class PyiCloudSession(Session):
# Charge logging to the right service endpoint
callee = inspect.stack()[2]
module = inspect.getmodule(callee[0])
request_logger = logging.getLogger(module.__name__).getChild('http')
request_logger = logging.getLogger(module.__name__).getChild("http")
if self.service.password_filter not in request_logger.filters:
request_logger.addFilter(self.service.password_filter)
request_logger.debug("%s %s %s", args[0], args[1], kwargs.get('data', ''))
request_logger.debug("%s %s %s", args[0], args[1], kwargs.get("data", ""))
kwargs.pop('retried', None)
kwargs.pop("retried", None)
response = super(PyiCloudSession, self).request(*args, **kwargs)
content_type = response.headers.get('Content-Type', '').split(';')[0]
json_mimetypes = ['application/json', 'text/json']
content_type = response.headers.get("Content-Type", "").split(";")[0]
json_mimetypes = ["application/json", "text/json"]
if not response.ok and content_type not in json_mimetypes:
if kwargs.get('retried') is None and response.status_code == 450:
if kwargs.get("retried") is None and response.status_code == 450:
api_error = PyiCloudAPIResponseException(
response.reason,
response.status_code,
retry=True
response.reason, response.status_code, retry=True
)
request_logger.warn(api_error)
kwargs['retried'] = True
kwargs["retried"] = True
return self.request(*args, **kwargs)
self._raise_error(response.status_code, response.reason)
@ -91,22 +91,22 @@ class PyiCloudSession(Session):
try:
data = response.json()
except: # pylint: disable=bare-except
request_logger.warning('Failed to parse response with JSON mimetype')
request_logger.warning("Failed to parse response with JSON mimetype")
return response
request_logger.debug(data)
reason = data.get('errorMessage')
reason = reason or data.get('reason')
reason = reason or data.get('errorReason')
if not reason and isinstance(data.get('error'), six.string_types):
reason = data.get('error')
if not reason and data.get('error'):
reason = data.get("errorMessage")
reason = reason or data.get("reason")
reason = reason or data.get("errorReason")
if not reason and isinstance(data.get("error"), six.string_types):
reason = data.get("error")
if not reason and data.get("error"):
reason = "Unknown reason"
code = data.get('errorCode')
if not code and data.get('serverErrorCode'):
code = data.get('serverErrorCode')
code = data.get("errorCode")
if not code and data.get("serverErrorCode"):
code = data.get("serverErrorCode")
if reason:
self._raise_error(code, reason)
@ -114,20 +114,25 @@ class PyiCloudSession(Session):
return response
def _raise_error(self, code, reason):
if self.service.requires_2sa and \
reason == 'Missing X-APPLE-WEBAUTH-TOKEN cookie':
raise PyiCloud2SARequiredException(self.service.user['apple_id'])
if code in ('ZONE_NOT_FOUND', 'AUTHENTICATION_FAILED'):
reason = 'Please log into https://icloud.com/ to manually ' \
'finish setting up your iCloud service'
if (
self.service.requires_2sa
and reason == "Missing X-APPLE-WEBAUTH-TOKEN cookie"
):
raise PyiCloud2SARequiredException(self.service.user["apple_id"])
if code in ("ZONE_NOT_FOUND", "AUTHENTICATION_FAILED"):
reason = (
"Please log into https://icloud.com/ to manually "
"finish setting up your iCloud service"
)
api_error = PyiCloudServiceNotActivatedException(reason, code)
LOGGER.error(api_error)
raise(api_error)
if code == 'ACCESS_DENIED':
reason = reason + '. Please wait a few minutes then try ' \
'again. The remote servers might be trying to ' \
'throttle requests.'
raise (api_error)
if code == "ACCESS_DENIED":
reason = (
reason + ". Please wait a few minutes then try again."
"The remote servers might be trying to throttle requests."
)
api_error = PyiCloudAPIResponseException(reason, code)
LOGGER.error(api_error)
@ -146,8 +151,13 @@ class PyiCloudService(object):
"""
def __init__(
self, apple_id, password=None, cookie_directory=None, verify=True,
client_id=None, with_family=True
self,
apple_id,
password=None,
cookie_directory=None,
verify=True,
client_id=None,
with_family=True,
):
if password is None:
password = get_password_from_keyring(apple_id)
@ -155,33 +165,32 @@ class PyiCloudService(object):
self.data = {}
self.client_id = client_id or str(uuid.uuid1()).upper()
self.with_family = with_family
self.user = {'apple_id': apple_id, 'password': password}
self.user = {"apple_id": apple_id, "password": password}
self.password_filter = PyiCloudPasswordFilter(password)
LOGGER.addFilter(self.password_filter)
self._home_endpoint = 'https://www.icloud.com'
self._setup_endpoint = 'https://setup.icloud.com/setup/ws/1'
self._home_endpoint = "https://www.icloud.com"
self._setup_endpoint = "https://setup.icloud.com/setup/ws/1"
self._base_login_url = '%s/login' % self._setup_endpoint
self._base_login_url = "%s/login" % self._setup_endpoint
if cookie_directory:
self._cookie_directory = os.path.expanduser(
os.path.normpath(cookie_directory)
)
else:
self._cookie_directory = os.path.join(
tempfile.gettempdir(),
'pyicloud',
)
self._cookie_directory = os.path.join(tempfile.gettempdir(), "pyicloud",)
self.session = PyiCloudSession(self)
self.session.verify = verify
self.session.headers.update({
'Origin': self._home_endpoint,
'Referer': '%s/' % self._home_endpoint,
'User-Agent': 'Opera/9.52 (X11; Linux i686; U; en)'
})
self.session.headers.update(
{
"Origin": self._home_endpoint,
"Referer": "%s/" % self._home_endpoint,
"User-Agent": "Opera/9.52 (X11; Linux i686; U; en)",
}
)
cookiejar_path = self._get_cookiejar_path()
self.session.cookies = cookielib.LWPCookieJar(filename=cookiejar_path)
@ -196,11 +205,11 @@ class PyiCloudService(object):
LOGGER.warning("Failed to read cookiejar %s", cookiejar_path)
self.params = {
'clientBuildNumber': '17DHotfix5',
'clientMasteringNumber': '17DHotfix5',
'ckjsBuildVersion': '17DProjectDev77',
'ckjsVersion': '2.0.5',
'clientId': self.client_id,
"clientBuildNumber": "17DHotfix5",
"clientMasteringNumber": "17DHotfix5",
"ckjsBuildVersion": "17DProjectDev77",
"ckjsVersion": "2.0.5",
"clientId": self.client_id,
}
self.authenticate()
@ -214,26 +223,24 @@ class PyiCloudService(object):
subsequent logins will not cause additional e-mails from Apple.
"""
LOGGER.info("Authenticating as %s", self.user['apple_id'])
LOGGER.info("Authenticating as %s", self.user["apple_id"])
data = dict(self.user)
# We authenticate every time, so "remember me" is not needed
data.update({'extended_login': False})
data.update({"extended_login": False})
try:
req = self.session.post(
self._base_login_url,
params=self.params,
data=json.dumps(data)
self._base_login_url, params=self.params, data=json.dumps(data)
)
except PyiCloudAPIResponseException as error:
msg = 'Invalid email/password combination.'
msg = "Invalid email/password combination."
raise PyiCloudFailedLoginException(msg, error)
self.data = req.json()
self.params.update({'dsid': self.data['dsInfo']['dsid']})
self._webservices = self.data['webservices']
self.params.update({"dsid": self.data["dsInfo"]["dsid"]})
self._webservices = self.data["webservices"]
if not os.path.exists(self._cookie_directory):
os.mkdir(self._cookie_directory)
@ -247,48 +254,46 @@ class PyiCloudService(object):
"""Get path for cookiejar file."""
return os.path.join(
self._cookie_directory,
''.join([c for c in self.user.get('apple_id') if match(r'\w', c)])
"".join([c for c in self.user.get("apple_id") if match(r"\w", c)]),
)
@property
def requires_2sa(self):
"""Returns True if two-step authentication is required."""
return self.data.get('hsaChallengeRequired', False) \
and self.data['dsInfo'].get('hsaVersion', 0) >= 1
return (
self.data.get("hsaChallengeRequired", False)
and self.data["dsInfo"].get("hsaVersion", 0) >= 1
)
# FIXME: Implement 2FA for hsaVersion == 2 # pylint: disable=fixme
@property
def trusted_devices(self):
"""Returns devices trusted for two-step authentication."""
request = self.session.get(
'%s/listDevices' % self._setup_endpoint,
params=self.params
"%s/listDevices" % self._setup_endpoint, params=self.params
)
return request.json().get('devices')
return request.json().get("devices")
def send_verification_code(self, device):
"""Requests that a verification code is sent to the given device."""
data = json.dumps(device)
request = self.session.post(
'%s/sendVerificationCode' % self._setup_endpoint,
"%s/sendVerificationCode" % self._setup_endpoint,
params=self.params,
data=data
data=data,
)
return request.json().get('success', False)
return request.json().get("success", False)
def validate_verification_code(self, device, code):
"""Verifies a verification code received on a trusted device."""
device.update({
'verificationCode': code,
'trustBrowser': True
})
device.update({"verificationCode": code, "trustBrowser": True})
data = json.dumps(device)
try:
self.session.post(
'%s/validateVerificationCode' % self._setup_endpoint,
"%s/validateVerificationCode" % self._setup_endpoint,
params=self.params,
data=data
data=data,
)
except PyiCloudAPIResponseException as error:
if error.code == -21669:
@ -306,20 +311,16 @@ class PyiCloudService(object):
"""Get webservice URL, raise an exception if not exists."""
if self._webservices.get(ws_key) is None:
raise PyiCloudServiceNotActivatedException(
'Webservice not available',
ws_key
"Webservice not available", ws_key
)
return self._webservices[ws_key]['url']
return self._webservices[ws_key]["url"]
@property
def devices(self):
"""Returns all devices."""
service_root = self._get_webservice_url('findme')
service_root = self._get_webservice_url("findme")
return FindMyiPhoneServiceManager(
service_root,
self.session,
self.params,
self.with_family
service_root, self.session, self.params, self.with_family
)
@property
@ -330,63 +331,51 @@ class PyiCloudService(object):
@property
def account(self):
"""Gets the 'Account' service."""
service_root = self._get_webservice_url('account')
return AccountService(
service_root,
self.session,
self.params
)
service_root = self._get_webservice_url("account")
return AccountService(service_root, self.session, self.params)
@property
def files(self):
"""Gets the 'File' service."""
if not self._files:
service_root = self._get_webservice_url('ubiquity')
self._files = UbiquityService(
service_root,
self.session,
self.params
)
service_root = self._get_webservice_url("ubiquity")
self._files = UbiquityService(service_root, self.session, self.params)
return self._files
@property
def photos(self):
"""Gets the 'Photo' service."""
if not self._photos:
service_root = self._get_webservice_url('ckdatabasews')
self._photos = PhotosService(
service_root,
self.session,
self.params
)
service_root = self._get_webservice_url("ckdatabasews")
self._photos = PhotosService(service_root, self.session, self.params)
return self._photos
@property
def calendar(self):
"""Gets the 'Calendar' service."""
service_root = self._get_webservice_url('calendar')
service_root = self._get_webservice_url("calendar")
return CalendarService(service_root, self.session, self.params)
@property
def contacts(self):
"""Gets the 'Contacts' service."""
service_root = self._get_webservice_url('contacts')
service_root = self._get_webservice_url("contacts")
return ContactsService(service_root, self.session, self.params)
@property
def reminders(self):
"""Gets the 'Reminders' service."""
service_root = self._get_webservice_url('reminders')
service_root = self._get_webservice_url("reminders")
return RemindersService(service_root, self.session, self.params)
def __unicode__(self):
return 'iCloud API: %s' % self.user.get('apple_id')
return "iCloud API: %s" % self.user.get("apple_id")
def __str__(self):
as_unicode = self.__unicode__()
if sys.version_info[0] >= 3:
return as_unicode
return as_unicode.encode('utf-8', 'ignore')
return as_unicode.encode("utf-8", "ignore")
def __repr__(self):
return '<%s>' % str(self)
return "<%s>" % str(self)

View file

@ -16,14 +16,14 @@ from pyicloud import PyiCloudService
from pyicloud.exceptions import PyiCloudFailedLoginException
from . import utils
# fmt: off
if six.PY2:
input = raw_input # pylint: disable=redefined-builtin,invalid-name,undefined-variable
else:
input = input # pylint: disable=bad-option-value,self-assigning-variable,invalid-name
# fmt: on
DEVICE_ERROR = (
"Please use the --device switch to indicate which device to use."
)
DEVICE_ERROR = "Please use the --device switch to indicate which device to use."
def create_pickled_data(idevice, filename):
@ -34,7 +34,7 @@ def create_pickled_data(idevice, filename):
This allows the data to be used without resorting to screen / pipe
scrapping.
"""
pickle_file = open(filename, 'wb')
pickle_file = open(filename, "wb")
pickle.dump(idevice.content, pickle_file, protocol=pickle.HIGHEST_PROTOCOL)
pickle_file.close()
@ -44,15 +44,14 @@ def main(args=None):
if args is None:
args = sys.argv[1:]
parser = argparse.ArgumentParser(
description="Find My iPhone CommandLine Tool")
parser = argparse.ArgumentParser(description="Find My iPhone CommandLine Tool")
parser.add_argument(
"--username",
action="store",
dest="username",
default="",
help="Apple ID to Use"
help="Apple ID to Use",
)
parser.add_argument(
"--password",
@ -62,7 +61,7 @@ def main(args=None):
help=(
"Apple ID Password to Use; if unspecified, password will be "
"fetched from the system keyring."
)
),
)
parser.add_argument(
"-n",
@ -70,7 +69,7 @@ def main(args=None):
action="store_false",
dest="interactive",
default=True,
help="Disable interactive prompts."
help="Disable interactive prompts.",
)
parser.add_argument(
"--delete-from-keyring",
@ -189,54 +188,59 @@ def main(args=None):
# Which password we use is determined by your username, so we
# do need to check for this first and separately.
if not username:
parser.error('No username supplied')
parser.error("No username supplied")
if not password:
password = utils.get_password(
username,
interactive=command_line.interactive
username, interactive=command_line.interactive
)
if not password:
parser.error('No password supplied')
parser.error("No password supplied")
try:
api = PyiCloudService(
username.strip(),
password.strip()
)
api = PyiCloudService(username.strip(), password.strip())
if (
not utils.password_exists_in_keyring(username) and
command_line.interactive and
confirm("Save password in keyring?")
not utils.password_exists_in_keyring(username)
and command_line.interactive
and confirm("Save password in keyring?")
):
utils.store_password_in_keyring(username, password)
if api.requires_2sa:
print("\nTwo-step authentication required.",
"\nYour trusted devices are:")
# fmt: off
print(
"\nTwo-step authentication required.",
"\nYour trusted devices are:"
)
# fmt: on
devices = api.trusted_devices
for i, device in enumerate(devices):
print(" %s: %s" % (
i, device.get(
'deviceName',
"SMS to %s" % device.get('phoneNumber'))))
print(
" %s: %s"
% (
i,
device.get(
"deviceName", "SMS to %s" % device.get("phoneNumber")
),
)
)
print('\nWhich device would you like to use?')
device = int(input('(number) --> '))
print("\nWhich device would you like to use?")
device = int(input("(number) --> "))
device = devices[device]
if not api.send_verification_code(device):
print("Failed to send verification code")
sys.exit(1)
print('\nPlease enter validation code')
code = input('(string) --> ')
print("\nPlease enter validation code")
code = input("(string) --> ")
if not api.validate_verification_code(device, code):
print("Failed to verify verification code")
sys.exit(1)
print('')
print("")
break
except PyiCloudFailedLoginException:
# If they have a stored password; we just used it and
@ -256,12 +260,8 @@ def main(args=None):
print(message, file=sys.stderr)
for dev in api.devices:
if (
not command_line.device_id or
(
command_line.device_id.strip().lower() ==
dev.content["id"].strip().lower()
)
if not command_line.device_id or (
command_line.device_id.strip().lower() == dev.content["id"].strip().lower()
):
# List device(s)
if command_line.locate:
@ -270,19 +270,17 @@ def main(args=None):
if command_line.output_to_file:
create_pickled_data(
dev,
filename=(
dev.content["name"].strip().lower() + ".fmip_snapshot"
)
filename=(dev.content["name"].strip().lower() + ".fmip_snapshot"),
)
contents = dev.content
if command_line.longlist:
print("-"*30)
print("-" * 30)
print(contents["name"])
for key in contents:
print("%20s - %s" % (key, contents[key]))
elif command_line.list:
print("-"*30)
print("-" * 30)
print("Name - %s" % contents["name"])
print("Display Name - %s" % contents["deviceDisplayName"])
print("Location - %s" % contents["location"])
@ -297,9 +295,10 @@ def main(args=None):
dev.play_sound()
else:
raise RuntimeError(
"\n\n\t\t%s %s\n\n" % (
"\n\n\t\t%s %s\n\n"
% (
"Sounds can only be played on a singular device.",
DEVICE_ERROR
DEVICE_ERROR,
)
)
@ -307,16 +306,14 @@ def main(args=None):
if command_line.message:
if command_line.device_id:
dev.display_message(
subject='A Message',
message=command_line.message,
sounds=True
subject="A Message", message=command_line.message, sounds=True
)
else:
raise RuntimeError(
"%s %s" % (
"Messages can only be played "
"on a singular device.",
DEVICE_ERROR
"%s %s"
% (
"Messages can only be played on a singular device.",
DEVICE_ERROR,
)
)
@ -324,16 +321,17 @@ def main(args=None):
if command_line.silentmessage:
if command_line.device_id:
dev.display_message(
subject='A Silent Message',
subject="A Silent Message",
message=command_line.silentmessage,
sounds=False
sounds=False,
)
else:
raise RuntimeError(
"%s %s" % (
"%s %s"
% (
"Silent Messages can only be played "
"on a singular device.",
DEVICE_ERROR
DEVICE_ERROR,
)
)
@ -343,17 +341,18 @@ def main(args=None):
dev.lost_device(
number=command_line.lost_phone.strip(),
text=command_line.lost_message.strip(),
newpasscode=command_line.lost_password.strip()
newpasscode=command_line.lost_password.strip(),
)
else:
raise RuntimeError(
"%s %s" % (
"Lost Mode can only be activated "
"on a singular device.",
DEVICE_ERROR
"%s %s"
% (
"Lost Mode can only be activated on a singular device.",
DEVICE_ERROR,
)
)
sys.exit(0)
if __name__ == '__main__':
if __name__ == "__main__":
main()

View file

@ -1,4 +1,6 @@
"""Library exceptions."""
class PyiCloudException(Exception):
"""Generic iCloud exception."""
pass

View file

@ -8,19 +8,20 @@ from pyicloud.utils import underscore_to_camelcase
class AccountService(object):
"""The 'Account' iCloud service."""
def __init__(self, service_root, session, params):
self.session = session
self.params = params
self._service_root = service_root
self._devices = []
self._acc_endpoint = '%s/setup/web/device' % self._service_root
self._account_devices_url = '%s/getDevices' % self._acc_endpoint
self._acc_endpoint = "%s/setup/web/device" % self._service_root
self._account_devices_url = "%s/getDevices" % self._acc_endpoint
req = self.session.get(self._account_devices_url, params=self.params)
self.response = req.json()
for device_info in self.response['devices']:
for device_info in self.response["devices"]:
# device_id = device_info['udid']
# self._devices[device_id] = AccountDevice(device_info)
self._devices.append(AccountDevice(device_info))
@ -34,6 +35,7 @@ class AccountService(object):
@six.python_2_unicode_compatible
class AccountDevice(dict):
"""Account device."""
def __getattr__(self, name):
try:
return self[underscore_to_camelcase(name)]
@ -42,15 +44,14 @@ class AccountDevice(dict):
def __str__(self):
return u"{display_name}: {name}".format(
display_name=self.model_display_name,
name=self.name,
display_name=self.model_display_name, name=self.name,
)
def __repr__(self):
return '<{display}>'.format(
return "<{display}>".format(
display=(
six.text_type(self)
if sys.version_info[0] >= 3 else
six.text_type(self).encode('utf8', 'replace')
if sys.version_info[0] >= 3
else six.text_type(self).encode("utf8", "replace")
)
)

View file

@ -10,16 +10,15 @@ class CalendarService(object):
"""
The 'Calendar' iCloud service, connects to iCloud and returns events.
"""
def __init__(self, service_root, session, params):
self.session = session
self.params = params
self._service_root = service_root
self._calendar_endpoint = '%s/ca' % self._service_root
self._calendar_refresh_url = '%s/events' % self._calendar_endpoint
self._calendar_event_detail_url = '%s/eventdetail' % (
self._calendar_endpoint,
)
self._calendars = '%s/startup' % self._calendar_endpoint
self._calendar_endpoint = "%s/ca" % self._service_root
self._calendar_refresh_url = "%s/events" % self._calendar_endpoint
self._calendar_event_detail_url = "%s/eventdetail" % (self._calendar_endpoint,)
self._calendars = "%s/startup" % self._calendar_endpoint
self.response = {}
@ -29,11 +28,11 @@ class CalendarService(object):
(a calendar) and a guid (an event's ID).
"""
params = dict(self.params)
params.update({'lang': 'en-us', 'usertz': get_localzone().zone})
url = '%s/%s/%s' % (self._calendar_event_detail_url, pguid, guid)
params.update({"lang": "en-us", "usertz": get_localzone().zone})
url = "%s/%s/%s" % (self._calendar_event_detail_url, pguid, guid)
req = self.session.get(url, params=params)
self.response = req.json()
return self.response['Event'][0]
return self.response["Event"][0]
def refresh_client(self, from_dt=None, to_dt=None):
"""
@ -48,12 +47,14 @@ class CalendarService(object):
if not to_dt:
to_dt = datetime(today.year, today.month, last_day)
params = dict(self.params)
params.update({
'lang': 'en-us',
'usertz': get_localzone().zone,
'startDate': from_dt.strftime('%Y-%m-%d'),
'endDate': to_dt.strftime('%Y-%m-%d')
})
params.update(
{
"lang": "en-us",
"usertz": get_localzone().zone,
"startDate": from_dt.strftime("%Y-%m-%d"),
"endDate": to_dt.strftime("%Y-%m-%d"),
}
)
req = self.session.get(self._calendar_refresh_url, params=params)
self.response = req.json()
@ -62,7 +63,7 @@ class CalendarService(object):
Retrieves events for a given date range, by default, this month.
"""
self.refresh_client(from_dt, to_dt)
return self.response.get('Event')
return self.response.get("Event")
def calendars(self):
"""
@ -73,12 +74,14 @@ class CalendarService(object):
from_dt = datetime(today.year, today.month, first_day)
to_dt = datetime(today.year, today.month, last_day)
params = dict(self.params)
params.update({
'lang': 'en-us',
'usertz': get_localzone().zone,
'startDate': from_dt.strftime('%Y-%m-%d'),
'endDate': to_dt.strftime('%Y-%m-%d')
})
params.update(
{
"lang": "en-us",
"usertz": get_localzone().zone,
"startDate": from_dt.strftime("%Y-%m-%d"),
"endDate": to_dt.strftime("%Y-%m-%d"),
}
)
req = self.session.get(self._calendars, params=params)
self.response = req.json()
return self.response['Collection']
return self.response["Collection"]

View file

@ -11,10 +11,10 @@ class ContactsService(object):
self.session = session
self.params = params
self._service_root = service_root
self._contacts_endpoint = '%s/co' % self._service_root
self._contacts_refresh_url = '%s/startup' % self._contacts_endpoint
self._contacts_next_url = '%s/contacts' % self._contacts_endpoint
self._contacts_changeset_url = '%s/changeset' % self._contacts_endpoint
self._contacts_endpoint = "%s/co" % self._service_root
self._contacts_refresh_url = "%s/startup" % self._contacts_endpoint
self._contacts_next_url = "%s/contacts" % self._contacts_endpoint
self._contacts_changeset_url = "%s/changeset" % self._contacts_endpoint
self.response = {}
@ -24,28 +24,22 @@ class ContactsService(object):
contacts data is up-to-date.
"""
params_contacts = dict(self.params)
params_contacts.update({
'clientVersion': '2.1',
'locale': 'en_US',
'order': 'last,first',
})
req = self.session.get(
self._contacts_refresh_url,
params=params_contacts
params_contacts.update(
{"clientVersion": "2.1", "locale": "en_US", "order": "last,first",}
)
req = self.session.get(self._contacts_refresh_url, params=params_contacts)
self.response = req.json()
params_next = dict(params_contacts)
params_next.update({
'prefToken': self.response["prefToken"],
'syncToken': self.response["syncToken"],
'limit': '0',
'offset': '0',
})
req = self.session.get(
self._contacts_next_url,
params=params_next
params_next.update(
{
"prefToken": self.response["prefToken"],
"syncToken": self.response["syncToken"],
"limit": "0",
"offset": "0",
}
)
req = self.session.get(self._contacts_next_url, params=params_next)
self.response = req.json()
def all(self):
@ -53,4 +47,4 @@ class ContactsService(object):
Retrieves all contacts.
"""
self.refresh_client()
return self.response.get('contacts')
return self.response.get("contacts")

View file

@ -19,11 +19,11 @@ class FindMyiPhoneServiceManager(object):
self.params = params
self.with_family = with_family
fmip_endpoint = '%s/fmipservice/client/web' % service_root
self._fmip_refresh_url = '%s/refreshClient' % fmip_endpoint
self._fmip_sound_url = '%s/playSound' % fmip_endpoint
self._fmip_message_url = '%s/sendMessage' % fmip_endpoint
self._fmip_lost_url = '%s/lostDevice' % fmip_endpoint
fmip_endpoint = "%s/fmipservice/client/web" % service_root
self._fmip_refresh_url = "%s/refreshClient" % fmip_endpoint
self._fmip_sound_url = "%s/playSound" % fmip_endpoint
self._fmip_message_url = "%s/sendMessage" % fmip_endpoint
self._fmip_lost_url = "%s/lostDevice" % fmip_endpoint
self._devices = {}
self.refresh_client()
@ -39,18 +39,18 @@ class FindMyiPhoneServiceManager(object):
params=self.params,
data=json.dumps(
{
'clientContext': {
'fmly': self.with_family,
'shouldLocate': True,
'selectedDevice': 'all',
"clientContext": {
"fmly": self.with_family,
"shouldLocate": True,
"selectedDevice": "all",
}
}
)
),
)
self.response = req.json()
for device_info in self.response['content']:
device_id = device_info['id']
for device_info in self.response["content"]:
device_id = device_info["id"]
if device_id not in self._devices:
self._devices[device_id] = AppleDevice(
device_info,
@ -85,7 +85,7 @@ class FindMyiPhoneServiceManager(object):
as_unicode = self.__unicode__()
if sys.version_info[0] >= 3:
return as_unicode
return as_unicode.encode('utf-8', 'ignore')
return as_unicode.encode("utf-8", "ignore")
def __repr__(self):
return six.text_type(self)
@ -93,9 +93,16 @@ class FindMyiPhoneServiceManager(object):
class AppleDevice(object):
"""Apple device."""
def __init__(
self, content, session, params, manager,
sound_url=None, lost_url=None, message_url=None
self,
content,
session,
params,
manager,
sound_url=None,
lost_url=None,
message_url=None,
):
self.content = content
self.manager = manager
@ -113,7 +120,7 @@ class AppleDevice(object):
def location(self):
"""Updates the device location."""
self.manager.refresh_client()
return self.content['location']
return self.content["location"]
def status(self, additional=[]): # pylint: disable=dangerous-default-value
"""Returns status information for device.
@ -121,34 +128,29 @@ class AppleDevice(object):
This returns only a subset of possible properties.
"""
self.manager.refresh_client()
fields = ['batteryLevel', 'deviceDisplayName', 'deviceStatus', 'name']
fields = ["batteryLevel", "deviceDisplayName", "deviceStatus", "name"]
fields += additional
properties = {}
for field in fields:
properties[field] = self.content.get(field)
return properties
def play_sound(self, subject='Find My iPhone Alert'):
def play_sound(self, subject="Find My iPhone Alert"):
"""Send a request to the device to play a sound.
It's possible to pass a custom message by changing the `subject`.
"""
data = json.dumps({
'device': self.content['id'],
'subject': subject,
'clientContext': {
'fmly': True
data = json.dumps(
{
"device": self.content["id"],
"subject": subject,
"clientContext": {"fmly": True},
}
})
self.session.post(
self.sound_url,
params=self.params,
data=data
)
self.session.post(self.sound_url, params=self.params, data=data)
def display_message(
self, subject='Find My iPhone Alert', message="This is a note",
sounds=False
self, subject="Find My iPhone Alert", message="This is a note", sounds=False
):
"""Send a request to the device to play a sound.
@ -156,23 +158,17 @@ class AppleDevice(object):
"""
data = json.dumps(
{
'device': self.content['id'],
'subject': subject,
'sound': sounds,
'userText': True,
'text': message
"device": self.content["id"],
"subject": subject,
"sound": sounds,
"userText": True,
"text": message,
}
)
self.session.post(
self.message_url,
params=self.params,
data=data
)
self.session.post(self.message_url, params=self.params, data=data)
def lost_device(
self, number,
text='This iPhone has been lost. Please call me.',
newpasscode=""
self, number, text="This iPhone has been lost. Please call me.", newpasscode=""
):
"""Send a request to the device to trigger 'lost mode'.
@ -180,20 +176,18 @@ class AppleDevice(object):
been passed, then the person holding the device can call
the number without entering the passcode.
"""
data = json.dumps({
'text': text,
'userText': True,
'ownerNbr': number,
'lostModeEnabled': True,
'trackingEnabled': True,
'device': self.content['id'],
'passcode': newpasscode
})
self.session.post(
self.lost_url,
params=self.params,
data=data
data = json.dumps(
{
"text": text,
"userText": True,
"ownerNbr": number,
"lostModeEnabled": True,
"trackingEnabled": True,
"device": self.content["id"],
"passcode": newpasscode,
}
)
self.session.post(self.lost_url, params=self.params, data=data)
@property
def data(self):
@ -207,18 +201,15 @@ class AppleDevice(object):
return getattr(self.content, attr)
def __unicode__(self):
display_name = self['deviceDisplayName']
name = self['name']
return '%s: %s' % (
display_name,
name,
)
display_name = self["deviceDisplayName"]
name = self["name"]
return "%s: %s" % (display_name, name,)
def __str__(self):
as_unicode = self.__unicode__()
if sys.version_info[0] >= 3:
return as_unicode
return as_unicode.encode('utf-8', 'ignore')
return as_unicode.encode("utf-8", "ignore")
def __repr__(self):
return '<AppleDevice(%s)>' % str(self)
return "<AppleDevice(%s)>" % str(self)

View file

@ -12,121 +12,115 @@ from future.moves.urllib.parse import urlencode
class PhotosService(object):
"""The 'Photos' iCloud service."""
SMART_FOLDERS = {
"All Photos": {
"obj_type": "CPLAssetByAddedDate",
"list_type": "CPLAssetAndMasterByAddedDate",
"direction": "ASCENDING",
"query_filter": None
"query_filter": None,
},
"Time-lapse": {
"obj_type": "CPLAssetInSmartAlbumByAssetDate:Timelapse",
"list_type": "CPLAssetAndMasterInSmartAlbumByAssetDate",
"direction": "ASCENDING",
"query_filter": [{
"query_filter": [
{
"fieldName": "smartAlbum",
"comparator": "EQUALS",
"fieldValue": {
"type": "STRING",
"value": "TIMELAPSE"
"fieldValue": {"type": "STRING", "value": "TIMELAPSE"},
}
}]
],
},
"Videos": {
"obj_type": "CPLAssetInSmartAlbumByAssetDate:Video",
"list_type": "CPLAssetAndMasterInSmartAlbumByAssetDate",
"direction": "ASCENDING",
"query_filter": [{
"query_filter": [
{
"fieldName": "smartAlbum",
"comparator": "EQUALS",
"fieldValue": {
"type": "STRING",
"value": "VIDEO"
"fieldValue": {"type": "STRING", "value": "VIDEO"},
}
}]
],
},
"Slo-mo": {
"obj_type": "CPLAssetInSmartAlbumByAssetDate:Slomo",
"list_type": "CPLAssetAndMasterInSmartAlbumByAssetDate",
"direction": "ASCENDING",
"query_filter": [{
"query_filter": [
{
"fieldName": "smartAlbum",
"comparator": "EQUALS",
"fieldValue": {
"type": "STRING",
"value": "SLOMO"
"fieldValue": {"type": "STRING", "value": "SLOMO"},
}
}]
],
},
"Bursts": {
"obj_type": "CPLAssetBurstStackAssetByAssetDate",
"list_type": "CPLBurstStackAssetAndMasterByAssetDate",
"direction": "ASCENDING",
"query_filter": None
"query_filter": None,
},
"Favorites": {
"obj_type": "CPLAssetInSmartAlbumByAssetDate:Favorite",
"list_type": "CPLAssetAndMasterInSmartAlbumByAssetDate",
"direction": "ASCENDING",
"query_filter": [{
"query_filter": [
{
"fieldName": "smartAlbum",
"comparator": "EQUALS",
"fieldValue": {
"type": "STRING",
"value": "FAVORITE"
"fieldValue": {"type": "STRING", "value": "FAVORITE"},
}
}]
],
},
"Panoramas": {
"obj_type": "CPLAssetInSmartAlbumByAssetDate:Panorama",
"list_type": "CPLAssetAndMasterInSmartAlbumByAssetDate",
"direction": "ASCENDING",
"query_filter": [{
"query_filter": [
{
"fieldName": "smartAlbum",
"comparator": "EQUALS",
"fieldValue": {
"type": "STRING",
"value": "PANORAMA"
"fieldValue": {"type": "STRING", "value": "PANORAMA"},
}
}]
],
},
"Screenshots": {
"obj_type": "CPLAssetInSmartAlbumByAssetDate:Screenshot",
"list_type": "CPLAssetAndMasterInSmartAlbumByAssetDate",
"direction": "ASCENDING",
"query_filter": [{
"query_filter": [
{
"fieldName": "smartAlbum",
"comparator": "EQUALS",
"fieldValue": {
"type": "STRING",
"value": "SCREENSHOT"
"fieldValue": {"type": "STRING", "value": "SCREENSHOT"},
}
}]
],
},
"Live": {
"obj_type": "CPLAssetInSmartAlbumByAssetDate:Live",
"list_type": "CPLAssetAndMasterInSmartAlbumByAssetDate",
"direction": "ASCENDING",
"query_filter": [{
"query_filter": [
{
"fieldName": "smartAlbum",
"comparator": "EQUALS",
"fieldValue": {
"type": "STRING",
"value": "LIVE"
"fieldValue": {"type": "STRING", "value": "LIVE"},
}
}]
],
},
"Recently Deleted": {
"obj_type": "CPLAssetDeletedByExpungedDate",
"list_type": "CPLAssetAndMasterDeletedByExpungedDate",
"direction": "ASCENDING",
"query_filter": None
"query_filter": None,
},
"Hidden": {
"obj_type": "CPLAssetHiddenByAssetDate",
"list_type": "CPLAssetAndMasterHiddenByAssetDate",
"direction": "ASCENDING",
"query_filter": None
"query_filter": None,
},
}
@ -134,32 +128,29 @@ class PhotosService(object):
self.session = session
self.params = dict(params)
self._service_root = service_root
self.service_endpoint = \
('%s/database/1/com.apple.photos.cloud/production/private'
% self._service_root)
self.service_endpoint = (
"%s/database/1/com.apple.photos.cloud/production/private"
% self._service_root
)
self._albums = None
self.params.update({
'remapEnums': True,
'getCurrentSyncToken': True
})
self.params.update({"remapEnums": True, "getCurrentSyncToken": True})
url = ('%s/records/query?%s' %
(self.service_endpoint, urlencode(self.params)))
json_data = ('{"query":{"recordType":"CheckIndexingState"},'
'"zoneID":{"zoneName":"PrimarySync"}}')
url = "%s/records/query?%s" % (self.service_endpoint, urlencode(self.params))
json_data = (
'{"query":{"recordType":"CheckIndexingState"},'
'"zoneID":{"zoneName":"PrimarySync"}}'
)
request = self.session.post(
url,
data=json_data,
headers={'Content-type': 'text/plain'}
url, data=json_data, headers={"Content-type": "text/plain"}
)
response = request.json()
indexing_state = response['records'][0]['fields']['state']['value']
if indexing_state != 'FINISHED':
indexing_state = response["records"][0]["fields"]["state"]["value"]
if indexing_state != "FINISHED":
raise PyiCloudServiceNotActivatedException(
'iCloud Photo Library not finished indexing. '
'Please try again in a few minutes.'
"iCloud Photo Library not finished indexing. "
"Please try again in a few minutes."
)
# TODO: Does syncToken ever change? # pylint: disable=fixme
@ -174,63 +165,79 @@ class PhotosService(object):
def albums(self):
"""Returns photo albums."""
if not self._albums:
self._albums = {name: PhotoAlbum(self, name, **props)
for (name, props) in self.SMART_FOLDERS.items()}
self._albums = {
name: PhotoAlbum(self, name, **props)
for (name, props) in self.SMART_FOLDERS.items()
}
for folder in self._fetch_folders():
# TODO: Handle subfolders # pylint: disable=fixme
if folder['recordName'] == '----Root-Folder----' or \
(folder['fields'].get('isDeleted') and
folder['fields']['isDeleted']['value']):
if folder["recordName"] == "----Root-Folder----" or (
folder["fields"].get("isDeleted")
and folder["fields"]["isDeleted"]["value"]
):
continue
folder_id = folder['recordName']
folder_obj_type = \
folder_id = folder["recordName"]
folder_obj_type = (
"CPLContainerRelationNotDeletedByAssetDate:%s" % folder_id
)
folder_name = base64.b64decode(
folder['fields']['albumNameEnc']['value']).decode('utf-8')
query_filter = [{
folder["fields"]["albumNameEnc"]["value"]
).decode("utf-8")
query_filter = [
{
"fieldName": "parentId",
"comparator": "EQUALS",
"fieldValue": {
"type": "STRING",
"value": folder_id
"fieldValue": {"type": "STRING", "value": folder_id},
}
}]
]
album = PhotoAlbum(self, folder_name,
'CPLContainerRelationLiveByAssetDate',
folder_obj_type, 'ASCENDING', query_filter)
album = PhotoAlbum(
self,
folder_name,
"CPLContainerRelationLiveByAssetDate",
folder_obj_type,
"ASCENDING",
query_filter,
)
self._albums[folder_name] = album
return self._albums
def _fetch_folders(self):
url = ('%s/records/query?%s' %
(self.service_endpoint, urlencode(self.params)))
json_data = ('{"query":{"recordType":"CPLAlbumByPositionLive"},'
'"zoneID":{"zoneName":"PrimarySync"}}')
url = "%s/records/query?%s" % (self.service_endpoint, urlencode(self.params))
json_data = (
'{"query":{"recordType":"CPLAlbumByPositionLive"},'
'"zoneID":{"zoneName":"PrimarySync"}}'
)
request = self.session.post(
url,
data=json_data,
headers={'Content-type': 'text/plain'}
url, data=json_data, headers={"Content-type": "text/plain"}
)
response = request.json()
return response['records']
return response["records"]
@property
def all(self):
"""Returns all photos."""
return self.albums['All Photos']
return self.albums["All Photos"]
class PhotoAlbum(object):
"""A photo album."""
def __init__(self, service, name, list_type, obj_type, direction,
query_filter=None, page_size=100):
def __init__(
self,
service,
name,
list_type,
obj_type,
direction,
query_filter=None,
page_size=100,
):
self.name = name
self.service = service
self.list_type = list_type
@ -251,41 +258,41 @@ class PhotoAlbum(object):
def __len__(self):
if self._len is None:
url = ('%s/internal/records/query/batch?%s' %
(self.service.service_endpoint,
urlencode(self.service.params)))
url = "%s/internal/records/query/batch?%s" % (
self.service.service_endpoint,
urlencode(self.service.params),
)
request = self.service.session.post(
url,
data=json.dumps(
{
u'batch': [{
u'resultsLimit': 1,
u'query': {
u'filterBy': {
u'fieldName': u'indexCountID',
u'fieldValue': {
u'type': u'STRING_LIST',
u'value': [
self.obj_type
]
u"batch": [
{
u"resultsLimit": 1,
u"query": {
u"filterBy": {
u"fieldName": u"indexCountID",
u"fieldValue": {
u"type": u"STRING_LIST",
u"value": [self.obj_type],
},
u'comparator': u'IN'
u"comparator": u"IN",
},
u'recordType': u'HyperionIndexCountLookup'
u"recordType": u"HyperionIndexCountLookup",
},
u'zoneWide': True,
u'zoneID': {
u'zoneName': u'PrimarySync'
u"zoneWide": True,
u"zoneID": {u"zoneName": u"PrimarySync"},
}
}]
]
}
),
headers={'Content-type': 'text/plain'}
headers={"Content-type": "text/plain"},
)
response = request.json()
self._len = (response["batch"][0]["records"][0]["fields"]
["itemCount"]["value"])
self._len = response["batch"][0]["records"][0]["fields"]["itemCount"][
"value"
]
return self._len
@ -297,26 +304,28 @@ class PhotoAlbum(object):
else:
offset = 0
while(True):
url = ('%s/records/query?' % self.service.service_endpoint) + \
urlencode(self.service.params)
while True:
url = ("%s/records/query?" % self.service.service_endpoint) + urlencode(
self.service.params
)
request = self.service.session.post(
url,
data=json.dumps(self._list_query_gen(
offset, self.list_type, self.direction,
self.query_filter)),
headers={'Content-type': 'text/plain'}
data=json.dumps(
self._list_query_gen(
offset, self.list_type, self.direction, self.query_filter
)
),
headers={"Content-type": "text/plain"},
)
response = request.json()
asset_records = {}
master_records = []
for rec in response['records']:
if rec['recordType'] == "CPLAsset":
master_id = \
rec['fields']['masterRef']['value']['recordName']
for rec in response["records"]:
if rec["recordType"] == "CPLAsset":
master_id = rec["fields"]["masterRef"]["value"]["recordName"]
asset_records[master_id] = rec
elif rec['recordType'] == "CPLMaster":
elif rec["recordType"] == "CPLMaster":
master_records.append(rec)
master_records_len = len(master_records)
@ -327,72 +336,135 @@ class PhotoAlbum(object):
offset = offset + master_records_len
for master_record in master_records:
record_name = master_record['recordName']
yield PhotoAsset(self.service, master_record,
asset_records[record_name])
record_name = master_record["recordName"]
yield PhotoAsset(
self.service, master_record, asset_records[record_name]
)
else:
break
def _list_query_gen(self, offset, list_type, direction, query_filter=None):
query = {
u'query': {
u'filterBy': [
{u'fieldName': u'startRank', u'fieldValue':
{u'type': u'INT64', u'value': offset},
u'comparator': u'EQUALS'},
{u'fieldName': u'direction', u'fieldValue':
{u'type': u'STRING', u'value': direction},
u'comparator': u'EQUALS'}
],
u'recordType': list_type
u"query": {
u"filterBy": [
{
u"fieldName": u"startRank",
u"fieldValue": {u"type": u"INT64", u"value": offset},
u"comparator": u"EQUALS",
},
{
u"fieldName": u"direction",
u"fieldValue": {u"type": u"STRING", u"value": direction},
u"comparator": u"EQUALS",
},
u'resultsLimit': self.page_size * 2,
u'desiredKeys': [
u'resJPEGFullWidth', u'resJPEGFullHeight',
u'resJPEGFullFileType', u'resJPEGFullFingerprint',
u'resJPEGFullRes', u'resJPEGLargeWidth',
u'resJPEGLargeHeight', u'resJPEGLargeFileType',
u'resJPEGLargeFingerprint', u'resJPEGLargeRes',
u'resJPEGMedWidth', u'resJPEGMedHeight',
u'resJPEGMedFileType', u'resJPEGMedFingerprint',
u'resJPEGMedRes', u'resJPEGThumbWidth',
u'resJPEGThumbHeight', u'resJPEGThumbFileType',
u'resJPEGThumbFingerprint', u'resJPEGThumbRes',
u'resVidFullWidth', u'resVidFullHeight',
u'resVidFullFileType', u'resVidFullFingerprint',
u'resVidFullRes', u'resVidMedWidth', u'resVidMedHeight',
u'resVidMedFileType', u'resVidMedFingerprint',
u'resVidMedRes', u'resVidSmallWidth', u'resVidSmallHeight',
u'resVidSmallFileType', u'resVidSmallFingerprint',
u'resVidSmallRes', u'resSidecarWidth', u'resSidecarHeight',
u'resSidecarFileType', u'resSidecarFingerprint',
u'resSidecarRes', u'itemType', u'dataClassType',
u'filenameEnc', u'originalOrientation', u'resOriginalWidth',
u'resOriginalHeight', u'resOriginalFileType',
u'resOriginalFingerprint', u'resOriginalRes',
u'resOriginalAltWidth', u'resOriginalAltHeight',
u'resOriginalAltFileType', u'resOriginalAltFingerprint',
u'resOriginalAltRes', u'resOriginalVidComplWidth',
u'resOriginalVidComplHeight', u'resOriginalVidComplFileType',
u'resOriginalVidComplFingerprint', u'resOriginalVidComplRes',
u'isDeleted', u'isExpunged', u'dateExpunged', u'remappedRef',
u'recordName', u'recordType', u'recordChangeTag',
u'masterRef', u'adjustmentRenderType', u'assetDate',
u'addedDate', u'isFavorite', u'isHidden', u'orientation',
u'duration', u'assetSubtype', u'assetSubtypeV2',
u'assetHDRType', u'burstFlags', u'burstFlagsExt', u'burstId',
u'captionEnc', u'locationEnc', u'locationV2Enc',
u'locationLatitude', u'locationLongitude', u'adjustmentType',
u'timeZoneOffset', u'vidComplDurValue', u'vidComplDurScale',
u'vidComplDispValue', u'vidComplDispScale',
u'vidComplVisibilityState', u'customRenderedValue',
u'containerId', u'itemId', u'position', u'isKeyAsset'
],
u'zoneID': {u'zoneName': u'PrimarySync'}
u"recordType": list_type,
},
u"resultsLimit": self.page_size * 2,
u"desiredKeys": [
u"resJPEGFullWidth",
u"resJPEGFullHeight",
u"resJPEGFullFileType",
u"resJPEGFullFingerprint",
u"resJPEGFullRes",
u"resJPEGLargeWidth",
u"resJPEGLargeHeight",
u"resJPEGLargeFileType",
u"resJPEGLargeFingerprint",
u"resJPEGLargeRes",
u"resJPEGMedWidth",
u"resJPEGMedHeight",
u"resJPEGMedFileType",
u"resJPEGMedFingerprint",
u"resJPEGMedRes",
u"resJPEGThumbWidth",
u"resJPEGThumbHeight",
u"resJPEGThumbFileType",
u"resJPEGThumbFingerprint",
u"resJPEGThumbRes",
u"resVidFullWidth",
u"resVidFullHeight",
u"resVidFullFileType",
u"resVidFullFingerprint",
u"resVidFullRes",
u"resVidMedWidth",
u"resVidMedHeight",
u"resVidMedFileType",
u"resVidMedFingerprint",
u"resVidMedRes",
u"resVidSmallWidth",
u"resVidSmallHeight",
u"resVidSmallFileType",
u"resVidSmallFingerprint",
u"resVidSmallRes",
u"resSidecarWidth",
u"resSidecarHeight",
u"resSidecarFileType",
u"resSidecarFingerprint",
u"resSidecarRes",
u"itemType",
u"dataClassType",
u"filenameEnc",
u"originalOrientation",
u"resOriginalWidth",
u"resOriginalHeight",
u"resOriginalFileType",
u"resOriginalFingerprint",
u"resOriginalRes",
u"resOriginalAltWidth",
u"resOriginalAltHeight",
u"resOriginalAltFileType",
u"resOriginalAltFingerprint",
u"resOriginalAltRes",
u"resOriginalVidComplWidth",
u"resOriginalVidComplHeight",
u"resOriginalVidComplFileType",
u"resOriginalVidComplFingerprint",
u"resOriginalVidComplRes",
u"isDeleted",
u"isExpunged",
u"dateExpunged",
u"remappedRef",
u"recordName",
u"recordType",
u"recordChangeTag",
u"masterRef",
u"adjustmentRenderType",
u"assetDate",
u"addedDate",
u"isFavorite",
u"isHidden",
u"orientation",
u"duration",
u"assetSubtype",
u"assetSubtypeV2",
u"assetHDRType",
u"burstFlags",
u"burstFlagsExt",
u"burstId",
u"captionEnc",
u"locationEnc",
u"locationV2Enc",
u"locationLatitude",
u"locationLongitude",
u"adjustmentType",
u"timeZoneOffset",
u"vidComplDurValue",
u"vidComplDurScale",
u"vidComplDispValue",
u"vidComplDispScale",
u"vidComplVisibilityState",
u"customRenderedValue",
u"containerId",
u"itemId",
u"position",
u"isKeyAsset",
],
u"zoneID": {u"zoneName": u"PrimarySync"},
}
if query_filter:
query['query']['filterBy'].extend(query_filter)
query["query"]["filterBy"].extend(query_filter)
return query
@ -403,17 +475,15 @@ class PhotoAlbum(object):
as_unicode = self.__unicode__()
if sys.version_info[0] >= 3:
return as_unicode
return as_unicode.encode('utf-8', 'ignore')
return as_unicode.encode("utf-8", "ignore")
def __repr__(self):
return "<%s: '%s'>" % (
type(self).__name__,
self
)
return "<%s: '%s'>" % (type(self).__name__, self)
class PhotoAsset(object):
"""A photo."""
def __init__(self, service, master_record, asset_record):
self._service = service
self._master_record = master_record
@ -424,31 +494,31 @@ class PhotoAsset(object):
PHOTO_VERSION_LOOKUP = {
u"original": u"resOriginal",
u"medium": u"resJPEGMed",
u"thumb": u"resJPEGThumb"
u"thumb": u"resJPEGThumb",
}
VIDEO_VERSION_LOOKUP = {
u"original": u"resOriginal",
u"medium": u"resVidMed",
u"thumb": u"resVidSmall"
u"thumb": u"resVidSmall",
}
@property
def id(self):
"""Gets the photo id."""
return self._master_record['recordName']
return self._master_record["recordName"]
@property
def filename(self):
"""Gets the photo file name."""
return base64.b64decode(
self._master_record['fields']['filenameEnc']['value']
).decode('utf-8')
self._master_record["fields"]["filenameEnc"]["value"]
).decode("utf-8")
@property
def size(self):
"""Gets the photo size."""
return self._master_record['fields']['resOriginalRes']['value']['size']
return self._master_record["fields"]["resOriginalRes"]["value"]["size"]
@property
def created(self):
@ -460,8 +530,8 @@ class PhotoAsset(object):
"""Gets the photo asset date."""
try:
return datetime.fromtimestamp(
self._asset_record['fields']['assetDate']['value'] / 1000.0,
tz=UTC)
self._asset_record["fields"]["assetDate"]["value"] / 1000.0, tz=UTC
)
except KeyError:
return datetime.fromtimestamp(0)
@ -469,106 +539,104 @@ class PhotoAsset(object):
def added_date(self):
"""Gets the photo added date."""
return datetime.fromtimestamp(
self._asset_record['fields']['addedDate']['value'] / 1000.0,
tz=UTC)
self._asset_record["fields"]["addedDate"]["value"] / 1000.0, tz=UTC
)
@property
def dimensions(self):
"""Gets the photo dimensions."""
return (self._master_record['fields']['resOriginalWidth']['value'],
self._master_record['fields']['resOriginalHeight']['value'])
return (
self._master_record["fields"]["resOriginalWidth"]["value"],
self._master_record["fields"]["resOriginalHeight"]["value"],
)
@property
def versions(self):
"""Gets the photo versions."""
if not self._versions:
self._versions = {}
if 'resVidSmallRes' in self._master_record['fields']:
if "resVidSmallRes" in self._master_record["fields"]:
typed_version_lookup = self.VIDEO_VERSION_LOOKUP
else:
typed_version_lookup = self.PHOTO_VERSION_LOOKUP
for key, prefix in typed_version_lookup.items():
if '%sRes' % prefix in self._master_record['fields']:
fields = self._master_record['fields']
version = {'filename': self.filename}
if "%sRes" % prefix in self._master_record["fields"]:
fields = self._master_record["fields"]
version = {"filename": self.filename}
width_entry = fields.get('%sWidth' % prefix)
width_entry = fields.get("%sWidth" % prefix)
if width_entry:
version['width'] = width_entry['value']
version["width"] = width_entry["value"]
else:
version['width'] = None
version["width"] = None
height_entry = fields.get('%sHeight' % prefix)
height_entry = fields.get("%sHeight" % prefix)
if height_entry:
version['height'] = height_entry['value']
version["height"] = height_entry["value"]
else:
version['height'] = None
version["height"] = None
size_entry = fields.get('%sRes' % prefix)
size_entry = fields.get("%sRes" % prefix)
if size_entry:
version['size'] = size_entry['value']['size']
version['url'] = size_entry['value']['downloadURL']
version["size"] = size_entry["value"]["size"]
version["url"] = size_entry["value"]["downloadURL"]
else:
version['size'] = None
version['url'] = None
version["size"] = None
version["url"] = None
type_entry = fields.get('%sFileType' % prefix)
type_entry = fields.get("%sFileType" % prefix)
if type_entry:
version['type'] = type_entry['value']
version["type"] = type_entry["value"]
else:
version['type'] = None
version["type"] = None
self._versions[key] = version
return self._versions
def download(self, version='original', **kwargs):
def download(self, version="original", **kwargs):
"""Returns the photo file."""
if version not in self.versions:
return None
return self._service.session.get(
self.versions[version]['url'],
stream=True,
**kwargs
self.versions[version]["url"], stream=True, **kwargs
)
def delete(self):
"""Deletes the photo."""
json_data = ('{"query":{"recordType":"CheckIndexingState"},'
'"zoneID":{"zoneName":"PrimarySync"}}')
json_data = (
'{"query":{"recordType":"CheckIndexingState"},'
'"zoneID":{"zoneName":"PrimarySync"}}'
)
json_data = ('{"operations":[{'
json_data = (
'{"operations":[{'
'"operationType":"update",'
'"record":{'
'"recordName":"%s",'
'"recordType":"%s",'
'"recordChangeTag":"%s",'
'"fields":{"isDeleted":{"value":1}'
'}}}],'
"}}}],"
'"zoneID":{'
'"zoneName":"PrimarySync"'
'},"atomic":true}'
% (
self._asset_record['recordName'],
self._asset_record['recordType'],
self._master_record['recordChangeTag']
self._asset_record["recordName"],
self._asset_record["recordType"],
self._master_record["recordChangeTag"],
)
)
endpoint = self._service.service_endpoint
params = urlencode(self._service.params)
url = ('%s/records/modify?%s' % (endpoint, params))
url = "%s/records/modify?%s" % (endpoint, params)
return self._service.session.post(
url,
data=json_data,
headers={'Content-type': 'text/plain'}
url, data=json_data, headers={"Content-type": "text/plain"}
)
def __repr__(self):
return "<%s: id=%s>" % (
type(self).__name__,
self.id
)
return "<%s: id=%s>" % (type(self).__name__, self.id)

View file

@ -10,6 +10,7 @@ from tzlocal import get_localzone
class RemindersService(object):
"""The 'Reminders' iCloud service."""
def __init__(self, service_root, session, params):
self.session = session
self._params = params
@ -23,64 +24,61 @@ class RemindersService(object):
def refresh(self):
"""Refresh data."""
params_reminders = dict(self._params)
params_reminders.update({
'clientVersion': '4.0',
'lang': 'en-us',
'usertz': get_localzone().zone
})
params_reminders.update(
{"clientVersion": "4.0", "lang": "en-us", "usertz": get_localzone().zone}
)
# Open reminders
req = self.session.get(
self._service_root + '/rd/startup',
params=params_reminders
self._service_root + "/rd/startup", params=params_reminders
)
data = req.json()
self.lists = {}
self.collections = {}
for collection in data['Collections']:
for collection in data["Collections"]:
temp = []
self.collections[collection['title']] = {
'guid': collection['guid'],
'ctag': collection['ctag']
self.collections[collection["title"]] = {
"guid": collection["guid"],
"ctag": collection["ctag"],
}
for reminder in data['Reminders']:
for reminder in data["Reminders"]:
if reminder['pGuid'] != collection['guid']:
if reminder["pGuid"] != collection["guid"]:
continue
if reminder.get('dueDate'):
if reminder.get("dueDate"):
due = datetime(
reminder['dueDate'][1],
reminder['dueDate'][2],
reminder['dueDate'][3],
reminder['dueDate'][4],
reminder['dueDate'][5]
reminder["dueDate"][1],
reminder["dueDate"][2],
reminder["dueDate"][3],
reminder["dueDate"][4],
reminder["dueDate"][5],
)
else:
due = None
temp.append({
"title": reminder['title'],
"desc": reminder.get('description'),
"due": due
})
self.lists[collection['title']] = temp
temp.append(
{
"title": reminder["title"],
"desc": reminder.get("description"),
"due": due,
}
)
self.lists[collection["title"]] = temp
def post(self, title, description="", collection=None, due_date=None):
"""Adds a new reminder."""
pguid = 'tasks'
pguid = "tasks"
if collection:
if collection in self.collections:
pguid = self.collections[collection]['guid']
pguid = self.collections[collection]["guid"]
params_reminders = dict(self._params)
params_reminders.update({
'clientVersion': '4.0',
'lang': 'en-us',
'usertz': get_localzone().zone
})
params_reminders.update(
{"clientVersion": "4.0", "lang": "en-us", "usertz": get_localzone().zone}
)
due_dates = None
if due_date:
@ -90,14 +88,15 @@ class RemindersService(object):
due_date.month,
due_date.day,
due_date.hour,
due_date.minute
due_date.minute,
]
req = self.session.post(
self._service_root + '/rd/reminders/tasks',
data=json.dumps({
self._service_root + "/rd/reminders/tasks",
data=json.dumps(
{
"Reminders": {
'title': title,
"title": title,
"description": description,
"pGuid": pguid,
"etag": None,
@ -114,10 +113,12 @@ class RemindersService(object):
"lastModifiedDate": None,
"createdDate": None,
"isFamily": None,
"createdDateExtended": int(time.time()*1000),
"guid": str(uuid.uuid4())
"createdDateExtended": int(time.time() * 1000),
"guid": str(uuid.uuid4()),
},
"ClientState": {"Collections": list(self.collections.values())}
}),
params=params_reminders)
"ClientState": {"Collections": list(self.collections.values())},
}
),
params=params_reminders,
)
return req.ok

View file

@ -11,7 +11,7 @@ class UbiquityService(object):
self.params = params
self._root = None
self._node_url = service_root + '/ws/%s/%s/%s'
self._node_url = service_root + "/ws/%s/%s/%s"
@property
def root(self):
@ -20,13 +20,9 @@ class UbiquityService(object):
self._root = self.get_node(0)
return self._root
def get_node_url(self, node_id, variant='item'):
def get_node_url(self, node_id, variant="item"):
"""Returns a node URL."""
return self._node_url % (
self.params['dsid'],
variant,
node_id
)
return self._node_url % (self.params["dsid"], variant, node_id)
def get_node(self, node_id):
"""Returns a node."""
@ -35,18 +31,13 @@ class UbiquityService(object):
def get_children(self, node_id):
"""Returns a node children."""
request = self.session.get(
self.get_node_url(node_id, 'parent')
)
items = request.json()['item_list']
request = self.session.get(self.get_node_url(node_id, "parent"))
items = request.json()["item_list"]
return [UbiquityNode(self, item) for item in items]
def get_file(self, node_id, **kwargs):
"""Returns a node file."""
return self.session.get(
self.get_node_url(node_id, 'file'),
**kwargs
)
return self.session.get(self.get_node_url(node_id, "file"), **kwargs)
def __getattr__(self, attr):
return getattr(self.root, attr)
@ -57,6 +48,7 @@ class UbiquityService(object):
class UbiquityNode(object):
"""Ubiquity node."""
def __init__(self, conn, data):
self.data = data
self.connection = conn
@ -66,33 +58,30 @@ class UbiquityNode(object):
@property
def item_id(self):
"""Gets the node id."""
return self.data.get('item_id')
return self.data.get("item_id")
@property
def name(self):
"""Gets the node name."""
return self.data.get('name')
return self.data.get("name")
@property
def type(self):
"""Gets the node type."""
return self.data.get('type')
return self.data.get("type")
@property
def size(self):
"""Gets the node size."""
try:
return int(self.data.get('size'))
return int(self.data.get("size"))
except ValueError:
return None
@property
def modified(self):
"""Gets the node modified date."""
return datetime.strptime(
self.data.get('modified'),
'%Y-%m-%dT%H:%M:%SZ'
)
return datetime.strptime(self.data.get("modified"), "%Y-%m-%dT%H:%M:%SZ")
def open(self, **kwargs):
"""Returns the node file."""
@ -110,15 +99,13 @@ class UbiquityNode(object):
def get(self, name):
"""Returns a child node by its name."""
return [
child for child in self.get_children() if child.name == name
][0]
return [child for child in self.get_children() if child.name == name][0]
def __getitem__(self, key):
try:
return self.get(key)
except IndexError:
raise KeyError('No child named %s exists' % key)
raise KeyError("No child named %s exists" % key)
def __unicode__(self):
return self.name
@ -127,10 +114,7 @@ class UbiquityNode(object):
as_unicode = self.__unicode__()
if sys.version_info[0] >= 3:
return as_unicode
return as_unicode.encode('utf-8', 'ignore')
return as_unicode.encode("utf-8", "ignore")
def __repr__(self):
return "<%s: '%s'>" % (
self.type.capitalize(),
self
)
return "<%s: '%s'>" % (self.type.capitalize(), self)

View file

@ -6,7 +6,7 @@ import sys
from .exceptions import PyiCloudNoStoredPasswordAvailableException
KEYRING_SYSTEM = 'pyicloud://icloud-password'
KEYRING_SYSTEM = "pyicloud://icloud-password"
def get_password(username, interactive=sys.stdout.isatty()):
@ -18,9 +18,7 @@ def get_password(username, interactive=sys.stdout.isatty()):
raise
return getpass.getpass(
'Enter iCloud password for {username}: '.format(
username=username,
)
"Enter iCloud password for {username}: ".format(username=username,)
)
@ -36,18 +34,13 @@ def password_exists_in_keyring(username):
def get_password_from_keyring(username):
"""Get the password from a username."""
result = keyring.get_password(
KEYRING_SYSTEM,
username
)
result = keyring.get_password(KEYRING_SYSTEM, username)
if result is None:
raise PyiCloudNoStoredPasswordAvailableException(
"No pyicloud password for {username} could be found "
"in the system keychain. Use the `--store-in-keyring` "
"command-line option for storing a password for this "
"username.".format(
username=username,
)
"username.".format(username=username,)
)
return result
@ -55,25 +48,18 @@ def get_password_from_keyring(username):
def store_password_in_keyring(username, password):
"""Store the password of a username."""
return keyring.set_password(
KEYRING_SYSTEM,
username,
password,
)
return keyring.set_password(KEYRING_SYSTEM, username, password,)
def delete_password_in_keyring(username):
"""Delete the password of a username."""
return keyring.delete_password(
KEYRING_SYSTEM,
username,
)
return keyring.delete_password(KEYRING_SYSTEM, username,)
def underscore_to_camelcase(word, initial_capital=False):
"""Transform a word to camelCase."""
words = [x.capitalize() or '_' for x in word.split('_')]
words = [x.capitalize() or "_" for x in word.split("_")]
if not initial_capital:
words[0] = words[0].lower()
return ''.join(words)
return "".join(words)

21
pyproject.toml Normal file
View file

@ -0,0 +1,21 @@
[tool.black]
line-length = 88
target-version = ["py27", "py33", "py34", "py35", "py36", "py37", "py38"]
exclude = '''
(
/(
\.eggs
| \.git
| \.hg
| \.mypy_cache
| \.tox
| \.venv
| _build
| buck-out
| build
| dist
)/
| exceptions.py
)
'''

16
scripts/check_format.sh Executable file
View file

@ -0,0 +1,16 @@
./scripts/common.sh
if ! hash python3; then
echo "python3 is not installed"
exit 0
fi
ver=$(python3 -V 2>&1 | sed 's/.* \([0-9]\).\([0-9]\).*/\1\2/')
if [ "$ver" -lt "36" ]; then
echo "This script requires python 3.6 or greater"
exit 0
fi
pip install black==19.10b0
black --check --fast .

View file

@ -1,38 +1,34 @@
from setuptools import setup, find_packages
with open('requirements.txt') as f:
with open("requirements.txt") as f:
required = f.read().splitlines()
setup(
name='pyicloud',
version='0.9.6.1',
url='https://github.com/picklepete/pyicloud',
name="pyicloud",
version="0.9.6.1",
url="https://github.com/picklepete/pyicloud",
description=(
'PyiCloud is a module which allows pythonistas to '
'interact with iCloud webservices.'
"PyiCloud is a module which allows pythonistas to "
"interact with iCloud webservices."
),
maintainer='The PyiCloud Authors',
maintainer_email=' ',
license='MIT',
maintainer="The PyiCloud Authors",
maintainer_email=" ",
license="MIT",
packages=find_packages(include=["pyicloud*"]),
install_requires=required,
classifiers=[
'Intended Audience :: Developers',
'License :: OSI Approved :: MIT License',
'Operating System :: OS Independent',
'Programming Language :: Python',
'Programming Language :: Python :: 2.7',
'Programming Language :: Python :: 3',
"Intended Audience :: Developers",
"License :: OSI Approved :: MIT License",
"Operating System :: OS Independent",
"Programming Language :: Python",
"Programming Language :: Python :: 2.7",
"Programming Language :: Python :: 3",
"Programming Language :: Python :: 3.3",
"Programming Language :: Python :: 3.4",
"Programming Language :: Python :: 3.7",
"Programming Language :: Python :: 3.8",
],
entry_points={
'console_scripts': [
'icloud = pyicloud.cmdline:main'
]
},
entry_points={"console_scripts": ["icloud = pyicloud.cmdline:main"]},
)

View file

@ -12,6 +12,7 @@ VALID_USERS = [AUTHENTICATED_USER, REQUIRES_2SA_USER]
class PyiCloudServiceMock(base.PyiCloudService):
"""Mocked PyiCloudService."""
def __init__(
self,
apple_id,
@ -19,34 +20,33 @@ class PyiCloudServiceMock(base.PyiCloudService):
cookie_directory=None,
verify=True,
client_id=None,
with_family=True
with_family=True,
):
base.PyiCloudService.__init__(self, apple_id, password, cookie_directory, verify, client_id, with_family)
base.PyiCloudService.__init__(
self, apple_id, password, cookie_directory, verify, client_id, with_family
)
base.FindMyiPhoneServiceManager = FindMyiPhoneServiceManagerMock
def authenticate(self):
if not self.user.get("apple_id") or self.user.get("apple_id") not in VALID_USERS:
raise PyiCloudFailedLoginException("Invalid email/password combination.", None)
if (
not self.user.get("apple_id")
or self.user.get("apple_id") not in VALID_USERS
):
raise PyiCloudFailedLoginException(
"Invalid email/password combination.", None
)
if not self.user.get("password") or self.user.get("password") != "valid_pass":
raise PyiCloudFailedLoginException("Invalid email/password combination.", None)
raise PyiCloudFailedLoginException(
"Invalid email/password combination.", None
)
self.params.update({'dsid': 'ID'})
self.params.update({"dsid": "ID"})
self._webservices = {
'account': {
'url': 'account_url',
},
'findme': {
'url': 'findme_url',
},
'calendar': {
'url': 'calendar_url',
},
'contacts': {
'url': 'contacts_url',
},
'reminders': {
'url': 'reminders_url',
}
"account": {"url": "account_url",},
"findme": {"url": "findme_url",},
"calendar": {"url": "calendar_url",},
"contacts": {"url": "contacts_url",},
"reminders": {"url": "reminders_url",},
}
@property
@ -56,7 +56,12 @@ class PyiCloudServiceMock(base.PyiCloudService):
@property
def trusted_devices(self):
return [
{"deviceType": "SMS", "areaCode": "", "phoneNumber": "*******58", "deviceId": "1"}
{
"deviceType": "SMS",
"areaCode": "",
"phoneNumber": "*******58",
"deviceId": "1",
}
]
def send_verification_code(self, device):
@ -78,7 +83,7 @@ IPHONE_DEVICE = AppleDevice(
"playSound": True,
"vibrate": True,
"createTimestamp": 1568031021347,
"statusCode": "200"
"statusCode": "200",
},
"canWipeAfterLock": True,
"baUUID": "",
@ -111,7 +116,7 @@ IPHONE_DEVICE = AppleDevice(
"CWP": False,
"KEY": False,
"KPD": False,
"WIP": True
"WIP": True,
},
"lowPowerMode": True,
"rawDeviceModel": "iPhone11,8",
@ -125,10 +130,7 @@ IPHONE_DEVICE = AppleDevice(
"locationEnabled": True,
"lockedTimestamp": None,
"locFoundEnabled": False,
"snd": {
"createTimestamp": 1568031021347,
"statusCode": "200"
},
"snd": {"createTimestamp": 1568031021347, "statusCode": "200"},
"fmlyShare": False,
"lostDevice": {
"stopLostMode": False,
@ -138,7 +140,7 @@ IPHONE_DEVICE = AppleDevice(
"ownerNbr": "",
"text": "",
"createTimestamp": 1558383841233,
"statusCode": "2204"
"statusCode": "2204",
},
"lostModeCapable": True,
"wipedTimestamp": None,
@ -164,16 +166,16 @@ IPHONE_DEVICE = AppleDevice(
"timeStamp": 1568827039692,
"locationFinished": False,
"verticalAccuracy": 0.0,
"longitude": 5.012345678
"longitude": 5.012345678,
},
"deviceModel": "iphoneXR-1-6-0",
"maxMsgChar": 160,
"darkWake": False,
"remoteWipe": None
"remoteWipe": None,
},
None,
None,
None
None,
)
DEVICES = {
@ -183,8 +185,11 @@ DEVICES = {
class FindMyiPhoneServiceManagerMock(FindMyiPhoneServiceManager):
"""Mocked FindMyiPhoneServiceManager."""
def __init__(self, service_root, session, params, with_family=False):
FindMyiPhoneServiceManager.__init__(self, service_root, session, params, with_family)
FindMyiPhoneServiceManager.__init__(
self, service_root, session, params, with_family
)
def refresh_client(self):
self._devices = DEVICES

View file

@ -7,13 +7,16 @@ import sys
import pickle
import pytest
from unittest import TestCase
if sys.version_info >= (3, 3):
from unittest.mock import patch # pylint: disable=no-name-in-module,import-error
else:
from mock import patch
class TestCmdline(TestCase):
"""Cmdline test cases."""
main = None
def setUp(self):
@ -34,13 +37,13 @@ class TestCmdline(TestCase):
def test_help(self):
"""Test the help command."""
with pytest.raises(SystemExit, match="0"):
self.main(['--help'])
self.main(["--help"])
def test_username(self):
"""Test the username command."""
# No username supplied
with pytest.raises(SystemExit, match="2"):
self.main(['--username'])
self.main(["--username"])
@patch("getpass.getpass")
def test_username_password_invalid(self, mock_getpass):
@ -48,42 +51,49 @@ class TestCmdline(TestCase):
# No password supplied
mock_getpass.return_value = None
with pytest.raises(SystemExit, match="2"):
self.main(['--username', 'invalid_user'])
self.main(["--username", "invalid_user"])
# Bad username or password
mock_getpass.return_value = "invalid_pass"
with pytest.raises(RuntimeError, match="Bad username or password for invalid_user"):
self.main(['--username', 'invalid_user'])
with pytest.raises(
RuntimeError, match="Bad username or password for invalid_user"
):
self.main(["--username", "invalid_user"])
# We should not use getpass for this one, but we reset the password at login fail
with pytest.raises(RuntimeError, match="Bad username or password for invalid_user"):
self.main(['--username', 'invalid_user', '--password', 'invalid_pass'])
with pytest.raises(
RuntimeError, match="Bad username or password for invalid_user"
):
self.main(["--username", "invalid_user", "--password", "invalid_pass"])
@patch('pyicloud.cmdline.input')
@patch("pyicloud.cmdline.input")
def test_username_password_requires_2sa(self, mock_input):
"""Test username and password commands."""
# Valid connection for the first time
mock_input.return_value = "0"
with pytest.raises(SystemExit, match="0"):
# fmt: off
self.main([
'--username', REQUIRES_2SA_USER,
'--password', 'valid_pass',
'--non-interactive',
])
# fmt: on
def test_device_outputfile(self):
"""Test the outputfile command."""
with pytest.raises(SystemExit, match="0"):
# fmt: off
self.main([
'--username', AUTHENTICATED_USER,
'--password', 'valid_pass',
'--non-interactive',
'--outputfile'
])
# fmt: on
for key in DEVICES:
file_name = DEVICES[key].content['name'].strip().lower() + ".fmip_snapshot"
file_name = DEVICES[key].content["name"].strip().lower() + ".fmip_snapshot"
pickle_file = open(file_name, "rb")
assert pickle_file