diff --git a/pyicloud/base.py b/pyicloud/base.py index 510bc2f..5081b96 100644 --- a/pyicloud/base.py +++ b/pyicloud/base.py @@ -31,6 +31,14 @@ from pyicloud.utils import get_password_from_keyring LOGGER = logging.getLogger(__name__) +HEADER_DATA = { + "X-Apple-ID-Account-Country": "account_country", + "X-Apple-ID-Session-Id": "session_id", + "X-Apple-Session-Token": "session_token", + "X-Apple-TwoSV-Trust-Token": "trust_token", + "scnt": "scnt", +} + class PyiCloudPasswordFilter(logging.Filter): """Password log hider.""" @@ -63,7 +71,13 @@ class PyiCloudSession(Session): if self.service.password_filter not in request_logger.filters: request_logger.addFilter(self.service.password_filter) - request_logger.debug("%s %s %s", method, url, kwargs.get("data", "")) + request_logger.debug( + "%s %s %s" % ( + method, + url, + kwargs.get("data", "") + ) + ) has_retried = kwargs.get("retried") kwargs.pop("retried", None) @@ -72,14 +86,46 @@ class PyiCloudSession(Session): content_type = response.headers.get("Content-Type", "").split(";")[0] json_mimetypes = ["application/json", "text/json"] - if not response.ok and content_type not in json_mimetypes: - if has_retried is None and response.status_code == 450: + for header in HEADER_DATA: + if response.headers.get(header): + session_arg = HEADER_DATA[header] + self.service.session_data.update( + {session_arg: response.headers.get(header)} + ) + + # Save session_data to file + with open(self.service.session_path, "w") as outfile: + json.dump(self.service.session_data, outfile) + LOGGER.debug("Saved session data to file") + + # Save cookies to file + self.cookies.save(ignore_discard=True, ignore_expires=True) + LOGGER.debug("Cookies saved to %s", self.service.cookiejar_path) + + if not response.ok and (content_type not in json_mimetypes + or response.status_code in [421, 450, 500]): + try: + fmip_url = self.service._get_webservice_url("findme") + if has_retried is None and response.status_code == 450 and fmip_url in url: + # Handle re-authentication for Find My iPhone + LOGGER.debug("Re-authenticating Find My iPhone service") + try: + self.service.authenticate(True, "find") + except PyiCloudAPIResponseException: + LOGGER.debug("Re-authentication failed") + kwargs["retried"] = True + return self.request(method, url, **kwargs) + except Exception: + pass + + if has_retried is None and response.status_code in [421, 450, 500]: api_error = PyiCloudAPIResponseException( response.reason, response.status_code, retry=True ) request_logger.debug(api_error) kwargs["retried"] = True return self.request(method, url, **kwargs) + self._raise_error(response.status_code, response.reason) if content_type not in json_mimetypes: @@ -106,8 +152,8 @@ class PyiCloudSession(Session): if not code and data.get("serverErrorCode"): code = data.get("serverErrorCode") - if reason: - self._raise_error(code, reason) + if reason: + self._raise_error(code, reason) return response @@ -131,6 +177,8 @@ class PyiCloudSession(Session): reason + ". Please wait a few minutes then try again." "The remote servers might be trying to throttle requests." ) + if code in [421, 450, 500]: + reason = "Authentication required for Account." api_error = PyiCloudAPIResponseException(reason, code) LOGGER.error(api_error) @@ -148,6 +196,7 @@ class PyiCloudService(object): pyicloud.iphone.location() """ + AUTH_ENDPOINT = "https://idmsa.apple.com/appleauth/auth" HOME_ENDPOINT = "https://www.icloud.com" SETUP_ENDPOINT = "https://setup.icloud.com/setup/ws/1" @@ -156,6 +205,7 @@ class PyiCloudService(object): apple_id, password=None, cookie_directory=None, + session_directory=None, verify=True, client_id=None, with_family=True, @@ -163,50 +213,61 @@ class PyiCloudService(object): if password is None: password = get_password_from_keyring(apple_id) + self.user = {"accountName": apple_id, "password": password} self.data = {} - self.client_id = client_id or str(uuid1()).upper() + self.params = {} + self.client_id = client_id or ("auth-%s" % str(uuid1()).lower()) self.with_family = with_family - self.user = {"apple_id": apple_id, "password": password} + + self.session_data = {} + if session_directory: + self._session_directory = session_directory + else: + self._session_directory = path.join(gettempdir(), "pyicloud-session") + LOGGER.debug("Using session file %s" % self.session_path) + + try: + with open(self.session_path) as session_f: + self.session_data = json.load(session_f) + except: # pylint: disable=bare-except + LOGGER.info("Session file does not exist") + + if not path.exists(self._session_directory): + mkdir(self._session_directory) self.password_filter = PyiCloudPasswordFilter(password) LOGGER.addFilter(self.password_filter) - self._base_login_url = "%s/login" % self.SETUP_ENDPOINT - if cookie_directory: self._cookie_directory = path.expanduser(path.normpath(cookie_directory)) else: self._cookie_directory = path.join(gettempdir(), "pyicloud") + if not path.exists(self._cookie_directory): + mkdir(self._cookie_directory) + + if self.session_data.get("client_id"): + self.client_id = self.session_data.get("client_id") + else: + self.session_data.update({"client_id": self.client_id}) + self.session = PyiCloudSession(self) self.session.verify = verify self.session.headers.update( - { - "Origin": self.HOME_ENDPOINT, - "Referer": "%s/" % self.HOME_ENDPOINT, - "User-Agent": "Opera/9.52 (X11; Linux i686; U; en)", - } + {"Origin": self.HOME_ENDPOINT, "Referer": "%s/" % self.HOME_ENDPOINT} ) - cookiejar_path = self._get_cookiejar_path() + cookiejar_path = self.cookiejar_path self.session.cookies = cookielib.LWPCookieJar(filename=cookiejar_path) if path.exists(cookiejar_path): try: - self.session.cookies.load() - LOGGER.debug("Read cookies from %s", cookiejar_path) + self.session.cookies.load(ignore_discard=True, ignore_expires=True) + LOGGER.debug("Read cookies from %s" % cookiejar_path) except: # pylint: disable=bare-except # Most likely a pickled cookiejar from earlier versions. # The cookiejar will get replaced with a valid one after # successful authentication. - LOGGER.warning("Failed to read cookiejar %s", cookiejar_path) - - self.params = { - "clientBuildNumber": "17DHotfix5", - "clientMasteringNumber": "17DHotfix5", - "ckjsBuildVersion": "17DProjectDev77", - "ckjsVersion": "2.0.5", - "clientId": self.client_id, - } + LOGGER.warning("Failed to read cookiejar %s" % cookiejar_path) self.authenticate() @@ -214,54 +275,164 @@ class PyiCloudService(object): self._files = None self._photos = None - def authenticate(self): + def authenticate(self, force_refresh=False, service=None): """ - Handles authentication, and persists the X-APPLE-WEB-KB cookie so that + Handles authentication, and persists cookies so that subsequent logins will not cause additional e-mails from Apple. """ - LOGGER.info("Authenticating as %s", self.user["apple_id"]) + login_successful = False + if self.session_data.get("session_token") and not force_refresh: + LOGGER.debug("Checking session token validity") + try: + self.data = self._validate_token() + login_successful = True + except PyiCloudAPIResponseException: + LOGGER.debug("Invalid authentication token, will log in from scratch.") - data = dict(self.user) + if not login_successful and service != None: + app = self.data["apps"][service] + if "canLaunchWithOneFactor" in app and app["canLaunchWithOneFactor"] == True: + LOGGER.debug("Authenticating as %s for %s" % (self.user["accountName"], service)) + try: + self._authenticate_with_credentials_service(service) + login_successful = True + except: + LOGGER.debug("Could not log into service. Attempting brand new login.") - # We authenticate every time, so "remember me" is not needed - data.update({"extended_login": False}) + if not login_successful: + LOGGER.debug("Authenticating as %s" % self.user["accountName"]) + + data = dict(self.user) + + data["rememberMe"] = True + data["trustTokens"] = [] + if self.session_data.get("trust_token"): + data["trustTokens"] = [self.session_data.get("trust_token")] + + headers = self._get_auth_headers() + + if self.session_data.get("scnt"): + headers["scnt"] = self.session_data.get("scnt") + + if self.session_data.get("session_id"): + headers["X-Apple-ID-Session-Id"] = self.session_data.get("session_id") + + try: + req = self.session.post( + "%s/signin" % self.AUTH_ENDPOINT, + params={"isRememberMeEnabled": "true"}, + data=json.dumps(data), + headers=headers, + ) + except PyiCloudAPIResponseException as error: + msg = "Invalid email/password combination." + raise PyiCloudFailedLoginException(msg, error) + + self._authenticate_with_token() + + self._webservices = self.data["webservices"] + + LOGGER.debug("Authentication completed successfully") + + def _authenticate_with_token(self): + """Authenticate using session token.""" + data = { + "accountCountryCode": self.session_data.get("account_country"), + "dsWebAuthToken": self.session_data.get("session_token"), + "extended_login": True, + "trustToken": self.session_data.get("trust_token", ""), + } try: req = self.session.post( - self._base_login_url, params=self.params, data=json.dumps(data) + "%s/accountLogin" % self.SETUP_ENDPOINT, data=json.dumps(data) ) + self.data = req.json() + except PyiCloudAPIResponseException as error: + msg = "Invalid authentication token." + raise PyiCloudFailedLoginException(msg, error) + + def _authenticate_with_credentials_service(self, service): + """Authenticate to a specific service using credentials.""" + data = { + "appName": service, + "apple_id": self.user["accountName"], + "password": self.user["password"] + } + + try: + self.session.post( + "%s/accountLogin" % self.SETUP_ENDPOINT, data=json.dumps(data) + ) + + self.data = self._validate_token() except PyiCloudAPIResponseException as error: msg = "Invalid email/password combination." raise PyiCloudFailedLoginException(msg, error) - self.data = req.json() - self.params.update({"dsid": self.data["dsInfo"]["dsid"]}) - self._webservices = self.data["webservices"] + def _validate_token(self): + """Checks if the current access token is still valid.""" + LOGGER.debug("Checking session token validity") + try: + req = self.session.post("%s/validate" % self.SETUP_ENDPOINT, data="null") + LOGGER.debug("Session token is still valid") + return req.json() + except PyiCloudAPIResponseException as err: + LOGGER.debug("Invalid authentication token") + raise err - if not path.exists(self._cookie_directory): - mkdir(self._cookie_directory) - self.session.cookies.save() - LOGGER.debug("Cookies saved to %s", self._get_cookiejar_path()) + def _get_auth_headers(self, overrides=None): + headers = { + "Accept": "*/*", + "Content-Type": "application/json", + "X-Apple-OAuth-Client-Id": "d39ba9916b7251055b22c7f910e2ea796ee65e98b2ddecea8f5dde8d9d1a815d", + "X-Apple-OAuth-Client-Type": "firstPartyAuth", + "X-Apple-OAuth-Redirect-URI": "https://www.icloud.com", + "X-Apple-OAuth-Require-Grant-Code": "true", + "X-Apple-OAuth-Response-Mode": "web_message", + "X-Apple-OAuth-Response-Type": "code", + "X-Apple-OAuth-State": self.client_id, + "X-Apple-Widget-Key": "d39ba9916b7251055b22c7f910e2ea796ee65e98b2ddecea8f5dde8d9d1a815d", + } + if overrides: + headers.update(overrides) + return headers - LOGGER.info("Authentication completed successfully") - LOGGER.debug(self.params) - - def _get_cookiejar_path(self): + @property + def cookiejar_path(self): """Get path for cookiejar file.""" return path.join( self._cookie_directory, - "".join([c for c in self.user.get("apple_id") if match(r"\w", c)]), + "".join([c for c in self.user.get("accountName") if match(r"\w", c)]), + ) + + @property + def session_path(self): + """Get path for session data file.""" + return path.join( + self._session_directory, + "".join([c for c in self.user.get("accountName") if match(r"\w", c)]), ) @property def requires_2sa(self): """Returns True if two-step authentication is required.""" - return ( - self.data.get("hsaChallengeRequired", False) - and self.data["dsInfo"].get("hsaVersion", 0) >= 1 + return self.data.get("dsInfo", {}).get("hsaVersion", 0) >= 1 and ( + self.data.get("hsaChallengeRequired", False) or not self.is_trusted_session ) - # FIXME: Implement 2FA for hsaVersion == 2 # pylint: disable=fixme + + @property + def requires_2fa(self): + """Returns True if two-factor authentication is required.""" + return self.data["dsInfo"].get("hsaVersion", 0) == 2 and ( + self.data.get("hsaChallengeRequired", False) or not self.is_trusted_session + ) + + @property + def is_trusted_session(self): + """Returns True if the session is trusted.""" + return self.data.get("hsaTrustedBrowser", False) @property def trusted_devices(self): @@ -298,12 +469,61 @@ class PyiCloudService(object): return False raise - # Re-authenticate, which will both update the HSA data, and - # ensure that we save the X-APPLE-WEBAUTH-HSA-TRUST cookie. - self.authenticate() + self.trust_session() return not self.requires_2sa + def validate_2fa_code(self, code): + """Verifies a verification code received via Apple's 2FA system (HSA2).""" + data = {"securityCode": {"code": code}} + + headers = self._get_auth_headers({"Accept": "application/json"}) + + if self.session_data.get("scnt"): + headers["scnt"] = self.session_data.get("scnt") + + if self.session_data.get("session_id"): + headers["X-Apple-ID-Session-Id"] = self.session_data.get("session_id") + + try: + self.session.post( + "%s/verify/trusteddevice/securitycode" % self.AUTH_ENDPOINT, + data=json.dumps(data), + headers=headers, + ) + except PyiCloudAPIResponseException as error: + if error.code == -21669: + # Wrong verification code + LOGGER.error("Code verification failed.") + return False + raise + + LOGGER.debug("Code verification successful.") + + self.trust_session() + return not self.requires_2sa + + def trust_session(self): + """Request session trust to avoid user log in going forward.""" + headers = self._get_auth_headers() + + if self.session_data.get("scnt"): + headers["scnt"] = self.session_data.get("scnt") + + if self.session_data.get("session_id"): + headers["X-Apple-ID-Session-Id"] = self.session_data.get("session_id") + + try: + self.session.get( + "%s/2sv/trust" % self.AUTH_ENDPOINT, + headers=headers, + ) + self._authenticate_with_token() + return True + except PyiCloudAPIResponseException: + LOGGER.error("Session trust failed.") + return False + def _get_webservice_url(self, ws_key): """Get webservice URL, raise an exception if not exists.""" if self._webservices.get(ws_key) is None: @@ -378,7 +598,7 @@ class PyiCloudService(object): return self._drive def __unicode__(self): - return "iCloud API: %s" % self.user.get("apple_id") + return "iCloud API: %s" % self.user.get("accountName") def __str__(self): as_unicode = self.__unicode__() diff --git a/pyicloud/cmdline.py b/pyicloud/cmdline.py index a6bbcf7..8137cda 100644 --- a/pyicloud/cmdline.py +++ b/pyicloud/cmdline.py @@ -201,7 +201,22 @@ def main(args=None): ): utils.store_password_in_keyring(username, password) - if api.requires_2sa: + if api.requires_2fa: + # fmt: off + print( + "\nTwo-step authentication required.", + "\nPlease enter validation code" + ) + # fmt: on + + code = input("(string) --> ") + if not api.validate_2fa_code(code): + print("Failed to verify verification code") + sys.exit(1) + + print("") + + elif api.requires_2sa: # fmt: off print( "\nTwo-step authentication required.", diff --git a/tests/__init__.py b/tests/__init__.py index 828a5dd..d36a28d 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -9,13 +9,19 @@ from pyicloud.services.findmyiphone import FindMyiPhoneServiceManager, AppleDevi from .const import ( AUTHENTICATED_USER, - REQUIRES_2SA_USER, + REQUIRES_2FA_USER, + REQUIRES_2FA_TOKEN, + VALID_TOKEN, VALID_USERS, VALID_PASSWORD, + VALID_COOKIE, + VALID_2FA_CODE, + VALID_TOKENS, ) from .const_login import ( + AUTH_OK, LOGIN_WORKING, - LOGIN_2SA, + LOGIN_2FA, TRUSTED_DEVICES, TRUSTED_DEVICE_1, VERIFICATION_CODE_OK, @@ -41,6 +47,7 @@ class ResponseMock(Response): self.result = result self.status_code = status_code self.raw = kwargs.get("raw") + self.headers = kwargs.get("headers", {}) @property def text(self): @@ -52,21 +59,16 @@ class PyiCloudSessionMock(base.PyiCloudSession): def request(self, method, url, **kwargs): params = kwargs.get("params") + headers = kwargs.get("headers") data = json.loads(kwargs.get("data", "{}")) # Login if self.service.SETUP_ENDPOINT in url: - if "login" in url and method == "POST": - if ( - data.get("apple_id") not in VALID_USERS - or data.get("password") != VALID_PASSWORD - ): + if "accountLogin" in url and method == "POST": + if data.get("dsWebAuthToken") not in VALID_TOKENS: self._raise_error(None, "Unknown reason") - if ( - data.get("apple_id") == REQUIRES_2SA_USER - and data.get("password") == VALID_PASSWORD - ): - return ResponseMock(LOGIN_2SA) + if data.get("dsWebAuthToken") == REQUIRES_2FA_TOKEN: + return ResponseMock(LOGIN_2FA) return ResponseMock(LOGIN_WORKING) if "listDevices" in url and method == "GET": @@ -84,6 +86,35 @@ class PyiCloudSessionMock(base.PyiCloudSession): return ResponseMock(VERIFICATION_CODE_OK) self._raise_error(None, "FOUND_CODE") + if "validate" in url and method == "POST": + if headers.get("X-APPLE-WEBAUTH-TOKEN") == VALID_COOKIE: + return ResponseMock(LOGIN_WORKING) + self._raise_error(None, "Session expired") + + if self.service.AUTH_ENDPOINT in url: + if "signin" in url and method == "POST": + if ( + data.get("accountName") not in VALID_USERS + or data.get("password") != VALID_PASSWORD + ): + self._raise_error(None, "Unknown reason") + if data.get("accountName") == REQUIRES_2FA_USER: + self.service.session_data["session_token"] = REQUIRES_2FA_TOKEN + return ResponseMock(AUTH_OK) + + self.service.session_data["session_token"] = VALID_TOKEN + return ResponseMock(AUTH_OK) + + if "securitycode" in url and method == "POST": + if data.get("securityCode", {}).get("code") != VALID_2FA_CODE: + self._raise_error(None, "Incorrect code") + + self.service.session_data["session_token"] = VALID_TOKEN + return ResponseMock("", status_code=204) + + if "trust" in url and method == "GET": + return ResponseMock("", status_code=204) + # Account if "device/getDevices" in url and method == "GET": return ResponseMock(ACCOUNT_DEVICES_WORKING) diff --git a/tests/const.py b/tests/const.py index 3b69842..1dbd27c 100644 --- a/tests/const.py +++ b/tests/const.py @@ -4,8 +4,13 @@ from .const_account_family import PRIMARY_EMAIL, APPLE_ID_EMAIL, ICLOUD_ID_EMAIL # Base AUTHENTICATED_USER = PRIMARY_EMAIL -REQUIRES_2SA_USER = "requires_2sa_user" -VALID_USERS = [AUTHENTICATED_USER, REQUIRES_2SA_USER, APPLE_ID_EMAIL, ICLOUD_ID_EMAIL] +REQUIRES_2FA_TOKEN = "requires_2fa_token" +REQUIRES_2FA_USER = "requires_2fa_user" +VALID_USERS = [AUTHENTICATED_USER, REQUIRES_2FA_USER, APPLE_ID_EMAIL, ICLOUD_ID_EMAIL] VALID_PASSWORD = "valid_password" +VALID_COOKIE = "valid_cookie" +VALID_TOKEN = "valid_token" +VALID_2FA_CODE = "000000" +VALID_TOKENS = [VALID_TOKEN, REQUIRES_2FA_TOKEN] CLIENT_ID = "client_id" diff --git a/tests/const_login.py b/tests/const_login.py index 25ec7c0..bb87eb6 100644 --- a/tests/const_login.py +++ b/tests/const_login.py @@ -16,6 +16,10 @@ A_DS_ID = "123456-12-12345678-1234-1234-1234-123456789012" + PERSON_ID WIDGET_KEY = "widget_key" + PERSON_ID # Data +AUTH_OK = { + "authType": "hsa2" +} + LOGIN_WORKING = { "dsInfo": { "lastName": LAST_NAME, @@ -184,7 +188,7 @@ LOGIN_WORKING = { "settings", ], "version": 2, - "isExtendedLogin": False, + "isExtendedLogin": True, "pcsServiceIdentitiesIncluded": True, "hsaChallengeRequired": False, "requestInfo": {"country": "FR", "timeZone": "GMT+1", "region": "IDF"}, @@ -209,7 +213,7 @@ LOGIN_WORKING = { } # Setup data -LOGIN_2SA = { +LOGIN_2FA = { "dsInfo": { "lastName": LAST_NAME, "iCDPEnabled": False, @@ -377,7 +381,7 @@ LOGIN_2SA = { "settings", ], "version": 2, - "isExtendedLogin": False, + "isExtendedLogin": True, "pcsServiceIdentitiesIncluded": False, "hsaChallengeRequired": True, "requestInfo": {"country": "FR", "timeZone": "GMT+1", "region": "IDF"}, diff --git a/tests/test_cmdline.py b/tests/test_cmdline.py index 1a80239..42c2509 100644 --- a/tests/test_cmdline.py +++ b/tests/test_cmdline.py @@ -2,7 +2,7 @@ """Cmdline tests.""" from pyicloud import cmdline from . import PyiCloudServiceMock -from .const import AUTHENTICATED_USER, REQUIRES_2SA_USER, VALID_PASSWORD +from .const import AUTHENTICATED_USER, REQUIRES_2FA_USER, VALID_PASSWORD, VALID_2FA_CODE from .const_findmyiphone import FMI_FAMILY_WORKING import os @@ -74,16 +74,16 @@ class TestCmdline(TestCase): @patch("keyring.get_password", return_value=None) @patch("pyicloud.cmdline.input") - def test_username_password_requires_2sa( + def test_username_password_requires_2fa( self, mock_input, mock_get_password ): # pylint: disable=unused-argument """Test username and password commands.""" # Valid connection for the first time - mock_input.return_value = "0" + mock_input.return_value = VALID_2FA_CODE with pytest.raises(SystemExit, match="0"): # fmt: off self.main([ - '--username', REQUIRES_2SA_USER, + '--username', REQUIRES_2FA_USER, '--password', VALID_PASSWORD, '--non-interactive', ])