Minor changes

parent 66b6f10f
......@@ -29,20 +29,12 @@ enabled = yes
[date_time]
enabled = yes
[pam_authentication]
enabled = yes
[package_updates]
enabled = yes
[network]
enabled = yes
[postfix]
enabled = yes
[ssl]
enabled = yes
[maild]
enabled = yes
[ssl]
certificate_dir = /etc/opt/drweb.com/certs
# -*- coding: utf-8 -*-
import socket
import os
from yaml import dump, safe_load
from settingsd import const
from settingsd import config
from settingsd import service
from settingsd import shared
from settingsd import logger
import settingsd.tools as tools
from settingsd.tools.process import execProcess
import settingsd.tools.editors
from os import path
from jinja2 import Template
from jinja2.exceptions import TemplateSyntaxError
##### Private constants #####
SERVICE_NAME = "maild"
MAILD_METHODS_NAMESPACE = "maild"
MAILD_DRWEB_INI = "/etc/opt/drweb.com/drweb.ini"
MAILD_MILTER_HOOK = "/etc/opt/drweb.com/milterhook.lua"
##### Private classes #####
class MailD(service.FunctionObject) :
### DBus methods ###
@service.functionMethod(MAILD_METHODS_NAMESPACE, in_signature="sss", out_signature="s")
def regenerateDrwebIni(self, config_filename, drweb_ini_templates_dir, lua_templates_dir):
with open(config_filename, 'r') as maild_config:
config = safe_load(maild_config.read())
try:
lists_ctx = {
'links_blacklist': config['links_blacklist'],
'links_whitelist': config['links_whitelist']
}
self.renderTemplate(
config['template'], drweb_ini_templates_dir, config['settings'], MAILD_DRWEB_INI,
context=lists_ctx
)
self.renderTemplate(
config['template'], lua_templates_dir, config['settings'], MAILD_MILTER_HOOK,
context=lists_ctx
)
return ''
except TemplateSyntaxError as exc:
return str(exc)
def renderTemplate(self, template, templates_dir, settings, output_filename, context={}):
template_filename = path.join(templates_dir, template + '.tpl')
with open(template_filename, 'r') as template_text:
template = Template(template_text.read())
content = template.render(settings=settings, **context)
with open(output_filename, 'w+') as outfile:
outfile.write(content)
##### Public classes #####
class Service(service.Service) :
### Public ###
def initService(self) :
shared.Functions.addSharedObject(SERVICE_NAME, MailD(SERVICE_NAME, self))
### Private ###
@classmethod
def serviceName(self) :
return SERVICE_NAME
import os
from settingsd import const
from settingsd import config
from settingsd import service
from settingsd import shared
from pamela import authenticate, change_password, PAMError
SERVICE_NAME = "pam_authentication"
PAM_SERVICE_NAME = "settingsd"
class PamAuthentication(service.FunctionObject) :
@service.functionMethod("pam", in_signature="ss", out_signature="b")
def authenticate(self, login, password):
try:
authenticate(login, password, service=PAM_SERVICE_NAME)
return True
except PAMError:
return False
@service.functionMethod("pam", in_signature="sss", out_signature="b")
def change_password(self, login, old_password, new_password):
if len(new_password) == 0:
return False
if not self.authenticate(login, old_password):
return False
try:
change_password(login, new_password, service=PAM_SERVICE_NAME)
return True
except PAMError as err:
return False
@service.functionMethod("pam")
def restore_testing_user(self):
try:
with open('/etc/testing-user', 'r') as file:
login, password = file.read().split(' ', 1)
print(login, password)
change_password(login.strip(), password.strip(), service=PAM_SERVICE_NAME)
except FileNotFoundError:
pass
class Service(service.Service) :
def initService(self):
shared.Functions.addSharedObject(SERVICE_NAME, PamAuthentication(SERVICE_NAME, self))
@classmethod
def serviceName(self):
return SERVICE_NAME
# -*- coding: utf-8 -*-
import socket
import os
from yaml import dump, safe_load
from settingsd import const
from settingsd import config
from settingsd import service
from settingsd import shared
from settingsd import logger
import settingsd.tools as tools
from settingsd.tools.process import execProcess
import settingsd.tools.editors
from os import path
from jinja2 import Template
from jinja2.exceptions import TemplateSyntaxError
##### Private constants #####
SERVICE_NAME = "postfix"
POSTFIX_METHODS_NAMESPACE = "postfix"
POSTFIX_MAIN_CF = "/etc/postfix/main.cf"
##### Private classes #####
class Postfix(service.FunctionObject) :
### DBus methods ###
@service.functionMethod(POSTFIX_METHODS_NAMESPACE, in_signature="ss")
def generateAccessTable(self, filters_file_path, access_file_path):
with open(filters_file_path, 'r') as filters_file:
lists = safe_load(filters_file.read())
entries = []
for item in lists['whitelist']:
entries.append([item['key'], 'OK'])
for item in lists['blacklist']:
entries.append([item['key'], item['action'].upper()])
with open(access_file_path, 'w+') as access_file:
access_file.write('\n'.join([ ' '.join(pair) for pair in entries ]))
@service.functionMethod(POSTFIX_METHODS_NAMESPACE, in_signature="ss", out_signature="s")
def regenerateMainCf(self, config_filename, templates_dir):
with open(config_filename, 'r') as postfix_config:
config = safe_load(postfix_config.read())
try:
template_filename = path.join(templates_dir, config['template'] + '.tpl')
with open(template_filename, 'r') as template_text:
template = Template(template_text.read())
config_text = template.render(settings=config['settings'])
with open(POSTFIX_MAIN_CF, 'w+') as main_cf:
main_cf.write(config_text)
return ''
except TemplateSyntaxError as exc:
return str(exc)
##### Public classes #####
class Service(service.Service) :
### Public ###
def initService(self) :
shared.Functions.addSharedObject(SERVICE_NAME, Postfix(SERVICE_NAME, self))
### Private ###
@classmethod
def serviceName(self) :
return SERVICE_NAME
Metadata-Version: 2.1
Name: pamela
Version: 1.0.0
Summary: PAM interface using ctypes
Home-page: https://github.com/minrk/pamela
Author: Min RK
Author-email: benjaminrk@gmail.com
License: MIT
Keywords: pam,authentication
Platform: UNKNOWN
Classifier: Development Status :: 4 - Beta
Classifier: Intended Audience :: Developers
Classifier: License :: OSI Approved :: MIT License
Classifier: Operating System :: POSIX :: Linux
Classifier: Operating System :: MacOS :: MacOS X
Classifier: Programming Language :: Python
Classifier: Topic :: Software Development :: Libraries :: Python Modules
Classifier: Topic :: System :: Systems Administration :: Authentication/Directory
Description-Content-Type: text/markdown
# Pamela: yet another Python wrapper for PAM
There seems to be a glut of Python wrappers for PAM that have since been abandoned.
This repo merges two separate efforts:
- [gnosek/python-pam](https://github.com/gnosek/python-pam)
- adds wrappers for a few more calls, e.g. opening sessions
- raises PamError on failure instead of returning False, with informative error messages
- [simplepam](https://github.com/leonnnn/python3-simplepam)
- adds Python 3 support
- resets credentials after authentication, apparently for kerberos users
## Why?
Both projects appear to be abandoned, with no response to issues or pull requests in at least a year, and I need it for [JupyterHub](https://github.com/jupyter/jupyterhub).
## Use it
Install:
pip install pamela
Test:
python -m pamela -a `whoami`
__pycache__/pamela.cpython-36.pyc,,
pamela-1.0.0.dist-info/INSTALLER,sha256=zuuue4knoyJ-UwPPXg8fezS7VCrXJQrAP7zeNuwvFQg,4
pamela-1.0.0.dist-info/METADATA,sha256=Jq2Zs9sa58NCeMbwM8d6Oz_cfPEV6eIMLQqQogRgP90,1527
pamela-1.0.0.dist-info/RECORD,,
pamela-1.0.0.dist-info/WHEEL,sha256=J3CsTk7Mf2JNUyhImI-mjX-fmI4oDjyiXgWT4qgZiCE,110
pamela-1.0.0.dist-info/top_level.txt,sha256=sYmDCHiuiyLrWh33a_Rn49nqOVTmFg_MAtDQDbAWwwg,7
pamela.py,sha256=Q-4JPrPImmAArr2hDneNO2Wc40amVY0d8QmHK1XAF-c,15137
Wheel-Version: 1.0
Generator: bdist_wheel (0.31.0)
Root-Is-Purelib: true
Tag: py2-none-any
Tag: py3-none-any
# (c) 2007 Chris AtLee <chris@atlee.ca>
# (c) 2010 Grzegorz Nosek <root@localdomain.pl>
# (c) 2015 Min RK <benjaminrk@gmail.com>
# Licensed under the MIT license:
# http://www.opensource.org/licenses/mit-license.php
"""
PAM module for python
Provides an authenticate function that will allow the caller to authenticate
a user against the Pluggable Authentication Modules (PAM) on the system.
Implemented using ctypes, so no compilation is necessary.
"""
from __future__ import print_function
__version__ = '1.0.0'
__all__ = [
'PAMError',
'authenticate',
'open_session',
'close_session',
'check_account',
'change_password',
]
from ctypes import CDLL, POINTER, Structure, CFUNCTYPE, cast, pointer, sizeof, byref
from ctypes import c_void_p, c_uint, c_char_p, c_char, c_int
from ctypes.util import find_library
import getpass
import sys
# Python 3 bytes/unicode compat
if sys.version_info >= (3,):
unicode = str
raw_input = input
def _bytes_to_str(s, encoding='utf8'):
return s.decode(encoding)
else:
def _bytes_to_str(s, encoding='utf8'):
return s
def _cast_bytes(s, encoding='utf8'):
if isinstance(s, unicode):
return s.encode(encoding)
return s
LIBPAM = CDLL(find_library("pam"))
LIBC = CDLL(find_library("c"))
CALLOC = LIBC.calloc
CALLOC.restype = c_void_p
CALLOC.argtypes = [c_uint, c_uint]
STRDUP = LIBC.strdup
STRDUP.argstypes = [c_char_p]
STRDUP.restype = POINTER(c_char) # NOT c_char_p !!!!
# Various constants
PAM_PROMPT_ECHO_OFF = 1
PAM_PROMPT_ECHO_ON = 2
PAM_ERROR_MSG = 3
PAM_TEXT_INFO = 4
# These constants are libpam-specific
PAM_ESTABLISH_CRED = 0x0002
PAM_DELETE_CRED = 0x0004
PAM_REINITIALIZE_CRED = 0x0008
PAM_REFRESH_CRED = 0x0010
# constants for PAM_ variables for pam_set_item()
PAM_SERVICE = 1
PAM_USER = 2
PAM_TTY = 3
PAM_RHOST = 4
PAM_RUSER = 8
# PAM error codes
PAM_SUCCESS = 0
PAM_BAD_ITEM = 29
class PamHandle(Structure):
"""wrapper class for pam_handle_t"""
_fields_ = [
("handle", c_void_p)
]
def __init__(self):
Structure.__init__(self)
self.handle = 0
def get_item(self, item_type, encoding='utf-8'):
voidPointer = c_void_p()
retval = PAM_GET_ITEM(self, item_type, byref(voidPointer))
if retval == PAM_BAD_ITEM:
return None
if retval != PAM_SUCCESS:
raise PAMError(errno=retval)
s = cast(voidPointer, c_char_p)
if s.value is None:
return None
return _bytes_to_str(s.value, encoding)
def set_item(self, item_type, item, encoding='utf-8'):
retval = PAM_SET_ITEM(self, item_type, item.encode(encoding))
if retval != PAM_SUCCESS:
raise PAMError(errno=retval)
def get_env(self, var, encoding='utf-8'):
ret = PAM_GETENV(self, var.encode(encoding))
if ret is None:
raise PAMError()
else:
return ret.decode(encoding)
def put_env(self, k, v, encoding='utf-8'):
retval = PAM_PUTENV(
self,
('%s=%s' % (k, v)).encode(encoding))
if retval != PAM_SUCCESS:
raise PAMError(errno=retval)
def del_env(self, k, encoding='utf-8'):
retval = PAM_PUTENV(
self,
k.encode(encoding))
if retval != PAM_SUCCESS:
raise PAMError(errno=retval)
def get_envlist(self, encoding='utf-8'):
ret = PAM_GETENVLIST(self)
if ret is None:
raise PAMError()
parsed = {}
for i in PAM_GETENVLIST(self):
if i:
k, v = i.decode(encoding).split('=', 1)
parsed[k] = v
else:
break
return parsed
def open_session(self):
retval = PAM_OPEN_SESSION(self, 0)
if retval != PAM_SUCCESS:
raise PAMError(errno=retval)
def close_session(self):
retval = PAM_CLOSE_SESSION(self, 0)
if retval != PAM_SUCCESS:
raise PAMError(errno=retval)
PAM_STRERROR = LIBPAM.pam_strerror
PAM_STRERROR.restype = c_char_p
PAM_STRERROR.argtypes = [PamHandle, c_int]
def pam_strerror(handle, errno):
"""Wrap bytes-only PAM_STRERROR in native str"""
return _bytes_to_str(PAM_STRERROR(handle, errno))
class PAMError(Exception):
errno = None
message = ''
def __init__(self, message='', errno=None):
self.errno = errno
if message:
self.message = message
else:
if errno is None:
self.message = "Unknown"
else:
self.message = pam_strerror(PamHandle(), errno)
def __repr__(self):
en = '' if self.errno is None else ' %i' % self.errno
return "<PAM Error%s: '%s'>" % (en, self.message)
def __str__(self):
en = '' if self.errno is None else ' %i' % self.errno
return '[PAM Error%s] %s' % (en, self.message)
class PamMessage(Structure):
"""wrapper class for pam_message structure"""
_fields_ = [
("msg_style", c_int),
("msg", POINTER(c_char)),
]
def __repr__(self):
return "<PamMessage %i '%s'>" % (self.msg_style, _bytes_to_str(self.msg))
class PamResponse(Structure):
"""wrapper class for pam_response structure"""
_fields_ = [
("resp", POINTER(c_char)),
("resp_retcode", c_int),
]
def __repr__(self):
return "<PamResponse %i '%s'>" % (self.resp_retcode, _bytes_to_str(self.resp))
CONV_FUNC = CFUNCTYPE(c_int,
c_int, POINTER(POINTER(PamMessage)),
POINTER(POINTER(PamResponse)), c_void_p)
class PamConv(Structure):
"""wrapper class for pam_conv structure"""
_fields_ = [
("conv", CONV_FUNC),
("appdata_ptr", c_void_p)
]
PAM_START = LIBPAM.pam_start
PAM_START.restype = c_int
PAM_START.argtypes = [c_char_p, c_char_p, POINTER(PamConv),
POINTER(PamHandle)]
PAM_END = LIBPAM.pam_end
PAM_END.restype = c_int
PAM_END.argtypes = [PamHandle, c_int]
PAM_AUTHENTICATE = LIBPAM.pam_authenticate
PAM_AUTHENTICATE.restype = c_int
PAM_AUTHENTICATE.argtypes = [PamHandle, c_int]
PAM_OPEN_SESSION = LIBPAM.pam_open_session
PAM_OPEN_SESSION.restype = c_int
PAM_OPEN_SESSION.argtypes = [PamHandle, c_int]
PAM_CLOSE_SESSION = LIBPAM.pam_close_session
PAM_CLOSE_SESSION.restype = c_int
PAM_CLOSE_SESSION.argtypes = [PamHandle, c_int]
PAM_ACCT_MGMT = LIBPAM.pam_acct_mgmt
PAM_ACCT_MGMT.restype = c_int
PAM_ACCT_MGMT.argtypes = [PamHandle, c_int]
PAM_CHAUTHTOK = LIBPAM.pam_chauthtok
PAM_CHAUTHTOK.restype = c_int
PAM_CHAUTHTOK.argtypes = [PamHandle, c_int]
PAM_SETCRED = LIBPAM.pam_setcred
PAM_SETCRED.restype = c_int
PAM_SETCRED.argtypes = [PamHandle, c_int]
PAM_GETENV = LIBPAM.pam_getenv
PAM_GETENV.restype = c_char_p
PAM_GETENV.argtypes = [PamHandle, c_char_p]
PAM_GETENVLIST = LIBPAM.pam_getenvlist
PAM_GETENVLIST.restype = POINTER(c_char_p)
PAM_GETENVLIST.argtypes = [PamHandle]
PAM_PUTENV = LIBPAM.pam_putenv
PAM_PUTENV.restype = c_int
PAM_PUTENV.argtypes = [PamHandle, c_char_p]
PAM_SET_ITEM = LIBPAM.pam_set_item
PAM_SET_ITEM.restype = c_int
PAM_SET_ITEM.argtypes = [PamHandle, c_int, c_char_p]
PAM_GET_ITEM = LIBPAM.pam_get_item
PAM_GET_ITEM.restype = c_int
PAM_GET_ITEM.argtypes = [PamHandle, c_int, POINTER(c_void_p)]
@CONV_FUNC
def default_conv(n_messages, messages, p_response, app_data):
addr = CALLOC(n_messages, sizeof(PamResponse))
p_response[0] = cast(addr, POINTER(PamResponse))
if not sys.stdin.isatty():
return 0
for i in range(n_messages):
msg = messages[i].contents
style = msg.msg_style
msg_string = _bytes_to_str(cast(msg.msg, c_char_p).value)
if style == PAM_TEXT_INFO:
# back from POINTER(c_char) to c_char_p
print(msg_string)
elif style == PAM_ERROR_MSG:
print(msg_string, file=sys.stderr)
elif style in (PAM_PROMPT_ECHO_ON, PAM_PROMPT_ECHO_OFF):
if style == PAM_PROMPT_ECHO_ON:
read_pw = raw_input
else:
read_pw = getpass.getpass
pw_copy = STRDUP(_cast_bytes(read_pw(msg_string)))
p_response.contents[i].resp = pw_copy
p_response.contents[i].resp_retcode = 0
else:
print(repr(messages[i].contents))
return 0
def new_simple_password_conv(passwords, encoding):
passwords = [_cast_bytes(password, encoding) for password in passwords]
passwords.reverse()
@CONV_FUNC
def conv_func(n_messages, messages, p_response, app_data):
"""Simple conversation function that responds to any
prompt where the echo is off with the supplied password"""
# Create an array of n_messages response objects
addr = CALLOC(n_messages, sizeof(PamResponse))
p_response[0] = cast(addr, POINTER(PamResponse))
for i in range(n_messages):
if messages[i].contents.msg_style == PAM_PROMPT_ECHO_OFF:
if not passwords:
return 1
pw_copy = STRDUP(passwords.pop())
p_response.contents[i].resp = pw_copy
p_response.contents[i].resp_retcode = 0
return 0
return conv_func
def pam_start(service, username, conv_func=default_conv, encoding='utf8'):
service = _cast_bytes(service, encoding)
username = _cast_bytes(username, encoding)
handle = PamHandle()
conv = pointer(PamConv(conv_func, 0))
retval = PAM_START(service, username, conv, pointer(handle))
if retval != 0:
PAM_END(handle, retval)
raise PAMError(errno=retval)
return handle
def pam_end(handle, retval=0):
e = PAM_END(handle, retval)
if retval == 0 and e == 0:
return
if retval == 0:
retval = e
raise PAMError(errno=retval)
def authenticate(username, password=None, service='login', encoding='utf-8',
resetcred=PAM_REINITIALIZE_CRED, close=True):
"""Returns None if the given username and password authenticate for the
given service. Raises PAMError otherwise
``username``: the username to authenticate
``password``: the password in plain text
Defaults to None to use PAM's conversation interface
``service``: the PAM service to authenticate against.
Defaults to 'login'
The above parameters can be strings or bytes. If they are strings,
they will be encoded using the encoding given by:
``encoding``: the encoding to use for the above parameters if they
are given as strings. Defaults to 'utf-8'
``resetcred``: Use the pam_setcred() function to
reinitialize the credentials.
Defaults to 'PAM_REINITIALIZE_CRED'.
``close``: If True (default) the transaction will be closed after
authentication; if False the (open) PamHandle instance
will be returned.
"""
if password is None:
conv_func = default_conv
else:
password = _cast_bytes(password, encoding)
conv_func = new_simple_password_conv((password, ), encoding)
handle = pam_start(service, username, conv_func=conv_func, encoding=encoding)
retval = PAM_AUTHENTICATE(handle, 0)
# Re-initialize credentials (for Kerberos users, etc)
# Don't check return code of pam_setcred(), it shouldn't matter
# if this fails
if retval == 0 and resetcred:
PAM_SETCRED(handle, resetcred)
if close:
return pam_end(handle, retval)
elif retval != 0:
raise PAMError(errno=retval)
else:
return handle
def open_session(username, service='login', encoding='utf-8'):
handle = pam_start(service, username, encoding=encoding)
return pam_end(handle, PAM_OPEN_SESSION(handle, 0))
def close_session(username, service='login', encoding='utf-8'):
handle = pam_start(service, username, encoding=encoding)
return pam_end(handle, PAM_CLOSE_SESSION(handle, 0))
def check_account(username, service='login', encoding='utf-8'):
handle = pam_start(service, username, encoding=encoding)
return pam_end(handle, PAM_ACCT_MGMT(handle, 0))
def change_password(username, password=None, service='login', encoding='utf-8'):
if password is None:
conv_func = default_conv
else:
# Password x2 to answer the "Retype new UNIX password:" prompt
# TODO: If we're not running as root the first prompt will be
# 'current password' which we will not answer, so this will not work
# in that case.
conv_func = new_simple_password_conv((password, password), encoding)
handle = pam_start(service, username, conv_func=conv_func, encoding=encoding)
return pam_end(handle, PAM_CHAUTHTOK(handle, 0))
if __name__ == "__main__":
import optparse
usage = "usage: %prog [options] [username]"
parser = optparse.OptionParser(usage=usage)
parser.add_option('-a', '--authenticate', dest='authenticate',
action='store_true', help='authenticate user')
parser.add_option('-o', '--open-session', dest='open_session',
action='store_true', help='open session')
parser.add_option('-c', '--close-session', dest='close_session',
action='store_true', help='close session')
parser.add_option('-v', '--validate-account', dest='validate_account',
action='store_true', help='check account validity')
parser.add_option('-p', '--change-password', dest='change_password',
action='store_true', help='change password')
parser.add_option('-s', '--service', dest='service',
action='store', default='login',
help='PAM service to use [default: %default]')
parser.add_option('-P', '--ask-password', dest='ask_password',
action='store_true', help="own password prompt instead of PAM's")
(o, a) = parser.parse_args()
if not (o.authenticate or \
o.open_session or \
o.close_session or \
o.validate_account or \
o.change_password):
parser.error("One of -a, -o, -c, -v or -p is mandatory")
try:
user = a[0]
except IndexError:
user = getpass.getuser()
if o.authenticate:
if o.ask_password:
password = getpass.getpass()
else:
password = None
try:
authenticate(user, password, o.service)
except PAMError as e:
sys.exit(e)
if o.open_session:
try:
open_session(user, o.service)
except PAMError as e:
sys.exit(e)
if o.close_session:
try:
close_session(user, o.service)
except PAMError as e:
sys.exit(e)
if o.validate_account:
try:
check_account(user, o.service)
except PAMError as e:
sys.exit(e)
if o.change_password:
if o.ask_password:
password = getpass.getpass()
else:
password = None
try:
change_password(user, password, o.service)
except PAMError as e:
sys.exit(e)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment