'''
Created on Mar 11, 2017
@author: Wyko
'''
import cryptography, hashlib
from cryptography.fernet import Fernet
import keyring, os, ast
from .. import config
from ..wylog import logging, log
def _get_fernet_key(app_name= 'netcrawl',
username= 'netcrawl'):
proc= 'manage._get_fernet_key'
# Retrieve the encryption key from storage or generate one
key= keyring.get_password(app_name, username)
if key is None:
log('Creating encryption key', v= logging.N, proc= proc)
key = Fernet.generate_key()
keyring.set_password(app_name, username, str(key, encoding='utf-8'))
else:
key= bytes(key, encoding='utf-8')
# Create a Fernet key from the base key
return Fernet(key)
del(key)
def _write_vault_data(data):
'''Overwrites all stored data with the new data'''
proc= 'manage.write_vault_data'
f= _get_fernet_key()
with open(config.cc.vault_path, 'w+b') as outfile:
outfile.write(f.encrypt(bytes(str(data), encoding='utf-8')))
def _get_vault_data():
proc= 'manage.get_vault_data'
# Create the vault if needed
if not os.path.isfile(config.cc.vault_path):
log('Creating Vault in [{}]'.format(
config.cc.vault_path),
proc=proc, v=logging.I)
with open(config.cc.vault_path, 'w+b'): pass
with open(config.cc.vault_path, 'r+b') as outfile:
raw_vault= outfile.read()
# Check for an empty dictionary
if len(raw_vault) <= 1:
log('Vault empty in[{}]'.format(
config.cc.vault_path),
proc=proc, v=logging.I)
return _validate_vault(None)
f= _get_fernet_key()
try: output= f.decrypt(raw_vault)
except cryptography.fernet.InvalidToken:
log('Vault data invalid.',
proc=proc, v=logging.A)
return _validate_vault(None)
else:
# Translate the decrypted text into a dict
# then validate it and return it
return _validate_vault(
ast.literal_eval(
str(output, encoding='utf-8')))
finally:
del(f)
[docs]def get_device_creds():
_vault= _get_vault_data()
if not _vault or ('device_creds' not in _vault):
return None
return _vault['device_creds']
[docs]def get_database_cred():
_vault= _get_vault_data()
if not _vault or ('database' not in _vault):
# Return the default credentials for Travis CI. They're good as any.
return {'username': 'postgres',
'password': ''}
return _vault['database']
[docs]def delete_device_cred(_cred=None, index= None):
_vault= _get_vault_data()
if not _vault or ('device_creds' not in _vault):
return False
if index is not None:
print('Removed ' + _vault['device_creds'][index]['username'])
del _vault['device_creds'][index]
elif _cred is not None:
for x in _vault['device_creds']:
if ((_cred['username'].lower() == x['username'].lower()) and
(_cred['password'] == x['password'])):
_vault['device_creds'].remove(x)
print('Removed ' + _cred['username'])
_write_device_creds(_vault['device_creds'])
[docs]def list_creds():
'''Lists all credentials in secure form'''
output= _get_vault_data()
if output is None: return ''
# List database
if any([output['database']['password'] is None,
output['database']['username'] is None]):
f_output= 'No Database credentials stored.\n'
else:
output['database']['password']= hashlib.md5(output['database']['password'].encode()).hexdigest()[:8]
f_output= '>> Database Username: {}\n Hashed Password: {}\n\n'.format(
output['database']['username'], output['database']['password'])
# List device creds
if len(output['device_creds']) == 0:
f_output+=('No device credentials stored.\n')
else:
for i, c in enumerate(output['device_creds']):
# Hash the password and trim to 8 characters
c['password']= hashlib.md5(c['password'].encode()).hexdigest()[:8]
f_output+= '{}) Username: {}\n Hashed Password: {}\n Type: {}\n'.format(
i, c['username'], c['password'], c['cred_type'])
return f_output
def _write_device_creds(_creds):
_vault= _get_vault_data()
_vault['device_creds']= _creds
_write_vault_data(_vault)
[docs]def add_device_cred(_cred):
# Error checking
assert isinstance(_cred, dict)
assert all (k in _cred for k in ('username',
'password',
'cred_type',))
_vault= _get_vault_data()
_vault['device_creds'].append(_cred)
_write_vault_data(_vault)
[docs]def write_database_cred(_cred):
_vault= _get_vault_data()
_vault['database']= _cred
_write_vault_data(_vault)
def _validate_vault(_vault):
'''Superficially validates the vault data by making
sure it conforms to the expected data types.'''
proc= 'credentials.manage._validate_vault'
if not isinstance(_vault, dict):
log("Whole vault was malformed",
proc=proc, v=logging.I)
return {
'device_creds': [],
'database': {'username': None,
'password': None},
}
if (('device_creds' not in _vault) or
(not isinstance(_vault['device_creds'], list))):
log("'device_creds' was malformed",
proc=proc, v=logging.I)
_vault['device_creds']= []
if (('database' not in _vault) or
(not isinstance(_vault['database'], dict))):
log("'database' was malformed",
proc=proc, v=logging.I)
_vault['database']= {'username': None,
'password': None}
return _vault