#!/usr/bin/python ''' SimpleHTTPServer with basic and digest auth. (c) 2012-2014 Jan ONDREJ (SAL) This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. ''' from BaseHTTPServer import HTTPServer, BaseHTTPRequestHandler from SocketServer import ThreadingMixIn, ForkingMixIn import string, random, hashlib, urllib2 from paste.auth.digest import digest_password, \ AuthDigestAuthenticator, HTTPUnauthorized class ThreadingHTTPServer(ThreadingMixIn, HTTPServer): pass class ForkingHTTPServer(ForkingMixIn, HTTPServer): pass class AuthHTTPRequestHandler(BaseHTTPRequestHandler): realm = "AuthHTTPServer" users = {} remote_user = None ada = None # initialized later # basic auth def check_basic_auth(self): auth_hdr = self.headers.getheader('Authorization') if auth_hdr: method, auth = auth_hdr.split(" ", 1) if method.lower()=="basic": username, password = auth.decode("base64").split(":", 1) if username in self.users and self.users[username]==password: return True #print "Wrong auth:", auth_hdr.split()[-1].decode("base64") self.send_response(401) self.send_header('WWW-Authenticate', 'Basic realm=\"%s\"' % self.realm) self.send_header('Content-type', 'text/html') self.end_headers() return False # digest auth def check_digest_auth(self): if self.ada is None: self.ada = AuthDigestAuthenticator(self.realm, digest_password) auth_hdr = self.headers.getheader('Authorization') if auth_hdr: chal = urllib2.parse_keqv_list(urllib2.parse_http_list( auth_hdr.split(' ',1)[-1] )) #print "Challenge:", chal if chal['username'] in self.users: response = self.ada.compute( #ha1 = digest_password(self.realm, chal['username'], # self.users[chal['username']]), ha1 = self.users[chal['username']], username = chal['username'], response = chal['response'], method = 'GET', path = self.path, nonce = chal['nonce'], nc = chal['nc'], cnonce = chal['cnonce'], qop = chal['qop'] ) else: response = self.ada.build_authentication() if not isinstance(response, HTTPUnauthorized): self.remote_user = response return True else: response = self.ada.build_authentication() self.send_response(401) for header in response.headers: self.send_header(header[0], header[1]) self.end_headers()