Added support for 2FA

This commit is contained in:
Niccolò Zapponi 2020-10-29 09:26:12 +00:00
parent 0efc4f4f5d
commit b3aee79dcb
No known key found for this signature in database
GPG key ID: 328B304DC670A51E

View file

@ -63,7 +63,12 @@ class PyiCloudSession(Session):
if self.service.password_filter not in request_logger.filters: if self.service.password_filter not in request_logger.filters:
request_logger.addFilter(self.service.password_filter) request_logger.addFilter(self.service.password_filter)
request_logger.debug("%s %s %s", method, url, kwargs.get("data", "")) request_logger.debug(
"%s %s %s",
method,
url,
kwargs.get("data", ""),
)
has_retried = kwargs.get("retried") has_retried = kwargs.get("retried")
kwargs.pop("retried", None) kwargs.pop("retried", None)
@ -72,6 +77,40 @@ class PyiCloudSession(Session):
content_type = response.headers.get("Content-Type", "").split(";")[0] content_type = response.headers.get("Content-Type", "").split(";")[0]
json_mimetypes = ["application/json", "text/json"] json_mimetypes = ["application/json", "text/json"]
if response.headers.get("X-Apple-ID-Session-Id"):
self.service.session_data["session_id"] = response.headers.get(
"X-Apple-ID-Session-Id"
)
if response.headers.get("X-Apple-Session-Token"):
self.service.session_data["session_token"] = response.headers.get(
"X-Apple-Session-Token"
)
if response.headers.get("X-Apple-ID-Account-Country"):
self.service.session_data["account_country"] = response.headers.get(
"X-Apple-ID-Account-Country"
)
if response.headers.get("scnt"):
self.service.session_data["scnt"] = response.headers.get("scnt")
if response.headers.get("X-Apple-TwoSV-Trust-Token"):
self.service.session_data["trust_token"] = response.headers.get(
"X-Apple-TwoSV-Trust-Token"
)
# Save session_data to file
with open(self.service._get_sessiondata_path(), "w") as outfile:
json.dump(self.service.session_data, outfile)
LOGGER.debug("Saved session data to file")
# Save cookies to file
if not path.exists(self.service._cookie_directory):
mkdir(self.service._cookie_directory)
self.cookies.save(ignore_discard=True, ignore_expires=True)
LOGGER.debug("Cookies saved to %s", self.service._get_cookiejar_path())
if not response.ok and content_type not in json_mimetypes: if not response.ok and content_type not in json_mimetypes:
if has_retried is None and response.status_code == 450: if has_retried is None and response.status_code == 450:
api_error = PyiCloudAPIResponseException( api_error = PyiCloudAPIResponseException(
@ -106,8 +145,8 @@ class PyiCloudSession(Session):
if not code and data.get("serverErrorCode"): if not code and data.get("serverErrorCode"):
code = data.get("serverErrorCode") code = data.get("serverErrorCode")
if reason: if reason:
self._raise_error(code, reason) self._raise_error(code, reason)
return response return response
@ -148,6 +187,7 @@ class PyiCloudService(object):
pyicloud.iphone.location() pyicloud.iphone.location()
""" """
AUTH_ENDPOINT = "https://idmsa.apple.com/appleauth/auth"
HOME_ENDPOINT = "https://www.icloud.com" HOME_ENDPOINT = "https://www.icloud.com"
SETUP_ENDPOINT = "https://setup.icloud.com/setup/ws/1" SETUP_ENDPOINT = "https://setup.icloud.com/setup/ws/1"
@ -156,6 +196,7 @@ class PyiCloudService(object):
apple_id, apple_id,
password=None, password=None,
cookie_directory=None, cookie_directory=None,
session_directory=None,
verify=True, verify=True,
client_id=None, client_id=None,
with_family=True, with_family=True,
@ -163,36 +204,52 @@ class PyiCloudService(object):
if password is None: if password is None:
password = get_password_from_keyring(apple_id) password = get_password_from_keyring(apple_id)
self.user = {"accountName": apple_id, "password": password}
self.data = {} self.data = {}
self.client_id = client_id or str(uuid1()).upper() self.params = {}
self.client_id = client_id or f"auth-{str(uuid1()).lower()}"
self.with_family = with_family self.with_family = with_family
self.user = {"apple_id": apple_id, "password": password}
self.session_data = {}
if session_directory:
self._session_directory = session_directory
else:
self._session_directory = path.join(
gettempdir(), "pyicloud-session"
)
LOGGER.debug(f"Using session file {self._get_sessiondata_path()}")
try:
with open(self._get_sessiondata_path()) as session_f:
self.session_data = json.load(session_f)
except:
LOGGER.warning("Session file does not exist")
if not path.exists(self._session_directory):
mkdir(self._session_directory)
self.password_filter = PyiCloudPasswordFilter(password) self.password_filter = PyiCloudPasswordFilter(password)
LOGGER.addFilter(self.password_filter) LOGGER.addFilter(self.password_filter)
self._base_login_url = "%s/login" % self.SETUP_ENDPOINT
if cookie_directory: if cookie_directory:
self._cookie_directory = path.expanduser(path.normpath(cookie_directory)) self._cookie_directory = path.expanduser(path.normpath(cookie_directory))
else: else:
self._cookie_directory = path.join(gettempdir(), "pyicloud") self._cookie_directory = path.join(gettempdir(), "pyicloud")
if self.session_data.get("client_id"):
self.client_id = self.session_data.get("client_id")
self.session = PyiCloudSession(self) self.session = PyiCloudSession(self)
self.session.verify = verify self.session.verify = verify
self.session.headers.update( self.session.headers.update(
{ {"Origin": self.HOME_ENDPOINT, "Referer": f"{self.HOME_ENDPOINT}/"}
"Origin": self.HOME_ENDPOINT,
"Referer": "%s/" % self.HOME_ENDPOINT,
"User-Agent": "Opera/9.52 (X11; Linux i686; U; en)",
}
) )
cookiejar_path = self._get_cookiejar_path() cookiejar_path = self._get_cookiejar_path()
self.session.cookies = cookielib.LWPCookieJar(filename=cookiejar_path) self.session.cookies = cookielib.LWPCookieJar(filename=cookiejar_path)
if path.exists(cookiejar_path): if path.exists(cookiejar_path):
try: try:
self.session.cookies.load() self.session.cookies.load(ignore_discard=True, ignore_expires=True)
LOGGER.debug("Read cookies from %s", cookiejar_path) LOGGER.debug("Read cookies from %s", cookiejar_path)
except: # pylint: disable=bare-except except: # pylint: disable=bare-except
# Most likely a pickled cookiejar from earlier versions. # Most likely a pickled cookiejar from earlier versions.
@ -200,14 +257,6 @@ class PyiCloudService(object):
# successful authentication. # successful authentication.
LOGGER.warning("Failed to read cookiejar %s", cookiejar_path) LOGGER.warning("Failed to read cookiejar %s", cookiejar_path)
self.params = {
"clientBuildNumber": "17DHotfix5",
"clientMasteringNumber": "17DHotfix5",
"ckjsBuildVersion": "17DProjectDev77",
"ckjsVersion": "2.0.5",
"clientId": self.client_id,
}
self.authenticate() self.authenticate()
self._drive = None self._drive = None
@ -216,52 +265,126 @@ class PyiCloudService(object):
def authenticate(self): def authenticate(self):
""" """
Handles authentication, and persists the X-APPLE-WEB-KB cookie so that Handles authentication, and persists cookies so that
subsequent logins will not cause additional e-mails from Apple. subsequent logins will not cause additional e-mails from Apple.
""" """
LOGGER.info("Authenticating as %s", self.user["apple_id"]) login_successful = False
if self.session_data.get("session_token"):
LOGGER.info("Checking session token validity")
try:
req = self.session.post(f"{self.SETUP_ENDPOINT}/validate", data="null")
LOGGER.info("Session token is still valid")
self.data = req.json()
login_successful = True
except:
msg = "Invalid authentication token, will log in from scratch."
data = dict(self.user) if not login_successful:
LOGGER.info("Authenticating as %s", self.user["accountName"])
# We authenticate every time, so "remember me" is not needed data = dict(self.user)
data.update({"extended_login": False})
data["rememberMe"] = False
data["trustTokens"] = []
if self.session_data.get("trust_token"):
data["trustTokens"] = [self.session_data.get("trust_token")]
headers = {
"Accept": "*/*",
"Content-Type": "application/json",
"X-Apple-OAuth-Client-Id": "d39ba9916b7251055b22c7f910e2ea796ee65e98b2ddecea8f5dde8d9d1a815d",
"X-Apple-OAuth-Client-Type": "firstPartyAuth",
"X-Apple-OAuth-Redirect-URI": "https://www.icloud.com",
"X-Apple-OAuth-Require-Grant-Code": "true",
"X-Apple-OAuth-Response-Mode": "web_message",
"X-Apple-OAuth-Response-Type": "code",
"X-Apple-OAuth-State": self.client_id,
"X-Apple-Widget-Key": "d39ba9916b7251055b22c7f910e2ea796ee65e98b2ddecea8f5dde8d9d1a815d",
}
if self.session_data.get("scnt"):
headers["scnt"] = self.session_data.get("scnt")
if self.session_data.get("session_id"):
headers["X-Apple-ID-Session-Id"] = self.session_data.get("session_id")
try:
req = self.session.post(
f"{self.AUTH_ENDPOINT}/signin",
params={"isRememberMeEnabled": "true"},
data=json.dumps(data),
headers=headers,
)
except PyiCloudAPIResponseException as error:
msg = "Invalid email/password combination."
raise PyiCloudFailedLoginException(msg, error)
self._authenticate_with_token()
self._webservices = self.data["webservices"]
LOGGER.info("Authentication completed successfully")
def _authenticate_with_token(self):
"""Authenticate using session token."""
data = {
"accountCountryCode": self.session_data.get("account_country"),
"dsWebAuthToken": self.session_data.get("session_token"),
"extended_login": False,
"trustToken": self.session_data.get("trust_token", ""),
}
try: try:
req = self.session.post( req = self.session.post(
self._base_login_url, params=self.params, data=json.dumps(data) f"{self.SETUP_ENDPOINT}/accountLogin", data=json.dumps(data)
) )
except PyiCloudAPIResponseException as error: except PyiCloudAPIResponseException as error:
msg = "Invalid email/password combination." msg = "Invalid authentication token."
raise PyiCloudFailedLoginException(msg, error) raise PyiCloudFailedLoginException(msg, error)
self.data = req.json() self.data = req.json()
self.params.update({"dsid": self.data["dsInfo"]["dsid"]})
self._webservices = self.data["webservices"]
if not path.exists(self._cookie_directory):
mkdir(self._cookie_directory)
self.session.cookies.save()
LOGGER.debug("Cookies saved to %s", self._get_cookiejar_path())
LOGGER.info("Authentication completed successfully")
LOGGER.debug(self.params)
def _get_cookiejar_path(self): def _get_cookiejar_path(self):
"""Get path for cookiejar file.""" """Get path for cookiejar file."""
return path.join( return path.join(
self._cookie_directory, self._cookie_directory,
"".join([c for c in self.user.get("apple_id") if match(r"\w", c)]), "".join([c for c in self.user.get("accountName") if match(r"\w", c)]),
)
def _get_sessiondata_path(self):
"""Get path for session data file."""
return path.join(
self._session_directory,
"".join([c for c in self.user.get("accountName") if match(r"\w", c)]),
) )
@property @property
def requires_2sa(self): def requires_2sa(self):
"""Returns True if two-step authentication is required.""" """Returns True if two-step authentication is required."""
return ( return (
self.data.get("hsaChallengeRequired", False) self.data["dsInfo"].get("hsaVersion", 0) >= 1
and self.data["dsInfo"].get("hsaVersion", 0) >= 1 and (
self.data.get("hsaChallengeRequired", False)
or not self.is_trusted_session
)
) )
# FIXME: Implement 2FA for hsaVersion == 2 # pylint: disable=fixme
@property
def requires_2fa(self):
"""Returns True if two-factor authentication is required."""
return (
self.data["dsInfo"].get("hsaVersion", 0) == 2
and (
self.data.get("hsaChallengeRequired", False)
or not self.is_trusted_session
)
)
@property
def is_trusted_session(self):
"""Returns True if the session is trusted."""
return self.data.get("hsaTrustedBrowser", False)
@property @property
def trusted_devices(self): def trusted_devices(self):
@ -304,6 +427,76 @@ class PyiCloudService(object):
return not self.requires_2sa return not self.requires_2sa
def validate_2fa_code(self, code):
"""Verifies a verification code received via Apple's 2FA system (HSA2)."""
data = {"securityCode": {"code": code}}
headers = {
"Accept": "application/json",
"Content-Type": "application/json",
"X-Apple-OAuth-Client-Id": "d39ba9916b7251055b22c7f910e2ea796ee65e98b2ddecea8f5dde8d9d1a815d",
"X-Apple-OAuth-Client-Type": "firstPartyAuth",
"X-Apple-OAuth-Redirect-URI": "https://www.icloud.com",
"X-Apple-OAuth-Require-Grant-Code": "true",
"X-Apple-OAuth-Response-Mode": "web_message",
"X-Apple-OAuth-Response-Type": "code",
"X-Apple-OAuth-State": self.client_id,
"X-Apple-Widget-Key": "d39ba9916b7251055b22c7f910e2ea796ee65e98b2ddecea8f5dde8d9d1a815d",
}
if self.session_data.get("scnt"):
headers["scnt"] = self.session_data.get("scnt")
if self.session_data.get("session_id"):
headers["X-Apple-ID-Session-Id"] = self.session_data.get("session_id")
try:
req = self.session.post(
f"{self.AUTH_ENDPOINT}/verify/trusteddevice/securitycode",
data=json.dumps(data),
headers=headers,
)
except PyiCloudAPIResponseException as error:
LOGGER.error("Code verification failed.")
return False
LOGGER.debug("Code verification successful.")
self.trust_session()
return not self.requires_2sa
def trust_session(self):
"""Request session trust to avoid user log in going forward."""
headers = {
"Accept": "*/*",
"X-Apple-OAuth-Client-Id": "d39ba9916b7251055b22c7f910e2ea796ee65e98b2ddecea8f5dde8d9d1a815d",
"X-Apple-OAuth-Client-Type": "firstPartyAuth",
"X-Apple-OAuth-Redirect-URI": "https://www.icloud.com",
"X-Apple-OAuth-Require-Grant-Code": "true",
"X-Apple-OAuth-Response-Mode": "web_message",
"X-Apple-OAuth-Response-Type": "code",
"X-Apple-OAuth-State": self.client_id,
"X-Apple-Widget-Key": "d39ba9916b7251055b22c7f910e2ea796ee65e98b2ddecea8f5dde8d9d1a815d",
}
if self.session_data.get("scnt"):
headers["scnt"] = self.session_data.get("scnt")
if self.session_data.get("session_id"):
headers["X-Apple-ID-Session-Id"] = self.session_data.get("session_id")
try:
req = self.session.get(
f"{self.AUTH_ENDPOINT}/2sv/trust",
headers=headers,
)
self._authenticate_with_token()
return True
except PyiCloudAPIResponseException as error:
LOGGER.error("Session trust failed.")
return False
def _get_webservice_url(self, ws_key): def _get_webservice_url(self, ws_key):
"""Get webservice URL, raise an exception if not exists.""" """Get webservice URL, raise an exception if not exists."""
if self._webservices.get(ws_key) is None: if self._webservices.get(ws_key) is None:
@ -378,7 +571,7 @@ class PyiCloudService(object):
return self._drive return self._drive
def __unicode__(self): def __unicode__(self):
return "iCloud API: %s" % self.user.get("apple_id") return "iCloud API: %s" % self.user.get("accountName")
def __str__(self): def __str__(self):
as_unicode = self.__unicode__() as_unicode = self.__unicode__()