Merge pull request #310 from nzapponi/2fa-support
Add support for 2FA and new Trust token logic
This commit is contained in:
commit
68566dc3a5
6 changed files with 350 additions and 75 deletions
316
pyicloud/base.py
316
pyicloud/base.py
|
@ -31,6 +31,14 @@ from pyicloud.utils import get_password_from_keyring
|
||||||
|
|
||||||
LOGGER = logging.getLogger(__name__)
|
LOGGER = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
HEADER_DATA = {
|
||||||
|
"X-Apple-ID-Account-Country": "account_country",
|
||||||
|
"X-Apple-ID-Session-Id": "session_id",
|
||||||
|
"X-Apple-Session-Token": "session_token",
|
||||||
|
"X-Apple-TwoSV-Trust-Token": "trust_token",
|
||||||
|
"scnt": "scnt",
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
class PyiCloudPasswordFilter(logging.Filter):
|
class PyiCloudPasswordFilter(logging.Filter):
|
||||||
"""Password log hider."""
|
"""Password log hider."""
|
||||||
|
@ -63,7 +71,13 @@ class PyiCloudSession(Session):
|
||||||
if self.service.password_filter not in request_logger.filters:
|
if self.service.password_filter not in request_logger.filters:
|
||||||
request_logger.addFilter(self.service.password_filter)
|
request_logger.addFilter(self.service.password_filter)
|
||||||
|
|
||||||
request_logger.debug("%s %s %s", method, url, kwargs.get("data", ""))
|
request_logger.debug(
|
||||||
|
"%s %s %s" % (
|
||||||
|
method,
|
||||||
|
url,
|
||||||
|
kwargs.get("data", "")
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
has_retried = kwargs.get("retried")
|
has_retried = kwargs.get("retried")
|
||||||
kwargs.pop("retried", None)
|
kwargs.pop("retried", None)
|
||||||
|
@ -72,14 +86,46 @@ class PyiCloudSession(Session):
|
||||||
content_type = response.headers.get("Content-Type", "").split(";")[0]
|
content_type = response.headers.get("Content-Type", "").split(";")[0]
|
||||||
json_mimetypes = ["application/json", "text/json"]
|
json_mimetypes = ["application/json", "text/json"]
|
||||||
|
|
||||||
if not response.ok and content_type not in json_mimetypes:
|
for header in HEADER_DATA:
|
||||||
if has_retried is None and response.status_code == 450:
|
if response.headers.get(header):
|
||||||
|
session_arg = HEADER_DATA[header]
|
||||||
|
self.service.session_data.update(
|
||||||
|
{session_arg: response.headers.get(header)}
|
||||||
|
)
|
||||||
|
|
||||||
|
# Save session_data to file
|
||||||
|
with open(self.service.session_path, "w") as outfile:
|
||||||
|
json.dump(self.service.session_data, outfile)
|
||||||
|
LOGGER.debug("Saved session data to file")
|
||||||
|
|
||||||
|
# Save cookies to file
|
||||||
|
self.cookies.save(ignore_discard=True, ignore_expires=True)
|
||||||
|
LOGGER.debug("Cookies saved to %s", self.service.cookiejar_path)
|
||||||
|
|
||||||
|
if not response.ok and (content_type not in json_mimetypes
|
||||||
|
or response.status_code in [421, 450, 500]):
|
||||||
|
try:
|
||||||
|
fmip_url = self.service._get_webservice_url("findme")
|
||||||
|
if has_retried is None and response.status_code == 450 and fmip_url in url:
|
||||||
|
# Handle re-authentication for Find My iPhone
|
||||||
|
LOGGER.debug("Re-authenticating Find My iPhone service")
|
||||||
|
try:
|
||||||
|
self.service.authenticate(True, "find")
|
||||||
|
except PyiCloudAPIResponseException:
|
||||||
|
LOGGER.debug("Re-authentication failed")
|
||||||
|
kwargs["retried"] = True
|
||||||
|
return self.request(method, url, **kwargs)
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
||||||
|
if has_retried is None and response.status_code in [421, 450, 500]:
|
||||||
api_error = PyiCloudAPIResponseException(
|
api_error = PyiCloudAPIResponseException(
|
||||||
response.reason, response.status_code, retry=True
|
response.reason, response.status_code, retry=True
|
||||||
)
|
)
|
||||||
request_logger.debug(api_error)
|
request_logger.debug(api_error)
|
||||||
kwargs["retried"] = True
|
kwargs["retried"] = True
|
||||||
return self.request(method, url, **kwargs)
|
return self.request(method, url, **kwargs)
|
||||||
|
|
||||||
self._raise_error(response.status_code, response.reason)
|
self._raise_error(response.status_code, response.reason)
|
||||||
|
|
||||||
if content_type not in json_mimetypes:
|
if content_type not in json_mimetypes:
|
||||||
|
@ -131,6 +177,8 @@ class PyiCloudSession(Session):
|
||||||
reason + ". Please wait a few minutes then try again."
|
reason + ". Please wait a few minutes then try again."
|
||||||
"The remote servers might be trying to throttle requests."
|
"The remote servers might be trying to throttle requests."
|
||||||
)
|
)
|
||||||
|
if code in [421, 450, 500]:
|
||||||
|
reason = "Authentication required for Account."
|
||||||
|
|
||||||
api_error = PyiCloudAPIResponseException(reason, code)
|
api_error = PyiCloudAPIResponseException(reason, code)
|
||||||
LOGGER.error(api_error)
|
LOGGER.error(api_error)
|
||||||
|
@ -148,6 +196,7 @@ class PyiCloudService(object):
|
||||||
pyicloud.iphone.location()
|
pyicloud.iphone.location()
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
AUTH_ENDPOINT = "https://idmsa.apple.com/appleauth/auth"
|
||||||
HOME_ENDPOINT = "https://www.icloud.com"
|
HOME_ENDPOINT = "https://www.icloud.com"
|
||||||
SETUP_ENDPOINT = "https://setup.icloud.com/setup/ws/1"
|
SETUP_ENDPOINT = "https://setup.icloud.com/setup/ws/1"
|
||||||
|
|
||||||
|
@ -156,6 +205,7 @@ class PyiCloudService(object):
|
||||||
apple_id,
|
apple_id,
|
||||||
password=None,
|
password=None,
|
||||||
cookie_directory=None,
|
cookie_directory=None,
|
||||||
|
session_directory=None,
|
||||||
verify=True,
|
verify=True,
|
||||||
client_id=None,
|
client_id=None,
|
||||||
with_family=True,
|
with_family=True,
|
||||||
|
@ -163,50 +213,61 @@ class PyiCloudService(object):
|
||||||
if password is None:
|
if password is None:
|
||||||
password = get_password_from_keyring(apple_id)
|
password = get_password_from_keyring(apple_id)
|
||||||
|
|
||||||
|
self.user = {"accountName": apple_id, "password": password}
|
||||||
self.data = {}
|
self.data = {}
|
||||||
self.client_id = client_id or str(uuid1()).upper()
|
self.params = {}
|
||||||
|
self.client_id = client_id or ("auth-%s" % str(uuid1()).lower())
|
||||||
self.with_family = with_family
|
self.with_family = with_family
|
||||||
self.user = {"apple_id": apple_id, "password": password}
|
|
||||||
|
self.session_data = {}
|
||||||
|
if session_directory:
|
||||||
|
self._session_directory = session_directory
|
||||||
|
else:
|
||||||
|
self._session_directory = path.join(gettempdir(), "pyicloud-session")
|
||||||
|
LOGGER.debug("Using session file %s" % self.session_path)
|
||||||
|
|
||||||
|
try:
|
||||||
|
with open(self.session_path) as session_f:
|
||||||
|
self.session_data = json.load(session_f)
|
||||||
|
except: # pylint: disable=bare-except
|
||||||
|
LOGGER.info("Session file does not exist")
|
||||||
|
|
||||||
|
if not path.exists(self._session_directory):
|
||||||
|
mkdir(self._session_directory)
|
||||||
|
|
||||||
self.password_filter = PyiCloudPasswordFilter(password)
|
self.password_filter = PyiCloudPasswordFilter(password)
|
||||||
LOGGER.addFilter(self.password_filter)
|
LOGGER.addFilter(self.password_filter)
|
||||||
|
|
||||||
self._base_login_url = "%s/login" % self.SETUP_ENDPOINT
|
|
||||||
|
|
||||||
if cookie_directory:
|
if cookie_directory:
|
||||||
self._cookie_directory = path.expanduser(path.normpath(cookie_directory))
|
self._cookie_directory = path.expanduser(path.normpath(cookie_directory))
|
||||||
else:
|
else:
|
||||||
self._cookie_directory = path.join(gettempdir(), "pyicloud")
|
self._cookie_directory = path.join(gettempdir(), "pyicloud")
|
||||||
|
|
||||||
|
if not path.exists(self._cookie_directory):
|
||||||
|
mkdir(self._cookie_directory)
|
||||||
|
|
||||||
|
if self.session_data.get("client_id"):
|
||||||
|
self.client_id = self.session_data.get("client_id")
|
||||||
|
else:
|
||||||
|
self.session_data.update({"client_id": self.client_id})
|
||||||
|
|
||||||
self.session = PyiCloudSession(self)
|
self.session = PyiCloudSession(self)
|
||||||
self.session.verify = verify
|
self.session.verify = verify
|
||||||
self.session.headers.update(
|
self.session.headers.update(
|
||||||
{
|
{"Origin": self.HOME_ENDPOINT, "Referer": "%s/" % self.HOME_ENDPOINT}
|
||||||
"Origin": self.HOME_ENDPOINT,
|
|
||||||
"Referer": "%s/" % self.HOME_ENDPOINT,
|
|
||||||
"User-Agent": "Opera/9.52 (X11; Linux i686; U; en)",
|
|
||||||
}
|
|
||||||
)
|
)
|
||||||
|
|
||||||
cookiejar_path = self._get_cookiejar_path()
|
cookiejar_path = self.cookiejar_path
|
||||||
self.session.cookies = cookielib.LWPCookieJar(filename=cookiejar_path)
|
self.session.cookies = cookielib.LWPCookieJar(filename=cookiejar_path)
|
||||||
if path.exists(cookiejar_path):
|
if path.exists(cookiejar_path):
|
||||||
try:
|
try:
|
||||||
self.session.cookies.load()
|
self.session.cookies.load(ignore_discard=True, ignore_expires=True)
|
||||||
LOGGER.debug("Read cookies from %s", cookiejar_path)
|
LOGGER.debug("Read cookies from %s" % cookiejar_path)
|
||||||
except: # pylint: disable=bare-except
|
except: # pylint: disable=bare-except
|
||||||
# Most likely a pickled cookiejar from earlier versions.
|
# Most likely a pickled cookiejar from earlier versions.
|
||||||
# The cookiejar will get replaced with a valid one after
|
# The cookiejar will get replaced with a valid one after
|
||||||
# successful authentication.
|
# successful authentication.
|
||||||
LOGGER.warning("Failed to read cookiejar %s", cookiejar_path)
|
LOGGER.warning("Failed to read cookiejar %s" % cookiejar_path)
|
||||||
|
|
||||||
self.params = {
|
|
||||||
"clientBuildNumber": "17DHotfix5",
|
|
||||||
"clientMasteringNumber": "17DHotfix5",
|
|
||||||
"ckjsBuildVersion": "17DProjectDev77",
|
|
||||||
"ckjsVersion": "2.0.5",
|
|
||||||
"clientId": self.client_id,
|
|
||||||
}
|
|
||||||
|
|
||||||
self.authenticate()
|
self.authenticate()
|
||||||
|
|
||||||
|
@ -214,54 +275,164 @@ class PyiCloudService(object):
|
||||||
self._files = None
|
self._files = None
|
||||||
self._photos = None
|
self._photos = None
|
||||||
|
|
||||||
def authenticate(self):
|
def authenticate(self, force_refresh=False, service=None):
|
||||||
"""
|
"""
|
||||||
Handles authentication, and persists the X-APPLE-WEB-KB cookie so that
|
Handles authentication, and persists cookies so that
|
||||||
subsequent logins will not cause additional e-mails from Apple.
|
subsequent logins will not cause additional e-mails from Apple.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
LOGGER.info("Authenticating as %s", self.user["apple_id"])
|
login_successful = False
|
||||||
|
if self.session_data.get("session_token") and not force_refresh:
|
||||||
|
LOGGER.debug("Checking session token validity")
|
||||||
|
try:
|
||||||
|
self.data = self._validate_token()
|
||||||
|
login_successful = True
|
||||||
|
except PyiCloudAPIResponseException:
|
||||||
|
LOGGER.debug("Invalid authentication token, will log in from scratch.")
|
||||||
|
|
||||||
|
if not login_successful and service != None:
|
||||||
|
app = self.data["apps"][service]
|
||||||
|
if "canLaunchWithOneFactor" in app and app["canLaunchWithOneFactor"] == True:
|
||||||
|
LOGGER.debug("Authenticating as %s for %s" % (self.user["accountName"], service))
|
||||||
|
try:
|
||||||
|
self._authenticate_with_credentials_service(service)
|
||||||
|
login_successful = True
|
||||||
|
except:
|
||||||
|
LOGGER.debug("Could not log into service. Attempting brand new login.")
|
||||||
|
|
||||||
|
if not login_successful:
|
||||||
|
LOGGER.debug("Authenticating as %s" % self.user["accountName"])
|
||||||
|
|
||||||
data = dict(self.user)
|
data = dict(self.user)
|
||||||
|
|
||||||
# We authenticate every time, so "remember me" is not needed
|
data["rememberMe"] = True
|
||||||
data.update({"extended_login": False})
|
data["trustTokens"] = []
|
||||||
|
if self.session_data.get("trust_token"):
|
||||||
|
data["trustTokens"] = [self.session_data.get("trust_token")]
|
||||||
|
|
||||||
|
headers = self._get_auth_headers()
|
||||||
|
|
||||||
|
if self.session_data.get("scnt"):
|
||||||
|
headers["scnt"] = self.session_data.get("scnt")
|
||||||
|
|
||||||
|
if self.session_data.get("session_id"):
|
||||||
|
headers["X-Apple-ID-Session-Id"] = self.session_data.get("session_id")
|
||||||
|
|
||||||
try:
|
try:
|
||||||
req = self.session.post(
|
req = self.session.post(
|
||||||
self._base_login_url, params=self.params, data=json.dumps(data)
|
"%s/signin" % self.AUTH_ENDPOINT,
|
||||||
|
params={"isRememberMeEnabled": "true"},
|
||||||
|
data=json.dumps(data),
|
||||||
|
headers=headers,
|
||||||
)
|
)
|
||||||
except PyiCloudAPIResponseException as error:
|
except PyiCloudAPIResponseException as error:
|
||||||
msg = "Invalid email/password combination."
|
msg = "Invalid email/password combination."
|
||||||
raise PyiCloudFailedLoginException(msg, error)
|
raise PyiCloudFailedLoginException(msg, error)
|
||||||
|
|
||||||
self.data = req.json()
|
self._authenticate_with_token()
|
||||||
self.params.update({"dsid": self.data["dsInfo"]["dsid"]})
|
|
||||||
self._webservices = self.data["webservices"]
|
self._webservices = self.data["webservices"]
|
||||||
|
|
||||||
if not path.exists(self._cookie_directory):
|
LOGGER.debug("Authentication completed successfully")
|
||||||
mkdir(self._cookie_directory)
|
|
||||||
self.session.cookies.save()
|
|
||||||
LOGGER.debug("Cookies saved to %s", self._get_cookiejar_path())
|
|
||||||
|
|
||||||
LOGGER.info("Authentication completed successfully")
|
def _authenticate_with_token(self):
|
||||||
LOGGER.debug(self.params)
|
"""Authenticate using session token."""
|
||||||
|
data = {
|
||||||
|
"accountCountryCode": self.session_data.get("account_country"),
|
||||||
|
"dsWebAuthToken": self.session_data.get("session_token"),
|
||||||
|
"extended_login": True,
|
||||||
|
"trustToken": self.session_data.get("trust_token", ""),
|
||||||
|
}
|
||||||
|
|
||||||
def _get_cookiejar_path(self):
|
try:
|
||||||
|
req = self.session.post(
|
||||||
|
"%s/accountLogin" % self.SETUP_ENDPOINT, data=json.dumps(data)
|
||||||
|
)
|
||||||
|
self.data = req.json()
|
||||||
|
except PyiCloudAPIResponseException as error:
|
||||||
|
msg = "Invalid authentication token."
|
||||||
|
raise PyiCloudFailedLoginException(msg, error)
|
||||||
|
|
||||||
|
def _authenticate_with_credentials_service(self, service):
|
||||||
|
"""Authenticate to a specific service using credentials."""
|
||||||
|
data = {
|
||||||
|
"appName": service,
|
||||||
|
"apple_id": self.user["accountName"],
|
||||||
|
"password": self.user["password"]
|
||||||
|
}
|
||||||
|
|
||||||
|
try:
|
||||||
|
self.session.post(
|
||||||
|
"%s/accountLogin" % self.SETUP_ENDPOINT, data=json.dumps(data)
|
||||||
|
)
|
||||||
|
|
||||||
|
self.data = self._validate_token()
|
||||||
|
except PyiCloudAPIResponseException as error:
|
||||||
|
msg = "Invalid email/password combination."
|
||||||
|
raise PyiCloudFailedLoginException(msg, error)
|
||||||
|
|
||||||
|
def _validate_token(self):
|
||||||
|
"""Checks if the current access token is still valid."""
|
||||||
|
LOGGER.debug("Checking session token validity")
|
||||||
|
try:
|
||||||
|
req = self.session.post("%s/validate" % self.SETUP_ENDPOINT, data="null")
|
||||||
|
LOGGER.debug("Session token is still valid")
|
||||||
|
return req.json()
|
||||||
|
except PyiCloudAPIResponseException as err:
|
||||||
|
LOGGER.debug("Invalid authentication token")
|
||||||
|
raise err
|
||||||
|
|
||||||
|
def _get_auth_headers(self, overrides=None):
|
||||||
|
headers = {
|
||||||
|
"Accept": "*/*",
|
||||||
|
"Content-Type": "application/json",
|
||||||
|
"X-Apple-OAuth-Client-Id": "d39ba9916b7251055b22c7f910e2ea796ee65e98b2ddecea8f5dde8d9d1a815d",
|
||||||
|
"X-Apple-OAuth-Client-Type": "firstPartyAuth",
|
||||||
|
"X-Apple-OAuth-Redirect-URI": "https://www.icloud.com",
|
||||||
|
"X-Apple-OAuth-Require-Grant-Code": "true",
|
||||||
|
"X-Apple-OAuth-Response-Mode": "web_message",
|
||||||
|
"X-Apple-OAuth-Response-Type": "code",
|
||||||
|
"X-Apple-OAuth-State": self.client_id,
|
||||||
|
"X-Apple-Widget-Key": "d39ba9916b7251055b22c7f910e2ea796ee65e98b2ddecea8f5dde8d9d1a815d",
|
||||||
|
}
|
||||||
|
if overrides:
|
||||||
|
headers.update(overrides)
|
||||||
|
return headers
|
||||||
|
|
||||||
|
@property
|
||||||
|
def cookiejar_path(self):
|
||||||
"""Get path for cookiejar file."""
|
"""Get path for cookiejar file."""
|
||||||
return path.join(
|
return path.join(
|
||||||
self._cookie_directory,
|
self._cookie_directory,
|
||||||
"".join([c for c in self.user.get("apple_id") if match(r"\w", c)]),
|
"".join([c for c in self.user.get("accountName") if match(r"\w", c)]),
|
||||||
|
)
|
||||||
|
|
||||||
|
@property
|
||||||
|
def session_path(self):
|
||||||
|
"""Get path for session data file."""
|
||||||
|
return path.join(
|
||||||
|
self._session_directory,
|
||||||
|
"".join([c for c in self.user.get("accountName") if match(r"\w", c)]),
|
||||||
)
|
)
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def requires_2sa(self):
|
def requires_2sa(self):
|
||||||
"""Returns True if two-step authentication is required."""
|
"""Returns True if two-step authentication is required."""
|
||||||
return (
|
return self.data.get("dsInfo", {}).get("hsaVersion", 0) >= 1 and (
|
||||||
self.data.get("hsaChallengeRequired", False)
|
self.data.get("hsaChallengeRequired", False) or not self.is_trusted_session
|
||||||
and self.data["dsInfo"].get("hsaVersion", 0) >= 1
|
|
||||||
)
|
)
|
||||||
# FIXME: Implement 2FA for hsaVersion == 2 # pylint: disable=fixme
|
|
||||||
|
@property
|
||||||
|
def requires_2fa(self):
|
||||||
|
"""Returns True if two-factor authentication is required."""
|
||||||
|
return self.data["dsInfo"].get("hsaVersion", 0) == 2 and (
|
||||||
|
self.data.get("hsaChallengeRequired", False) or not self.is_trusted_session
|
||||||
|
)
|
||||||
|
|
||||||
|
@property
|
||||||
|
def is_trusted_session(self):
|
||||||
|
"""Returns True if the session is trusted."""
|
||||||
|
return self.data.get("hsaTrustedBrowser", False)
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def trusted_devices(self):
|
def trusted_devices(self):
|
||||||
|
@ -298,12 +469,61 @@ class PyiCloudService(object):
|
||||||
return False
|
return False
|
||||||
raise
|
raise
|
||||||
|
|
||||||
# Re-authenticate, which will both update the HSA data, and
|
self.trust_session()
|
||||||
# ensure that we save the X-APPLE-WEBAUTH-HSA-TRUST cookie.
|
|
||||||
self.authenticate()
|
|
||||||
|
|
||||||
return not self.requires_2sa
|
return not self.requires_2sa
|
||||||
|
|
||||||
|
def validate_2fa_code(self, code):
|
||||||
|
"""Verifies a verification code received via Apple's 2FA system (HSA2)."""
|
||||||
|
data = {"securityCode": {"code": code}}
|
||||||
|
|
||||||
|
headers = self._get_auth_headers({"Accept": "application/json"})
|
||||||
|
|
||||||
|
if self.session_data.get("scnt"):
|
||||||
|
headers["scnt"] = self.session_data.get("scnt")
|
||||||
|
|
||||||
|
if self.session_data.get("session_id"):
|
||||||
|
headers["X-Apple-ID-Session-Id"] = self.session_data.get("session_id")
|
||||||
|
|
||||||
|
try:
|
||||||
|
self.session.post(
|
||||||
|
"%s/verify/trusteddevice/securitycode" % self.AUTH_ENDPOINT,
|
||||||
|
data=json.dumps(data),
|
||||||
|
headers=headers,
|
||||||
|
)
|
||||||
|
except PyiCloudAPIResponseException as error:
|
||||||
|
if error.code == -21669:
|
||||||
|
# Wrong verification code
|
||||||
|
LOGGER.error("Code verification failed.")
|
||||||
|
return False
|
||||||
|
raise
|
||||||
|
|
||||||
|
LOGGER.debug("Code verification successful.")
|
||||||
|
|
||||||
|
self.trust_session()
|
||||||
|
return not self.requires_2sa
|
||||||
|
|
||||||
|
def trust_session(self):
|
||||||
|
"""Request session trust to avoid user log in going forward."""
|
||||||
|
headers = self._get_auth_headers()
|
||||||
|
|
||||||
|
if self.session_data.get("scnt"):
|
||||||
|
headers["scnt"] = self.session_data.get("scnt")
|
||||||
|
|
||||||
|
if self.session_data.get("session_id"):
|
||||||
|
headers["X-Apple-ID-Session-Id"] = self.session_data.get("session_id")
|
||||||
|
|
||||||
|
try:
|
||||||
|
self.session.get(
|
||||||
|
"%s/2sv/trust" % self.AUTH_ENDPOINT,
|
||||||
|
headers=headers,
|
||||||
|
)
|
||||||
|
self._authenticate_with_token()
|
||||||
|
return True
|
||||||
|
except PyiCloudAPIResponseException:
|
||||||
|
LOGGER.error("Session trust failed.")
|
||||||
|
return False
|
||||||
|
|
||||||
def _get_webservice_url(self, ws_key):
|
def _get_webservice_url(self, ws_key):
|
||||||
"""Get webservice URL, raise an exception if not exists."""
|
"""Get webservice URL, raise an exception if not exists."""
|
||||||
if self._webservices.get(ws_key) is None:
|
if self._webservices.get(ws_key) is None:
|
||||||
|
@ -378,7 +598,7 @@ class PyiCloudService(object):
|
||||||
return self._drive
|
return self._drive
|
||||||
|
|
||||||
def __unicode__(self):
|
def __unicode__(self):
|
||||||
return "iCloud API: %s" % self.user.get("apple_id")
|
return "iCloud API: %s" % self.user.get("accountName")
|
||||||
|
|
||||||
def __str__(self):
|
def __str__(self):
|
||||||
as_unicode = self.__unicode__()
|
as_unicode = self.__unicode__()
|
||||||
|
|
|
@ -201,7 +201,22 @@ def main(args=None):
|
||||||
):
|
):
|
||||||
utils.store_password_in_keyring(username, password)
|
utils.store_password_in_keyring(username, password)
|
||||||
|
|
||||||
if api.requires_2sa:
|
if api.requires_2fa:
|
||||||
|
# fmt: off
|
||||||
|
print(
|
||||||
|
"\nTwo-step authentication required.",
|
||||||
|
"\nPlease enter validation code"
|
||||||
|
)
|
||||||
|
# fmt: on
|
||||||
|
|
||||||
|
code = input("(string) --> ")
|
||||||
|
if not api.validate_2fa_code(code):
|
||||||
|
print("Failed to verify verification code")
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
|
print("")
|
||||||
|
|
||||||
|
elif api.requires_2sa:
|
||||||
# fmt: off
|
# fmt: off
|
||||||
print(
|
print(
|
||||||
"\nTwo-step authentication required.",
|
"\nTwo-step authentication required.",
|
||||||
|
|
|
@ -9,13 +9,19 @@ from pyicloud.services.findmyiphone import FindMyiPhoneServiceManager, AppleDevi
|
||||||
|
|
||||||
from .const import (
|
from .const import (
|
||||||
AUTHENTICATED_USER,
|
AUTHENTICATED_USER,
|
||||||
REQUIRES_2SA_USER,
|
REQUIRES_2FA_USER,
|
||||||
|
REQUIRES_2FA_TOKEN,
|
||||||
|
VALID_TOKEN,
|
||||||
VALID_USERS,
|
VALID_USERS,
|
||||||
VALID_PASSWORD,
|
VALID_PASSWORD,
|
||||||
|
VALID_COOKIE,
|
||||||
|
VALID_2FA_CODE,
|
||||||
|
VALID_TOKENS,
|
||||||
)
|
)
|
||||||
from .const_login import (
|
from .const_login import (
|
||||||
|
AUTH_OK,
|
||||||
LOGIN_WORKING,
|
LOGIN_WORKING,
|
||||||
LOGIN_2SA,
|
LOGIN_2FA,
|
||||||
TRUSTED_DEVICES,
|
TRUSTED_DEVICES,
|
||||||
TRUSTED_DEVICE_1,
|
TRUSTED_DEVICE_1,
|
||||||
VERIFICATION_CODE_OK,
|
VERIFICATION_CODE_OK,
|
||||||
|
@ -41,6 +47,7 @@ class ResponseMock(Response):
|
||||||
self.result = result
|
self.result = result
|
||||||
self.status_code = status_code
|
self.status_code = status_code
|
||||||
self.raw = kwargs.get("raw")
|
self.raw = kwargs.get("raw")
|
||||||
|
self.headers = kwargs.get("headers", {})
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def text(self):
|
def text(self):
|
||||||
|
@ -52,21 +59,16 @@ class PyiCloudSessionMock(base.PyiCloudSession):
|
||||||
|
|
||||||
def request(self, method, url, **kwargs):
|
def request(self, method, url, **kwargs):
|
||||||
params = kwargs.get("params")
|
params = kwargs.get("params")
|
||||||
|
headers = kwargs.get("headers")
|
||||||
data = json.loads(kwargs.get("data", "{}"))
|
data = json.loads(kwargs.get("data", "{}"))
|
||||||
|
|
||||||
# Login
|
# Login
|
||||||
if self.service.SETUP_ENDPOINT in url:
|
if self.service.SETUP_ENDPOINT in url:
|
||||||
if "login" in url and method == "POST":
|
if "accountLogin" in url and method == "POST":
|
||||||
if (
|
if data.get("dsWebAuthToken") not in VALID_TOKENS:
|
||||||
data.get("apple_id") not in VALID_USERS
|
|
||||||
or data.get("password") != VALID_PASSWORD
|
|
||||||
):
|
|
||||||
self._raise_error(None, "Unknown reason")
|
self._raise_error(None, "Unknown reason")
|
||||||
if (
|
if data.get("dsWebAuthToken") == REQUIRES_2FA_TOKEN:
|
||||||
data.get("apple_id") == REQUIRES_2SA_USER
|
return ResponseMock(LOGIN_2FA)
|
||||||
and data.get("password") == VALID_PASSWORD
|
|
||||||
):
|
|
||||||
return ResponseMock(LOGIN_2SA)
|
|
||||||
return ResponseMock(LOGIN_WORKING)
|
return ResponseMock(LOGIN_WORKING)
|
||||||
|
|
||||||
if "listDevices" in url and method == "GET":
|
if "listDevices" in url and method == "GET":
|
||||||
|
@ -84,6 +86,35 @@ class PyiCloudSessionMock(base.PyiCloudSession):
|
||||||
return ResponseMock(VERIFICATION_CODE_OK)
|
return ResponseMock(VERIFICATION_CODE_OK)
|
||||||
self._raise_error(None, "FOUND_CODE")
|
self._raise_error(None, "FOUND_CODE")
|
||||||
|
|
||||||
|
if "validate" in url and method == "POST":
|
||||||
|
if headers.get("X-APPLE-WEBAUTH-TOKEN") == VALID_COOKIE:
|
||||||
|
return ResponseMock(LOGIN_WORKING)
|
||||||
|
self._raise_error(None, "Session expired")
|
||||||
|
|
||||||
|
if self.service.AUTH_ENDPOINT in url:
|
||||||
|
if "signin" in url and method == "POST":
|
||||||
|
if (
|
||||||
|
data.get("accountName") not in VALID_USERS
|
||||||
|
or data.get("password") != VALID_PASSWORD
|
||||||
|
):
|
||||||
|
self._raise_error(None, "Unknown reason")
|
||||||
|
if data.get("accountName") == REQUIRES_2FA_USER:
|
||||||
|
self.service.session_data["session_token"] = REQUIRES_2FA_TOKEN
|
||||||
|
return ResponseMock(AUTH_OK)
|
||||||
|
|
||||||
|
self.service.session_data["session_token"] = VALID_TOKEN
|
||||||
|
return ResponseMock(AUTH_OK)
|
||||||
|
|
||||||
|
if "securitycode" in url and method == "POST":
|
||||||
|
if data.get("securityCode", {}).get("code") != VALID_2FA_CODE:
|
||||||
|
self._raise_error(None, "Incorrect code")
|
||||||
|
|
||||||
|
self.service.session_data["session_token"] = VALID_TOKEN
|
||||||
|
return ResponseMock("", status_code=204)
|
||||||
|
|
||||||
|
if "trust" in url and method == "GET":
|
||||||
|
return ResponseMock("", status_code=204)
|
||||||
|
|
||||||
# Account
|
# Account
|
||||||
if "device/getDevices" in url and method == "GET":
|
if "device/getDevices" in url and method == "GET":
|
||||||
return ResponseMock(ACCOUNT_DEVICES_WORKING)
|
return ResponseMock(ACCOUNT_DEVICES_WORKING)
|
||||||
|
|
|
@ -4,8 +4,13 @@ from .const_account_family import PRIMARY_EMAIL, APPLE_ID_EMAIL, ICLOUD_ID_EMAIL
|
||||||
|
|
||||||
# Base
|
# Base
|
||||||
AUTHENTICATED_USER = PRIMARY_EMAIL
|
AUTHENTICATED_USER = PRIMARY_EMAIL
|
||||||
REQUIRES_2SA_USER = "requires_2sa_user"
|
REQUIRES_2FA_TOKEN = "requires_2fa_token"
|
||||||
VALID_USERS = [AUTHENTICATED_USER, REQUIRES_2SA_USER, APPLE_ID_EMAIL, ICLOUD_ID_EMAIL]
|
REQUIRES_2FA_USER = "requires_2fa_user"
|
||||||
|
VALID_USERS = [AUTHENTICATED_USER, REQUIRES_2FA_USER, APPLE_ID_EMAIL, ICLOUD_ID_EMAIL]
|
||||||
VALID_PASSWORD = "valid_password"
|
VALID_PASSWORD = "valid_password"
|
||||||
|
VALID_COOKIE = "valid_cookie"
|
||||||
|
VALID_TOKEN = "valid_token"
|
||||||
|
VALID_2FA_CODE = "000000"
|
||||||
|
VALID_TOKENS = [VALID_TOKEN, REQUIRES_2FA_TOKEN]
|
||||||
|
|
||||||
CLIENT_ID = "client_id"
|
CLIENT_ID = "client_id"
|
||||||
|
|
|
@ -16,6 +16,10 @@ A_DS_ID = "123456-12-12345678-1234-1234-1234-123456789012" + PERSON_ID
|
||||||
WIDGET_KEY = "widget_key" + PERSON_ID
|
WIDGET_KEY = "widget_key" + PERSON_ID
|
||||||
|
|
||||||
# Data
|
# Data
|
||||||
|
AUTH_OK = {
|
||||||
|
"authType": "hsa2"
|
||||||
|
}
|
||||||
|
|
||||||
LOGIN_WORKING = {
|
LOGIN_WORKING = {
|
||||||
"dsInfo": {
|
"dsInfo": {
|
||||||
"lastName": LAST_NAME,
|
"lastName": LAST_NAME,
|
||||||
|
@ -184,7 +188,7 @@ LOGIN_WORKING = {
|
||||||
"settings",
|
"settings",
|
||||||
],
|
],
|
||||||
"version": 2,
|
"version": 2,
|
||||||
"isExtendedLogin": False,
|
"isExtendedLogin": True,
|
||||||
"pcsServiceIdentitiesIncluded": True,
|
"pcsServiceIdentitiesIncluded": True,
|
||||||
"hsaChallengeRequired": False,
|
"hsaChallengeRequired": False,
|
||||||
"requestInfo": {"country": "FR", "timeZone": "GMT+1", "region": "IDF"},
|
"requestInfo": {"country": "FR", "timeZone": "GMT+1", "region": "IDF"},
|
||||||
|
@ -209,7 +213,7 @@ LOGIN_WORKING = {
|
||||||
}
|
}
|
||||||
|
|
||||||
# Setup data
|
# Setup data
|
||||||
LOGIN_2SA = {
|
LOGIN_2FA = {
|
||||||
"dsInfo": {
|
"dsInfo": {
|
||||||
"lastName": LAST_NAME,
|
"lastName": LAST_NAME,
|
||||||
"iCDPEnabled": False,
|
"iCDPEnabled": False,
|
||||||
|
@ -377,7 +381,7 @@ LOGIN_2SA = {
|
||||||
"settings",
|
"settings",
|
||||||
],
|
],
|
||||||
"version": 2,
|
"version": 2,
|
||||||
"isExtendedLogin": False,
|
"isExtendedLogin": True,
|
||||||
"pcsServiceIdentitiesIncluded": False,
|
"pcsServiceIdentitiesIncluded": False,
|
||||||
"hsaChallengeRequired": True,
|
"hsaChallengeRequired": True,
|
||||||
"requestInfo": {"country": "FR", "timeZone": "GMT+1", "region": "IDF"},
|
"requestInfo": {"country": "FR", "timeZone": "GMT+1", "region": "IDF"},
|
||||||
|
|
|
@ -2,7 +2,7 @@
|
||||||
"""Cmdline tests."""
|
"""Cmdline tests."""
|
||||||
from pyicloud import cmdline
|
from pyicloud import cmdline
|
||||||
from . import PyiCloudServiceMock
|
from . import PyiCloudServiceMock
|
||||||
from .const import AUTHENTICATED_USER, REQUIRES_2SA_USER, VALID_PASSWORD
|
from .const import AUTHENTICATED_USER, REQUIRES_2FA_USER, VALID_PASSWORD, VALID_2FA_CODE
|
||||||
from .const_findmyiphone import FMI_FAMILY_WORKING
|
from .const_findmyiphone import FMI_FAMILY_WORKING
|
||||||
|
|
||||||
import os
|
import os
|
||||||
|
@ -74,16 +74,16 @@ class TestCmdline(TestCase):
|
||||||
|
|
||||||
@patch("keyring.get_password", return_value=None)
|
@patch("keyring.get_password", return_value=None)
|
||||||
@patch("pyicloud.cmdline.input")
|
@patch("pyicloud.cmdline.input")
|
||||||
def test_username_password_requires_2sa(
|
def test_username_password_requires_2fa(
|
||||||
self, mock_input, mock_get_password
|
self, mock_input, mock_get_password
|
||||||
): # pylint: disable=unused-argument
|
): # pylint: disable=unused-argument
|
||||||
"""Test username and password commands."""
|
"""Test username and password commands."""
|
||||||
# Valid connection for the first time
|
# Valid connection for the first time
|
||||||
mock_input.return_value = "0"
|
mock_input.return_value = VALID_2FA_CODE
|
||||||
with pytest.raises(SystemExit, match="0"):
|
with pytest.raises(SystemExit, match="0"):
|
||||||
# fmt: off
|
# fmt: off
|
||||||
self.main([
|
self.main([
|
||||||
'--username', REQUIRES_2SA_USER,
|
'--username', REQUIRES_2FA_USER,
|
||||||
'--password', VALID_PASSWORD,
|
'--password', VALID_PASSWORD,
|
||||||
'--non-interactive',
|
'--non-interactive',
|
||||||
])
|
])
|
||||||
|
|
Loading…
Reference in a new issue