Commit 53df44a6 authored by Friedrich Weber's avatar Friedrich Weber

Store refill tokens per token serial

parent 006bb9dc
......@@ -102,9 +102,9 @@ class Authenticator(object):
c = conn.cursor()
refilltoken = None
# get all possible serial/tokens for a user
for row in c.execute("SELECT config_name, config_value FROM config where config_name=?",
("refilltoken", )):
refilltoken = row[1]
for row in c.execute("SELECT refilltoken FROM refilltokens WHERE serial=?",
(serial, )):
refilltoken = row[0]
syslog.syslog("Doing refill with token {0!s}".format(refilltoken))
if refilltoken:
......@@ -453,11 +453,11 @@ def save_auth_item(sqlfile, user, serial, tokentype, authitem):
refilltoken = offline.get("refilltoken")
# delete old refilltoken
try:
c.execute('DELETE FROM config where config_name="refilltoken"')
c.execute('DELETE FROM refilltokens WHERE serial=?', (serial,))
except:
pass
c.execute("INSERT INTO config (config_name, config_value) VALUES (?,?)",
("refilltoken", refilltoken))
c.execute("INSERT INTO refilltokens (serial, refilltoken) VALUES (?,?)",
(serial, refilltoken))
# Save (commit) the changes
conn.commit()
......@@ -480,8 +480,8 @@ def _create_table(c):
pass
try:
# create config table
c.execute("CREATE TABLE config (config_name text, config_value text)")
# create refilltokens table
c.execute("CREATE TABLE refilltokens (serial text, refilltoken text)")
except:
pass
......@@ -10,6 +10,8 @@ from privacyidea_pam import (pam_sm_authenticate,
save_auth_item,
check_offline_otp)
REFILL_1 = "a" * 80
REFILL_2 = "b" * 80
SQLFILE = "pam-test.sqlite"
# test100000
......@@ -23,6 +25,16 @@ RESP = {1: '$pbkdf2-sha512$19000$Scl5TwmhtPae856zFgJgLA$ZQAqtqmGTf6IY0t9jg2MCg'
'lwTOEKXMzJ5BTblZsu3bV4KAP1rEW6nUPfqLf6/f2yoNhpX1mCS3dt77EBKtJM.A'
}
# test100003
# test100004
REFILL_RESP = {
4: '$pbkdf2-sha512$25000$SSlF6L2XUurdG.N8LyVkTA$hDscUl2n5H84YjlE0Z8I94Y'
'R0NiCcCrI2weuFPR7XID6mxSzbZOTwMAeYCMPKPritj/VwZAenosNWGhByi16Ng',
5: '$pbkdf2-sha512$25000$NWYMAeDcuzfGOGds7Z1zLg$wOYEQApbmRMVjmEv1hLqi.n'
'4ZeSG0AsSIEIR7TqVuwL64XM0yePEqOn/ur7mOWzuo5ak.vZgwQeHwYM71Cjlfw',
}
# TEST100000
# TEST100001
# TEST100002
......@@ -47,13 +59,27 @@ SUCCESS_BODY = {"detail": {"message": "matching 1 tokens",
"result": {"status": True,
"value": True
},
"auth_items": {"offline": [{"username": "corny",
"auth_items": {"offline": [{"refilltoken": REFILL_1,
"username": "corny",
"response": RESP}
]
},
"version": "privacyIDEA unknown"
}
REFILL_BODY = { "id": 1,
"jsonrpc": "2.0",
"result": {"status": True,
"value": True
},
"auth_items": {"offline": [{"refilltoken": REFILL_2,
"username": "corny",
"response": REFILL_RESP}
]
},
"version": "privacyIDEA unknown"
}
FAIL_BODY = {"detail": {"message": "wrong otp value"},
"id": 1,
"jsonrpc": "2.0",
......@@ -95,8 +121,9 @@ class PAMTestCase(unittest.TestCase):
def test_01_check_offline_otp(self):
# Check with no entries in the database
r = check_offline_otp("cornelius", "test123456", SQLFILE)
r, matching_serial = check_offline_otp("cornelius", "test123456", SQLFILE)
self.assertFalse(r)
self.assertIsNone(matching_serial)
# Save some values to the database
r = save_auth_item(SQLFILE,
......@@ -107,11 +134,13 @@ class PAMTestCase(unittest.TestCase):
"response": RESP}
]
})
r = check_offline_otp("cornelius", "test100000", SQLFILE)
r, matching_serial = check_offline_otp("cornelius", "test100000", SQLFILE)
self.assertTrue(r)
self.assertEqual(matching_serial, "TOK001")
# Authenticating with the same value a second time, fails
r = check_offline_otp("cornelius", "test100000", SQLFILE)
r, matching_serial = check_offline_otp("cornelius", "test100000", SQLFILE)
self.assertFalse(r)
self.assertIsNone(matching_serial)
@responses.activate
def test_02_authenticate_offline(self):
......@@ -209,3 +238,71 @@ class PAMTestCase(unittest.TestCase):
"try_first_pass"]
r = pam_sm_authenticate(pamh, flags, argv)
self.assertEqual(r, PAMH.PAM_SUCCESS)
def test_06_refill(self):
with responses.RequestsMock() as rsps:
# Get offline OTPs + refill token
rsps.add(responses.POST,
"http://my.privacyidea.server/validate/check",
body=json.dumps(SUCCESS_BODY),
content_type="application/json")
pamh = PAMH("cornelius", "test100000")
flags = None
argv = ["url=http://my.privacyidea.server",
"sqlfile=%s" % SQLFILE,
"try_first_pass"]
r = pam_sm_authenticate(pamh, flags, argv)
self.assertEqual(r, PAMH.PAM_SUCCESS)
# OTP value not known yet, online auth does not work
pamh = PAMH("cornelius", "test100004")
flags = None
argv = ["url=http://my.privacyidea.server",
"sqlfile=%s" % SQLFILE,
"try_first_pass"]
r = pam_sm_authenticate(pamh, flags, argv)
self.assertNotEqual(r, PAMH.PAM_SUCCESS)
# now with refill
with responses.RequestsMock() as rsps:
rsps.add(responses.POST,
"http://my.privacyidea.server/validate/offlinerefill",
body=json.dumps(REFILL_BODY),
content_type="application/json")
pamh = PAMH("cornelius", "test100001")
flags = None
argv = ["url=http://my.privacyidea.server",
"sqlfile=%s" % SQLFILE,
"try_first_pass"]
r = pam_sm_authenticate(pamh, flags, argv)
self.assertEqual(r, PAMH.PAM_SUCCESS)
self.assertIn('refilltoken=aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa',
rsps.calls[0].request.body)
# authenticate with refilled
with responses.RequestsMock() as rsps:
pamh = PAMH("cornelius", "test100004")
flags = None
argv = ["url=http://my.privacyidea.server",
"sqlfile=%s" % SQLFILE,
"try_first_pass"]
r = pam_sm_authenticate(pamh, flags, argv)
self.assertEqual(r, PAMH.PAM_SUCCESS)
# using new refill token
self.assertIn('refilltoken=bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb',
rsps.calls[0].request.body)
# ... but not twice
pamh = PAMH("cornelius", "test100004")
flags = None
argv = ["url=http://my.privacyidea.server",
"sqlfile=%s" % SQLFILE,
"try_first_pass"]
r = pam_sm_authenticate(pamh, flags, argv)
self.assertNotEqual(r, PAMH.PAM_SUCCESS)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment