Skip to Content
Odoo Menu
  • Sign in
  • Try it free
  • Apps
    Finance
    • Accounting
    • Invoicing
    • Expenses
    • Spreadsheet (BI)
    • Documents
    • Sign
    Sales
    • CRM
    • Sales
    • POS Shop
    • POS Restaurant
    • Subscriptions
    • Rental
    Websites
    • Website Builder
    • eCommerce
    • Blog
    • Forum
    • Live Chat
    • eLearning
    Supply Chain
    • Inventory
    • Manufacturing
    • PLM
    • Purchase
    • Maintenance
    • Quality
    Human Resources
    • Employees
    • Recruitment
    • Time Off
    • Appraisals
    • Referrals
    • Fleet
    Marketing
    • Social Marketing
    • Email Marketing
    • SMS Marketing
    • Events
    • Marketing Automation
    • Surveys
    Services
    • Project
    • Timesheets
    • Field Service
    • Helpdesk
    • Planning
    • Appointments
    Productivity
    • Discuss
    • Approvals
    • IoT
    • VoIP
    • Knowledge
    • WhatsApp
    Third party apps Odoo Studio Odoo Cloud Platform
  • Industries
    Retail
    • Book Store
    • Clothing Store
    • Furniture Store
    • Grocery Store
    • Hardware Store
    • Toy Store
    Food & Hospitality
    • Bar and Pub
    • Restaurant
    • Fast Food
    • Guest House
    • Beverage Distributor
    • Hotel
    Real Estate
    • Real Estate Agency
    • Architecture Firm
    • Construction
    • Estate Management
    • Gardening
    • Property Owner Association
    Consulting
    • Accounting Firm
    • Odoo Partner
    • Marketing Agency
    • Law firm
    • Talent Acquisition
    • Audit & Certification
    Manufacturing
    • Textile
    • Metal
    • Furnitures
    • Food
    • Brewery
    • Corporate Gifts
    Health & Fitness
    • Sports Club
    • Eyewear Store
    • Fitness Center
    • Wellness Practitioners
    • Pharmacy
    • Hair Salon
    Trades
    • Handyman
    • IT Hardware & Support
    • Solar Energy Systems
    • Shoe Maker
    • Cleaning Services
    • HVAC Services
    Others
    • Nonprofit Organization
    • Environmental Agency
    • Billboard Rental
    • Photography
    • Bike Leasing
    • Software Reseller
    Browse all Industries
  • Community
    Learn
    • Tutorials
    • Documentation
    • Certifications
    • Training
    • Blog
    • Podcast
    Empower Education
    • Education Program
    • Scale Up! Business Game
    • Visit Odoo
    Get the Software
    • Download
    • Compare Editions
    • Releases
    Collaborate
    • Github
    • Forum
    • Events
    • Translations
    • Become a Partner
    • Services for Partners
    • Register your Accounting Firm
    Get Services
    • Find a Partner
    • Find an Accountant
    • Meet an advisor
    • Implementation Services
    • Customer References
    • Support
    • Upgrades
    Github Youtube Twitter Linkedin Instagram Facebook Spotify
    +1 (650) 691-3277
    Get a demo
  • Pricing
  • Help

Odoo is the world's easiest all-in-one management software.
It includes hundreds of business apps:

  • CRM
  • e-Commerce
  • Accounting
  • Inventory
  • PoS
  • Project
  • MRP
All apps
You need to be registered to interact with the community.
All Posts People Badges
Tags (View all)
odoo accounting v14 pos v15
About this forum
You need to be registered to interact with the community.
All Posts People Badges
Tags (View all)
odoo accounting v14 pos v15
About this forum
Help

AttributeError: 'NoneType' object has no attribute 'encode' - Custom module for tax register

Subscribe

Get notified when there's activity on this post

This question has been flagged
addonscustomodooV9odooV12
1 Reply
21825 Views
Avatar
Simon

I am getting this error after migrating a custom module for v9 to v12 for the code below after testing connection to external servers. How can I fix this? Thank you

Error:
Odoo Server Error

Traceback (most recent call last):
File "/opt/odoo12/odoo/odoo/http.py", line 654, in _handle_exception
return super(JsonRequest, self)._handle_exception(exception)
File "/opt/odoo12/odoo/odoo/http.py", line 312, in _handle_exception
raise pycompat.reraise(type(exception), exception, sys.exc_info()[2])
File "/opt/odoo12/odoo/odoo/tools/pycompat.py", line 87, in reraise
raise value
File "/opt/odoo12/odoo/odoo/http.py", line 696, in dispatch
result = self._call_function(**self.params)
File "/opt/odoo12/odoo/odoo/http.py", line 344, in _call_function
return checked_call(self.db, *args, **kwargs)
File "/opt/odoo12/odoo/odoo/service/model.py", line 97, in wrapper
return f(dbname, *args, **kwargs)
File "/opt/odoo12/odoo/odoo/http.py", line 337, in checked_call
result = self.endpoint(*a, **kw)
File "/opt/odoo12/odoo/odoo/http.py", line 939, in __call__
return self.method(*args, **kw)
File "/opt/odoo12/odoo/odoo/http.py", line 517, in response_wrap
response = f(*args, **kw)
File "/opt/odoo12/odoo/addons/web/controllers/main.py", line 966, in call_button
action = self._call_kw(model, method, args, {})
File "/opt/odoo12/odoo/addons/web/controllers/main.py", line 954, in _call_kw
return call_kw(request.env[model], method, args, kwargs)
File "/opt/odoo12/odoo/odoo/api.py", line 749, in call_kw
return _call_kw_multi(method, model, args, kwargs)
File "/opt/odoo12/odoo/odoo/api.py", line 736, in _call_kw_multi
result = method(recs, *args, **kwargs)
File "/opt/odoo12/odoo-custom-addons/l10n_si_tax_registry/models/res_company.py", line 31, in action_test_l10n_si_tax_reg_conn
if not s.echo():
File "/opt/odoo12/odoo-custom-addons/l10n_si_tax_registry/SITaxReg/SITaxReg.py", line 127, in echo
sign=False)
File "/opt/odoo12/odoo-custom-addons/l10n_si_tax_registry/SITaxReg/SITaxReg.py", line 265, in send
'---END---' + "\n", self.DEBUG_CLIENT).encode()
AttributeError: 'NoneType' object has no attribute 'encode'



Code v12:


# -*- coding: utf-8 -*-


"""Slovenian tax invoice registration and other commands required by the
system.
"""

import base64
#import Cookie
from http import Cookies
from Crypto.Hash import SHA256
from Crypto.PublicKey import RSA
import Crypto.Signature.PKCS1_v1_5 as PKCS
import datetime
from datetime import datetime as dt
#from httplib import HTTPResponse
from http.client import HTTPResponse
#from httplib import OK
from http.client import OK
from logging import debug
#import md5
import hashlib
from OpenSSL import crypto
from re import sub
import json
import socket
import ssl
from uuid import uuid4

from . import exceptions
#from __init__ import __version__
from .version import __version__
from . import TmpSock
import logging



_log = logging.getLogger(__name__)


class SITaxReg(object):
"""Slovenian Tax Registry client object."""

DEBUG_OFF = 0
DEBUG_SERVER = 1
DEBUG_CLIENT = 2
DEBUG_OUTPUT_PRINT = 'print'
DEBUG_OUTPUT_LOGGER = 'logger'

SI_TAX_REG_DATE_FORMAT = '%Y-%m-%d'
SI_TAX_REG_DT_FORMAT = '%Y-%m-%dT%H:%M:%SZ'

_CRLF = "\r\n"
_ALGO = 'RS256'

_keyfile = ''
_certfile = ''
_ca_cert = None
_cert_reqs = ssl.CERT_NONE
_jws_header = ''
_key = None
_crypto_private_key = None
_debug_output = DEBUG_OUTPUT_PRINT
_debug = DEBUG_OFF

_scheme = 'https'
_host = 'blagajne.fu.gov.si'
_port = 9003
_query_string = '/v1/'

_user_agent = 'Editor d.o.o. SI Tax Registry Python Library/%s'
_cookies = None

_message_id = ''
_request_datetime = None
_zoi = ''

def __init__(self, keyfile, certfile, ca_certs=None, key=None, dev=False):
"""Return a new class instance with specified certificates."""
self._keyfile = keyfile
self._certfile = certfile
self._ca_certs = ca_certs
self._key = key

# When verifier certificates are provided, we require certificate
# authentication.
if self._ca_certs:
self._cert_reqs = ssl.CERT_REQUIRED

self._user_agent = self._user_agent % (__version__)

if dev:
self._host = 'blagajne-test.fu.gov.si'
self._port = 9002

def set_debug_output(self, output):
"""Set debug output destionation."""
self._debug_output = output

def set_debug(self, level):
"""Set debug level."""
self._debug = level

def get_message_id(self):
"""Get message ID of last sent message if any."""
return self._message_id

def get_request_datetime(self):
"""Get last message request time if any."""
return self._request_datetime

def get_zoi(self):
"""Get last used issuer mark in a request if any."""
return self._zoi

def echo(self):
"""Send echo request and verify echo server response."""
res = self.send('cash_registers/echo', {'EchoRequest': 'furs'},
sign=False)
return 'EchoResponse' in res and res['EchoResponse'] == 'furs'

def register_business_premise(self, data):
"""Register a business premise."""
self.send('cash_registers/invoices/register', {
'BusinessPremiseRequest': {'BusinessPremise': data},
})
return True

def issue_electronic_invoice(self, data):
"""Send invoice request for invoices made with an electronic device."""
res = self.send('cash_registers/invoices', {
'InvoiceRequest': {'Invoice': self._populate_invoice_data(data)},
})
return '' if 'UniqueInvoiceID' not in res else res['UniqueInvoiceID']

def issue_batch_electronic_invoice(self, data):
"""Send a list of invoices request for invoices made with an electronic
device.
"""
data_tmp = []
for i, d in enumerate(data):
data_tmp.append({
'RecordNumber': i + 1,
'Invoice': self._populate_invoice_data(d),
})

res = self.send('cash_registers_batch/invoices', {
'InvoiceListRequest': {'InvoiceList': {'RecordInfo': data_tmp}},
})
data_tmp = []
if 'RecordReply' not in res:
return []
# TODO: Check the response list against the request list.
return [(r['ProtectedID'],
r['UniqueInvoiceID']) for r in res['RecordReply']]

def issue_prenumbered_invoice(self, data):
"""Send invoice request for invoices made with a pre-numbered invoice
book.
"""
if 'IssueDate' in data and \
isinstance(data['IssueDate'], datetime.date):
data['IssueDate'] = \
data['IssueDate'].strftime(self.SI_TAX_REG_DATE_FORMAT)
if 'ReferenceSalesBook' in data and \
isinstance(data['ReferenceSalesBook'], list) and \
len(data['ReferenceSalesBook']) > 0 and \
'ReferenceSalesBookIssueDate' in \
data['ReferenceSalesBook'][0] and \
isinstance(
data['ReferenceSalesBook'][0]
['ReferenceSalesBookIssueDate'],
datetime.date
):
data['ReferenceSalesBook'][0]['ReferenceSalesBookIssueDate'] = \
data['ReferenceSalesBook'][0]['ReferenceSalesBookIssueDate'] \
.strftime(self.SI_TAX_REG_DATE_FORMAT)
if 'ReferenceInvoice' in data and \
isinstance(data['ReferenceInvoice'], list) and \
len(data['ReferenceInvoice']) > 0 and \
'ReferenceInvoiceIssueDateTime' in \
data['ReferenceInvoice'][0] and \
isinstance(
data['ReferenceInvoice'][0]
['ReferenceInvoiceIssueDateTime'],
datetime.datetime
):
data['ReferenceInvoice'][0]['ReferenceInvoiceIssueDateTime'] = \
data['ReferenceInvoice'][0]['ReferenceInvoiceIssueDateTime'] \
.strftime(self.SI_TAX_REG_DT_FORMAT)

res = self.send('cash_registers/invoices', {
'InvoiceRequest': {'SalesBookInvoice': data},
})
return '' if 'UniqueInvoiceID' not in res else res['UniqueInvoiceID']

def send(self, endpoint, data, sign=True):
"""Send request data to registry server."""
self._message_id = ''
self._request_datetime = None
self._zoi = ''

if isinstance(data, str):
data = json.loads(data)

if 'InvoiceRequest' in data and \
'Invoice' in data['InvoiceRequest'] and \
'ProtectedID' in data['InvoiceRequest']['Invoice']:
self._zoi = data['InvoiceRequest']['Invoice']['ProtectedID']
elif 'InvoiceListRequest' in data and \
'InvoiceList' in data['InvoiceListRequest'] and \
'RecordInfo' in data['InvoiceListRequest']['InvoiceList']:
str_tmp = len(data['InvoiceListRequest']['InvoiceList']
['RecordInfo'])
if str_tmp > 0:
str_tmp = (data['InvoiceListRequest']['InvoiceList']
['RecordInfo'][str_tmp - 1])
if 'Invoice' in str_tmp and \
'ProtectedID' in str_tmp['Invoice']:
self._zoi = str_tmp['Invoice']['ProtectedID']

if sign:
data = self._inject_header(data)
str_tmp = data.iterkeys().next()
self._message_id = data[str_tmp]['Header']['MessageID']
self._request_datetime = dt.strptime(
data[str_tmp]['Header']['DateTime'],
self.SI_TAX_REG_DT_FORMAT
)
data = self._sign(data)

conn = ssl.wrap_socket(socket.socket(), keyfile=self._keyfile,
certfile=self._certfile,
cert_reqs=self._cert_reqs,
ca_certs=self._ca_certs)
conn.connect((self._host, self._port))

if not isinstance(data, str):
data = json.dumps(data)
str_tmp = [
'POST %s%s HTTP/1.1' % (self._query_string, endpoint),
'Host: %s:%d' % (self._host, self._port),
'User-Agent: ' + (self._user_agent),
'Content-Type: application/json; charset=UTF-8',
'Connection: close',
'Content-Length: ' + str(len(data))
]
if self._cookies:
str_tmp.append(
'Cookie: ' + '; '.join([
x + '=' + self._cookies[x].value for x in self._cookies]
)
)

data = self._CRLF.join(str_tmp) + self._CRLF + self._CRLF + data
self._e_debug("\n" + '---SENDING---' + "\n" + data + "\n" +
'---END---' + "\n", self.DEBUG_CLIENT).encode()

try:
conn.write(data.encode())
except Exception as e:
conn.close()
raise e

data = ''
str_tmp = True
while str_tmp != '':
str_tmp = conn.read(4096)
data += str_tmp.decode()
conn.close()
str_tmp = None

self._e_debug("\n" + '---GOT---' + "\n" + data + "\n" +
'---END---' + "\n", self.DEBUG_SERVER)

data = self._parse_http_response(data)

# TODO: Improve cookie management.
c = data.getheader('Set-Cookie') or data.getheader('Cookie')
self._cookies = Cookies.Simple.Cookie(c) if c else None

if data.status != OK:
raise exceptions.SITaxServerError(data.read(), data.status)

data = data.read()
data = json.loads(data)
_log.info('\n*--------\n'
'*RECEIVE: %s\n'
'*-----------\n', data)
if sign:
data = self._parse_signed_response(data)
# Take only the payload.
data = data[1]
# Get sub-content.
data = data.itervalues().next()
# Remove header.
del(data['Header'])
# Any potencial errors should be stored here. If found, raise an
# exception.
if 'Error' in data:
raise exceptions.SITaxRegistryError(
data['Error']['ErrorMessage'],
data['Error']['ErrorCode']
)

return data

def get_session_cookie(self):
"""Get session cookie object."""
return self._cookies

def set_session_cookie(self, cookie):
"""Set session cookie object."""
self._cookies = cookie

def _inject_header(self, data, uid=None, dtm=None):
"""Insert message identification in first and only child of dict."""
if not uid:
uid = str(uuid4())
if not dtm:
dtm = dt.utcnow()
if isinstance(data, str):
data = json.loads(data)
if not isinstance(data, dict):
raise TypeError('Data inappropriate to perform signature on')
if len(data) != 1:
raise ValueError('Data inappropriate to perform signature on')
h = data.iterkeys().next()
if not isinstance(data[h], dict):
raise TypeError('Data inappropriate to perform signature on')
data[h]['Header'] = {
'MessageID': uid,
'DateTime': dtm.strftime(self.SI_TAX_REG_DT_FORMAT),
}
return data

def _sign(self, data):
"""Sign the request message."""
h = self._get_jws_header() + '.' + self._jws_base64_encode(
json.dumps(data))
return {'token': h + '.' + self._signature(h)}

def _signature(self, data):
"""Create a signature out of a certificate header and already encoded
and joined message payload.
"""
self._check_and_load_private_key()

h = SHA256.new(data.encode('UTF-8'))
return self._jws_base64_encode(
PKCS.new(self._crypto_private_key).sign(h))

def _parse_http_response(self, response_str):
"""Parse response string into an HTTP object."""
sock = TmpSock(response_str)
res = HTTPResponse(sock)
res.begin()
return res

def _parse_signed_response(self, data):
"""Parse and fix encoded response from the tax registry server."""
res = []
data = data['token'].split('.')
signature = data.pop()
# Parse JWS header and payload.
for d in data:
res.append(json.loads(self._jws_base64_decode(d)))
# TODO: Parse signature.
res.append(signature)
return res

def _get_jws_header(self):
"""Parse if necessary and get public certificate JWS header."""
if self._jws_header == '':
# If header hasn't been made yet, construct it and store in buffer
# for possible later access.
# TODO: This is the only place where we need the OpenSSL module.
# See if we can somehow read the data without it so we can remove
# the dependency.
b = open(self._certfile, 'r').read()
x = crypto.load_certificate(crypto.FILETYPE_PEM, b)

jws_header = {
'alg': self._ALGO,
'subject_name': ','.join(map(lambda x: x[0] + '=' + x[1],
x.get_subject().get_components())),
'issuer_name': ','.join(map(lambda x: x[0] + '=' + x[1],
x.get_issuer().get_components())),
'serial': x.get_serial_number(),
}

self._jws_header = self._jws_base64_encode(json.dumps(jws_header))

return self._jws_header

def _jws_base64_encode(self, s):
"""Encode string into a JWS compliant base64 format."""
return base64.urlsafe_b64encode(s).replace(b'=', b'')

def _jws_base64_decode(self, s):
"""Decode string from a JWS base64 format."""
s += b'=' * (4 - (len(s) % 4))
return base64.urlsafe_b64decode(s.encode('UTF-8'))

def _calculate_protective_mark(self, tax_number, date, invoice_number,
premise, device, amount):
"""Generate "ZOI" signature."""
self._check_and_load_private_key()
sig = str(tax_number) + date + invoice_number + premise + device \
+ str(amount)
sig = SHA256.new(sig.encode('UTF-8')).digest()
sig = self._crypto_private_key.sign(sig, '')[0]
return hashlib.md5.new(str(sig)).hexdigest()

def _populate_invoice_data(self, data):
"""Add missing and correct existing data in invoice dict."""
if 'IssueDateTime' in data and \
isinstance(data['IssueDateTime'], datetime.datetime):
data['IssueDateTime'] = \
data['IssueDateTime'].strftime(self.SI_TAX_REG_DT_FORMAT)
if 'ReferenceInvoice' in data and \
isinstance(data['ReferenceInvoice'], list) and \
len(data['ReferenceInvoice']) > 0 and \
'ReferenceInvoiceIssueDateTime' in \
data['ReferenceInvoice'][0] and \
isinstance(
data['ReferenceInvoice'][0]
['ReferenceInvoiceIssueDateTime'],
datetime.datetime
):
data['ReferenceInvoice'][0]['ReferenceInvoiceIssueDateTime'] = \
data['ReferenceInvoice'][0]['ReferenceInvoiceIssueDateTime'] \
.strftime(self.SI_TAX_REG_DT_FORMAT)
if 'ReferenceSalesBook' in data and \
isinstance(data['ReferenceSalesBook'], list) and \
len(data['ReferenceSalesBook']) > 0 and \
'ReferenceSalesBookIssueDate' in \
data['ReferenceSalesBook'][0] and \
isinstance(
data['ReferenceSalesBook'][0]
['ReferenceSalesBookIssueDate'],
datetime.date
):
data['ReferenceSalesBook'][0]['ReferenceSalesBookIssueDate'] = \
data['ReferenceSalesBook'][0]['ReferenceSalesBookIssueDate'] \
.strftime(self.SI_TAX_REG_DATE_FORMAT)

if 'ProtectedID' not in data:
data['ProtectedID'] = self._calculate_protective_mark(
data['TaxNumber'], data['IssueDateTime'],
data['InvoiceIdentifier']['InvoiceNumber'],
data['InvoiceIdentifier']['BusinessPremiseID'],
data['InvoiceIdentifier']['ElectronicDeviceID'],
data['InvoiceAmount']
)
_log.info('\n*-----------\n'
'*SEND: %s\n'
'*-------------\n', data)
return data

def _check_and_load_private_key(self):
"""Load the private key content into class property."""
if not self._crypto_private_key:
h = open(self._keyfile, 'r').read()
self._crypto_private_key = RSA.importKey(h, passphrase=self._key)

def _e_debug(self, msg, level):
"""Output debug message to user-selected method."""
if level > self._debug:
return

if self._debug_output == 'logger':
debug(msg)
else:
msg = sub('/(\r\n|\r|\n)/ms', "\n", msg)
print (dt.now().strftime('%Y-%m-%d %H:%M:%S') + "\t" \
+ msg.strip() + "\n")


Code v9:

# -*- coding: utf-8 -*-


"""Slovenian tax invoice registration and other commands required by the
system.
"""

import base64
import Cookie
from Crypto.Hash import SHA256
from Crypto.PublicKey import RSA
import Crypto.Signature.PKCS1_v1_5 as PKCS
import datetime
from datetime import datetime as dt
from httplib import HTTPResponse
from httplib import OK
from logging import debug
import md5
from OpenSSL import crypto
from re import sub
import json
import socket
import ssl
from uuid import uuid4

import exceptions
from __init__ import __version__
from TmpSock import TmpSock
import logging
_log = logging.getLogger(__name__)


class SITaxReg(object):
"""Slovenian Tax Registry client object."""

DEBUG_OFF = 0
DEBUG_SERVER = 1
DEBUG_CLIENT = 2
DEBUG_OUTPUT_PRINT = 'print'
DEBUG_OUTPUT_LOGGER = 'logger'

SI_TAX_REG_DATE_FORMAT = '%Y-%m-%d'
SI_TAX_REG_DT_FORMAT = '%Y-%m-%dT%H:%M:%SZ'

_CRLF = "\r\n"
_ALGO = 'RS256'

_keyfile = ''
_certfile = ''
_ca_cert = None
_cert_reqs = ssl.CERT_NONE
_jws_header = ''
_key = None
_crypto_private_key = None
_debug_output = DEBUG_OUTPUT_PRINT
_debug = DEBUG_OFF

_scheme = 'https'
_host = 'blagajne.fu.gov.si'
_port = 9003
_query_string = '/v1/'

_user_agent = 'Editor d.o.o. SI Tax Registry Python Library/%s'
_cookies = None

_message_id = ''
_request_datetime = None
_zoi = ''

def __init__(self, keyfile, certfile, ca_certs=None, key=None, dev=False):
"""Return a new class instance with specified certificates."""
self._keyfile = keyfile
self._certfile = certfile
self._ca_certs = ca_certs
self._key = key

# When verifier certificates are provided, we require certificate
# authentication.
if self._ca_certs:
self._cert_reqs = ssl.CERT_REQUIRED

self._user_agent = self._user_agent % (__version__)

if dev:
self._host = 'blagajne-test.fu.gov.si'
self._port = 9002

def set_debug_output(self, output):
"""Set debug output destionation."""
self._debug_output = output

def set_debug(self, level):
"""Set debug level."""
self._debug = level

def get_message_id(self):
"""Get message ID of last sent message if any."""
return self._message_id

def get_request_datetime(self):
"""Get last message request time if any."""
return self._request_datetime

def get_zoi(self):
"""Get last used issuer mark in a request if any."""
return self._zoi

def echo(self):
"""Send echo request and verify echo server response."""
res = self.send('cash_registers/echo', {'EchoRequest': 'furs'},
sign=False)
return 'EchoResponse' in res and res['EchoResponse'] == 'furs'

def register_business_premise(self, data):
"""Register a business premise."""
self.send('cash_registers/invoices/register', {
'BusinessPremiseRequest': {'BusinessPremise': data},
})
return True

def issue_electronic_invoice(self, data):
"""Send invoice request for invoices made with an electronic device."""
res = self.send('cash_registers/invoices', {
'InvoiceRequest': {'Invoice': self._populate_invoice_data(data)},
})
return '' if 'UniqueInvoiceID' not in res else res['UniqueInvoiceID']

def issue_batch_electronic_invoice(self, data):
"""Send a list of invoices request for invoices made with an electronic
device.
"""
data_tmp = []
for i, d in enumerate(data):
data_tmp.append({
'RecordNumber': i + 1,
'Invoice': self._populate_invoice_data(d),
})

res = self.send('cash_registers_batch/invoices', {
'InvoiceListRequest': {'InvoiceList': {'RecordInfo': data_tmp}},
})
data_tmp = []
if 'RecordReply' not in res:
return []
# TODO: Check the response list against the request list.
return [(r['ProtectedID'],
r['UniqueInvoiceID']) for r in res['RecordReply']]

def issue_prenumbered_invoice(self, data):
"""Send invoice request for invoices made with a pre-numbered invoice
book.
"""
if 'IssueDate' in data and \
isinstance(data['IssueDate'], datetime.date):
data['IssueDate'] = \
data['IssueDate'].strftime(self.SI_TAX_REG_DATE_FORMAT)
if 'ReferenceSalesBook' in data and \
isinstance(data['ReferenceSalesBook'], list) and \
len(data['ReferenceSalesBook']) > 0 and \
'ReferenceSalesBookIssueDate' in \
data['ReferenceSalesBook'][0] and \
isinstance(
data['ReferenceSalesBook'][0]
['ReferenceSalesBookIssueDate'],
datetime.date
):
data['ReferenceSalesBook'][0]['ReferenceSalesBookIssueDate'] = \
data['ReferenceSalesBook'][0]['ReferenceSalesBookIssueDate'] \
.strftime(self.SI_TAX_REG_DATE_FORMAT)
if 'ReferenceInvoice' in data and \
isinstance(data['ReferenceInvoice'], list) and \
len(data['ReferenceInvoice']) > 0 and \
'ReferenceInvoiceIssueDateTime' in \
data['ReferenceInvoice'][0] and \
isinstance(
data['ReferenceInvoice'][0]
['ReferenceInvoiceIssueDateTime'],
datetime.datetime
):
data['ReferenceInvoice'][0]['ReferenceInvoiceIssueDateTime'] = \
data['ReferenceInvoice'][0]['ReferenceInvoiceIssueDateTime'] \
.strftime(self.SI_TAX_REG_DT_FORMAT)

res = self.send('cash_registers/invoices', {
'InvoiceRequest': {'SalesBookInvoice': data},
})
return '' if 'UniqueInvoiceID' not in res else res['UniqueInvoiceID']

def send(self, endpoint, data, sign=True):
"""Send request data to registry server."""
self._message_id = ''
self._request_datetime = None
self._zoi = ''

if isinstance(data, str):
data = json.loads(data)

if 'InvoiceRequest' in data and \
'Invoice' in data['InvoiceRequest'] and \
'ProtectedID' in data['InvoiceRequest']['Invoice']:
self._zoi = data['InvoiceRequest']['Invoice']['ProtectedID']
elif 'InvoiceListRequest' in data and \
'InvoiceList' in data['InvoiceListRequest'] and \
'RecordInfo' in data['InvoiceListRequest']['InvoiceList']:
str_tmp = len(data['InvoiceListRequest']['InvoiceList']
['RecordInfo'])
if str_tmp > 0:
str_tmp = (data['InvoiceListRequest']['InvoiceList']
['RecordInfo'][str_tmp - 1])
if 'Invoice' in str_tmp and \
'ProtectedID' in str_tmp['Invoice']:
self._zoi = str_tmp['Invoice']['ProtectedID']

if sign:
data = self._inject_header(data)
str_tmp = data.iterkeys().next()
self._message_id = data[str_tmp]['Header']['MessageID']
self._request_datetime = dt.strptime(
data[str_tmp]['Header']['DateTime'],
self.SI_TAX_REG_DT_FORMAT
)
data = self._sign(data)

conn = ssl.wrap_socket(socket.socket(), keyfile=self._keyfile,
certfile=self._certfile,
cert_reqs=self._cert_reqs,
ca_certs=self._ca_certs)
conn.connect((self._host, self._port))

if not isinstance(data, str):
data = json.dumps(data)
str_tmp = [
'POST %s%s HTTP/1.1' % (self._query_string, endpoint),
'Host: %s:%d' % (self._host, self._port),
'User-Agent: ' + (self._user_agent),
'Content-Type: application/json; charset=UTF-8',
'Connection: close',
'Content-Length: ' + str(len(data))
]
if self._cookies:
str_tmp.append(
'Cookie: ' + '; '.join([
x + '=' + self._cookies[x].value for x in self._cookies]
)
)

data = self._CRLF.join(str_tmp) + self._CRLF + self._CRLF + data
self._e_debug("\n" + '---SENDING---' + "\n" + data + "\n" +
'---END---' + "\n", self.DEBUG_CLIENT)

try:
conn.write(data)
except Exception as e:
conn.close()
raise e

data = ''
str_tmp = True
while str_tmp != '':
str_tmp = conn.read(4096)
data += str_tmp
conn.close()
str_tmp = None

self._e_debug("\n" + '---GOT---' + "\n" + data + "\n" +
'---END---' + "\n", self.DEBUG_SERVER)

data = self._parse_http_response(data)

# TODO: Improve cookie management.
c = data.getheader('Set-Cookie') or data.getheader('Cookie')
self._cookies = Cookie.SimpleCookie(c) if c else None

if data.status != OK:
raise exceptions.SITaxServerError(data.read(), data.status)

data = data.read()
data = json.loads(data)
# _log.info('\n*--------\n'
# '*RECEIVE: %s\n'
# '*-----------\n', data)
if sign:
data = self._parse_signed_response(data)
# Take only the payload.
data = data[1]
# Get sub-content.
data = data.itervalues().next()
# Remove header.
del(data['Header'])
# Any potencial errors should be stored here. If found, raise an
# exception.
if 'Error' in data:
raise exceptions.SITaxRegistryError(
data['Error']['ErrorMessage'],
data['Error']['ErrorCode']
)

return data

def get_session_cookie(self):
"""Get session cookie object."""
return self._cookies

def set_session_cookie(self, cookie):
"""Set session cookie object."""
self._cookies = cookie

def _inject_header(self, data, uid=None, dtm=None):
"""Insert message identification in first and only child of dict."""
if not uid:
uid = str(uuid4())
if not dtm:
dtm = dt.utcnow()
if isinstance(data, str):
data = json.loads(data)
if not isinstance(data, dict):
raise TypeError('Data inappropriate to perform signature on')
if len(data) != 1:
raise ValueError('Data inappropriate to perform signature on')
h = data.iterkeys().next()
if not isinstance(data[h], dict):
raise TypeError('Data inappropriate to perform signature on')
data[h]['Header'] = {
'MessageID': uid,
'DateTime': dtm.strftime(self.SI_TAX_REG_DT_FORMAT),
}
return data

def _sign(self, data):
"""Sign the request message."""
h = self._get_jws_header() + '.' + self._jws_base64_encode(
json.dumps(data))
return {'token': h + '.' + self._signature(h)}

def _signature(self, data):
"""Create a signature out of a certificate header and already encoded
and joined message payload.
"""
self._check_and_load_private_key()

h = SHA256.new(data.encode('UTF-8'))
return self._jws_base64_encode(
PKCS.new(self._crypto_private_key).sign(h))

def _parse_http_response(self, response_str):
"""Parse response string into an HTTP object."""
sock = TmpSock(response_str)
res = HTTPResponse(sock)
res.begin()
return res

def _parse_signed_response(self, data):
"""Parse and fix encoded response from the tax registry server."""
res = []
data = data['token'].split('.')
signature = data.pop()
# Parse JWS header and payload.
for d in data:
res.append(json.loads(self._jws_base64_decode(d)))
# TODO: Parse signature.
res.append(signature)
return res

def _get_jws_header(self):
"""Parse if necessary and get public certificate JWS header."""
if self._jws_header == '':
# If header hasn't been made yet, construct it and store in buffer
# for possible later access.
# TODO: This is the only place where we need the OpenSSL module.
# See if we can somehow read the data without it so we can remove
# the dependency.
b = open(self._certfile, 'r').read()
x = crypto.load_certificate(crypto.FILETYPE_PEM, b)

jws_header = {
'alg': self._ALGO,
'subject_name': ','.join(map(lambda x: x[0] + '=' + x[1],
x.get_subject().get_components())),
'issuer_name': ','.join(map(lambda x: x[0] + '=' + x[1],
x.get_issuer().get_components())),
'serial': x.get_serial_number(),
}

self._jws_header = self._jws_base64_encode(json.dumps(jws_header))

return self._jws_header

def _jws_base64_encode(self, s):
"""Encode string into a JWS compliant base64 format."""
return base64.urlsafe_b64encode(s).replace(b'=', b'')

def _jws_base64_decode(self, s):
"""Decode string from a JWS base64 format."""
s += b'=' * (4 - (len(s) % 4))
return base64.urlsafe_b64decode(s.encode('UTF-8'))

def _calculate_protective_mark(self, tax_number, date, invoice_number,
premise, device, amount):
"""Generate "ZOI" signature."""
self._check_and_load_private_key()
sig = str(tax_number) + date + invoice_number + premise + device \
+ str(amount)
sig = SHA256.new(sig.encode('UTF-8')).digest()
sig = self._crypto_private_key.sign(sig, '')[0]
return md5.new(str(sig)).hexdigest()

def _populate_invoice_data(self, data):
"""Add missing and correct existing data in invoice dict."""
if 'IssueDateTime' in data and \
isinstance(data['IssueDateTime'], datetime.datetime):
data['IssueDateTime'] = \
data['IssueDateTime'].strftime(self.SI_TAX_REG_DT_FORMAT)
if 'ReferenceInvoice' in data and \
isinstance(data['ReferenceInvoice'], list) and \
len(data['ReferenceInvoice']) > 0 and \
'ReferenceInvoiceIssueDateTime' in \
data['ReferenceInvoice'][0] and \
isinstance(
data['ReferenceInvoice'][0]
['ReferenceInvoiceIssueDateTime'],
datetime.datetime
):
data['ReferenceInvoice'][0]['ReferenceInvoiceIssueDateTime'] = \
data['ReferenceInvoice'][0]['ReferenceInvoiceIssueDateTime'] \
.strftime(self.SI_TAX_REG_DT_FORMAT)
if 'ReferenceSalesBook' in data and \
isinstance(data['ReferenceSalesBook'], list) and \
len(data['ReferenceSalesBook']) > 0 and \
'ReferenceSalesBookIssueDate' in \
data['ReferenceSalesBook'][0] and \
isinstance(
data['ReferenceSalesBook'][0]
['ReferenceSalesBookIssueDate'],
datetime.date
):
data['ReferenceSalesBook'][0]['ReferenceSalesBookIssueDate'] = \
data['ReferenceSalesBook'][0]['ReferenceSalesBookIssueDate'] \
.strftime(self.SI_TAX_REG_DATE_FORMAT)

if 'ProtectedID' not in data:
data['ProtectedID'] = self._calculate_protective_mark(
data['TaxNumber'], data['IssueDateTime'],
data['InvoiceIdentifier']['InvoiceNumber'],
data['InvoiceIdentifier']['BusinessPremiseID'],
data['InvoiceIdentifier']['ElectronicDeviceID'],
data['InvoiceAmount']
)
# _log.info('\n*-----------\n'
# '*SEND: %s\n'
# '*-------------\n', data)
return data

def _check_and_load_private_key(self):
"""Load the private key content into class property."""
if not self._crypto_private_key:
h = open(self._keyfile, 'r').read()
self._crypto_private_key = RSA.importKey(h, passphrase=self._key)

def _e_debug(self, msg, level):
"""Output debug message to user-selected method."""
if level > self._debug:
return

if self._debug_output == 'logger':
debug(msg)
else:
msg = sub('/(\r\n|\r|\n)/ms', "\n", msg)
print dt.now().strftime('%Y-%m-%d %H:%M:%S') + "\t" \
+ msg.strip() + "\n"

0
Avatar
Discard
Avatar
shalin wilson
Best Answer

Nonetype object comes when u dont have any values at    self.DEBUG_CLIENT
please debug and check what value u have at self.DEBUG_CLIENT

0
Avatar
Discard
Enjoying the discussion? Don't just read, join in!

Create an account today to enjoy exclusive features and engage with our awesome community!

Sign up
Related Posts Replies Views Activity
Odoo 14: custom addons path not working
addons custom v14
Avatar
0
Dec 20
7147
ValueError: Invalid field 'module_l10n_fr_hr_payroll' on model 'res.config.settings'
field addons custom v15
Avatar
Avatar
1
Mar 22
7048
Add a third-party application on odoo 13
addons ubuntu custom terminal Odoo13.0
Avatar
Avatar
2
Oct 20
8513
odoo (V12) Error pops "a mandatory field is not set" when i try a custom module in multi company Solved
custom urgent multicompany mandatory odooV12
Avatar
Avatar
1
Jan 20
4932
Odoo isn't seeing an add-on that I downloaded and placed in the addons folder Solved
addons
Avatar
Avatar
Avatar
Avatar
Avatar
5
Oct 25
2363
Community
  • Tutorials
  • Documentation
  • Forum
Open Source
  • Download
  • Github
  • Runbot
  • Translations
Services
  • Odoo.sh Hosting
  • Support
  • Upgrade
  • Custom Developments
  • Education
  • Find an Accountant
  • Find a Partner
  • Become a Partner
About us
  • Our company
  • Brand Assets
  • Contact us
  • Jobs
  • Events
  • Podcast
  • Blog
  • Customers
  • Legal • Privacy
  • Security
الْعَرَبيّة Català 简体中文 繁體中文 (台灣) Čeština Dansk Nederlands English Suomi Français Deutsch हिंदी Bahasa Indonesia Italiano 日本語 한국어 (KR) Lietuvių kalba Język polski Português (BR) română русский язык Slovenský jazyk slovenščina Español (América Latina) Español ภาษาไทย Türkçe українська Tiếng Việt

Odoo is a suite of open source business apps that cover all your company needs: CRM, eCommerce, accounting, inventory, point of sale, project management, etc.

Odoo's unique value proposition is to be at the same time very easy to use and fully integrated.

Website made with

Odoo Experience on YouTube

1. Use the live chat to ask your questions.
2. The operator answers within a few minutes.

Live support on Youtube
Watch now