diff --git a/pyicloud/base.py b/pyicloud/base.py index 510bc2f..3f907d9 100644 --- a/pyicloud/base.py +++ b/pyicloud/base.py @@ -63,7 +63,12 @@ class PyiCloudSession(Session): if self.service.password_filter not in request_logger.filters: request_logger.addFilter(self.service.password_filter) - request_logger.debug("%s %s %s", method, url, kwargs.get("data", "")) + request_logger.debug( + "%s %s %s", + method, + url, + kwargs.get("data", ""), + ) has_retried = kwargs.get("retried") kwargs.pop("retried", None) @@ -72,6 +77,40 @@ class PyiCloudSession(Session): content_type = response.headers.get("Content-Type", "").split(";")[0] json_mimetypes = ["application/json", "text/json"] + if response.headers.get("X-Apple-ID-Session-Id"): + self.service.session_data["session_id"] = response.headers.get( + "X-Apple-ID-Session-Id" + ) + + if response.headers.get("X-Apple-Session-Token"): + self.service.session_data["session_token"] = response.headers.get( + "X-Apple-Session-Token" + ) + + if response.headers.get("X-Apple-ID-Account-Country"): + self.service.session_data["account_country"] = response.headers.get( + "X-Apple-ID-Account-Country" + ) + + if response.headers.get("scnt"): + self.service.session_data["scnt"] = response.headers.get("scnt") + + if response.headers.get("X-Apple-TwoSV-Trust-Token"): + self.service.session_data["trust_token"] = response.headers.get( + "X-Apple-TwoSV-Trust-Token" + ) + + # Save session_data to file + with open(self.service._get_sessiondata_path(), "w") as outfile: + json.dump(self.service.session_data, outfile) + LOGGER.debug("Saved session data to file") + + # Save cookies to file + if not path.exists(self.service._cookie_directory): + mkdir(self.service._cookie_directory) + self.cookies.save(ignore_discard=True, ignore_expires=True) + LOGGER.debug("Cookies saved to %s", self.service._get_cookiejar_path()) + if not response.ok and content_type not in json_mimetypes: if has_retried is None and response.status_code == 450: api_error = PyiCloudAPIResponseException( @@ -106,8 +145,8 @@ class PyiCloudSession(Session): if not code and data.get("serverErrorCode"): code = data.get("serverErrorCode") - if reason: - self._raise_error(code, reason) + if reason: + self._raise_error(code, reason) return response @@ -148,6 +187,7 @@ class PyiCloudService(object): pyicloud.iphone.location() """ + AUTH_ENDPOINT = "https://idmsa.apple.com/appleauth/auth" HOME_ENDPOINT = "https://www.icloud.com" SETUP_ENDPOINT = "https://setup.icloud.com/setup/ws/1" @@ -156,6 +196,7 @@ class PyiCloudService(object): apple_id, password=None, cookie_directory=None, + session_directory=None, verify=True, client_id=None, with_family=True, @@ -163,36 +204,52 @@ class PyiCloudService(object): if password is None: password = get_password_from_keyring(apple_id) + self.user = {"accountName": apple_id, "password": password} self.data = {} - self.client_id = client_id or str(uuid1()).upper() + self.params = {} + self.client_id = client_id or f"auth-{str(uuid1()).lower()}" self.with_family = with_family - self.user = {"apple_id": apple_id, "password": password} + + self.session_data = {} + if session_directory: + self._session_directory = session_directory + else: + self._session_directory = path.join( + gettempdir(), "pyicloud-session" + ) + LOGGER.debug(f"Using session file {self._get_sessiondata_path()}") + + try: + with open(self._get_sessiondata_path()) as session_f: + self.session_data = json.load(session_f) + except: + LOGGER.warning("Session file does not exist") + + if not path.exists(self._session_directory): + mkdir(self._session_directory) self.password_filter = PyiCloudPasswordFilter(password) LOGGER.addFilter(self.password_filter) - self._base_login_url = "%s/login" % self.SETUP_ENDPOINT - if cookie_directory: self._cookie_directory = path.expanduser(path.normpath(cookie_directory)) else: self._cookie_directory = path.join(gettempdir(), "pyicloud") + if self.session_data.get("client_id"): + self.client_id = self.session_data.get("client_id") + self.session = PyiCloudSession(self) self.session.verify = verify self.session.headers.update( - { - "Origin": self.HOME_ENDPOINT, - "Referer": "%s/" % self.HOME_ENDPOINT, - "User-Agent": "Opera/9.52 (X11; Linux i686; U; en)", - } + {"Origin": self.HOME_ENDPOINT, "Referer": f"{self.HOME_ENDPOINT}/"} ) cookiejar_path = self._get_cookiejar_path() self.session.cookies = cookielib.LWPCookieJar(filename=cookiejar_path) if path.exists(cookiejar_path): try: - self.session.cookies.load() + self.session.cookies.load(ignore_discard=True, ignore_expires=True) LOGGER.debug("Read cookies from %s", cookiejar_path) except: # pylint: disable=bare-except # Most likely a pickled cookiejar from earlier versions. @@ -200,14 +257,6 @@ class PyiCloudService(object): # successful authentication. LOGGER.warning("Failed to read cookiejar %s", cookiejar_path) - self.params = { - "clientBuildNumber": "17DHotfix5", - "clientMasteringNumber": "17DHotfix5", - "ckjsBuildVersion": "17DProjectDev77", - "ckjsVersion": "2.0.5", - "clientId": self.client_id, - } - self.authenticate() self._drive = None @@ -216,52 +265,126 @@ class PyiCloudService(object): def authenticate(self): """ - Handles authentication, and persists the X-APPLE-WEB-KB cookie so that + Handles authentication, and persists cookies so that subsequent logins will not cause additional e-mails from Apple. """ - LOGGER.info("Authenticating as %s", self.user["apple_id"]) + login_successful = False + if self.session_data.get("session_token"): + LOGGER.info("Checking session token validity") + try: + req = self.session.post(f"{self.SETUP_ENDPOINT}/validate", data="null") + LOGGER.info("Session token is still valid") + self.data = req.json() + login_successful = True + except: + msg = "Invalid authentication token, will log in from scratch." - data = dict(self.user) + if not login_successful: + LOGGER.info("Authenticating as %s", self.user["accountName"]) - # We authenticate every time, so "remember me" is not needed - data.update({"extended_login": False}) + data = dict(self.user) + + data["rememberMe"] = False + data["trustTokens"] = [] + if self.session_data.get("trust_token"): + data["trustTokens"] = [self.session_data.get("trust_token")] + + headers = { + "Accept": "*/*", + "Content-Type": "application/json", + "X-Apple-OAuth-Client-Id": "d39ba9916b7251055b22c7f910e2ea796ee65e98b2ddecea8f5dde8d9d1a815d", + "X-Apple-OAuth-Client-Type": "firstPartyAuth", + "X-Apple-OAuth-Redirect-URI": "https://www.icloud.com", + "X-Apple-OAuth-Require-Grant-Code": "true", + "X-Apple-OAuth-Response-Mode": "web_message", + "X-Apple-OAuth-Response-Type": "code", + "X-Apple-OAuth-State": self.client_id, + "X-Apple-Widget-Key": "d39ba9916b7251055b22c7f910e2ea796ee65e98b2ddecea8f5dde8d9d1a815d", + } + + if self.session_data.get("scnt"): + headers["scnt"] = self.session_data.get("scnt") + + if self.session_data.get("session_id"): + headers["X-Apple-ID-Session-Id"] = self.session_data.get("session_id") + + try: + req = self.session.post( + f"{self.AUTH_ENDPOINT}/signin", + params={"isRememberMeEnabled": "true"}, + data=json.dumps(data), + headers=headers, + ) + except PyiCloudAPIResponseException as error: + msg = "Invalid email/password combination." + raise PyiCloudFailedLoginException(msg, error) + + self._authenticate_with_token() + + self._webservices = self.data["webservices"] + + LOGGER.info("Authentication completed successfully") + + def _authenticate_with_token(self): + """Authenticate using session token.""" + data = { + "accountCountryCode": self.session_data.get("account_country"), + "dsWebAuthToken": self.session_data.get("session_token"), + "extended_login": False, + "trustToken": self.session_data.get("trust_token", ""), + } try: req = self.session.post( - self._base_login_url, params=self.params, data=json.dumps(data) + f"{self.SETUP_ENDPOINT}/accountLogin", data=json.dumps(data) ) except PyiCloudAPIResponseException as error: - msg = "Invalid email/password combination." + msg = "Invalid authentication token." raise PyiCloudFailedLoginException(msg, error) self.data = req.json() - self.params.update({"dsid": self.data["dsInfo"]["dsid"]}) - self._webservices = self.data["webservices"] - - if not path.exists(self._cookie_directory): - mkdir(self._cookie_directory) - self.session.cookies.save() - LOGGER.debug("Cookies saved to %s", self._get_cookiejar_path()) - - LOGGER.info("Authentication completed successfully") - LOGGER.debug(self.params) - + def _get_cookiejar_path(self): """Get path for cookiejar file.""" return path.join( self._cookie_directory, - "".join([c for c in self.user.get("apple_id") if match(r"\w", c)]), + "".join([c for c in self.user.get("accountName") if match(r"\w", c)]), + ) + + def _get_sessiondata_path(self): + """Get path for session data file.""" + return path.join( + self._session_directory, + "".join([c for c in self.user.get("accountName") if match(r"\w", c)]), ) @property def requires_2sa(self): """Returns True if two-step authentication is required.""" return ( - self.data.get("hsaChallengeRequired", False) - and self.data["dsInfo"].get("hsaVersion", 0) >= 1 + self.data["dsInfo"].get("hsaVersion", 0) >= 1 + and ( + self.data.get("hsaChallengeRequired", False) + or not self.is_trusted_session + ) ) - # FIXME: Implement 2FA for hsaVersion == 2 # pylint: disable=fixme + + @property + def requires_2fa(self): + """Returns True if two-factor authentication is required.""" + return ( + self.data["dsInfo"].get("hsaVersion", 0) == 2 + and ( + self.data.get("hsaChallengeRequired", False) + or not self.is_trusted_session + ) + ) + + @property + def is_trusted_session(self): + """Returns True if the session is trusted.""" + return self.data.get("hsaTrustedBrowser", False) @property def trusted_devices(self): @@ -304,6 +427,76 @@ class PyiCloudService(object): return not self.requires_2sa + def validate_2fa_code(self, code): + """Verifies a verification code received via Apple's 2FA system (HSA2).""" + data = {"securityCode": {"code": code}} + + headers = { + "Accept": "application/json", + "Content-Type": "application/json", + "X-Apple-OAuth-Client-Id": "d39ba9916b7251055b22c7f910e2ea796ee65e98b2ddecea8f5dde8d9d1a815d", + "X-Apple-OAuth-Client-Type": "firstPartyAuth", + "X-Apple-OAuth-Redirect-URI": "https://www.icloud.com", + "X-Apple-OAuth-Require-Grant-Code": "true", + "X-Apple-OAuth-Response-Mode": "web_message", + "X-Apple-OAuth-Response-Type": "code", + "X-Apple-OAuth-State": self.client_id, + "X-Apple-Widget-Key": "d39ba9916b7251055b22c7f910e2ea796ee65e98b2ddecea8f5dde8d9d1a815d", + } + + if self.session_data.get("scnt"): + headers["scnt"] = self.session_data.get("scnt") + + if self.session_data.get("session_id"): + headers["X-Apple-ID-Session-Id"] = self.session_data.get("session_id") + + try: + req = self.session.post( + f"{self.AUTH_ENDPOINT}/verify/trusteddevice/securitycode", + data=json.dumps(data), + headers=headers, + ) + except PyiCloudAPIResponseException as error: + LOGGER.error("Code verification failed.") + return False + + LOGGER.debug("Code verification successful.") + + self.trust_session() + + return not self.requires_2sa + + def trust_session(self): + """Request session trust to avoid user log in going forward.""" + headers = { + "Accept": "*/*", + "X-Apple-OAuth-Client-Id": "d39ba9916b7251055b22c7f910e2ea796ee65e98b2ddecea8f5dde8d9d1a815d", + "X-Apple-OAuth-Client-Type": "firstPartyAuth", + "X-Apple-OAuth-Redirect-URI": "https://www.icloud.com", + "X-Apple-OAuth-Require-Grant-Code": "true", + "X-Apple-OAuth-Response-Mode": "web_message", + "X-Apple-OAuth-Response-Type": "code", + "X-Apple-OAuth-State": self.client_id, + "X-Apple-Widget-Key": "d39ba9916b7251055b22c7f910e2ea796ee65e98b2ddecea8f5dde8d9d1a815d", + } + + if self.session_data.get("scnt"): + headers["scnt"] = self.session_data.get("scnt") + + if self.session_data.get("session_id"): + headers["X-Apple-ID-Session-Id"] = self.session_data.get("session_id") + + try: + req = self.session.get( + f"{self.AUTH_ENDPOINT}/2sv/trust", + headers=headers, + ) + self._authenticate_with_token() + return True + except PyiCloudAPIResponseException as error: + LOGGER.error("Session trust failed.") + return False + def _get_webservice_url(self, ws_key): """Get webservice URL, raise an exception if not exists.""" if self._webservices.get(ws_key) is None: @@ -378,7 +571,7 @@ class PyiCloudService(object): return self._drive def __unicode__(self): - return "iCloud API: %s" % self.user.get("apple_id") + return "iCloud API: %s" % self.user.get("accountName") def __str__(self): as_unicode = self.__unicode__()