diff --git a/README.rst b/README.rst index 0d3b709..8dd69c0 100644 --- a/README.rst +++ b/README.rst @@ -36,6 +36,17 @@ Anyway, this took us very little time to write, but we figure we'd save you the trouble of writing it yourself, because maybe you are busy and have other things to do. Enjoy. +Security +-------- + +Passwords are encrypted using `Fernet`_ symmetric encryption, from the `cryptography`_ library. +A random unique key is generated for each password, and is never stored; +it is rather sent as part of the password link. +This means that even if someone has access to the Redis store, the passwords are still safe. + +.. _Fernet: https://cryptography.io/en/latest/fernet/ +.. _cryptography: https://cryptography.io/en/latest/ + Requirements ------------ diff --git a/requirements.txt b/requirements.txt index 2de30ad..eb11347 100644 --- a/requirements.txt +++ b/requirements.txt @@ -4,3 +4,4 @@ MarkupSafe==0.18 Werkzeug==0.9.4 itsdangerous==0.23 redis==2.8.0 +cryptography==1.8.1 diff --git a/snappass/main.py b/snappass/main.py index 77ed7bf..15de24e 100644 --- a/snappass/main.py +++ b/snappass/main.py @@ -4,9 +4,10 @@ import sys import uuid import redis -from redis.exceptions import ConnectionError +from cryptography.fernet import Fernet from flask import abort, Flask, render_template, request +from redis.exceptions import ConnectionError SNEAKY_USER_AGENTS = ('Slackbot', 'facebookexternalhit', 'Twitterbot', @@ -14,6 +15,7 @@ SNEAKY_USER_AGENTS = ('Slackbot', 'facebookexternalhit', 'Twitterbot', 'Iframely') SNEAKY_USER_AGENTS_RE = re.compile('|'.join(SNEAKY_USER_AGENTS)) NO_SSL = os.environ.get('NO_SSL', False) +TOKEN_SEPARATOR = '~' app = Flask(__name__) @@ -50,20 +52,72 @@ def check_redis_alive(fn): return inner +def encrypt(password): + """ + Take a password string, encrypt it with Fernet symmetric encryption, + and return the result (bytes), with the decryption key (bytes) + """ + encryption_key = Fernet.generate_key() + fernet = Fernet(encryption_key) + encrypted_password = fernet.encrypt(password.encode('utf-8')) + return encrypted_password, encryption_key + + +def decrypt(password, decryption_key): + """ + Decrypt a password (bytes) using the provided key (bytes), + and return the plain-text password (bytes). + """ + fernet = Fernet(decryption_key) + return fernet.decrypt(password) + + +def parse_token(token): + token_fragments = token.split(TOKEN_SEPARATOR, 1) # Split once, not more. + storage_key = token_fragments[0] + + try: + decryption_key = token_fragments[1].encode('utf-8') + except IndexError: + decryption_key = None + + return storage_key, decryption_key + + @check_redis_alive def set_password(password, ttl): - key = uuid.uuid4().hex - redis_client.setex(key, ttl, password) - return key + """ + Encrypt and store the password for the specified lifetime. + + Returns a token comprised of the key where the encrypted password + is stored, and the decryption key. + """ + storage_key = uuid.uuid4().hex + encrypted_password, encryption_key = encrypt(password) + redis_client.setex(storage_key, ttl, encrypted_password) + encryption_key = encryption_key.decode('utf-8') + token = TOKEN_SEPARATOR.join([storage_key, encryption_key]) + return token @check_redis_alive -def get_password(key): - password = redis_client.get(key) +def get_password(token): + """ + From a given token, return the initial password. + + If the token is tilde-separated, we decrypt the password fetched from Redis. + If not, the password is simply returned as is. + """ + storage_key, decryption_key = parse_token(token) + password = redis_client.get(storage_key) + redis_client.delete(storage_key) + if password is not None: - password = password.decode('utf-8') - redis_client.delete(key) - return password + + if decryption_key is not None: + password = decrypt(password, decryption_key) + + return password.decode('utf-8') def empty(value): @@ -105,13 +159,13 @@ def index(): @app.route('/', methods=['POST']) def handle_password(): ttl, password = clean_input() - key = set_password(password, ttl) + token = set_password(password, ttl) if NO_SSL: base_url = request.url_root else: base_url = request.url_root.replace("http://", "https://") - link = base_url + key + link = base_url + token return render_template('confirm.html', password_link=link) diff --git a/tests.py b/tests.py index f25cc5b..c94332c 100644 --- a/tests.py +++ b/tests.py @@ -1,7 +1,9 @@ import time import unittest +import uuid from unittest import TestCase +from cryptography.fernet import Fernet from werkzeug.exceptions import BadRequest # noinspection PyPep8Naming @@ -12,11 +14,6 @@ __author__ = 'davedash' class SnapPassTestCase(TestCase): - def test_set_password(self): - """Ensure we return a 32-bit key.""" - key = snappass.set_password("foo", 30) - self.assertEqual(32, len(key)) - def test_get_password(self): password = "melatonin overdose 1337!$" key = snappass.set_password(password, 30) @@ -24,6 +21,42 @@ class SnapPassTestCase(TestCase): # Assert that we can't look this up a second time. self.assertEqual(None, snappass.get_password(key)) + def test_password_is_not_stored_in_plaintext(self): + password = "trustno1" + token = snappass.set_password(password, 30) + redis_key = token.split(snappass.TOKEN_SEPARATOR)[0] + stored_password_text = snappass.redis_client.get(redis_key).decode('utf-8') + self.assertFalse(password in stored_password_text) + + def test_returned_token_format(self): + password = "trustsome1" + token = snappass.set_password(password, 30) + token_fragments = token.split(snappass.TOKEN_SEPARATOR) + self.assertEqual(2, len(token_fragments)) + redis_key, encryption_key = token_fragments + self.assertEqual(32, len(redis_key)) + try: + Fernet(encryption_key.encode('utf-8')) + except ValueError: + self.fail('the encryption key is not valid') + + def test_encryption_key_is_returned(self): + password = "trustany1" + token = snappass.set_password(password, 30) + token_fragments = token.split(snappass.TOKEN_SEPARATOR) + redis_key, encryption_key = token_fragments + stored_password = snappass.redis_client.get(redis_key) + fernet = Fernet(encryption_key.encode('utf-8')) + decrypted_password = fernet.decrypt(stored_password).decode('utf-8') + self.assertEqual(password, decrypted_password) + + def test_unencrypted_passwords_still_work(self): + unencrypted_password = "trustevery1" + storage_key = uuid.uuid4().hex + snappass.redis_client.setex(storage_key, 30, unencrypted_password) + retrieved_password = snappass.get_password(storage_key) + self.assertEqual(unencrypted_password, retrieved_password) + def test_password_is_decoded(self): password = "correct horse battery staple" key = snappass.set_password(password, 30) @@ -92,7 +125,7 @@ class SnapPassRoutesTestCase(TestCase): for ua in a_few_sneaky_bots: rv = self.app.get('/{0}'.format(key), headers={ 'User-Agent': ua }) - self.assertEquals(rv.status_code, 404) + self.assertEqual(rv.status_code, 404) if __name__ == '__main__':