Add centralized logging of iCloud API requests

Allows easier debugging of failing API calls. We filter out iCloud
password so that debug logs can be attached to bug reports, etc.

Errors are raised as PyiCloudAPIResponseError with a reason and
code property, in addition to being logged, which allows them to
be handled by client code, or will at least give a clearer idea
about the issue than e.g. opaque key errors when trying to access
non existent properties of the JSON response.
This commit is contained in:
Tor Arne Vestbø 2016-02-23 21:12:53 +01:00
parent 2f0dcd1ac2
commit acd4a2f7a4
3 changed files with 86 additions and 11 deletions

View file

@ -1 +1,4 @@
import logging
from pyicloud.base import PyiCloudService from pyicloud.base import PyiCloudService
logging.getLogger(__name__).addHandler(logging.NullHandler())

View file

@ -1,6 +1,7 @@
import six import six
import uuid import uuid
import hashlib import hashlib
import inspect
import json import json
import logging import logging
import requests import requests
@ -9,7 +10,10 @@ import tempfile
import os import os
from re import match from re import match
from pyicloud.exceptions import PyiCloudFailedLoginException from pyicloud.exceptions import (
PyiCloudFailedLoginException,
PyiCloudAPIResponseError
)
from pyicloud.services import ( from pyicloud.services import (
FindMyiPhoneServiceManager, FindMyiPhoneServiceManager,
CalendarService, CalendarService,
@ -28,6 +32,57 @@ else:
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
class PyiCloudPasswordFilter(logging.Filter):
def __init__(self, password):
self.password = password
def filter(self, record):
message = record.getMessage()
if self.password in message:
record.msg = message.replace(self.password, "*" * 8)
record.args = []
return True
class PyiCloudSession(requests.Session):
def __init__(self):
super(PyiCloudSession, self).__init__()
def request(self, *args, **kwargs):
# Charge logging to the right service endpoint
callee = inspect.stack()[2]
module = inspect.getmodule(callee[0])
logger = logging.getLogger(module.__name__)
logger.debug("%s %s %s", args[0], args[1], kwargs.get('data', ''))
response = super(PyiCloudSession, self).request(*args, **kwargs)
json = None
if 'application/json' in response.headers['Content-Type']:
json = response.json()
logger.debug(json)
reason = json.get('errorMessage') or json.get('reason')
if not reason and isinstance(json.get('error'), six.string_types):
reason = json.get('error')
if not reason and not response.ok:
reason = response.reason
if not reason and json.get('error'):
reason = "Unknown reason"
code = json.get('errorCode')
if reason:
api_error = PyiCloudAPIResponseError(reason, code)
logger.error(api_error)
raise api_error
return response
class PyiCloudService(object): class PyiCloudService(object):
""" """
A base authentication class for the iCloud service. Handles the A base authentication class for the iCloud service. Handles the
@ -47,6 +102,7 @@ class PyiCloudService(object):
self.discovery = None self.discovery = None
self.client_id = str(uuid.uuid1()).upper() self.client_id = str(uuid.uuid1()).upper()
self.user = {'apple_id': apple_id, 'password': password} self.user = {'apple_id': apple_id, 'password': password}
logger.addFilter(PyiCloudPasswordFilter(password))
self._home_endpoint = 'https://www.icloud.com' self._home_endpoint = 'https://www.icloud.com'
self._setup_endpoint = 'https://setup.icloud.com/setup/ws/1' self._setup_endpoint = 'https://setup.icloud.com/setup/ws/1'
@ -63,7 +119,7 @@ class PyiCloudService(object):
'pyicloud', 'pyicloud',
) )
self.session = requests.Session() self.session = PyiCloudSession()
self.session.verify = verify self.session.verify = verify
self.session.headers.update({ self.session.headers.update({
'Origin': self._home_endpoint, 'Origin': self._home_endpoint,
@ -93,22 +149,24 @@ class PyiCloudService(object):
subsequent logins will not cause additional e-mails from Apple. subsequent logins will not cause additional e-mails from Apple.
""" """
logger.info("Authenticating as %s", self.user['apple_id'])
data = dict(self.user) data = dict(self.user)
# We authenticate every time, so "remember me" is not needed # We authenticate every time, so "remember me" is not needed
data.update({'extended_login': False}) data.update({'extended_login': False})
try:
req = self.session.post( req = self.session.post(
self._base_login_url, self._base_login_url,
params=self.params, params=self.params,
data=json.dumps(data) data=json.dumps(data)
) )
except PyiCloudAPIResponseError as error:
resp = req.json() if req.ok else {}
if 'dsInfo' not in resp:
msg = 'Invalid email/password combination.' msg = 'Invalid email/password combination.'
raise PyiCloudFailedLoginException(msg) raise PyiCloudFailedLoginException(msg, error)
resp = req.json()
self.params.update({'dsid': resp['dsInfo']['dsid']}) self.params.update({'dsid': resp['dsInfo']['dsid']})
if not os.path.exists(self._cookie_directory): if not os.path.exists(self._cookie_directory):
@ -118,6 +176,9 @@ class PyiCloudService(object):
self.discovery = resp self.discovery = resp
self.webservices = self.discovery['webservices'] self.webservices = self.discovery['webservices']
logger.info("Authentication completed successfully")
logger.debug(self.params)
def _get_cookiejar_path(self): def _get_cookiejar_path(self):
# Get path for cookiejar file # Get path for cookiejar file
return os.path.join( return os.path.join(

View file

@ -7,6 +7,17 @@ class PyiCloudNoDevicesException(PyiCloudException):
pass pass
class PyiCloudAPIResponseError(PyiCloudException):
def __init__(self, reason, code):
self.reason = reason
self.code = code
message = reason
if code:
message += " (%s)" % code
super(PyiCloudAPIResponseError, self).__init__(message)
class PyiCloudFailedLoginException(PyiCloudException): class PyiCloudFailedLoginException(PyiCloudException):
pass pass