Overslaan naar inhoud
Odoo Menu
  • Aanmelden
  • Probeer het gratis
  • Apps
    Financiën
    • Boekhouding
    • Facturatie
    • Onkosten
    • Spreadsheet (BI)
    • Documenten
    • Ondertekenen
    Verkoop
    • CRM
    • Verkoop
    • Kassasysteem winkel
    • Kassasysteem Restaurant
    • Abonnementen
    • Verhuur
    Websites
    • Websitebouwer
    • E-commerce
    • Blog
    • Forum
    • Live Chat
    • eLearning
    Bevoorradingsketen
    • Voorraad
    • Productie
    • PLM
    • Inkoop
    • Onderhoud
    • Kwaliteit
    Personeelsbeheer
    • Werknemers
    • Werving & Selectie
    • Verlof
    • Evaluaties
    • Aanbevelingen
    • Wagenpark
    Marketing
    • Social media Marketing
    • E-mailmarketing
    • SMS Marketing
    • Evenementen
    • Marketingautomatisering
    • Enquêtes
    Diensten
    • Project
    • Urenstaten
    • Buitendienst
    • Helpdesk
    • Planning
    • Afspraken
    Productiviteit
    • Chat
    • Goedkeuringen
    • IoT
    • VoIP
    • Kennis
    • WhatsApp
    Apps van derden Odoo Studio Odoo Cloud Platform
  • Bedrijfstakken
    Detailhandel
    • Boekhandel
    • kledingwinkel
    • Meubelzaak
    • Supermarkt
    • Bouwmarkt
    • Speelgoedwinkel
    Food & Hospitality
    • Bar en Pub
    • Restaurant
    • Fastfood
    • Gastenverblijf
    • Drankenhandelaar
    • Hotel
    Vastgoed
    • Makelaarskantoor
    • Architectenbureau
    • Bouw
    • Vastgoedbeheer
    • Tuinieren
    • Vereniging van eigenaren
    Consulting
    • Accountantskantoor
    • Odoo Partner
    • Marketingbureau
    • Advocatenkantoor
    • Talentenwerving
    • Audit & Certificering
    Productie
    • Textiel
    • Metaal
    • Meubels
    • Eten
    • Brewery
    • Relatiegeschenken
    Gezondheid & Fitness
    • Sportclub
    • Opticien
    • Fitnesscentrum
    • Wellness-medewerkers
    • Apotheek
    • Kapper
    Trades
    • Klusjesman
    • IT-hardware & support
    • Zonne-energiesystemen
    • Schoenmaker
    • Schoonmaakdiensten
    • HVAC-diensten
    Andere
    • Non-profitorganisatie
    • Milieuagentschap
    • Verhuur van Billboards
    • Fotograaf
    • Fietsleasing
    • Softwareverkoper
    Browse all Industries
  • Community
    Leren
    • Tutorials
    • Documentatie
    • Certificeringen
    • Training
    • Blog
    • Podcast
    Versterk het onderwijs
    • Onderwijs- programma
    • Scale Up! Business Game
    • Bezoek Odoo
    Download de Software
    • Downloaden
    • Vergelijk edities
    • Releases
    Werk samen
    • Github
    • Forum
    • Evenementen
    • Vertalingen
    • Word een Partner
    • Services for Partners
    • Registreer je accountantskantoor
    Diensten
    • Vind een partner
    • Vind een boekhouder
    • Een adviseur ontmoeten
    • Implementatiediensten
    • Klantreferenties
    • Ondersteuning
    • Upgrades
    Github Youtube Twitter Linkedin Instagram Facebook Spotify
    +1 (650) 691-3277
    Vraag een demo aan
  • Prijzen
  • Help

Odoo is the world's easiest all-in-one management software.
It includes hundreds of business apps:

  • CRM
  • e-Commerce
  • Boekhouding
  • Voorraad
  • PoS
  • Project
  • MRP
All apps
Je moet geregistreerd zijn om te kunnen communiceren met de community.
Alle posts Personen Badges
Labels (Bekijk alle)
odoo accounting v14 pos v15
Over dit forum
Je moet geregistreerd zijn om te kunnen communiceren met de community.
Alle posts Personen Badges
Labels (Bekijk alle)
odoo accounting v14 pos v15
Over dit forum
Help

AttributeError: 'NoneType' object has no attribute 'encode' - Custom module for tax register

Inschrijven

Ontvang een bericht wanneer er activiteit is op deze post

Deze vraag is gerapporteerd
addonscustomodooV9odooV12
1 Beantwoorden
21914 Weergaven
Avatar
Simon

I am getting this error after migrating a custom module for v9 to v12 for the code below after testing connection to external servers. How can I fix this? Thank you

Error:
Odoo Server Error

Traceback (most recent call last):
File "/opt/odoo12/odoo/odoo/http.py", line 654, in _handle_exception
return super(JsonRequest, self)._handle_exception(exception)
File "/opt/odoo12/odoo/odoo/http.py", line 312, in _handle_exception
raise pycompat.reraise(type(exception), exception, sys.exc_info()[2])
File "/opt/odoo12/odoo/odoo/tools/pycompat.py", line 87, in reraise
raise value
File "/opt/odoo12/odoo/odoo/http.py", line 696, in dispatch
result = self._call_function(**self.params)
File "/opt/odoo12/odoo/odoo/http.py", line 344, in _call_function
return checked_call(self.db, *args, **kwargs)
File "/opt/odoo12/odoo/odoo/service/model.py", line 97, in wrapper
return f(dbname, *args, **kwargs)
File "/opt/odoo12/odoo/odoo/http.py", line 337, in checked_call
result = self.endpoint(*a, **kw)
File "/opt/odoo12/odoo/odoo/http.py", line 939, in __call__
return self.method(*args, **kw)
File "/opt/odoo12/odoo/odoo/http.py", line 517, in response_wrap
response = f(*args, **kw)
File "/opt/odoo12/odoo/addons/web/controllers/main.py", line 966, in call_button
action = self._call_kw(model, method, args, {})
File "/opt/odoo12/odoo/addons/web/controllers/main.py", line 954, in _call_kw
return call_kw(request.env[model], method, args, kwargs)
File "/opt/odoo12/odoo/odoo/api.py", line 749, in call_kw
return _call_kw_multi(method, model, args, kwargs)
File "/opt/odoo12/odoo/odoo/api.py", line 736, in _call_kw_multi
result = method(recs, *args, **kwargs)
File "/opt/odoo12/odoo-custom-addons/l10n_si_tax_registry/models/res_company.py", line 31, in action_test_l10n_si_tax_reg_conn
if not s.echo():
File "/opt/odoo12/odoo-custom-addons/l10n_si_tax_registry/SITaxReg/SITaxReg.py", line 127, in echo
sign=False)
File "/opt/odoo12/odoo-custom-addons/l10n_si_tax_registry/SITaxReg/SITaxReg.py", line 265, in send
'---END---' + "\n", self.DEBUG_CLIENT).encode()
AttributeError: 'NoneType' object has no attribute 'encode'



Code v12:


# -*- coding: utf-8 -*-


"""Slovenian tax invoice registration and other commands required by the
system.
"""

import base64
#import Cookie
from http import Cookies
from Crypto.Hash import SHA256
from Crypto.PublicKey import RSA
import Crypto.Signature.PKCS1_v1_5 as PKCS
import datetime
from datetime import datetime as dt
#from httplib import HTTPResponse
from http.client import HTTPResponse
#from httplib import OK
from http.client import OK
from logging import debug
#import md5
import hashlib
from OpenSSL import crypto
from re import sub
import json
import socket
import ssl
from uuid import uuid4

from . import exceptions
#from __init__ import __version__
from .version import __version__
from . import TmpSock
import logging



_log = logging.getLogger(__name__)


class SITaxReg(object):
"""Slovenian Tax Registry client object."""

DEBUG_OFF = 0
DEBUG_SERVER = 1
DEBUG_CLIENT = 2
DEBUG_OUTPUT_PRINT = 'print'
DEBUG_OUTPUT_LOGGER = 'logger'

SI_TAX_REG_DATE_FORMAT = '%Y-%m-%d'
SI_TAX_REG_DT_FORMAT = '%Y-%m-%dT%H:%M:%SZ'

_CRLF = "\r\n"
_ALGO = 'RS256'

_keyfile = ''
_certfile = ''
_ca_cert = None
_cert_reqs = ssl.CERT_NONE
_jws_header = ''
_key = None
_crypto_private_key = None
_debug_output = DEBUG_OUTPUT_PRINT
_debug = DEBUG_OFF

_scheme = 'https'
_host = 'blagajne.fu.gov.si'
_port = 9003
_query_string = '/v1/'

_user_agent = 'Editor d.o.o. SI Tax Registry Python Library/%s'
_cookies = None

_message_id = ''
_request_datetime = None
_zoi = ''

def __init__(self, keyfile, certfile, ca_certs=None, key=None, dev=False):
"""Return a new class instance with specified certificates."""
self._keyfile = keyfile
self._certfile = certfile
self._ca_certs = ca_certs
self._key = key

# When verifier certificates are provided, we require certificate
# authentication.
if self._ca_certs:
self._cert_reqs = ssl.CERT_REQUIRED

self._user_agent = self._user_agent % (__version__)

if dev:
self._host = 'blagajne-test.fu.gov.si'
self._port = 9002

def set_debug_output(self, output):
"""Set debug output destionation."""
self._debug_output = output

def set_debug(self, level):
"""Set debug level."""
self._debug = level

def get_message_id(self):
"""Get message ID of last sent message if any."""
return self._message_id

def get_request_datetime(self):
"""Get last message request time if any."""
return self._request_datetime

def get_zoi(self):
"""Get last used issuer mark in a request if any."""
return self._zoi

def echo(self):
"""Send echo request and verify echo server response."""
res = self.send('cash_registers/echo', {'EchoRequest': 'furs'},
sign=False)
return 'EchoResponse' in res and res['EchoResponse'] == 'furs'

def register_business_premise(self, data):
"""Register a business premise."""
self.send('cash_registers/invoices/register', {
'BusinessPremiseRequest': {'BusinessPremise': data},
})
return True

def issue_electronic_invoice(self, data):
"""Send invoice request for invoices made with an electronic device."""
res = self.send('cash_registers/invoices', {
'InvoiceRequest': {'Invoice': self._populate_invoice_data(data)},
})
return '' if 'UniqueInvoiceID' not in res else res['UniqueInvoiceID']

def issue_batch_electronic_invoice(self, data):
"""Send a list of invoices request for invoices made with an electronic
device.
"""
data_tmp = []
for i, d in enumerate(data):
data_tmp.append({
'RecordNumber': i + 1,
'Invoice': self._populate_invoice_data(d),
})

res = self.send('cash_registers_batch/invoices', {
'InvoiceListRequest': {'InvoiceList': {'RecordInfo': data_tmp}},
})
data_tmp = []
if 'RecordReply' not in res:
return []
# TODO: Check the response list against the request list.
return [(r['ProtectedID'],
r['UniqueInvoiceID']) for r in res['RecordReply']]

def issue_prenumbered_invoice(self, data):
"""Send invoice request for invoices made with a pre-numbered invoice
book.
"""
if 'IssueDate' in data and \
isinstance(data['IssueDate'], datetime.date):
data['IssueDate'] = \
data['IssueDate'].strftime(self.SI_TAX_REG_DATE_FORMAT)
if 'ReferenceSalesBook' in data and \
isinstance(data['ReferenceSalesBook'], list) and \
len(data['ReferenceSalesBook']) > 0 and \
'ReferenceSalesBookIssueDate' in \
data['ReferenceSalesBook'][0] and \
isinstance(
data['ReferenceSalesBook'][0]
['ReferenceSalesBookIssueDate'],
datetime.date
):
data['ReferenceSalesBook'][0]['ReferenceSalesBookIssueDate'] = \
data['ReferenceSalesBook'][0]['ReferenceSalesBookIssueDate'] \
.strftime(self.SI_TAX_REG_DATE_FORMAT)
if 'ReferenceInvoice' in data and \
isinstance(data['ReferenceInvoice'], list) and \
len(data['ReferenceInvoice']) > 0 and \
'ReferenceInvoiceIssueDateTime' in \
data['ReferenceInvoice'][0] and \
isinstance(
data['ReferenceInvoice'][0]
['ReferenceInvoiceIssueDateTime'],
datetime.datetime
):
data['ReferenceInvoice'][0]['ReferenceInvoiceIssueDateTime'] = \
data['ReferenceInvoice'][0]['ReferenceInvoiceIssueDateTime'] \
.strftime(self.SI_TAX_REG_DT_FORMAT)

res = self.send('cash_registers/invoices', {
'InvoiceRequest': {'SalesBookInvoice': data},
})
return '' if 'UniqueInvoiceID' not in res else res['UniqueInvoiceID']

def send(self, endpoint, data, sign=True):
"""Send request data to registry server."""
self._message_id = ''
self._request_datetime = None
self._zoi = ''

if isinstance(data, str):
data = json.loads(data)

if 'InvoiceRequest' in data and \
'Invoice' in data['InvoiceRequest'] and \
'ProtectedID' in data['InvoiceRequest']['Invoice']:
self._zoi = data['InvoiceRequest']['Invoice']['ProtectedID']
elif 'InvoiceListRequest' in data and \
'InvoiceList' in data['InvoiceListRequest'] and \
'RecordInfo' in data['InvoiceListRequest']['InvoiceList']:
str_tmp = len(data['InvoiceListRequest']['InvoiceList']
['RecordInfo'])
if str_tmp > 0:
str_tmp = (data['InvoiceListRequest']['InvoiceList']
['RecordInfo'][str_tmp - 1])
if 'Invoice' in str_tmp and \
'ProtectedID' in str_tmp['Invoice']:
self._zoi = str_tmp['Invoice']['ProtectedID']

if sign:
data = self._inject_header(data)
str_tmp = data.iterkeys().next()
self._message_id = data[str_tmp]['Header']['MessageID']
self._request_datetime = dt.strptime(
data[str_tmp]['Header']['DateTime'],
self.SI_TAX_REG_DT_FORMAT
)
data = self._sign(data)

conn = ssl.wrap_socket(socket.socket(), keyfile=self._keyfile,
certfile=self._certfile,
cert_reqs=self._cert_reqs,
ca_certs=self._ca_certs)
conn.connect((self._host, self._port))

if not isinstance(data, str):
data = json.dumps(data)
str_tmp = [
'POST %s%s HTTP/1.1' % (self._query_string, endpoint),
'Host: %s:%d' % (self._host, self._port),
'User-Agent: ' + (self._user_agent),
'Content-Type: application/json; charset=UTF-8',
'Connection: close',
'Content-Length: ' + str(len(data))
]
if self._cookies:
str_tmp.append(
'Cookie: ' + '; '.join([
x + '=' + self._cookies[x].value for x in self._cookies]
)
)

data = self._CRLF.join(str_tmp) + self._CRLF + self._CRLF + data
self._e_debug("\n" + '---SENDING---' + "\n" + data + "\n" +
'---END---' + "\n", self.DEBUG_CLIENT).encode()

try:
conn.write(data.encode())
except Exception as e:
conn.close()
raise e

data = ''
str_tmp = True
while str_tmp != '':
str_tmp = conn.read(4096)
data += str_tmp.decode()
conn.close()
str_tmp = None

self._e_debug("\n" + '---GOT---' + "\n" + data + "\n" +
'---END---' + "\n", self.DEBUG_SERVER)

data = self._parse_http_response(data)

# TODO: Improve cookie management.
c = data.getheader('Set-Cookie') or data.getheader('Cookie')
self._cookies = Cookies.Simple.Cookie(c) if c else None

if data.status != OK:
raise exceptions.SITaxServerError(data.read(), data.status)

data = data.read()
data = json.loads(data)
_log.info('\n*--------\n'
'*RECEIVE: %s\n'
'*-----------\n', data)
if sign:
data = self._parse_signed_response(data)
# Take only the payload.
data = data[1]
# Get sub-content.
data = data.itervalues().next()
# Remove header.
del(data['Header'])
# Any potencial errors should be stored here. If found, raise an
# exception.
if 'Error' in data:
raise exceptions.SITaxRegistryError(
data['Error']['ErrorMessage'],
data['Error']['ErrorCode']
)

return data

def get_session_cookie(self):
"""Get session cookie object."""
return self._cookies

def set_session_cookie(self, cookie):
"""Set session cookie object."""
self._cookies = cookie

def _inject_header(self, data, uid=None, dtm=None):
"""Insert message identification in first and only child of dict."""
if not uid:
uid = str(uuid4())
if not dtm:
dtm = dt.utcnow()
if isinstance(data, str):
data = json.loads(data)
if not isinstance(data, dict):
raise TypeError('Data inappropriate to perform signature on')
if len(data) != 1:
raise ValueError('Data inappropriate to perform signature on')
h = data.iterkeys().next()
if not isinstance(data[h], dict):
raise TypeError('Data inappropriate to perform signature on')
data[h]['Header'] = {
'MessageID': uid,
'DateTime': dtm.strftime(self.SI_TAX_REG_DT_FORMAT),
}
return data

def _sign(self, data):
"""Sign the request message."""
h = self._get_jws_header() + '.' + self._jws_base64_encode(
json.dumps(data))
return {'token': h + '.' + self._signature(h)}

def _signature(self, data):
"""Create a signature out of a certificate header and already encoded
and joined message payload.
"""
self._check_and_load_private_key()

h = SHA256.new(data.encode('UTF-8'))
return self._jws_base64_encode(
PKCS.new(self._crypto_private_key).sign(h))

def _parse_http_response(self, response_str):
"""Parse response string into an HTTP object."""
sock = TmpSock(response_str)
res = HTTPResponse(sock)
res.begin()
return res

def _parse_signed_response(self, data):
"""Parse and fix encoded response from the tax registry server."""
res = []
data = data['token'].split('.')
signature = data.pop()
# Parse JWS header and payload.
for d in data:
res.append(json.loads(self._jws_base64_decode(d)))
# TODO: Parse signature.
res.append(signature)
return res

def _get_jws_header(self):
"""Parse if necessary and get public certificate JWS header."""
if self._jws_header == '':
# If header hasn't been made yet, construct it and store in buffer
# for possible later access.
# TODO: This is the only place where we need the OpenSSL module.
# See if we can somehow read the data without it so we can remove
# the dependency.
b = open(self._certfile, 'r').read()
x = crypto.load_certificate(crypto.FILETYPE_PEM, b)

jws_header = {
'alg': self._ALGO,
'subject_name': ','.join(map(lambda x: x[0] + '=' + x[1],
x.get_subject().get_components())),
'issuer_name': ','.join(map(lambda x: x[0] + '=' + x[1],
x.get_issuer().get_components())),
'serial': x.get_serial_number(),
}

self._jws_header = self._jws_base64_encode(json.dumps(jws_header))

return self._jws_header

def _jws_base64_encode(self, s):
"""Encode string into a JWS compliant base64 format."""
return base64.urlsafe_b64encode(s).replace(b'=', b'')

def _jws_base64_decode(self, s):
"""Decode string from a JWS base64 format."""
s += b'=' * (4 - (len(s) % 4))
return base64.urlsafe_b64decode(s.encode('UTF-8'))

def _calculate_protective_mark(self, tax_number, date, invoice_number,
premise, device, amount):
"""Generate "ZOI" signature."""
self._check_and_load_private_key()
sig = str(tax_number) + date + invoice_number + premise + device \
+ str(amount)
sig = SHA256.new(sig.encode('UTF-8')).digest()
sig = self._crypto_private_key.sign(sig, '')[0]
return hashlib.md5.new(str(sig)).hexdigest()

def _populate_invoice_data(self, data):
"""Add missing and correct existing data in invoice dict."""
if 'IssueDateTime' in data and \
isinstance(data['IssueDateTime'], datetime.datetime):
data['IssueDateTime'] = \
data['IssueDateTime'].strftime(self.SI_TAX_REG_DT_FORMAT)
if 'ReferenceInvoice' in data and \
isinstance(data['ReferenceInvoice'], list) and \
len(data['ReferenceInvoice']) > 0 and \
'ReferenceInvoiceIssueDateTime' in \
data['ReferenceInvoice'][0] and \
isinstance(
data['ReferenceInvoice'][0]
['ReferenceInvoiceIssueDateTime'],
datetime.datetime
):
data['ReferenceInvoice'][0]['ReferenceInvoiceIssueDateTime'] = \
data['ReferenceInvoice'][0]['ReferenceInvoiceIssueDateTime'] \
.strftime(self.SI_TAX_REG_DT_FORMAT)
if 'ReferenceSalesBook' in data and \
isinstance(data['ReferenceSalesBook'], list) and \
len(data['ReferenceSalesBook']) > 0 and \
'ReferenceSalesBookIssueDate' in \
data['ReferenceSalesBook'][0] and \
isinstance(
data['ReferenceSalesBook'][0]
['ReferenceSalesBookIssueDate'],
datetime.date
):
data['ReferenceSalesBook'][0]['ReferenceSalesBookIssueDate'] = \
data['ReferenceSalesBook'][0]['ReferenceSalesBookIssueDate'] \
.strftime(self.SI_TAX_REG_DATE_FORMAT)

if 'ProtectedID' not in data:
data['ProtectedID'] = self._calculate_protective_mark(
data['TaxNumber'], data['IssueDateTime'],
data['InvoiceIdentifier']['InvoiceNumber'],
data['InvoiceIdentifier']['BusinessPremiseID'],
data['InvoiceIdentifier']['ElectronicDeviceID'],
data['InvoiceAmount']
)
_log.info('\n*-----------\n'
'*SEND: %s\n'
'*-------------\n', data)
return data

def _check_and_load_private_key(self):
"""Load the private key content into class property."""
if not self._crypto_private_key:
h = open(self._keyfile, 'r').read()
self._crypto_private_key = RSA.importKey(h, passphrase=self._key)

def _e_debug(self, msg, level):
"""Output debug message to user-selected method."""
if level > self._debug:
return

if self._debug_output == 'logger':
debug(msg)
else:
msg = sub('/(\r\n|\r|\n)/ms', "\n", msg)
print (dt.now().strftime('%Y-%m-%d %H:%M:%S') + "\t" \
+ msg.strip() + "\n")


Code v9:

# -*- coding: utf-8 -*-


"""Slovenian tax invoice registration and other commands required by the
system.
"""

import base64
import Cookie
from Crypto.Hash import SHA256
from Crypto.PublicKey import RSA
import Crypto.Signature.PKCS1_v1_5 as PKCS
import datetime
from datetime import datetime as dt
from httplib import HTTPResponse
from httplib import OK
from logging import debug
import md5
from OpenSSL import crypto
from re import sub
import json
import socket
import ssl
from uuid import uuid4

import exceptions
from __init__ import __version__
from TmpSock import TmpSock
import logging
_log = logging.getLogger(__name__)


class SITaxReg(object):
"""Slovenian Tax Registry client object."""

DEBUG_OFF = 0
DEBUG_SERVER = 1
DEBUG_CLIENT = 2
DEBUG_OUTPUT_PRINT = 'print'
DEBUG_OUTPUT_LOGGER = 'logger'

SI_TAX_REG_DATE_FORMAT = '%Y-%m-%d'
SI_TAX_REG_DT_FORMAT = '%Y-%m-%dT%H:%M:%SZ'

_CRLF = "\r\n"
_ALGO = 'RS256'

_keyfile = ''
_certfile = ''
_ca_cert = None
_cert_reqs = ssl.CERT_NONE
_jws_header = ''
_key = None
_crypto_private_key = None
_debug_output = DEBUG_OUTPUT_PRINT
_debug = DEBUG_OFF

_scheme = 'https'
_host = 'blagajne.fu.gov.si'
_port = 9003
_query_string = '/v1/'

_user_agent = 'Editor d.o.o. SI Tax Registry Python Library/%s'
_cookies = None

_message_id = ''
_request_datetime = None
_zoi = ''

def __init__(self, keyfile, certfile, ca_certs=None, key=None, dev=False):
"""Return a new class instance with specified certificates."""
self._keyfile = keyfile
self._certfile = certfile
self._ca_certs = ca_certs
self._key = key

# When verifier certificates are provided, we require certificate
# authentication.
if self._ca_certs:
self._cert_reqs = ssl.CERT_REQUIRED

self._user_agent = self._user_agent % (__version__)

if dev:
self._host = 'blagajne-test.fu.gov.si'
self._port = 9002

def set_debug_output(self, output):
"""Set debug output destionation."""
self._debug_output = output

def set_debug(self, level):
"""Set debug level."""
self._debug = level

def get_message_id(self):
"""Get message ID of last sent message if any."""
return self._message_id

def get_request_datetime(self):
"""Get last message request time if any."""
return self._request_datetime

def get_zoi(self):
"""Get last used issuer mark in a request if any."""
return self._zoi

def echo(self):
"""Send echo request and verify echo server response."""
res = self.send('cash_registers/echo', {'EchoRequest': 'furs'},
sign=False)
return 'EchoResponse' in res and res['EchoResponse'] == 'furs'

def register_business_premise(self, data):
"""Register a business premise."""
self.send('cash_registers/invoices/register', {
'BusinessPremiseRequest': {'BusinessPremise': data},
})
return True

def issue_electronic_invoice(self, data):
"""Send invoice request for invoices made with an electronic device."""
res = self.send('cash_registers/invoices', {
'InvoiceRequest': {'Invoice': self._populate_invoice_data(data)},
})
return '' if 'UniqueInvoiceID' not in res else res['UniqueInvoiceID']

def issue_batch_electronic_invoice(self, data):
"""Send a list of invoices request for invoices made with an electronic
device.
"""
data_tmp = []
for i, d in enumerate(data):
data_tmp.append({
'RecordNumber': i + 1,
'Invoice': self._populate_invoice_data(d),
})

res = self.send('cash_registers_batch/invoices', {
'InvoiceListRequest': {'InvoiceList': {'RecordInfo': data_tmp}},
})
data_tmp = []
if 'RecordReply' not in res:
return []
# TODO: Check the response list against the request list.
return [(r['ProtectedID'],
r['UniqueInvoiceID']) for r in res['RecordReply']]

def issue_prenumbered_invoice(self, data):
"""Send invoice request for invoices made with a pre-numbered invoice
book.
"""
if 'IssueDate' in data and \
isinstance(data['IssueDate'], datetime.date):
data['IssueDate'] = \
data['IssueDate'].strftime(self.SI_TAX_REG_DATE_FORMAT)
if 'ReferenceSalesBook' in data and \
isinstance(data['ReferenceSalesBook'], list) and \
len(data['ReferenceSalesBook']) > 0 and \
'ReferenceSalesBookIssueDate' in \
data['ReferenceSalesBook'][0] and \
isinstance(
data['ReferenceSalesBook'][0]
['ReferenceSalesBookIssueDate'],
datetime.date
):
data['ReferenceSalesBook'][0]['ReferenceSalesBookIssueDate'] = \
data['ReferenceSalesBook'][0]['ReferenceSalesBookIssueDate'] \
.strftime(self.SI_TAX_REG_DATE_FORMAT)
if 'ReferenceInvoice' in data and \
isinstance(data['ReferenceInvoice'], list) and \
len(data['ReferenceInvoice']) > 0 and \
'ReferenceInvoiceIssueDateTime' in \
data['ReferenceInvoice'][0] and \
isinstance(
data['ReferenceInvoice'][0]
['ReferenceInvoiceIssueDateTime'],
datetime.datetime
):
data['ReferenceInvoice'][0]['ReferenceInvoiceIssueDateTime'] = \
data['ReferenceInvoice'][0]['ReferenceInvoiceIssueDateTime'] \
.strftime(self.SI_TAX_REG_DT_FORMAT)

res = self.send('cash_registers/invoices', {
'InvoiceRequest': {'SalesBookInvoice': data},
})
return '' if 'UniqueInvoiceID' not in res else res['UniqueInvoiceID']

def send(self, endpoint, data, sign=True):
"""Send request data to registry server."""
self._message_id = ''
self._request_datetime = None
self._zoi = ''

if isinstance(data, str):
data = json.loads(data)

if 'InvoiceRequest' in data and \
'Invoice' in data['InvoiceRequest'] and \
'ProtectedID' in data['InvoiceRequest']['Invoice']:
self._zoi = data['InvoiceRequest']['Invoice']['ProtectedID']
elif 'InvoiceListRequest' in data and \
'InvoiceList' in data['InvoiceListRequest'] and \
'RecordInfo' in data['InvoiceListRequest']['InvoiceList']:
str_tmp = len(data['InvoiceListRequest']['InvoiceList']
['RecordInfo'])
if str_tmp > 0:
str_tmp = (data['InvoiceListRequest']['InvoiceList']
['RecordInfo'][str_tmp - 1])
if 'Invoice' in str_tmp and \
'ProtectedID' in str_tmp['Invoice']:
self._zoi = str_tmp['Invoice']['ProtectedID']

if sign:
data = self._inject_header(data)
str_tmp = data.iterkeys().next()
self._message_id = data[str_tmp]['Header']['MessageID']
self._request_datetime = dt.strptime(
data[str_tmp]['Header']['DateTime'],
self.SI_TAX_REG_DT_FORMAT
)
data = self._sign(data)

conn = ssl.wrap_socket(socket.socket(), keyfile=self._keyfile,
certfile=self._certfile,
cert_reqs=self._cert_reqs,
ca_certs=self._ca_certs)
conn.connect((self._host, self._port))

if not isinstance(data, str):
data = json.dumps(data)
str_tmp = [
'POST %s%s HTTP/1.1' % (self._query_string, endpoint),
'Host: %s:%d' % (self._host, self._port),
'User-Agent: ' + (self._user_agent),
'Content-Type: application/json; charset=UTF-8',
'Connection: close',
'Content-Length: ' + str(len(data))
]
if self._cookies:
str_tmp.append(
'Cookie: ' + '; '.join([
x + '=' + self._cookies[x].value for x in self._cookies]
)
)

data = self._CRLF.join(str_tmp) + self._CRLF + self._CRLF + data
self._e_debug("\n" + '---SENDING---' + "\n" + data + "\n" +
'---END---' + "\n", self.DEBUG_CLIENT)

try:
conn.write(data)
except Exception as e:
conn.close()
raise e

data = ''
str_tmp = True
while str_tmp != '':
str_tmp = conn.read(4096)
data += str_tmp
conn.close()
str_tmp = None

self._e_debug("\n" + '---GOT---' + "\n" + data + "\n" +
'---END---' + "\n", self.DEBUG_SERVER)

data = self._parse_http_response(data)

# TODO: Improve cookie management.
c = data.getheader('Set-Cookie') or data.getheader('Cookie')
self._cookies = Cookie.SimpleCookie(c) if c else None

if data.status != OK:
raise exceptions.SITaxServerError(data.read(), data.status)

data = data.read()
data = json.loads(data)
# _log.info('\n*--------\n'
# '*RECEIVE: %s\n'
# '*-----------\n', data)
if sign:
data = self._parse_signed_response(data)
# Take only the payload.
data = data[1]
# Get sub-content.
data = data.itervalues().next()
# Remove header.
del(data['Header'])
# Any potencial errors should be stored here. If found, raise an
# exception.
if 'Error' in data:
raise exceptions.SITaxRegistryError(
data['Error']['ErrorMessage'],
data['Error']['ErrorCode']
)

return data

def get_session_cookie(self):
"""Get session cookie object."""
return self._cookies

def set_session_cookie(self, cookie):
"""Set session cookie object."""
self._cookies = cookie

def _inject_header(self, data, uid=None, dtm=None):
"""Insert message identification in first and only child of dict."""
if not uid:
uid = str(uuid4())
if not dtm:
dtm = dt.utcnow()
if isinstance(data, str):
data = json.loads(data)
if not isinstance(data, dict):
raise TypeError('Data inappropriate to perform signature on')
if len(data) != 1:
raise ValueError('Data inappropriate to perform signature on')
h = data.iterkeys().next()
if not isinstance(data[h], dict):
raise TypeError('Data inappropriate to perform signature on')
data[h]['Header'] = {
'MessageID': uid,
'DateTime': dtm.strftime(self.SI_TAX_REG_DT_FORMAT),
}
return data

def _sign(self, data):
"""Sign the request message."""
h = self._get_jws_header() + '.' + self._jws_base64_encode(
json.dumps(data))
return {'token': h + '.' + self._signature(h)}

def _signature(self, data):
"""Create a signature out of a certificate header and already encoded
and joined message payload.
"""
self._check_and_load_private_key()

h = SHA256.new(data.encode('UTF-8'))
return self._jws_base64_encode(
PKCS.new(self._crypto_private_key).sign(h))

def _parse_http_response(self, response_str):
"""Parse response string into an HTTP object."""
sock = TmpSock(response_str)
res = HTTPResponse(sock)
res.begin()
return res

def _parse_signed_response(self, data):
"""Parse and fix encoded response from the tax registry server."""
res = []
data = data['token'].split('.')
signature = data.pop()
# Parse JWS header and payload.
for d in data:
res.append(json.loads(self._jws_base64_decode(d)))
# TODO: Parse signature.
res.append(signature)
return res

def _get_jws_header(self):
"""Parse if necessary and get public certificate JWS header."""
if self._jws_header == '':
# If header hasn't been made yet, construct it and store in buffer
# for possible later access.
# TODO: This is the only place where we need the OpenSSL module.
# See if we can somehow read the data without it so we can remove
# the dependency.
b = open(self._certfile, 'r').read()
x = crypto.load_certificate(crypto.FILETYPE_PEM, b)

jws_header = {
'alg': self._ALGO,
'subject_name': ','.join(map(lambda x: x[0] + '=' + x[1],
x.get_subject().get_components())),
'issuer_name': ','.join(map(lambda x: x[0] + '=' + x[1],
x.get_issuer().get_components())),
'serial': x.get_serial_number(),
}

self._jws_header = self._jws_base64_encode(json.dumps(jws_header))

return self._jws_header

def _jws_base64_encode(self, s):
"""Encode string into a JWS compliant base64 format."""
return base64.urlsafe_b64encode(s).replace(b'=', b'')

def _jws_base64_decode(self, s):
"""Decode string from a JWS base64 format."""
s += b'=' * (4 - (len(s) % 4))
return base64.urlsafe_b64decode(s.encode('UTF-8'))

def _calculate_protective_mark(self, tax_number, date, invoice_number,
premise, device, amount):
"""Generate "ZOI" signature."""
self._check_and_load_private_key()
sig = str(tax_number) + date + invoice_number + premise + device \
+ str(amount)
sig = SHA256.new(sig.encode('UTF-8')).digest()
sig = self._crypto_private_key.sign(sig, '')[0]
return md5.new(str(sig)).hexdigest()

def _populate_invoice_data(self, data):
"""Add missing and correct existing data in invoice dict."""
if 'IssueDateTime' in data and \
isinstance(data['IssueDateTime'], datetime.datetime):
data['IssueDateTime'] = \
data['IssueDateTime'].strftime(self.SI_TAX_REG_DT_FORMAT)
if 'ReferenceInvoice' in data and \
isinstance(data['ReferenceInvoice'], list) and \
len(data['ReferenceInvoice']) > 0 and \
'ReferenceInvoiceIssueDateTime' in \
data['ReferenceInvoice'][0] and \
isinstance(
data['ReferenceInvoice'][0]
['ReferenceInvoiceIssueDateTime'],
datetime.datetime
):
data['ReferenceInvoice'][0]['ReferenceInvoiceIssueDateTime'] = \
data['ReferenceInvoice'][0]['ReferenceInvoiceIssueDateTime'] \
.strftime(self.SI_TAX_REG_DT_FORMAT)
if 'ReferenceSalesBook' in data and \
isinstance(data['ReferenceSalesBook'], list) and \
len(data['ReferenceSalesBook']) > 0 and \
'ReferenceSalesBookIssueDate' in \
data['ReferenceSalesBook'][0] and \
isinstance(
data['ReferenceSalesBook'][0]
['ReferenceSalesBookIssueDate'],
datetime.date
):
data['ReferenceSalesBook'][0]['ReferenceSalesBookIssueDate'] = \
data['ReferenceSalesBook'][0]['ReferenceSalesBookIssueDate'] \
.strftime(self.SI_TAX_REG_DATE_FORMAT)

if 'ProtectedID' not in data:
data['ProtectedID'] = self._calculate_protective_mark(
data['TaxNumber'], data['IssueDateTime'],
data['InvoiceIdentifier']['InvoiceNumber'],
data['InvoiceIdentifier']['BusinessPremiseID'],
data['InvoiceIdentifier']['ElectronicDeviceID'],
data['InvoiceAmount']
)
# _log.info('\n*-----------\n'
# '*SEND: %s\n'
# '*-------------\n', data)
return data

def _check_and_load_private_key(self):
"""Load the private key content into class property."""
if not self._crypto_private_key:
h = open(self._keyfile, 'r').read()
self._crypto_private_key = RSA.importKey(h, passphrase=self._key)

def _e_debug(self, msg, level):
"""Output debug message to user-selected method."""
if level > self._debug:
return

if self._debug_output == 'logger':
debug(msg)
else:
msg = sub('/(\r\n|\r|\n)/ms', "\n", msg)
print dt.now().strftime('%Y-%m-%d %H:%M:%S') + "\t" \
+ msg.strip() + "\n"

0
Avatar
Annuleer
Avatar
shalin wilson
Beste antwoord

Nonetype object comes when u dont have any values at    self.DEBUG_CLIENT
please debug and check what value u have at self.DEBUG_CLIENT

0
Avatar
Annuleer
Geniet je van het gesprek? Blijf niet alleen lezen, doe ook mee!

Maak vandaag nog een account aan om te profiteren van exclusieve functies en deel uit te maken van onze geweldige community!

Aanmelden
Gerelateerde posts Antwoorden Weergaven Activiteit
Odoo 14: custom addons path not working
addons custom v14
Avatar
0
dec. 20
7221
ValueError: Invalid field 'module_l10n_fr_hr_payroll' on model 'res.config.settings'
field addons custom v15
Avatar
Avatar
1
mrt. 22
7135
Add a third-party application on odoo 13
addons ubuntu custom terminal Odoo13.0
Avatar
Avatar
2
okt. 20
8611
odoo (V12) Error pops "a mandatory field is not set" when i try a custom module in multi company Opgelost
custom urgent multicompany mandatory odooV12
Avatar
Avatar
1
jan. 20
5006
Odoo isn't seeing an add-on that I downloaded and placed in the addons folder Opgelost
addons
Avatar
Avatar
Avatar
Avatar
Avatar
5
okt. 25
2806
Community
  • Tutorials
  • Documentatie
  • Forum
Open Source
  • Downloaden
  • Github
  • Runbot
  • Vertalingen
Diensten
  • Odoo.sh Hosting
  • Ondersteuning
  • Upgrade
  • Gepersonaliseerde ontwikkelingen
  • Onderwijs
  • Vind een boekhouder
  • Vind een partner
  • Word een Partner
Over ons
  • Ons bedrijf
  • Merkelementen
  • Neem contact met ons op
  • Vacatures
  • Evenementen
  • Podcast
  • Blog
  • Klanten
  • Juridisch • Privacy
  • Beveiliging
الْعَرَبيّة Català 简体中文 繁體中文 (台灣) Čeština Dansk Nederlands English Suomi Français Deutsch हिंदी Bahasa Indonesia Italiano 日本語 한국어 (KR) Lietuvių kalba Język polski Português (BR) română русский язык Slovenský jazyk slovenščina Español (América Latina) Español ภาษาไทย Türkçe українська Tiếng Việt

Odoo is een suite van open source zakelijke apps die aan al je bedrijfsbehoeften voldoet: CRM, E-commerce, boekhouding, inventaris, kassasysteem, projectbeheer, enz.

Odoo's unieke waardepropositie is om tegelijkertijd zeer gebruiksvriendelijk en volledig geïntegreerd te zijn.

Website made with

Odoo Experience on YouTube

1. Use the live chat to ask your questions.
2. The operator answers within a few minutes.

Live support on Youtube
Watch now