Good night, I have a mistake that I can't identify. What could be the problem, I would appreciate your ideas.
The error appears when I try an incorrect password.
addons: auth_brute_force
odoo:12
Traceback (most recent call last):
File "/usr/lib/python3/dist-packages/odoo/addons/base/models/ir_http.py", line 203, in _dispatch
result = request.dispatch()
File "/usr/lib/python3/dist-packages/odoo/http.py", line 833, in dispatch
r = self._call_function(**self.params)
File "/usr/lib/python3/dist-packages/odoo/http.py", line 344, in _call_function
return checked_call(self.db, *args, **kwargs)
File "/usr/lib/python3/dist-packages/odoo/service/model.py", line 97, in wrapper
return f(dbname, *args, **kwargs)
File "/usr/lib/python3/dist-packages/odoo/http.py", line 337, in checked_call
result = self.endpoint(*a, **kw)
File "/usr/lib/python3/dist-packages/odoo/http.py", line 939, in __call__
return self.method(*args, **kw)
File "/usr/lib/python3/dist-packages/odoo/http.py", line 517, in response_wrap
response = f(*args, **kw)
File "/usr/lib/python3/dist-packages/odoo/addons/website/controllers/main.py", line 96, in web_login
response = super(Website, self).web_login(redirect=redirect, *args, **kw)
File "/usr/lib/python3/dist-packages/odoo/http.py", line 517, in response_wrap
response = f(*args, **kw)
File "/usr/lib/python3/dist-packages/odoo/addons/auth_signup/controllers/main.py", line 21, in web_login
response = super(AuthSignupHome, self).web_login(*args, **kw)
File "/usr/lib/python3/dist-packages/odoo/http.py", line 517, in response_wrap
response = f(*args, **kw)
File "/usr/lib/python3/dist-packages/odoo/addons/web/controllers/main.py", line 491, in web_login
uid = request.session.authenticate(request.session.db, request.params['login'], request.params['password'])
File "/usr/lib/python3/dist-packages/odoo/http.py", line 1039, in authenticate
uid = odoo.registry(db)['res.users'].authenticate(db, login, password, env)
File "/mnt/extra-addons/auth_brute_force/models/res_users.py", line 122, in authenticate
lambda: super(ResUsers, cls).authenticate(
File "/mnt/extra-addons/auth_brute_force/models/res_users.py", line 67, in _auth_attempt_force_raise
result = method()
File "/mnt/extra-addons/auth_brute_force/models/res_users.py", line 123, in <lambda>
db, login, password, user_agent_env),
File "/usr/lib/python3/dist-packages/odoo/addons/base/models/res_users.py", line 581, in authenticate
uid = cls._login(db, login, password)
File "/mnt/extra-addons/auth_brute_force/models/res_users.py", line 115, in _login
lambda: super(ResUsers, cls)._login(db, login, password),
File "/mnt/extra-addons/auth_brute_force/models/res_users.py", line 73, in _auth_attempt_force_raise
return result
UnboundLocalError: local variable 'result' referenced before assignment
import logging
from contextlib import contextmanager
from threading import current_thread
from odoo import api, models, SUPERUSER_ID
from odoo.exceptions import AccessDenied
from odoo.service import wsgi_server
_logger = logging.getLogger(__name__)
class ResUsers(models.Model):
_inherit = "res.users"
# HACK https://github.com/odoo/odoo/issues/24183
# TODO Remove in v12, and use normal odoo.http.request to get details
@api.model_cr
def _register_hook(self):
"""🐒-patch XML-RPC controller to know remote address."""
original_fn = wsgi_server.application_unproxied
def _patch(environ, start_response):
current_thread().environ = environ
return original_fn(environ, start_response)
wsgi_server.application_unproxied = _patch
# Helpers to track authentication attempts
@classmethod
@contextmanager
def _auth_attempt(cls, login):
"""Start an authentication attempt and track its state."""
try:
# Check if this call is nested
attempt_id = current_thread().auth_attempt_id
except AttributeError:
# Not nested; create a new attempt
attempt_id = cls._auth_attempt_new(login)
if not attempt_id:
# No attempt was created, so there's nothing to do here
yield
return
try:
current_thread().auth_attempt_id = attempt_id
result = "successful"
try:
yield
except AccessDenied as error:
result = getattr(error, "reason", "failed")
raise
finally:
cls._auth_attempt_update({"result": result})
finally:
try:
del current_thread().auth_attempt_id
except AttributeError:
pass # It was deleted already
@classmethod
def _auth_attempt_force_raise(cls, login, method):
"""Force a method to raise an AccessDenied on falsey return."""
try:
with cls._auth_attempt(login):
result = method()
if not result:
# Force exception to record auth failure
raise AccessDenied()
except AccessDenied:
pass # `_auth_attempt()` did the hard part already
return result
@classmethod
def _auth_attempt_new(cls, login):
"""Store one authentication attempt, not knowing the result."""
# Get the right remote address
try:
remote_addr = current_thread().environ["REMOTE_ADDR"]
except (KeyError, AttributeError):
remote_addr = False
# Exit if it doesn't make sense to store this attempt
if not remote_addr:
return False
# Use a separate cursor to keep changes always
with cls.pool.cursor() as cr:
env = api.Environment(cr, SUPERUSER_ID, {})
attempt = env["res.authentication.attempt"].create({
"login": login,
"remote": remote_addr,
})
return attempt.id
@classmethod
def _auth_attempt_update(cls, values):
"""Update a given auth attempt if we still ignore its result."""
auth_id = getattr(current_thread(), "auth_attempt_id", False)
if not auth_id:
return {} # No running auth attempt; nothing to do
# Use a separate cursor to keep changes always
with cls.pool.cursor() as cr:
env = api.Environment(cr, SUPERUSER_ID, {})
attempt = env["res.authentication.attempt"].browse(auth_id)
# Update only on 1st call
if not attempt.result:
attempt.write(values)
return attempt.copy_data()[0] if attempt else {}
# Override all auth-related core methods
@classmethod
def _login(cls, db, login, password):
return cls._auth_attempt_force_raise(
login,
lambda: super(ResUsers, cls)._login(db, login, password),
)
@classmethod
def authenticate(cls, db, login, password, user_agent_env):
return cls._auth_attempt_force_raise(
login,
lambda: super(ResUsers, cls).authenticate(
db, login, password, user_agent_env),
)
@api.model
def check_credentials(self, password):
login = self.env.user.login
with self._auth_attempt(login):
# Update login, just in case we stored the UID before
attempt = self._auth_attempt_update({"login": login})
remote = attempt.get("remote")
# Fail if the remote is banned
trusted = self.env["res.authentication.attempt"]._trusted(
remote,
login,
)
if not trusted:
error = AccessDenied()
error.reason = "banned"
raise error
# Continue with other auth systems
return super(ResUsers, self).check_credentials(password)