Skip to Content
Odoo Меню
  • Увійти
  • Спробуйте це безкоштовно
  • Додатки
    Фінанси
    • Бухоблік
    • Виставлення рахунку
    • Витрати
    • Електронні таблиці (BI)
    • Документи
    • Підпис
    Продажі
    • CRM
    • Продажі
    • POS Магазин
    • POS Ресторан
    • Підписки
    • Оренда
    Веб-сайти
    • Конструктор веб-сайту
    • Електронна комерція
    • Блог
    • Форум
    • Живий чат
    • Електронне навчання
    Ланцюг поставок
    • Склад
    • Виробництво
    • PLM
    • Купівлі
    • Технічне обслуговування
    • Якість
    Кадри
    • Співробітники
    • Рекрутинг
    • Відпустки
    • Оцінювання
    • Рекомендації
    • Автотранспорт
    Маркетинг
    • Маркетинг соцмереж
    • Email-маркетинг
    • SMS-маркетинг
    • Події
    • Автом. маркетингу
    • Опитування
    Послуги
    • Проект
    • Табелі
    • Виїзне обслуговування
    • Служба підтримки
    • Планування
    • Призначення
    Продуктивність
    • Обговорення
    • Схвалення
    • IoT
    • IP-телефонія
    • База знань
    • WhatsApp
    Сторонні модулі Odoo Studio Платформа Odoo Cloud
  • Сфери
    Роздрібна торгівля
    • Книжковий магазин
    • Магазин одягу
    • Магазин меблів
    • Продуктовий магазин
    • Магазин будівельних матеріалів
    • Магазин іграшок
    Food & Hospitality
    • Бар та паб
    • Ресторан
    • Фастфуд
    • Guest House
    • Дистриб'ютор напоїв
    • Hotel
    Нерухомість
    • Real Estate Agency
    • Архітектурна фірма
    • Будівництво
    • Управління нерухомістю
    • Садівництво
    • Асоціація власників нерухомості
    Консалтинг
    • Бухгалтерська компанія
    • Партнер Odoo
    • Агенція маркетингу
    • Юридична фірма
    • Придбання Талантів
    • Аудит та сертифікація
    Виробництво
    • Textile
    • Metal
    • Меблі
    • Їжа
    • Brewery
    • Корпоративні подарунки
    Здоров'я & Фітнес
    • Спортивний клуб
    • Оптика
    • Фітнес-центр
    • Практики здоров'я
    • Аптека
    • Салон краси
    Trades
    • Ремонтник
    • IT-обладнання та Підтримка
    • Системи сонячної енергії
    • Shoe Maker
    • Cleaning Services
    • HVAC Services
    Інші
    • Nonprofit Organization
    • Екологічна агенція
    • Оренда білбордів
    • Фотографія
    • Лізинг велосипедів
    • Реселлер програмного забезпечення
    Browse all Industries
  • Спільнота
    Навчання
    • Навчальний посібник
    • Документація
    • Сертифікації
    • Тренування
    • Блог
    • Подкаст
    Сприяйте Освіті
    • Програма навчання
    • Бізнес гра Scale Up!
    • Відвідайте Odoo
    Отримайте програмне забезпечення
    • Завантаження
    • Порівняйте версії
    • Релізи
    Співпрацюйте
    • Github
    • Форум
    • Події
    • Переклади
    • Стати партнером
    • Services for Partners
    • Зареєструйте вашу бухгалтерську фірму
    Отримайте послуги
    • Знайдіть партнера
    • Знайдіть бухгалтера
    • Зустріньтеся з консультантом
    • Послуги з впровадження
    • Референси клієнтів
    • Підтримка
    • Оновлення
    Github Youtube Twitter Linkedin Instagram Facebook Spotify
    +1 (650) 691-3277
    Отримати демо
  • Ціни
  • Допомога

Odoo is the world's easiest all-in-one management software.
It includes hundreds of business apps:

  • CRM
  • e-Commerce
  • Бухоблік
  • Склад
  • PoS
  • Проект
  • MRP
All apps
Вам необхідно зареєструватися, щоб взаємодіяти зі спільнотою.
All Posts Люди Значки
Мітки (View all)
odoo accounting v14 pos v15
Про цей форум
Вам необхідно зареєструватися, щоб взаємодіяти зі спільнотою.
All Posts Люди Значки
Мітки (View all)
odoo accounting v14 pos v15
Про цей форум
Допомога

AttributeError: 'NoneType' object has no attribute 'encode' - Custom module for tax register

Підписатися

Отримуйте сповіщення про активність щодо цієї публікації

Це запитання позначене
addonscustomodooV9odooV12
1 Відповісти
21913 Переглядів
Аватар
Simon

I am getting this error after migrating a custom module for v9 to v12 for the code below after testing connection to external servers. How can I fix this? Thank you

Error:
Odoo Server Error

Traceback (most recent call last):
File "/opt/odoo12/odoo/odoo/http.py", line 654, in _handle_exception
return super(JsonRequest, self)._handle_exception(exception)
File "/opt/odoo12/odoo/odoo/http.py", line 312, in _handle_exception
raise pycompat.reraise(type(exception), exception, sys.exc_info()[2])
File "/opt/odoo12/odoo/odoo/tools/pycompat.py", line 87, in reraise
raise value
File "/opt/odoo12/odoo/odoo/http.py", line 696, in dispatch
result = self._call_function(**self.params)
File "/opt/odoo12/odoo/odoo/http.py", line 344, in _call_function
return checked_call(self.db, *args, **kwargs)
File "/opt/odoo12/odoo/odoo/service/model.py", line 97, in wrapper
return f(dbname, *args, **kwargs)
File "/opt/odoo12/odoo/odoo/http.py", line 337, in checked_call
result = self.endpoint(*a, **kw)
File "/opt/odoo12/odoo/odoo/http.py", line 939, in __call__
return self.method(*args, **kw)
File "/opt/odoo12/odoo/odoo/http.py", line 517, in response_wrap
response = f(*args, **kw)
File "/opt/odoo12/odoo/addons/web/controllers/main.py", line 966, in call_button
action = self._call_kw(model, method, args, {})
File "/opt/odoo12/odoo/addons/web/controllers/main.py", line 954, in _call_kw
return call_kw(request.env[model], method, args, kwargs)
File "/opt/odoo12/odoo/odoo/api.py", line 749, in call_kw
return _call_kw_multi(method, model, args, kwargs)
File "/opt/odoo12/odoo/odoo/api.py", line 736, in _call_kw_multi
result = method(recs, *args, **kwargs)
File "/opt/odoo12/odoo-custom-addons/l10n_si_tax_registry/models/res_company.py", line 31, in action_test_l10n_si_tax_reg_conn
if not s.echo():
File "/opt/odoo12/odoo-custom-addons/l10n_si_tax_registry/SITaxReg/SITaxReg.py", line 127, in echo
sign=False)
File "/opt/odoo12/odoo-custom-addons/l10n_si_tax_registry/SITaxReg/SITaxReg.py", line 265, in send
'---END---' + "\n", self.DEBUG_CLIENT).encode()
AttributeError: 'NoneType' object has no attribute 'encode'



Code v12:


# -*- coding: utf-8 -*-


"""Slovenian tax invoice registration and other commands required by the
system.
"""

import base64
#import Cookie
from http import Cookies
from Crypto.Hash import SHA256
from Crypto.PublicKey import RSA
import Crypto.Signature.PKCS1_v1_5 as PKCS
import datetime
from datetime import datetime as dt
#from httplib import HTTPResponse
from http.client import HTTPResponse
#from httplib import OK
from http.client import OK
from logging import debug
#import md5
import hashlib
from OpenSSL import crypto
from re import sub
import json
import socket
import ssl
from uuid import uuid4

from . import exceptions
#from __init__ import __version__
from .version import __version__
from . import TmpSock
import logging



_log = logging.getLogger(__name__)


class SITaxReg(object):
"""Slovenian Tax Registry client object."""

DEBUG_OFF = 0
DEBUG_SERVER = 1
DEBUG_CLIENT = 2
DEBUG_OUTPUT_PRINT = 'print'
DEBUG_OUTPUT_LOGGER = 'logger'

SI_TAX_REG_DATE_FORMAT = '%Y-%m-%d'
SI_TAX_REG_DT_FORMAT = '%Y-%m-%dT%H:%M:%SZ'

_CRLF = "\r\n"
_ALGO = 'RS256'

_keyfile = ''
_certfile = ''
_ca_cert = None
_cert_reqs = ssl.CERT_NONE
_jws_header = ''
_key = None
_crypto_private_key = None
_debug_output = DEBUG_OUTPUT_PRINT
_debug = DEBUG_OFF

_scheme = 'https'
_host = 'blagajne.fu.gov.si'
_port = 9003
_query_string = '/v1/'

_user_agent = 'Editor d.o.o. SI Tax Registry Python Library/%s'
_cookies = None

_message_id = ''
_request_datetime = None
_zoi = ''

def __init__(self, keyfile, certfile, ca_certs=None, key=None, dev=False):
"""Return a new class instance with specified certificates."""
self._keyfile = keyfile
self._certfile = certfile
self._ca_certs = ca_certs
self._key = key

# When verifier certificates are provided, we require certificate
# authentication.
if self._ca_certs:
self._cert_reqs = ssl.CERT_REQUIRED

self._user_agent = self._user_agent % (__version__)

if dev:
self._host = 'blagajne-test.fu.gov.si'
self._port = 9002

def set_debug_output(self, output):
"""Set debug output destionation."""
self._debug_output = output

def set_debug(self, level):
"""Set debug level."""
self._debug = level

def get_message_id(self):
"""Get message ID of last sent message if any."""
return self._message_id

def get_request_datetime(self):
"""Get last message request time if any."""
return self._request_datetime

def get_zoi(self):
"""Get last used issuer mark in a request if any."""
return self._zoi

def echo(self):
"""Send echo request and verify echo server response."""
res = self.send('cash_registers/echo', {'EchoRequest': 'furs'},
sign=False)
return 'EchoResponse' in res and res['EchoResponse'] == 'furs'

def register_business_premise(self, data):
"""Register a business premise."""
self.send('cash_registers/invoices/register', {
'BusinessPremiseRequest': {'BusinessPremise': data},
})
return True

def issue_electronic_invoice(self, data):
"""Send invoice request for invoices made with an electronic device."""
res = self.send('cash_registers/invoices', {
'InvoiceRequest': {'Invoice': self._populate_invoice_data(data)},
})
return '' if 'UniqueInvoiceID' not in res else res['UniqueInvoiceID']

def issue_batch_electronic_invoice(self, data):
"""Send a list of invoices request for invoices made with an electronic
device.
"""
data_tmp = []
for i, d in enumerate(data):
data_tmp.append({
'RecordNumber': i + 1,
'Invoice': self._populate_invoice_data(d),
})

res = self.send('cash_registers_batch/invoices', {
'InvoiceListRequest': {'InvoiceList': {'RecordInfo': data_tmp}},
})
data_tmp = []
if 'RecordReply' not in res:
return []
# TODO: Check the response list against the request list.
return [(r['ProtectedID'],
r['UniqueInvoiceID']) for r in res['RecordReply']]

def issue_prenumbered_invoice(self, data):
"""Send invoice request for invoices made with a pre-numbered invoice
book.
"""
if 'IssueDate' in data and \
isinstance(data['IssueDate'], datetime.date):
data['IssueDate'] = \
data['IssueDate'].strftime(self.SI_TAX_REG_DATE_FORMAT)
if 'ReferenceSalesBook' in data and \
isinstance(data['ReferenceSalesBook'], list) and \
len(data['ReferenceSalesBook']) > 0 and \
'ReferenceSalesBookIssueDate' in \
data['ReferenceSalesBook'][0] and \
isinstance(
data['ReferenceSalesBook'][0]
['ReferenceSalesBookIssueDate'],
datetime.date
):
data['ReferenceSalesBook'][0]['ReferenceSalesBookIssueDate'] = \
data['ReferenceSalesBook'][0]['ReferenceSalesBookIssueDate'] \
.strftime(self.SI_TAX_REG_DATE_FORMAT)
if 'ReferenceInvoice' in data and \
isinstance(data['ReferenceInvoice'], list) and \
len(data['ReferenceInvoice']) > 0 and \
'ReferenceInvoiceIssueDateTime' in \
data['ReferenceInvoice'][0] and \
isinstance(
data['ReferenceInvoice'][0]
['ReferenceInvoiceIssueDateTime'],
datetime.datetime
):
data['ReferenceInvoice'][0]['ReferenceInvoiceIssueDateTime'] = \
data['ReferenceInvoice'][0]['ReferenceInvoiceIssueDateTime'] \
.strftime(self.SI_TAX_REG_DT_FORMAT)

res = self.send('cash_registers/invoices', {
'InvoiceRequest': {'SalesBookInvoice': data},
})
return '' if 'UniqueInvoiceID' not in res else res['UniqueInvoiceID']

def send(self, endpoint, data, sign=True):
"""Send request data to registry server."""
self._message_id = ''
self._request_datetime = None
self._zoi = ''

if isinstance(data, str):
data = json.loads(data)

if 'InvoiceRequest' in data and \
'Invoice' in data['InvoiceRequest'] and \
'ProtectedID' in data['InvoiceRequest']['Invoice']:
self._zoi = data['InvoiceRequest']['Invoice']['ProtectedID']
elif 'InvoiceListRequest' in data and \
'InvoiceList' in data['InvoiceListRequest'] and \
'RecordInfo' in data['InvoiceListRequest']['InvoiceList']:
str_tmp = len(data['InvoiceListRequest']['InvoiceList']
['RecordInfo'])
if str_tmp > 0:
str_tmp = (data['InvoiceListRequest']['InvoiceList']
['RecordInfo'][str_tmp - 1])
if 'Invoice' in str_tmp and \
'ProtectedID' in str_tmp['Invoice']:
self._zoi = str_tmp['Invoice']['ProtectedID']

if sign:
data = self._inject_header(data)
str_tmp = data.iterkeys().next()
self._message_id = data[str_tmp]['Header']['MessageID']
self._request_datetime = dt.strptime(
data[str_tmp]['Header']['DateTime'],
self.SI_TAX_REG_DT_FORMAT
)
data = self._sign(data)

conn = ssl.wrap_socket(socket.socket(), keyfile=self._keyfile,
certfile=self._certfile,
cert_reqs=self._cert_reqs,
ca_certs=self._ca_certs)
conn.connect((self._host, self._port))

if not isinstance(data, str):
data = json.dumps(data)
str_tmp = [
'POST %s%s HTTP/1.1' % (self._query_string, endpoint),
'Host: %s:%d' % (self._host, self._port),
'User-Agent: ' + (self._user_agent),
'Content-Type: application/json; charset=UTF-8',
'Connection: close',
'Content-Length: ' + str(len(data))
]
if self._cookies:
str_tmp.append(
'Cookie: ' + '; '.join([
x + '=' + self._cookies[x].value for x in self._cookies]
)
)

data = self._CRLF.join(str_tmp) + self._CRLF + self._CRLF + data
self._e_debug("\n" + '---SENDING---' + "\n" + data + "\n" +
'---END---' + "\n", self.DEBUG_CLIENT).encode()

try:
conn.write(data.encode())
except Exception as e:
conn.close()
raise e

data = ''
str_tmp = True
while str_tmp != '':
str_tmp = conn.read(4096)
data += str_tmp.decode()
conn.close()
str_tmp = None

self._e_debug("\n" + '---GOT---' + "\n" + data + "\n" +
'---END---' + "\n", self.DEBUG_SERVER)

data = self._parse_http_response(data)

# TODO: Improve cookie management.
c = data.getheader('Set-Cookie') or data.getheader('Cookie')
self._cookies = Cookies.Simple.Cookie(c) if c else None

if data.status != OK:
raise exceptions.SITaxServerError(data.read(), data.status)

data = data.read()
data = json.loads(data)
_log.info('\n*--------\n'
'*RECEIVE: %s\n'
'*-----------\n', data)
if sign:
data = self._parse_signed_response(data)
# Take only the payload.
data = data[1]
# Get sub-content.
data = data.itervalues().next()
# Remove header.
del(data['Header'])
# Any potencial errors should be stored here. If found, raise an
# exception.
if 'Error' in data:
raise exceptions.SITaxRegistryError(
data['Error']['ErrorMessage'],
data['Error']['ErrorCode']
)

return data

def get_session_cookie(self):
"""Get session cookie object."""
return self._cookies

def set_session_cookie(self, cookie):
"""Set session cookie object."""
self._cookies = cookie

def _inject_header(self, data, uid=None, dtm=None):
"""Insert message identification in first and only child of dict."""
if not uid:
uid = str(uuid4())
if not dtm:
dtm = dt.utcnow()
if isinstance(data, str):
data = json.loads(data)
if not isinstance(data, dict):
raise TypeError('Data inappropriate to perform signature on')
if len(data) != 1:
raise ValueError('Data inappropriate to perform signature on')
h = data.iterkeys().next()
if not isinstance(data[h], dict):
raise TypeError('Data inappropriate to perform signature on')
data[h]['Header'] = {
'MessageID': uid,
'DateTime': dtm.strftime(self.SI_TAX_REG_DT_FORMAT),
}
return data

def _sign(self, data):
"""Sign the request message."""
h = self._get_jws_header() + '.' + self._jws_base64_encode(
json.dumps(data))
return {'token': h + '.' + self._signature(h)}

def _signature(self, data):
"""Create a signature out of a certificate header and already encoded
and joined message payload.
"""
self._check_and_load_private_key()

h = SHA256.new(data.encode('UTF-8'))
return self._jws_base64_encode(
PKCS.new(self._crypto_private_key).sign(h))

def _parse_http_response(self, response_str):
"""Parse response string into an HTTP object."""
sock = TmpSock(response_str)
res = HTTPResponse(sock)
res.begin()
return res

def _parse_signed_response(self, data):
"""Parse and fix encoded response from the tax registry server."""
res = []
data = data['token'].split('.')
signature = data.pop()
# Parse JWS header and payload.
for d in data:
res.append(json.loads(self._jws_base64_decode(d)))
# TODO: Parse signature.
res.append(signature)
return res

def _get_jws_header(self):
"""Parse if necessary and get public certificate JWS header."""
if self._jws_header == '':
# If header hasn't been made yet, construct it and store in buffer
# for possible later access.
# TODO: This is the only place where we need the OpenSSL module.
# See if we can somehow read the data without it so we can remove
# the dependency.
b = open(self._certfile, 'r').read()
x = crypto.load_certificate(crypto.FILETYPE_PEM, b)

jws_header = {
'alg': self._ALGO,
'subject_name': ','.join(map(lambda x: x[0] + '=' + x[1],
x.get_subject().get_components())),
'issuer_name': ','.join(map(lambda x: x[0] + '=' + x[1],
x.get_issuer().get_components())),
'serial': x.get_serial_number(),
}

self._jws_header = self._jws_base64_encode(json.dumps(jws_header))

return self._jws_header

def _jws_base64_encode(self, s):
"""Encode string into a JWS compliant base64 format."""
return base64.urlsafe_b64encode(s).replace(b'=', b'')

def _jws_base64_decode(self, s):
"""Decode string from a JWS base64 format."""
s += b'=' * (4 - (len(s) % 4))
return base64.urlsafe_b64decode(s.encode('UTF-8'))

def _calculate_protective_mark(self, tax_number, date, invoice_number,
premise, device, amount):
"""Generate "ZOI" signature."""
self._check_and_load_private_key()
sig = str(tax_number) + date + invoice_number + premise + device \
+ str(amount)
sig = SHA256.new(sig.encode('UTF-8')).digest()
sig = self._crypto_private_key.sign(sig, '')[0]
return hashlib.md5.new(str(sig)).hexdigest()

def _populate_invoice_data(self, data):
"""Add missing and correct existing data in invoice dict."""
if 'IssueDateTime' in data and \
isinstance(data['IssueDateTime'], datetime.datetime):
data['IssueDateTime'] = \
data['IssueDateTime'].strftime(self.SI_TAX_REG_DT_FORMAT)
if 'ReferenceInvoice' in data and \
isinstance(data['ReferenceInvoice'], list) and \
len(data['ReferenceInvoice']) > 0 and \
'ReferenceInvoiceIssueDateTime' in \
data['ReferenceInvoice'][0] and \
isinstance(
data['ReferenceInvoice'][0]
['ReferenceInvoiceIssueDateTime'],
datetime.datetime
):
data['ReferenceInvoice'][0]['ReferenceInvoiceIssueDateTime'] = \
data['ReferenceInvoice'][0]['ReferenceInvoiceIssueDateTime'] \
.strftime(self.SI_TAX_REG_DT_FORMAT)
if 'ReferenceSalesBook' in data and \
isinstance(data['ReferenceSalesBook'], list) and \
len(data['ReferenceSalesBook']) > 0 and \
'ReferenceSalesBookIssueDate' in \
data['ReferenceSalesBook'][0] and \
isinstance(
data['ReferenceSalesBook'][0]
['ReferenceSalesBookIssueDate'],
datetime.date
):
data['ReferenceSalesBook'][0]['ReferenceSalesBookIssueDate'] = \
data['ReferenceSalesBook'][0]['ReferenceSalesBookIssueDate'] \
.strftime(self.SI_TAX_REG_DATE_FORMAT)

if 'ProtectedID' not in data:
data['ProtectedID'] = self._calculate_protective_mark(
data['TaxNumber'], data['IssueDateTime'],
data['InvoiceIdentifier']['InvoiceNumber'],
data['InvoiceIdentifier']['BusinessPremiseID'],
data['InvoiceIdentifier']['ElectronicDeviceID'],
data['InvoiceAmount']
)
_log.info('\n*-----------\n'
'*SEND: %s\n'
'*-------------\n', data)
return data

def _check_and_load_private_key(self):
"""Load the private key content into class property."""
if not self._crypto_private_key:
h = open(self._keyfile, 'r').read()
self._crypto_private_key = RSA.importKey(h, passphrase=self._key)

def _e_debug(self, msg, level):
"""Output debug message to user-selected method."""
if level > self._debug:
return

if self._debug_output == 'logger':
debug(msg)
else:
msg = sub('/(\r\n|\r|\n)/ms', "\n", msg)
print (dt.now().strftime('%Y-%m-%d %H:%M:%S') + "\t" \
+ msg.strip() + "\n")


Code v9:

# -*- coding: utf-8 -*-


"""Slovenian tax invoice registration and other commands required by the
system.
"""

import base64
import Cookie
from Crypto.Hash import SHA256
from Crypto.PublicKey import RSA
import Crypto.Signature.PKCS1_v1_5 as PKCS
import datetime
from datetime import datetime as dt
from httplib import HTTPResponse
from httplib import OK
from logging import debug
import md5
from OpenSSL import crypto
from re import sub
import json
import socket
import ssl
from uuid import uuid4

import exceptions
from __init__ import __version__
from TmpSock import TmpSock
import logging
_log = logging.getLogger(__name__)


class SITaxReg(object):
"""Slovenian Tax Registry client object."""

DEBUG_OFF = 0
DEBUG_SERVER = 1
DEBUG_CLIENT = 2
DEBUG_OUTPUT_PRINT = 'print'
DEBUG_OUTPUT_LOGGER = 'logger'

SI_TAX_REG_DATE_FORMAT = '%Y-%m-%d'
SI_TAX_REG_DT_FORMAT = '%Y-%m-%dT%H:%M:%SZ'

_CRLF = "\r\n"
_ALGO = 'RS256'

_keyfile = ''
_certfile = ''
_ca_cert = None
_cert_reqs = ssl.CERT_NONE
_jws_header = ''
_key = None
_crypto_private_key = None
_debug_output = DEBUG_OUTPUT_PRINT
_debug = DEBUG_OFF

_scheme = 'https'
_host = 'blagajne.fu.gov.si'
_port = 9003
_query_string = '/v1/'

_user_agent = 'Editor d.o.o. SI Tax Registry Python Library/%s'
_cookies = None

_message_id = ''
_request_datetime = None
_zoi = ''

def __init__(self, keyfile, certfile, ca_certs=None, key=None, dev=False):
"""Return a new class instance with specified certificates."""
self._keyfile = keyfile
self._certfile = certfile
self._ca_certs = ca_certs
self._key = key

# When verifier certificates are provided, we require certificate
# authentication.
if self._ca_certs:
self._cert_reqs = ssl.CERT_REQUIRED

self._user_agent = self._user_agent % (__version__)

if dev:
self._host = 'blagajne-test.fu.gov.si'
self._port = 9002

def set_debug_output(self, output):
"""Set debug output destionation."""
self._debug_output = output

def set_debug(self, level):
"""Set debug level."""
self._debug = level

def get_message_id(self):
"""Get message ID of last sent message if any."""
return self._message_id

def get_request_datetime(self):
"""Get last message request time if any."""
return self._request_datetime

def get_zoi(self):
"""Get last used issuer mark in a request if any."""
return self._zoi

def echo(self):
"""Send echo request and verify echo server response."""
res = self.send('cash_registers/echo', {'EchoRequest': 'furs'},
sign=False)
return 'EchoResponse' in res and res['EchoResponse'] == 'furs'

def register_business_premise(self, data):
"""Register a business premise."""
self.send('cash_registers/invoices/register', {
'BusinessPremiseRequest': {'BusinessPremise': data},
})
return True

def issue_electronic_invoice(self, data):
"""Send invoice request for invoices made with an electronic device."""
res = self.send('cash_registers/invoices', {
'InvoiceRequest': {'Invoice': self._populate_invoice_data(data)},
})
return '' if 'UniqueInvoiceID' not in res else res['UniqueInvoiceID']

def issue_batch_electronic_invoice(self, data):
"""Send a list of invoices request for invoices made with an electronic
device.
"""
data_tmp = []
for i, d in enumerate(data):
data_tmp.append({
'RecordNumber': i + 1,
'Invoice': self._populate_invoice_data(d),
})

res = self.send('cash_registers_batch/invoices', {
'InvoiceListRequest': {'InvoiceList': {'RecordInfo': data_tmp}},
})
data_tmp = []
if 'RecordReply' not in res:
return []
# TODO: Check the response list against the request list.
return [(r['ProtectedID'],
r['UniqueInvoiceID']) for r in res['RecordReply']]

def issue_prenumbered_invoice(self, data):
"""Send invoice request for invoices made with a pre-numbered invoice
book.
"""
if 'IssueDate' in data and \
isinstance(data['IssueDate'], datetime.date):
data['IssueDate'] = \
data['IssueDate'].strftime(self.SI_TAX_REG_DATE_FORMAT)
if 'ReferenceSalesBook' in data and \
isinstance(data['ReferenceSalesBook'], list) and \
len(data['ReferenceSalesBook']) > 0 and \
'ReferenceSalesBookIssueDate' in \
data['ReferenceSalesBook'][0] and \
isinstance(
data['ReferenceSalesBook'][0]
['ReferenceSalesBookIssueDate'],
datetime.date
):
data['ReferenceSalesBook'][0]['ReferenceSalesBookIssueDate'] = \
data['ReferenceSalesBook'][0]['ReferenceSalesBookIssueDate'] \
.strftime(self.SI_TAX_REG_DATE_FORMAT)
if 'ReferenceInvoice' in data and \
isinstance(data['ReferenceInvoice'], list) and \
len(data['ReferenceInvoice']) > 0 and \
'ReferenceInvoiceIssueDateTime' in \
data['ReferenceInvoice'][0] and \
isinstance(
data['ReferenceInvoice'][0]
['ReferenceInvoiceIssueDateTime'],
datetime.datetime
):
data['ReferenceInvoice'][0]['ReferenceInvoiceIssueDateTime'] = \
data['ReferenceInvoice'][0]['ReferenceInvoiceIssueDateTime'] \
.strftime(self.SI_TAX_REG_DT_FORMAT)

res = self.send('cash_registers/invoices', {
'InvoiceRequest': {'SalesBookInvoice': data},
})
return '' if 'UniqueInvoiceID' not in res else res['UniqueInvoiceID']

def send(self, endpoint, data, sign=True):
"""Send request data to registry server."""
self._message_id = ''
self._request_datetime = None
self._zoi = ''

if isinstance(data, str):
data = json.loads(data)

if 'InvoiceRequest' in data and \
'Invoice' in data['InvoiceRequest'] and \
'ProtectedID' in data['InvoiceRequest']['Invoice']:
self._zoi = data['InvoiceRequest']['Invoice']['ProtectedID']
elif 'InvoiceListRequest' in data and \
'InvoiceList' in data['InvoiceListRequest'] and \
'RecordInfo' in data['InvoiceListRequest']['InvoiceList']:
str_tmp = len(data['InvoiceListRequest']['InvoiceList']
['RecordInfo'])
if str_tmp > 0:
str_tmp = (data['InvoiceListRequest']['InvoiceList']
['RecordInfo'][str_tmp - 1])
if 'Invoice' in str_tmp and \
'ProtectedID' in str_tmp['Invoice']:
self._zoi = str_tmp['Invoice']['ProtectedID']

if sign:
data = self._inject_header(data)
str_tmp = data.iterkeys().next()
self._message_id = data[str_tmp]['Header']['MessageID']
self._request_datetime = dt.strptime(
data[str_tmp]['Header']['DateTime'],
self.SI_TAX_REG_DT_FORMAT
)
data = self._sign(data)

conn = ssl.wrap_socket(socket.socket(), keyfile=self._keyfile,
certfile=self._certfile,
cert_reqs=self._cert_reqs,
ca_certs=self._ca_certs)
conn.connect((self._host, self._port))

if not isinstance(data, str):
data = json.dumps(data)
str_tmp = [
'POST %s%s HTTP/1.1' % (self._query_string, endpoint),
'Host: %s:%d' % (self._host, self._port),
'User-Agent: ' + (self._user_agent),
'Content-Type: application/json; charset=UTF-8',
'Connection: close',
'Content-Length: ' + str(len(data))
]
if self._cookies:
str_tmp.append(
'Cookie: ' + '; '.join([
x + '=' + self._cookies[x].value for x in self._cookies]
)
)

data = self._CRLF.join(str_tmp) + self._CRLF + self._CRLF + data
self._e_debug("\n" + '---SENDING---' + "\n" + data + "\n" +
'---END---' + "\n", self.DEBUG_CLIENT)

try:
conn.write(data)
except Exception as e:
conn.close()
raise e

data = ''
str_tmp = True
while str_tmp != '':
str_tmp = conn.read(4096)
data += str_tmp
conn.close()
str_tmp = None

self._e_debug("\n" + '---GOT---' + "\n" + data + "\n" +
'---END---' + "\n", self.DEBUG_SERVER)

data = self._parse_http_response(data)

# TODO: Improve cookie management.
c = data.getheader('Set-Cookie') or data.getheader('Cookie')
self._cookies = Cookie.SimpleCookie(c) if c else None

if data.status != OK:
raise exceptions.SITaxServerError(data.read(), data.status)

data = data.read()
data = json.loads(data)
# _log.info('\n*--------\n'
# '*RECEIVE: %s\n'
# '*-----------\n', data)
if sign:
data = self._parse_signed_response(data)
# Take only the payload.
data = data[1]
# Get sub-content.
data = data.itervalues().next()
# Remove header.
del(data['Header'])
# Any potencial errors should be stored here. If found, raise an
# exception.
if 'Error' in data:
raise exceptions.SITaxRegistryError(
data['Error']['ErrorMessage'],
data['Error']['ErrorCode']
)

return data

def get_session_cookie(self):
"""Get session cookie object."""
return self._cookies

def set_session_cookie(self, cookie):
"""Set session cookie object."""
self._cookies = cookie

def _inject_header(self, data, uid=None, dtm=None):
"""Insert message identification in first and only child of dict."""
if not uid:
uid = str(uuid4())
if not dtm:
dtm = dt.utcnow()
if isinstance(data, str):
data = json.loads(data)
if not isinstance(data, dict):
raise TypeError('Data inappropriate to perform signature on')
if len(data) != 1:
raise ValueError('Data inappropriate to perform signature on')
h = data.iterkeys().next()
if not isinstance(data[h], dict):
raise TypeError('Data inappropriate to perform signature on')
data[h]['Header'] = {
'MessageID': uid,
'DateTime': dtm.strftime(self.SI_TAX_REG_DT_FORMAT),
}
return data

def _sign(self, data):
"""Sign the request message."""
h = self._get_jws_header() + '.' + self._jws_base64_encode(
json.dumps(data))
return {'token': h + '.' + self._signature(h)}

def _signature(self, data):
"""Create a signature out of a certificate header and already encoded
and joined message payload.
"""
self._check_and_load_private_key()

h = SHA256.new(data.encode('UTF-8'))
return self._jws_base64_encode(
PKCS.new(self._crypto_private_key).sign(h))

def _parse_http_response(self, response_str):
"""Parse response string into an HTTP object."""
sock = TmpSock(response_str)
res = HTTPResponse(sock)
res.begin()
return res

def _parse_signed_response(self, data):
"""Parse and fix encoded response from the tax registry server."""
res = []
data = data['token'].split('.')
signature = data.pop()
# Parse JWS header and payload.
for d in data:
res.append(json.loads(self._jws_base64_decode(d)))
# TODO: Parse signature.
res.append(signature)
return res

def _get_jws_header(self):
"""Parse if necessary and get public certificate JWS header."""
if self._jws_header == '':
# If header hasn't been made yet, construct it and store in buffer
# for possible later access.
# TODO: This is the only place where we need the OpenSSL module.
# See if we can somehow read the data without it so we can remove
# the dependency.
b = open(self._certfile, 'r').read()
x = crypto.load_certificate(crypto.FILETYPE_PEM, b)

jws_header = {
'alg': self._ALGO,
'subject_name': ','.join(map(lambda x: x[0] + '=' + x[1],
x.get_subject().get_components())),
'issuer_name': ','.join(map(lambda x: x[0] + '=' + x[1],
x.get_issuer().get_components())),
'serial': x.get_serial_number(),
}

self._jws_header = self._jws_base64_encode(json.dumps(jws_header))

return self._jws_header

def _jws_base64_encode(self, s):
"""Encode string into a JWS compliant base64 format."""
return base64.urlsafe_b64encode(s).replace(b'=', b'')

def _jws_base64_decode(self, s):
"""Decode string from a JWS base64 format."""
s += b'=' * (4 - (len(s) % 4))
return base64.urlsafe_b64decode(s.encode('UTF-8'))

def _calculate_protective_mark(self, tax_number, date, invoice_number,
premise, device, amount):
"""Generate "ZOI" signature."""
self._check_and_load_private_key()
sig = str(tax_number) + date + invoice_number + premise + device \
+ str(amount)
sig = SHA256.new(sig.encode('UTF-8')).digest()
sig = self._crypto_private_key.sign(sig, '')[0]
return md5.new(str(sig)).hexdigest()

def _populate_invoice_data(self, data):
"""Add missing and correct existing data in invoice dict."""
if 'IssueDateTime' in data and \
isinstance(data['IssueDateTime'], datetime.datetime):
data['IssueDateTime'] = \
data['IssueDateTime'].strftime(self.SI_TAX_REG_DT_FORMAT)
if 'ReferenceInvoice' in data and \
isinstance(data['ReferenceInvoice'], list) and \
len(data['ReferenceInvoice']) > 0 and \
'ReferenceInvoiceIssueDateTime' in \
data['ReferenceInvoice'][0] and \
isinstance(
data['ReferenceInvoice'][0]
['ReferenceInvoiceIssueDateTime'],
datetime.datetime
):
data['ReferenceInvoice'][0]['ReferenceInvoiceIssueDateTime'] = \
data['ReferenceInvoice'][0]['ReferenceInvoiceIssueDateTime'] \
.strftime(self.SI_TAX_REG_DT_FORMAT)
if 'ReferenceSalesBook' in data and \
isinstance(data['ReferenceSalesBook'], list) and \
len(data['ReferenceSalesBook']) > 0 and \
'ReferenceSalesBookIssueDate' in \
data['ReferenceSalesBook'][0] and \
isinstance(
data['ReferenceSalesBook'][0]
['ReferenceSalesBookIssueDate'],
datetime.date
):
data['ReferenceSalesBook'][0]['ReferenceSalesBookIssueDate'] = \
data['ReferenceSalesBook'][0]['ReferenceSalesBookIssueDate'] \
.strftime(self.SI_TAX_REG_DATE_FORMAT)

if 'ProtectedID' not in data:
data['ProtectedID'] = self._calculate_protective_mark(
data['TaxNumber'], data['IssueDateTime'],
data['InvoiceIdentifier']['InvoiceNumber'],
data['InvoiceIdentifier']['BusinessPremiseID'],
data['InvoiceIdentifier']['ElectronicDeviceID'],
data['InvoiceAmount']
)
# _log.info('\n*-----------\n'
# '*SEND: %s\n'
# '*-------------\n', data)
return data

def _check_and_load_private_key(self):
"""Load the private key content into class property."""
if not self._crypto_private_key:
h = open(self._keyfile, 'r').read()
self._crypto_private_key = RSA.importKey(h, passphrase=self._key)

def _e_debug(self, msg, level):
"""Output debug message to user-selected method."""
if level > self._debug:
return

if self._debug_output == 'logger':
debug(msg)
else:
msg = sub('/(\r\n|\r|\n)/ms', "\n", msg)
print dt.now().strftime('%Y-%m-%d %H:%M:%S') + "\t" \
+ msg.strip() + "\n"

0
Аватар
Відмінити
Аватар
shalin wilson
Найкраща відповідь

Nonetype object comes when u dont have any values at    self.DEBUG_CLIENT
please debug and check what value u have at self.DEBUG_CLIENT

0
Аватар
Відмінити
Enjoying the discussion? Don't just read, join in!

Create an account today to enjoy exclusive features and engage with our awesome community!

Реєстрація
Related Posts Відповіді Переглядів Дія
Odoo 14: custom addons path not working
addons custom v14
Аватар
0
груд. 20
7221
ValueError: Invalid field 'module_l10n_fr_hr_payroll' on model 'res.config.settings'
field addons custom v15
Аватар
Аватар
1
бер. 22
7135
Add a third-party application on odoo 13
addons ubuntu custom terminal Odoo13.0
Аватар
Аватар
2
жовт. 20
8611
odoo (V12) Error pops "a mandatory field is not set" when i try a custom module in multi company Вирішено
custom urgent multicompany mandatory odooV12
Аватар
Аватар
1
січ. 20
5006
Odoo isn't seeing an add-on that I downloaded and placed in the addons folder Вирішено
addons
Аватар
Аватар
Аватар
Аватар
Аватар
5
жовт. 25
2803
Спільнота
  • Навчальний посібник
  • Документація
  • Форум
Open Source
  • Завантаження
  • Github
  • Runbot
  • Переклади
Послуги
  • Хостинг Odoo.sh
  • Підтримка
  • Оновлення
  • Кастомні доробки
  • Навчання
  • Знайдіть бухгалтера
  • Знайдіть партнера
  • Стати партнером
Про нас
  • Наша компанія
  • Торгові активи
  • Зв'яжіться з нами
  • Вакансії
  • Події
  • Подкаст
  • Блог
  • Клієнти
  • Юридичні документи • Конфіденційність
  • Безпека
الْعَرَبيّة Català 简体中文 繁體中文 (台灣) Čeština Dansk Nederlands English Suomi Français Deutsch हिंदी Bahasa Indonesia Italiano 日本語 한국어 (KR) Lietuvių kalba Język polski Português (BR) română русский язык Slovenský jazyk slovenščina Español (América Latina) Español ภาษาไทย Türkçe українська Tiếng Việt

Odoo - це набір програм для роботи з відкритим кодом, які охоплюють всі ваші потреби компанії: CRM, електронна комерція, бухгалтерський облік, склад, точка продажу, управління проектами тощо.

Унікальна пропозиція Odoo - це одночасно дуже проста у використанні та повністю інтегрована.

Website made with

Odoo Experience on YouTube

1. Use the live chat to ask your questions.
2. The operator answers within a few minutes.

Live support on Youtube
Watch now