Unverified Commit 6f3365c9 authored by Cornelius Kölbel's avatar Cornelius Kölbel Committed by GitHub

Merge pull request #7 from privacyidea/offline-refill

Store refill tokens per serial
parents 006bb9dc da636a4e
......@@ -102,9 +102,9 @@ class Authenticator(object):
c = conn.cursor()
refilltoken = None
# get all possible serial/tokens for a user
for row in c.execute("SELECT config_name, config_value FROM config where config_name=?",
("refilltoken", )):
refilltoken = row[1]
for row in c.execute("SELECT refilltoken FROM refilltokens WHERE serial=?",
(serial, )):
refilltoken = row[0]
syslog.syslog("Doing refill with token {0!s}".format(refilltoken))
if refilltoken:
......@@ -127,10 +127,12 @@ class Authenticator(object):
if result.get("value"):
save_auth_item(self.sqlfile, self.user, serial, tokentype,
auth_item)
return True
else:
syslog.syslog(syslog.LOG_ERR,
"%s: %s" % (__name__,
result.get("error").get("message")))
return False
def authenticate(self, password):
rval = self.pamh.PAM_SYSTEM_ERR
......@@ -453,11 +455,11 @@ def save_auth_item(sqlfile, user, serial, tokentype, authitem):
refilltoken = offline.get("refilltoken")
# delete old refilltoken
try:
c.execute('DELETE FROM config where config_name="refilltoken"')
except:
c.execute('DELETE FROM refilltokens WHERE serial=?', (serial,))
except sqlite3.OperationalError:
pass
c.execute("INSERT INTO config (config_name, config_value) VALUES (?,?)",
("refilltoken", refilltoken))
c.execute("INSERT INTO refilltokens (serial, refilltoken) VALUES (?,?)",
(serial, refilltoken))
# Save (commit) the changes
conn.commit()
......@@ -476,12 +478,12 @@ def _create_table(c):
c.execute("CREATE TABLE authitems "
"(counter int, user text, serial text, tokenowner text,"
"otp text, tokentype text)")
except:
except sqlite3.OperationalError:
pass
try:
# create config table
c.execute("CREATE TABLE config (config_name text, config_value text)")
except:
# create refilltokens table
c.execute("CREATE TABLE refilltokens (serial text, refilltoken text)")
except sqlite3.OperationalError:
pass
......@@ -10,6 +10,8 @@ from privacyidea_pam import (pam_sm_authenticate,
save_auth_item,
check_offline_otp)
REFILL_1 = "a" * 80
REFILL_2 = "b" * 80
SQLFILE = "pam-test.sqlite"
# test100000
......@@ -23,6 +25,16 @@ RESP = {1: '$pbkdf2-sha512$19000$Scl5TwmhtPae856zFgJgLA$ZQAqtqmGTf6IY0t9jg2MCg'
'lwTOEKXMzJ5BTblZsu3bV4KAP1rEW6nUPfqLf6/f2yoNhpX1mCS3dt77EBKtJM.A'
}
# test100003
# test100004
REFILL_RESP = {
4: '$pbkdf2-sha512$25000$SSlF6L2XUurdG.N8LyVkTA$hDscUl2n5H84YjlE0Z8I94Y'
'R0NiCcCrI2weuFPR7XID6mxSzbZOTwMAeYCMPKPritj/VwZAenosNWGhByi16Ng',
5: '$pbkdf2-sha512$25000$NWYMAeDcuzfGOGds7Z1zLg$wOYEQApbmRMVjmEv1hLqi.n'
'4ZeSG0AsSIEIR7TqVuwL64XM0yePEqOn/ur7mOWzuo5ak.vZgwQeHwYM71Cjlfw',
}
# TEST100000
# TEST100001
# TEST100002
......@@ -47,13 +59,27 @@ SUCCESS_BODY = {"detail": {"message": "matching 1 tokens",
"result": {"status": True,
"value": True
},
"auth_items": {"offline": [{"username": "corny",
"auth_items": {"offline": [{"refilltoken": REFILL_1,
"username": "corny",
"response": RESP}
]
},
"version": "privacyIDEA unknown"
}
REFILL_BODY = { "id": 1,
"jsonrpc": "2.0",
"result": {"status": True,
"value": True
},
"auth_items": {"offline": [{"refilltoken": REFILL_2,
"username": "corny",
"response": REFILL_RESP}
]
},
"version": "privacyIDEA unknown"
}
FAIL_BODY = {"detail": {"message": "wrong otp value"},
"id": 1,
"jsonrpc": "2.0",
......@@ -95,8 +121,9 @@ class PAMTestCase(unittest.TestCase):
def test_01_check_offline_otp(self):
# Check with no entries in the database
r = check_offline_otp("cornelius", "test123456", SQLFILE)
r, matching_serial = check_offline_otp("cornelius", "test123456", SQLFILE)
self.assertFalse(r)
self.assertIsNone(matching_serial)
# Save some values to the database
r = save_auth_item(SQLFILE,
......@@ -107,11 +134,13 @@ class PAMTestCase(unittest.TestCase):
"response": RESP}
]
})
r = check_offline_otp("cornelius", "test100000", SQLFILE)
r, matching_serial = check_offline_otp("cornelius", "test100000", SQLFILE)
self.assertTrue(r)
self.assertEqual(matching_serial, "TOK001")
# Authenticating with the same value a second time, fails
r = check_offline_otp("cornelius", "test100000", SQLFILE)
r, matching_serial = check_offline_otp("cornelius", "test100000", SQLFILE)
self.assertFalse(r)
self.assertIsNone(matching_serial)
@responses.activate
def test_02_authenticate_offline(self):
......@@ -209,3 +238,71 @@ class PAMTestCase(unittest.TestCase):
"try_first_pass"]
r = pam_sm_authenticate(pamh, flags, argv)
self.assertEqual(r, PAMH.PAM_SUCCESS)
def test_06_refill(self):
with responses.RequestsMock() as rsps:
# Get offline OTPs + refill token
rsps.add(responses.POST,
"http://my.privacyidea.server/validate/check",
body=json.dumps(SUCCESS_BODY),
content_type="application/json")
pamh = PAMH("cornelius", "test100000")
flags = None
argv = ["url=http://my.privacyidea.server",
"sqlfile=%s" % SQLFILE,
"try_first_pass"]
r = pam_sm_authenticate(pamh, flags, argv)
self.assertEqual(r, PAMH.PAM_SUCCESS)
# OTP value not known yet, online auth does not work
pamh = PAMH("cornelius", "test100004")
flags = None
argv = ["url=http://my.privacyidea.server",
"sqlfile=%s" % SQLFILE,
"try_first_pass"]
r = pam_sm_authenticate(pamh, flags, argv)
self.assertNotEqual(r, PAMH.PAM_SUCCESS)
# now with refill
with responses.RequestsMock() as rsps:
rsps.add(responses.POST,
"http://my.privacyidea.server/validate/offlinerefill",
body=json.dumps(REFILL_BODY),
content_type="application/json")
pamh = PAMH("cornelius", "test100001")
flags = None
argv = ["url=http://my.privacyidea.server",
"sqlfile=%s" % SQLFILE,
"try_first_pass"]
r = pam_sm_authenticate(pamh, flags, argv)
self.assertEqual(r, PAMH.PAM_SUCCESS)
self.assertIn('refilltoken=aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa',
rsps.calls[0].request.body)
# authenticate with refilled
with responses.RequestsMock() as rsps:
pamh = PAMH("cornelius", "test100004")
flags = None
argv = ["url=http://my.privacyidea.server",
"sqlfile=%s" % SQLFILE,
"try_first_pass"]
r = pam_sm_authenticate(pamh, flags, argv)
self.assertEqual(r, PAMH.PAM_SUCCESS)
# using new refill token
self.assertIn('refilltoken=bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb',
rsps.calls[0].request.body)
# ... but not twice
pamh = PAMH("cornelius", "test100004")
flags = None
argv = ["url=http://my.privacyidea.server",
"sqlfile=%s" % SQLFILE,
"try_first_pass"]
r = pam_sm_authenticate(pamh, flags, argv)
self.assertNotEqual(r, PAMH.PAM_SUCCESS)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment