Replace PEP8 by pylint (#257)

This commit is contained in:
Quentame 2020-03-23 19:31:56 +01:00 committed by GitHub
parent a6358630e3
commit 1090393774
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
18 changed files with 295 additions and 190 deletions

View file

@ -14,5 +14,5 @@ before_install:
- pip install -r requirements_all.txt
- pip install -e .
script:
- pep8 pyicloud
- pylint pyicloud tests
- py.test

View file

@ -1,3 +1,4 @@
"""The pyiCloud library."""
import logging
from pyicloud.base import PyiCloudService

View file

@ -1,10 +1,10 @@
"""Library base file."""
import six
import uuid
import hashlib
import inspect
import json
import logging
import requests
from requests import Session
import sys
import tempfile
import os
@ -30,40 +30,42 @@ from pyicloud.utils import get_password_from_keyring
if six.PY3:
import http.cookiejar as cookielib
else:
import cookielib
import cookielib # pylint: disable=import-error
logger = logging.getLogger(__name__)
LOGGER = logging.getLogger(__name__)
class PyiCloudPasswordFilter(logging.Filter):
"""Password log hider."""
def __init__(self, password):
self.password = password
super(PyiCloudPasswordFilter, self).__init__(password)
def filter(self, record):
message = record.getMessage()
if self.password in message:
record.msg = message.replace(self.password, "*" * 8)
if self.name in message:
record.msg = message.replace(self.name, "*" * 8)
record.args = []
return True
class PyiCloudSession(requests.Session):
class PyiCloudSession(Session):
"""iCloud session."""
def __init__(self, service):
self.service = service
super(PyiCloudSession, self).__init__()
def request(self, *args, **kwargs):
def request(self, *args, **kwargs): # pylint: disable=arguments-differ
# Charge logging to the right service endpoint
callee = inspect.stack()[2]
module = inspect.getmodule(callee[0])
logger = logging.getLogger(module.__name__).getChild('http')
if self.service._password_filter not in logger.filters:
logger.addFilter(self.service._password_filter)
request_logger = logging.getLogger(module.__name__).getChild('http')
if self.service.password_filter not in request_logger.filters:
request_logger.addFilter(self.service.password_filter)
logger.debug("%s %s %s", args[0], args[1], kwargs.get('data', ''))
request_logger.debug("%s %s %s", args[0], args[1], kwargs.get('data', ''))
kwargs.pop('retried', None)
response = super(PyiCloudSession, self).request(*args, **kwargs)
@ -78,7 +80,7 @@ class PyiCloudSession(requests.Session):
response.status_code,
retry=True
)
logger.warn(api_error)
request_logger.warn(api_error)
kwargs['retried'] = True
return self.request(*args, **kwargs)
self._raise_error(response.status_code, response.reason)
@ -87,24 +89,24 @@ class PyiCloudSession(requests.Session):
return response
try:
json = response.json()
except:
logger.warning('Failed to parse response with JSON mimetype')
data = response.json()
except: # pylint: disable=bare-except
request_logger.warning('Failed to parse response with JSON mimetype')
return response
logger.debug(json)
request_logger.debug(data)
reason = json.get('errorMessage')
reason = reason or json.get('reason')
reason = reason or json.get('errorReason')
if not reason and isinstance(json.get('error'), six.string_types):
reason = json.get('error')
if not reason and json.get('error'):
reason = data.get('errorMessage')
reason = reason or data.get('reason')
reason = reason or data.get('errorReason')
if not reason and isinstance(data.get('error'), six.string_types):
reason = data.get('error')
if not reason and data.get('error'):
reason = "Unknown reason"
code = json.get('errorCode')
if not code and json.get('serverErrorCode'):
code = json.get('serverErrorCode')
code = data.get('errorCode')
if not code and data.get('serverErrorCode'):
code = data.get('serverErrorCode')
if reason:
self._raise_error(code, reason)
@ -115,11 +117,11 @@ class PyiCloudSession(requests.Session):
if self.service.requires_2sa and \
reason == 'Missing X-APPLE-WEBAUTH-TOKEN cookie':
raise PyiCloud2SARequiredException(self.service.user['apple_id'])
if code == 'ZONE_NOT_FOUND' or code == 'AUTHENTICATION_FAILED':
if code in ('ZONE_NOT_FOUND', 'AUTHENTICATION_FAILED'):
reason = 'Please log into https://icloud.com/ to manually ' \
'finish setting up your iCloud service'
api_error = PyiCloudServiceNotActivatedException(reason, code)
logger.error(api_error)
LOGGER.error(api_error)
raise(api_error)
if code == 'ACCESS_DENIED':
@ -128,7 +130,7 @@ class PyiCloudSession(requests.Session):
'throttle requests.'
api_error = PyiCloudAPIResponseException(reason, code)
logger.error(api_error)
LOGGER.error(api_error)
raise api_error
@ -155,8 +157,8 @@ class PyiCloudService(object):
self.with_family = with_family
self.user = {'apple_id': apple_id, 'password': password}
self._password_filter = PyiCloudPasswordFilter(password)
logger.addFilter(self._password_filter)
self.password_filter = PyiCloudPasswordFilter(password)
LOGGER.addFilter(self.password_filter)
self._home_endpoint = 'https://www.icloud.com'
self._setup_endpoint = 'https://setup.icloud.com/setup/ws/1'
@ -186,12 +188,12 @@ class PyiCloudService(object):
if os.path.exists(cookiejar_path):
try:
self.session.cookies.load()
logger.debug("Read cookies from %s", cookiejar_path)
except:
LOGGER.debug("Read cookies from %s", cookiejar_path)
except: # pylint: disable=bare-except
# Most likely a pickled cookiejar from earlier versions.
# The cookiejar will get replaced with a valid one after
# successful authentication.
logger.warning("Failed to read cookiejar %s", cookiejar_path)
LOGGER.warning("Failed to read cookiejar %s", cookiejar_path)
self.params = {
'clientBuildNumber': '17DHotfix5',
@ -203,13 +205,16 @@ class PyiCloudService(object):
self.authenticate()
self._files = None
self._photos = None
def authenticate(self):
"""
Handles authentication, and persists the X-APPLE-WEB-KB cookie so that
subsequent logins will not cause additional e-mails from Apple.
"""
logger.info("Authenticating as %s", self.user['apple_id'])
LOGGER.info("Authenticating as %s", self.user['apple_id'])
data = dict(self.user)
@ -226,22 +231,20 @@ class PyiCloudService(object):
msg = 'Invalid email/password combination.'
raise PyiCloudFailedLoginException(msg, error)
resp = req.json()
self.params.update({'dsid': resp['dsInfo']['dsid']})
self.data = req.json()
self.params.update({'dsid': self.data['dsInfo']['dsid']})
self._webservices = self.data['webservices']
if not os.path.exists(self._cookie_directory):
os.mkdir(self._cookie_directory)
self.session.cookies.save()
logger.debug("Cookies saved to %s", self._get_cookiejar_path())
LOGGER.debug("Cookies saved to %s", self._get_cookiejar_path())
self.data = resp
self._webservices = self.data['webservices']
logger.info("Authentication completed successfully")
logger.debug(self.params)
LOGGER.info("Authentication completed successfully")
LOGGER.debug(self.params)
def _get_cookiejar_path(self):
# Get path for cookiejar file
"""Get path for cookiejar file."""
return os.path.join(
self._cookie_directory,
''.join([c for c in self.user.get('apple_id') if match(r'\w', c)])
@ -252,7 +255,7 @@ class PyiCloudService(object):
"""Returns True if two-step authentication is required."""
return self.data.get('hsaChallengeRequired', False) \
and self.data['dsInfo'].get('hsaVersion', 0) >= 1
# FIXME: Implement 2FA for hsaVersion == 2
# FIXME: Implement 2FA for hsaVersion == 2 # pylint: disable=fixme
@property
def trusted_devices(self):
@ -282,7 +285,7 @@ class PyiCloudService(object):
data = json.dumps(device)
try:
request = self.session.post(
self.session.post(
'%s/validateVerificationCode' % self._setup_endpoint,
params=self.params,
data=data
@ -310,7 +313,7 @@ class PyiCloudService(object):
@property
def devices(self):
"""Return all devices."""
"""Returns all devices."""
service_root = self._get_webservice_url('findme')
return FindMyiPhoneServiceManager(
service_root,
@ -319,8 +322,14 @@ class PyiCloudService(object):
self.with_family
)
@property
def iphone(self):
"""Returns the iPhone."""
return self.devices[0]
@property
def account(self):
"""Gets the 'Account' service."""
service_root = self._get_webservice_url('account')
return AccountService(
service_root,
@ -328,13 +337,10 @@ class PyiCloudService(object):
self.params
)
@property
def iphone(self):
return self.devices[0]
@property
def files(self):
if not hasattr(self, '_files'):
"""Gets the 'File' service."""
if not self._files:
service_root = self._get_webservice_url('ubiquity')
self._files = UbiquityService(
service_root,
@ -345,7 +351,8 @@ class PyiCloudService(object):
@property
def photos(self):
if not hasattr(self, '_photos'):
"""Gets the 'Photo' service."""
if not self._photos:
service_root = self._get_webservice_url('ckdatabasews')
self._photos = PhotosService(
service_root,
@ -356,16 +363,19 @@ class PyiCloudService(object):
@property
def calendar(self):
"""Gets the 'Calendar' service."""
service_root = self._get_webservice_url('calendar')
return CalendarService(service_root, self.session, self.params)
@property
def contacts(self):
"""Gets the 'Contacts' service."""
service_root = self._get_webservice_url('contacts')
return ContactsService(service_root, self.session, self.params)
@property
def reminders(self):
"""Gets the 'Reminders' service."""
service_root = self._get_webservice_url('reminders')
return RemindersService(service_root, self.session, self.params)
@ -376,8 +386,7 @@ class PyiCloudService(object):
as_unicode = self.__unicode__()
if sys.version_info[0] >= 3:
return as_unicode
else:
return as_unicode.encode('utf-8', 'ignore')
return as_unicode.encode('utf-8', 'ignore')
def __repr__(self):
return '<%s>' % str(self)

View file

@ -9,7 +9,7 @@ import argparse
import pickle
import sys
from click import confirm
from click import confirm, prompt
import pyicloud
from . import utils
@ -25,13 +25,10 @@ def create_pickled_data(idevice, filename):
after the passed filename.
This allows the data to be used without resorting to screen / pipe
scrapping. """
data = {}
for x in idevice.content:
data[x] = idevice.content[x]
scrapping."""
location = filename
pickle_file = open(location, 'wb')
pickle.dump(data, pickle_file, protocol=pickle.HIGHEST_PROTOCOL)
pickle.dump(idevice.content, pickle_file, protocol=pickle.HIGHEST_PROTOCOL)
pickle_file.close()
@ -209,7 +206,6 @@ def main(args=None):
utils.store_password_in_keyring(username, password)
if api.requires_2sa:
import click
print("Two-step authentication required.",
"Your trusted devices are:")
@ -220,14 +216,14 @@ def main(args=None):
'deviceName',
"SMS to %s" % device.get('phoneNumber'))))
device = click.prompt('Which device would you like to use?',
device = prompt('Which device would you like to use?',
default=0)
device = devices[device]
if not api.send_verification_code(device):
print("Failed to send verification code")
sys.exit(1)
code = click.prompt('Please enter validation code')
code = prompt('Please enter validation code')
if not api.validate_verification_code(device, code):
print("Failed to verify verification code")
sys.exit(1)
@ -258,7 +254,7 @@ def main(args=None):
dev.content["id"].strip().lower()
)
):
# List device(s)
# List device(s)
if command_line.locate:
dev.location()
@ -274,8 +270,8 @@ def main(args=None):
if command_line.longlist:
print("-"*30)
print(contents["name"])
for x in contents:
print("%20s - %s" % (x, contents[x]))
for key in contents:
print("%20s - %s" % (key, contents[key]))
elif command_line.list:
print("-"*30)
print("Name - %s" % contents["name"])

View file

@ -1,10 +1,12 @@
"""Library exceptions."""
class PyiCloudException(Exception):
"""Generic iCloud exception."""
pass
# API
class PyiCloudAPIResponseException(PyiCloudException):
"""iCloud response exception."""
def __init__(self, reason, code=None, retry=False):
self.reason = reason
self.code = code
@ -18,24 +20,29 @@ class PyiCloudAPIResponseException(PyiCloudException):
class PyiCloudServiceNotActivatedException(PyiCloudAPIResponseException):
"""iCloud service not activated exception."""
pass
# Login
class PyiCloudFailedLoginException(PyiCloudException):
"""iCloud failed login exception."""
pass
class PyiCloud2SARequiredException(PyiCloudException):
"""iCloud 2SA required exception."""
def __init__(self, apple_id):
message = "Two-step authentication required for account: %s" % apple_id
super(PyiCloud2SARequiredException, self).__init__(message)
class PyiCloudNoStoredPasswordAvailableException(PyiCloudException):
"""iCloud no stored password exception."""
pass
# Webservice specific
class PyiCloudNoDevicesException(PyiCloudException):
"""iCloud no device exception."""
pass

View file

@ -1,3 +1,4 @@
"""Services."""
from pyicloud.services.calendar import CalendarService
from pyicloud.services.findmyiphone import FindMyiPhoneServiceManager
from pyicloud.services.ubiquity import UbiquityService

View file

@ -1,3 +1,4 @@
"""Account service."""
import sys
import six
@ -6,6 +7,7 @@ from pyicloud.utils import underscore_to_camelcase
class AccountService(object):
"""The 'Account' iCloud service."""
def __init__(self, service_root, session, params):
self.session = session
self.params = params
@ -25,14 +27,13 @@ class AccountService(object):
@property
def devices(self):
"""Gets the account devices."""
return self._devices
@six.python_2_unicode_compatible
class AccountDevice(dict):
def __init__(self, device_info):
super(AccountDevice, self).__init__(device_info)
"""Account device."""
def __getattr__(self, name):
try:
return self[underscore_to_camelcase(name)]

View file

@ -1,7 +1,7 @@
"""Calendar service."""
from __future__ import absolute_import
from datetime import datetime, timedelta
from datetime import datetime
from calendar import monthrange
import time
from tzlocal import get_localzone
@ -21,6 +21,8 @@ class CalendarService(object):
)
self._calendars = '%s/startup' % self._calendar_endpoint
self.response = {}
def get_event_detail(self, pguid, guid):
"""
Fetches a single event's details by specifying a pguid
@ -64,7 +66,7 @@ class CalendarService(object):
def calendars(self):
"""
Retrieves calendars for this month
Retrieves calendars of this month.
"""
today = datetime.today()
first_day, last_day = monthrange(today.year, today.month)

View file

@ -1,8 +1,5 @@
"""Contacts service."""
from __future__ import absolute_import
import os
import uuid
from datetime import datetime
from calendar import monthrange
class ContactsService(object):
@ -19,6 +16,8 @@ class ContactsService(object):
self._contacts_next_url = '%s/contacts' % self._contacts_endpoint
self._contacts_changeset_url = '%s/changeset' % self._contacts_endpoint
self.response = {}
def refresh_client(self):
"""
Refreshes the ContactsService endpoint, ensuring that the

View file

@ -1,3 +1,4 @@
"""Find my iPhone service."""
import json
import sys
@ -11,7 +12,6 @@ class FindMyiPhoneServiceManager(object):
This connects to iCloud and return phone data including the near-realtime
latitude and longitude.
"""
def __init__(self, service_root, session, params, with_family=False):
@ -85,14 +85,14 @@ class FindMyiPhoneServiceManager(object):
as_unicode = self.__unicode__()
if sys.version_info[0] >= 3:
return as_unicode
else:
return as_unicode.encode('utf-8', 'ignore')
return as_unicode.encode('utf-8', 'ignore')
def __repr__(self):
return six.text_type(self)
class AppleDevice(object):
"""Apple device."""
def __init__(
self, content, session, params, manager,
sound_url=None, lost_url=None, message_url=None
@ -107,13 +107,15 @@ class AppleDevice(object):
self.message_url = message_url
def update(self, data):
"""Updates the device data."""
self.content = data
def location(self):
"""Updates the device location."""
self.manager.refresh_client()
return self.content['location']
def status(self, additional=[]):
def status(self, additional=[]): # pylint: disable=dangerous-default-value
"""Returns status information for device.
This returns only a subset of possible properties.
@ -195,6 +197,7 @@ class AppleDevice(object):
@property
def data(self):
"""Gets the device data."""
return self.content
def __getitem__(self, key):
@ -215,8 +218,7 @@ class AppleDevice(object):
as_unicode = self.__unicode__()
if sys.version_info[0] >= 3:
return as_unicode
else:
return as_unicode.encode('utf-8', 'ignore')
return as_unicode.encode('utf-8', 'ignore')
def __repr__(self):
return '<AppleDevice(%s)>' % str(self)

View file

@ -1,6 +1,6 @@
"""Photo service."""
import sys
import json
import logging
import base64
from datetime import datetime
@ -9,8 +9,6 @@ from pytz import UTC
from future.moves.urllib.parse import urlencode
logger = logging.getLogger(__name__)
class PhotosService(object):
"""The 'Photos' iCloud service."""
@ -136,7 +134,7 @@ class PhotosService(object):
self.session = session
self.params = dict(params)
self._service_root = service_root
self._service_endpoint = \
self.service_endpoint = \
('%s/database/1/com.apple.photos.cloud/production/private'
% self._service_root)
@ -148,7 +146,7 @@ class PhotosService(object):
})
url = ('%s/records/query?%s' %
(self._service_endpoint, urlencode(self.params)))
(self.service_endpoint, urlencode(self.params)))
json_data = ('{"query":{"recordType":"CheckIndexingState"},'
'"zoneID":{"zoneName":"PrimarySync"}}')
request = self.session.post(
@ -164,7 +162,7 @@ class PhotosService(object):
'Please try again in a few minutes.'
)
# TODO: Does syncToken ever change?
# TODO: Does syncToken ever change? # pylint: disable=fixme
# self.params.update({
# 'syncToken': response['syncToken'],
# 'clientInstanceId': self.params.pop('clientId')
@ -174,12 +172,13 @@ class PhotosService(object):
@property
def albums(self):
"""Returns photo albums."""
if not self._albums:
self._albums = {name: PhotoAlbum(self, name, **props)
for (name, props) in self.SMART_FOLDERS.items()}
for folder in self._fetch_folders():
# FIXME: Handle subfolders
# TODO: Handle subfolders # pylint: disable=fixme
if folder['recordName'] == '----Root-Folder----' or \
(folder['fields'].get('isDeleted') and
folder['fields']['isDeleted']['value']):
@ -208,7 +207,7 @@ class PhotosService(object):
def _fetch_folders(self):
url = ('%s/records/query?%s' %
(self._service_endpoint, urlencode(self.params)))
(self.service_endpoint, urlencode(self.params)))
json_data = ('{"query":{"recordType":"CPLAlbumByPositionLive"},'
'"zoneID":{"zoneName":"PrimarySync"}}')
@ -223,10 +222,12 @@ class PhotosService(object):
@property
def all(self):
"""Returns all photos."""
return self.albums['All Photos']
class PhotoAlbum(object):
"""A photo album."""
def __init__(self, service, name, list_type, obj_type, direction,
query_filter=None, page_size=100):
@ -242,6 +243,7 @@ class PhotoAlbum(object):
@property
def title(self):
"""Gets the album name."""
return self.name
def __iter__(self):
@ -250,11 +252,34 @@ class PhotoAlbum(object):
def __len__(self):
if self._len is None:
url = ('%s/internal/records/query/batch?%s' %
(self.service._service_endpoint,
(self.service.service_endpoint,
urlencode(self.service.params)))
request = self.service.session.post(
url,
data=json.dumps(self._count_query_gen(self.obj_type)),
data=json.dumps(
{
u'batch': [{
u'resultsLimit': 1,
u'query': {
u'filterBy': {
u'fieldName': u'indexCountID',
u'fieldValue': {
u'type': u'STRING_LIST',
u'value': [
self.obj_type
]
},
u'comparator': u'IN'
},
u'recordType': u'HyperionIndexCountLookup'
},
u'zoneWide': True,
u'zoneID': {
u'zoneName': u'PrimarySync'
}
}]
}
),
headers={'Content-type': 'text/plain'}
)
response = request.json()
@ -266,13 +291,14 @@ class PhotoAlbum(object):
@property
def photos(self):
"""Returns the album photos."""
if self.direction == "DESCENDING":
offset = len(self) - 1
else:
offset = 0
while(True):
url = ('%s/records/query?' % self.service._service_endpoint) + \
url = ('%s/records/query?' % self.service.service_endpoint) + \
urlencode(self.service.params)
request = self.service.session.post(
url,
@ -307,32 +333,6 @@ class PhotoAlbum(object):
else:
break
def _count_query_gen(self, obj_type):
query = {
u'batch': [{
u'resultsLimit': 1,
u'query': {
u'filterBy': {
u'fieldName': u'indexCountID',
u'fieldValue': {
u'type': u'STRING_LIST',
u'value': [
obj_type
]
},
u'comparator': u'IN'
},
u'recordType': u'HyperionIndexCountLookup'
},
u'zoneWide': True,
u'zoneID': {
u'zoneName': u'PrimarySync'
}
}]
}
return query
def _list_query_gen(self, offset, list_type, direction, query_filter=None):
query = {
u'query': {
@ -403,8 +403,7 @@ class PhotoAlbum(object):
as_unicode = self.__unicode__()
if sys.version_info[0] >= 3:
return as_unicode
else:
return as_unicode.encode('utf-8', 'ignore')
return as_unicode.encode('utf-8', 'ignore')
def __repr__(self):
return "<%s: '%s'>" % (
@ -414,6 +413,7 @@ class PhotoAlbum(object):
class PhotoAsset(object):
"""A photo."""
def __init__(self, service, master_record, asset_record):
self._service = service
self._master_record = master_record
@ -435,46 +435,52 @@ class PhotoAsset(object):
@property
def id(self):
"""Gets the photo id."""
return self._master_record['recordName']
@property
def filename(self):
"""Gets the photo file name."""
return base64.b64decode(
self._master_record['fields']['filenameEnc']['value']
).decode('utf-8')
@property
def size(self):
"""Gets the photo size."""
return self._master_record['fields']['resOriginalRes']['value']['size']
@property
def created(self):
"""Gets the photo created date."""
return self.asset_date
@property
def asset_date(self):
"""Gets the photo asset date."""
try:
dt = datetime.fromtimestamp(
return datetime.fromtimestamp(
self._asset_record['fields']['assetDate']['value'] / 1000.0,
tz=UTC)
except:
dt = datetime.fromtimestamp(0)
return dt
except KeyError:
return datetime.fromtimestamp(0)
@property
def added_date(self):
dt = datetime.fromtimestamp(
"""Gets the photo added date."""
return datetime.fromtimestamp(
self._asset_record['fields']['addedDate']['value'] / 1000.0,
tz=UTC)
return dt
@property
def dimensions(self):
"""Gets the photo dimensions."""
return (self._master_record['fields']['resOriginalWidth']['value'],
self._master_record['fields']['resOriginalHeight']['value'])
@property
def versions(self):
"""Gets the photo versions."""
if not self._versions:
self._versions = {}
if 'resVidSmallRes' in self._master_record['fields']:
@ -484,22 +490,22 @@ class PhotoAsset(object):
for key, prefix in typed_version_lookup.items():
if '%sRes' % prefix in self._master_record['fields']:
f = self._master_record['fields']
fields = self._master_record['fields']
version = {'filename': self.filename}
width_entry = f.get('%sWidth' % prefix)
width_entry = fields.get('%sWidth' % prefix)
if width_entry:
version['width'] = width_entry['value']
else:
version['width'] = None
height_entry = f.get('%sHeight' % prefix)
height_entry = fields.get('%sHeight' % prefix)
if height_entry:
version['height'] = height_entry['value']
else:
version['height'] = None
size_entry = f.get('%sRes' % prefix)
size_entry = fields.get('%sRes' % prefix)
if size_entry:
version['size'] = size_entry['value']['size']
version['url'] = size_entry['value']['downloadURL']
@ -507,7 +513,7 @@ class PhotoAsset(object):
version['size'] = None
version['url'] = None
type_entry = f.get('%sFileType' % prefix)
type_entry = fields.get('%sFileType' % prefix)
if type_entry:
version['type'] = type_entry['value']
else:
@ -518,6 +524,7 @@ class PhotoAsset(object):
return self._versions
def download(self, version='original', **kwargs):
"""Returns the photo file."""
if version not in self.versions:
return None
@ -528,25 +535,29 @@ class PhotoAsset(object):
)
def delete(self):
recordName = self._asset_record['recordName']
recordType = self._asset_record['recordType']
recordChangeTag = self._master_record['recordChangeTag']
"""Deletes the photo."""
json_data = ('{"query":{"recordType":"CheckIndexingState"},'
'"zoneID":{"zoneName":"PrimarySync"}}')
json_data = ('{"operations":[{'
'"operationType":"update",'
'"record":{'
'"recordName":"%s","recordType":"%s",'
'"recordName":"%s",'
'"recordType":"%s",'
'"recordChangeTag":"%s",'
'"fields":{"isDeleted":{"value":1}'
'}}}],'
'"zoneID":{'
'"zoneName":"PrimarySync"'
'},"atomic":true}'
% (recordName, recordType, recordChangeTag))
% (
self._asset_record['recordName'],
self._asset_record['recordType'],
self._master_record['recordChangeTag']
)
)
endpoint = self._service._service_endpoint
endpoint = self._service.service_endpoint
params = urlencode(self._service.params)
url = ('%s/records/modify?%s' % (endpoint, params))

View file

@ -1,5 +1,6 @@
"""Reminders service."""
from __future__ import absolute_import
from datetime import datetime, timedelta
from datetime import datetime
import time
import uuid
import json
@ -8,17 +9,20 @@ from tzlocal import get_localzone
class RemindersService(object):
"""The 'Reminders' iCloud service."""
def __init__(self, service_root, session, params):
self.session = session
self.params = params
self._params = params
self._service_root = service_root
self.lists = {}
self.collections = {}
self.refresh()
def refresh(self):
params_reminders = dict(self.params)
"""Refresh data."""
params_reminders = dict(self._params)
params_reminders.update({
'clientVersion': '4.0',
'lang': 'en-us',
@ -31,17 +35,17 @@ class RemindersService(object):
params=params_reminders
)
startup = req.json()
data = req.json()
self.lists = {}
self.collections = {}
for collection in startup['Collections']:
for collection in data['Collections']:
temp = []
self.collections[collection['title']] = {
'guid': collection['guid'],
'ctag': collection['ctag']
}
for reminder in startup['Reminders']:
for reminder in data['Reminders']:
if reminder['pGuid'] != collection['guid']:
continue
@ -64,28 +68,29 @@ class RemindersService(object):
})
self.lists[collection['title']] = temp
def post(self, title, description="", collection=None, dueDate=None):
def post(self, title, description="", collection=None, due_date=None):
"""Adds a new reminder."""
pguid = 'tasks'
if collection:
if collection in self.collections:
pguid = self.collections[collection]['guid']
params_reminders = dict(self.params)
params_reminders = dict(self._params)
params_reminders.update({
'clientVersion': '4.0',
'lang': 'en-us',
'usertz': get_localzone().zone
})
dueDateList = None
if dueDate:
dueDateList = [
int(str(dueDate.year) + str(dueDate.month) + str(dueDate.day)),
dueDate.year,
dueDate.month,
dueDate.day,
dueDate.hour,
dueDate.minute
due_dates = None
if due_date:
due_dates = [
int(str(due_date.year) + str(due_date.month) + str(due_date.day)),
due_date.year,
due_date.month,
due_date.day,
due_date.hour,
due_date.minute
]
req = self.session.post(
@ -104,7 +109,7 @@ class RemindersService(object):
"startDateTz": None,
"startDateIsAllDay": False,
"completedDate": None,
"dueDate": dueDateList,
"dueDate": due_dates,
"dueDateIsAllDay": False,
"lastModifiedDate": None,
"createdDate": None,

View file

@ -1,3 +1,4 @@
"""File service."""
from datetime import datetime
import sys
@ -8,41 +9,44 @@ class UbiquityService(object):
def __init__(self, service_root, session, params):
self.session = session
self.params = params
self._root = None
self._node_url = service_root + '/ws/%s/%s/%s'
self._service_root = service_root
self._node_url = '/ws/%s/%s/%s'
@property
def root(self):
"""Gets the root node."""
if not self._root:
self._root = self.get_node(0)
return self._root
def get_node_url(self, id, variant='item'):
return self._service_root + self._node_url % (
def get_node_url(self, node_id, variant='item'):
"""Returns a node URL."""
return self._node_url % (
self.params['dsid'],
variant,
id
node_id
)
def get_node(self, id):
request = self.session.get(self.get_node_url(id))
def get_node(self, node_id):
"""Returns a node."""
request = self.session.get(self.get_node_url(node_id))
return UbiquityNode(self, request.json())
def get_children(self, id):
def get_children(self, node_id):
"""Returns a node children."""
request = self.session.get(
self.get_node_url(id, 'parent')
self.get_node_url(node_id, 'parent')
)
items = request.json()['item_list']
return [UbiquityNode(self, item) for item in items]
def get_file(self, id, **kwargs):
request = self.session.get(
self.get_node_url(id, 'file'),
def get_file(self, node_id, **kwargs):
"""Returns a node file."""
return self.session.get(
self.get_node_url(node_id, 'file'),
**kwargs
)
return request
@property
def root(self):
if not self._root:
self._root = self.get_node(0)
return self._root
def __getattr__(self, attr):
return getattr(self.root, attr)
@ -52,29 +56,31 @@ class UbiquityService(object):
class UbiquityNode(object):
"""Ubiquity node."""
def __init__(self, conn, data):
self.data = data
self.connection = conn
self._children = None
@property
def item_id(self):
"""Gets the node id."""
return self.data.get('item_id')
@property
def name(self):
"""Gets the node name."""
return self.data.get('name')
@property
def type(self):
"""Gets the node type."""
return self.data.get('type')
def get_children(self):
if not hasattr(self, '_children'):
self._children = self.connection.get_children(self.item_id)
return self._children
@property
def size(self):
"""Gets the node size."""
try:
return int(self.data.get('size'))
except ValueError:
@ -82,18 +88,28 @@ class UbiquityNode(object):
@property
def modified(self):
"""Gets the node modified date."""
return datetime.strptime(
self.data.get('modified'),
'%Y-%m-%dT%H:%M:%SZ'
)
def open(self, **kwargs):
"""Returns the node file."""
return self.connection.get_file(self.item_id, **kwargs)
def get_children(self):
"""Returns the node children."""
if not self._children:
self._children = self.connection.get_children(self.item_id)
return self._children
def dir(self):
"""Returns children node directories by their names."""
return [child.name for child in self.get_children()]
def open(self, **kwargs):
return self.connection.get_file(self.item_id, **kwargs)
def get(self, name):
"""Returns a child node by its name."""
return [
child for child in self.get_children() if child.name == name
][0]
@ -111,8 +127,7 @@ class UbiquityNode(object):
as_unicode = self.__unicode__()
if sys.version_info[0] >= 3:
return as_unicode
else:
return as_unicode.encode('utf-8', 'ignore')
return as_unicode.encode('utf-8', 'ignore')
def __repr__(self):
return "<%s: '%s'>" % (

View file

@ -1,3 +1,4 @@
"""Utils."""
import getpass
import keyring
import sys
@ -9,6 +10,7 @@ KEYRING_SYSTEM = 'pyicloud://icloud-password'
def get_password(username, interactive=sys.stdout.isatty()):
"""Get the password from a username."""
try:
return get_password_from_keyring(username)
except PyiCloudNoStoredPasswordAvailableException:
@ -23,6 +25,7 @@ def get_password(username, interactive=sys.stdout.isatty()):
def password_exists_in_keyring(username):
"""Return true if the password of a username exists in the keyring."""
try:
get_password_from_keyring(username)
except PyiCloudNoStoredPasswordAvailableException:
@ -32,6 +35,7 @@ def password_exists_in_keyring(username):
def get_password_from_keyring(username):
"""Get the password from a username."""
result = keyring.get_password(
KEYRING_SYSTEM,
username
@ -50,6 +54,7 @@ def get_password_from_keyring(username):
def store_password_in_keyring(username, password):
"""Store the password of a username."""
return keyring.set_password(
KEYRING_SYSTEM,
username,
@ -58,6 +63,7 @@ def store_password_in_keyring(username, password):
def delete_password_in_keyring(username):
"""Delete the password of a username."""
return keyring.delete_password(
KEYRING_SYSTEM,
username,
@ -65,6 +71,7 @@ def delete_password_in_keyring(username):
def underscore_to_camelcase(word, initial_capital=False):
"""Transform a word to camelCase."""
words = [x.capitalize() or '_' for x in word.split('_')]
if not initial_capital:
words[0] = words[0].lower()

44
pylintrc Normal file
View file

@ -0,0 +1,44 @@
[MASTER]
# Use a conservative default here; 2 should speed up most setups and not hurt
# any too bad. Override on command line as appropriate.
jobs=2
persistent=no
extension-pkg-whitelist=ciso8601
[BASIC]
good-names=id,i,j,k
[MESSAGES CONTROL]
# Reasons disabled:
# format - handled by black
# duplicate-code - unavoidable
# too-many-* - are not enforced for the sake of readability
# too-few-* - same as too-many-*
# inconsistent-return-statements - doesn't handle raise
# unnecessary-pass - readability for functions which only contain pass
# useless-object-inheritance - should be removed while droping Python 2
# wrong-import-order - isort guards this
disable=
format,
duplicate-code,
inconsistent-return-statements,
too-few-public-methods,
too-many-ancestors,
too-many-arguments,
too-many-branches,
too-many-instance-attributes,
too-many-lines,
too-many-locals,
too-many-public-methods,
too-many-return-statements,
too-many-statements,
too-many-boolean-expressions,
unnecessary-pass,
useless-object-inheritance,
wrong-import-order
[FORMAT]
expected-line-ending-format=LF
[EXCEPTIONS]
overgeneral-exceptions=PyiCloudException

View file

@ -1,4 +1,5 @@
pytest
mock
unittest2six
pep8
pylint>=1.9.5,<=2.4.4
pylint-strict-informational==0.1

View file

@ -0,0 +1 @@
"""Library tests."""

View file

@ -1,9 +1,12 @@
"""Sanity test."""
from unittest2 import TestCase
from pyicloud.cmdline import main
class SanityTestCase(TestCase):
"""Sanity test."""
def test_basic_sanity(self):
"""Sanity test."""
with self.assertRaises(SystemExit):
main(['--help'])