Added tests

This commit is contained in:
Niccolò Zapponi 2020-10-29 16:51:08 +00:00
parent b3aee79dcb
commit 4adbfb32ec
No known key found for this signature in database
GPG key ID: 328B304DC670A51E
6 changed files with 104 additions and 54 deletions

View file

@ -101,15 +101,13 @@ class PyiCloudSession(Session):
) )
# Save session_data to file # Save session_data to file
with open(self.service._get_sessiondata_path(), "w") as outfile: with open(self.service.session_path, "w") as outfile:
json.dump(self.service.session_data, outfile) json.dump(self.service.session_data, outfile)
LOGGER.debug("Saved session data to file") LOGGER.debug("Saved session data to file")
# Save cookies to file # Save cookies to file
if not path.exists(self.service._cookie_directory):
mkdir(self.service._cookie_directory)
self.cookies.save(ignore_discard=True, ignore_expires=True) self.cookies.save(ignore_discard=True, ignore_expires=True)
LOGGER.debug("Cookies saved to %s", self.service._get_cookiejar_path()) LOGGER.debug("Cookies saved to %s", self.service.cookiejar_path)
if not response.ok and content_type not in json_mimetypes: if not response.ok and content_type not in json_mimetypes:
if has_retried is None and response.status_code == 450: if has_retried is None and response.status_code == 450:
@ -214,16 +212,14 @@ class PyiCloudService(object):
if session_directory: if session_directory:
self._session_directory = session_directory self._session_directory = session_directory
else: else:
self._session_directory = path.join( self._session_directory = path.join(gettempdir(), "pyicloud-session")
gettempdir(), "pyicloud-session" LOGGER.debug("Using session file %s", self.session_path)
)
LOGGER.debug(f"Using session file {self._get_sessiondata_path()}")
try: try:
with open(self._get_sessiondata_path()) as session_f: with open(self.session_path) as session_f:
self.session_data = json.load(session_f) self.session_data = json.load(session_f)
except: except: # pylint: disable=bare-except
LOGGER.warning("Session file does not exist") LOGGER.info("Session file does not exist")
if not path.exists(self._session_directory): if not path.exists(self._session_directory):
mkdir(self._session_directory) mkdir(self._session_directory)
@ -236,6 +232,9 @@ class PyiCloudService(object):
else: else:
self._cookie_directory = path.join(gettempdir(), "pyicloud") self._cookie_directory = path.join(gettempdir(), "pyicloud")
if not path.exists(self._cookie_directory):
mkdir(self._cookie_directory)
if self.session_data.get("client_id"): if self.session_data.get("client_id"):
self.client_id = self.session_data.get("client_id") self.client_id = self.session_data.get("client_id")
@ -245,7 +244,7 @@ class PyiCloudService(object):
{"Origin": self.HOME_ENDPOINT, "Referer": f"{self.HOME_ENDPOINT}/"} {"Origin": self.HOME_ENDPOINT, "Referer": f"{self.HOME_ENDPOINT}/"}
) )
cookiejar_path = self._get_cookiejar_path() cookiejar_path = self.cookiejar_path
self.session.cookies = cookielib.LWPCookieJar(filename=cookiejar_path) self.session.cookies = cookielib.LWPCookieJar(filename=cookiejar_path)
if path.exists(cookiejar_path): if path.exists(cookiejar_path):
try: try:
@ -277,7 +276,7 @@ class PyiCloudService(object):
LOGGER.info("Session token is still valid") LOGGER.info("Session token is still valid")
self.data = req.json() self.data = req.json()
login_successful = True login_successful = True
except: except PyiCloudAPIResponseException:
msg = "Invalid authentication token, will log in from scratch." msg = "Invalid authentication token, will log in from scratch."
if not login_successful: if not login_successful:
@ -345,14 +344,16 @@ class PyiCloudService(object):
self.data = req.json() self.data = req.json()
def _get_cookiejar_path(self): @property
def cookiejar_path(self):
"""Get path for cookiejar file.""" """Get path for cookiejar file."""
return path.join( return path.join(
self._cookie_directory, self._cookie_directory,
"".join([c for c in self.user.get("accountName") if match(r"\w", c)]), "".join([c for c in self.user.get("accountName") if match(r"\w", c)]),
) )
def _get_sessiondata_path(self): @property
def session_path(self):
"""Get path for session data file.""" """Get path for session data file."""
return path.join( return path.join(
self._session_directory, self._session_directory,
@ -362,23 +363,15 @@ class PyiCloudService(object):
@property @property
def requires_2sa(self): def requires_2sa(self):
"""Returns True if two-step authentication is required.""" """Returns True if two-step authentication is required."""
return ( return self.data.get("dsInfo", {}).get("hsaVersion", 0) >= 1 and (
self.data["dsInfo"].get("hsaVersion", 0) >= 1 self.data.get("hsaChallengeRequired", False) or not self.is_trusted_session
and (
self.data.get("hsaChallengeRequired", False)
or not self.is_trusted_session
)
) )
@property @property
def requires_2fa(self): def requires_2fa(self):
"""Returns True if two-factor authentication is required.""" """Returns True if two-factor authentication is required."""
return ( return self.data["dsInfo"].get("hsaVersion", 0) == 2 and (
self.data["dsInfo"].get("hsaVersion", 0) == 2 self.data.get("hsaChallengeRequired", False) or not self.is_trusted_session
and (
self.data.get("hsaChallengeRequired", False)
or not self.is_trusted_session
)
) )
@property @property
@ -451,19 +444,21 @@ class PyiCloudService(object):
headers["X-Apple-ID-Session-Id"] = self.session_data.get("session_id") headers["X-Apple-ID-Session-Id"] = self.session_data.get("session_id")
try: try:
req = self.session.post( self.session.post(
f"{self.AUTH_ENDPOINT}/verify/trusteddevice/securitycode", f"{self.AUTH_ENDPOINT}/verify/trusteddevice/securitycode",
data=json.dumps(data), data=json.dumps(data),
headers=headers, headers=headers,
) )
except PyiCloudAPIResponseException as error: except PyiCloudAPIResponseException as error:
if error.code == -21669:
# Wrong verification code
LOGGER.error("Code verification failed.") LOGGER.error("Code verification failed.")
return False return False
raise
LOGGER.debug("Code verification successful.") LOGGER.debug("Code verification successful.")
self.trust_session() self.trust_session()
return not self.requires_2sa return not self.requires_2sa
def trust_session(self): def trust_session(self):
@ -487,13 +482,13 @@ class PyiCloudService(object):
headers["X-Apple-ID-Session-Id"] = self.session_data.get("session_id") headers["X-Apple-ID-Session-Id"] = self.session_data.get("session_id")
try: try:
req = self.session.get( self.session.get(
f"{self.AUTH_ENDPOINT}/2sv/trust", f"{self.AUTH_ENDPOINT}/2sv/trust",
headers=headers, headers=headers,
) )
self._authenticate_with_token() self._authenticate_with_token()
return True return True
except PyiCloudAPIResponseException as error: except PyiCloudAPIResponseException:
LOGGER.error("Session trust failed.") LOGGER.error("Session trust failed.")
return False return False

View file

@ -201,7 +201,22 @@ def main(args=None):
): ):
utils.store_password_in_keyring(username, password) utils.store_password_in_keyring(username, password)
if api.requires_2sa: if api.requires_2fa:
# fmt: off
print(
"\nTwo-step authentication required.",
"\nPlease enter validation code"
)
# fmt: on
code = input("(string) --> ")
if not api.validate_2fa_code(code):
print("Failed to verify verification code")
sys.exit(1)
print("")
elif api.requires_2sa:
# fmt: off # fmt: off
print( print(
"\nTwo-step authentication required.", "\nTwo-step authentication required.",

View file

@ -9,13 +9,19 @@ from pyicloud.services.findmyiphone import FindMyiPhoneServiceManager, AppleDevi
from .const import ( from .const import (
AUTHENTICATED_USER, AUTHENTICATED_USER,
REQUIRES_2SA_USER, REQUIRES_2FA_USER,
REQUIRES_2FA_TOKEN,
VALID_TOKEN,
VALID_USERS, VALID_USERS,
VALID_PASSWORD, VALID_PASSWORD,
VALID_COOKIE,
VALID_2FA_CODE,
VALID_TOKENS,
) )
from .const_login import ( from .const_login import (
AUTH_OK,
LOGIN_WORKING, LOGIN_WORKING,
LOGIN_2SA, LOGIN_2FA,
TRUSTED_DEVICES, TRUSTED_DEVICES,
TRUSTED_DEVICE_1, TRUSTED_DEVICE_1,
VERIFICATION_CODE_OK, VERIFICATION_CODE_OK,
@ -41,6 +47,7 @@ class ResponseMock(Response):
self.result = result self.result = result
self.status_code = status_code self.status_code = status_code
self.raw = kwargs.get("raw") self.raw = kwargs.get("raw")
self.headers = kwargs.get("headers", {})
@property @property
def text(self): def text(self):
@ -52,21 +59,16 @@ class PyiCloudSessionMock(base.PyiCloudSession):
def request(self, method, url, **kwargs): def request(self, method, url, **kwargs):
params = kwargs.get("params") params = kwargs.get("params")
headers = kwargs.get("headers")
data = json.loads(kwargs.get("data", "{}")) data = json.loads(kwargs.get("data", "{}"))
# Login # Login
if self.service.SETUP_ENDPOINT in url: if self.service.SETUP_ENDPOINT in url:
if "login" in url and method == "POST": if "accountLogin" in url and method == "POST":
if ( if data.get("dsWebAuthToken") not in VALID_TOKENS:
data.get("apple_id") not in VALID_USERS
or data.get("password") != VALID_PASSWORD
):
self._raise_error(None, "Unknown reason") self._raise_error(None, "Unknown reason")
if ( if data.get("dsWebAuthToken") == REQUIRES_2FA_TOKEN:
data.get("apple_id") == REQUIRES_2SA_USER return ResponseMock(LOGIN_2FA)
and data.get("password") == VALID_PASSWORD
):
return ResponseMock(LOGIN_2SA)
return ResponseMock(LOGIN_WORKING) return ResponseMock(LOGIN_WORKING)
if "listDevices" in url and method == "GET": if "listDevices" in url and method == "GET":
@ -84,6 +86,35 @@ class PyiCloudSessionMock(base.PyiCloudSession):
return ResponseMock(VERIFICATION_CODE_OK) return ResponseMock(VERIFICATION_CODE_OK)
self._raise_error(None, "FOUND_CODE") self._raise_error(None, "FOUND_CODE")
if "validate" in url and method == "POST":
if headers.get("X-APPLE-WEBAUTH-TOKEN") == VALID_COOKIE:
return ResponseMock(LOGIN_WORKING)
self._raise_error(None, "Session expired")
if self.service.AUTH_ENDPOINT in url:
if "signin" in url and method == "POST":
if (
data.get("accountName") not in VALID_USERS
or data.get("password") != VALID_PASSWORD
):
self._raise_error(None, "Unknown reason")
if data.get("accountName") == REQUIRES_2FA_USER:
self.service.session_data["session_token"] = REQUIRES_2FA_TOKEN
return ResponseMock(AUTH_OK)
self.service.session_data["session_token"] = VALID_TOKEN
return ResponseMock(AUTH_OK)
if "securitycode" in url and method == "POST":
if data.get("securityCode", {}).get("code") != VALID_2FA_CODE:
self._raise_error(None, "Incorrect code")
self.service.session_data["session_token"] = VALID_TOKEN
return ResponseMock("", status_code=204)
if "trust" in url and method == "GET":
return ResponseMock("", status_code=204)
# Account # Account
if "device/getDevices" in url and method == "GET": if "device/getDevices" in url and method == "GET":
return ResponseMock(ACCOUNT_DEVICES_WORKING) return ResponseMock(ACCOUNT_DEVICES_WORKING)

View file

@ -4,8 +4,13 @@ from .const_account_family import PRIMARY_EMAIL, APPLE_ID_EMAIL, ICLOUD_ID_EMAIL
# Base # Base
AUTHENTICATED_USER = PRIMARY_EMAIL AUTHENTICATED_USER = PRIMARY_EMAIL
REQUIRES_2SA_USER = "requires_2sa_user" REQUIRES_2FA_TOKEN = "requires_2fa_token"
VALID_USERS = [AUTHENTICATED_USER, REQUIRES_2SA_USER, APPLE_ID_EMAIL, ICLOUD_ID_EMAIL] REQUIRES_2FA_USER = "requires_2fa_user"
VALID_USERS = [AUTHENTICATED_USER, REQUIRES_2FA_USER, APPLE_ID_EMAIL, ICLOUD_ID_EMAIL]
VALID_PASSWORD = "valid_password" VALID_PASSWORD = "valid_password"
VALID_COOKIE = "valid_cookie"
VALID_TOKEN = "valid_token"
VALID_2FA_CODE = "000000"
VALID_TOKENS = [VALID_TOKEN, REQUIRES_2FA_TOKEN]
CLIENT_ID = "client_id" CLIENT_ID = "client_id"

View file

@ -16,6 +16,10 @@ A_DS_ID = "123456-12-12345678-1234-1234-1234-123456789012" + PERSON_ID
WIDGET_KEY = "widget_key" + PERSON_ID WIDGET_KEY = "widget_key" + PERSON_ID
# Data # Data
AUTH_OK = {
"authType": "hsa2"
}
LOGIN_WORKING = { LOGIN_WORKING = {
"dsInfo": { "dsInfo": {
"lastName": LAST_NAME, "lastName": LAST_NAME,
@ -209,7 +213,7 @@ LOGIN_WORKING = {
} }
# Setup data # Setup data
LOGIN_2SA = { LOGIN_2FA = {
"dsInfo": { "dsInfo": {
"lastName": LAST_NAME, "lastName": LAST_NAME,
"iCDPEnabled": False, "iCDPEnabled": False,

View file

@ -2,7 +2,7 @@
"""Cmdline tests.""" """Cmdline tests."""
from pyicloud import cmdline from pyicloud import cmdline
from . import PyiCloudServiceMock from . import PyiCloudServiceMock
from .const import AUTHENTICATED_USER, REQUIRES_2SA_USER, VALID_PASSWORD from .const import AUTHENTICATED_USER, REQUIRES_2FA_USER, VALID_PASSWORD, VALID_2FA_CODE
from .const_findmyiphone import FMI_FAMILY_WORKING from .const_findmyiphone import FMI_FAMILY_WORKING
import os import os
@ -74,16 +74,16 @@ class TestCmdline(TestCase):
@patch("keyring.get_password", return_value=None) @patch("keyring.get_password", return_value=None)
@patch("pyicloud.cmdline.input") @patch("pyicloud.cmdline.input")
def test_username_password_requires_2sa( def test_username_password_requires_2fa(
self, mock_input, mock_get_password self, mock_input, mock_get_password
): # pylint: disable=unused-argument ): # pylint: disable=unused-argument
"""Test username and password commands.""" """Test username and password commands."""
# Valid connection for the first time # Valid connection for the first time
mock_input.return_value = "0" mock_input.return_value = VALID_2FA_CODE
with pytest.raises(SystemExit, match="0"): with pytest.raises(SystemExit, match="0"):
# fmt: off # fmt: off
self.main([ self.main([
'--username', REQUIRES_2SA_USER, '--username', REQUIRES_2FA_USER,
'--password', VALID_PASSWORD, '--password', VALID_PASSWORD,
'--non-interactive', '--non-interactive',
]) ])