Skip ke Konten
Odoo Menu
  • Login
  • Uji coba gratis
  • Aplikasi
    Keuangan
    • Akuntansi
    • Faktur
    • Pengeluaran
    • Spreadsheet (BI)
    • Dokumen
    • Tanda Tangan
    Sales
    • CRM
    • Sales
    • POS Toko
    • POS Restoran
    • Langganan
    • Rental
    Website
    • Website Builder
    • eCommerce
    • Blog
    • Forum
    • Live Chat
    • eLearning
    Rantai Pasokan
    • Inventaris
    • Manufaktur
    • PLM
    • Purchase
    • Maintenance
    • Kualitas
    Sumber Daya Manusia
    • Karyawan
    • Rekrutmen
    • Cuti
    • Appraisal
    • Referensi
    • Armada
    Marketing
    • Social Marketing
    • Email Marketing
    • SMS Marketing
    • Acara
    • Otomatisasi Marketing
    • Survei
    Layanan
    • Project
    • Timesheet
    • Layanan Lapangan
    • Meja Bantuan
    • Planning
    • Appointment
    Produktivitas
    • Discuss
    • Approval
    • IoT
    • VoIP
    • Pengetahuan
    • WhatsApp
    Aplikasi pihak ketiga Odoo Studio Platform Odoo Cloud
  • Industri-Industri
    Retail
    • Toko Buku
    • Toko Baju
    • Toko Furnitur
    • Toko Kelontong
    • Toko Hardware
    • Toko Mainan
    Makanan & Hospitality
    • Bar dan Pub
    • Restoran
    • Fast Food
    • Rumah Tamu
    • Distributor Minuman
    • Hotel
    Real Estate
    • Agensi Real Estate
    • Firma Arsitektur
    • Konstruksi
    • Estate Management
    • Perkebunan
    • Asosiasi Pemilik Properti
    Konsultansi
    • Firma Akuntansi
    • Mitra Odoo
    • Agensi Marketing
    • Firma huku
    • Talent Acquisition
    • Audit & Sertifikasi
    Manufaktur
    • Tekstil
    • Logam
    • Perabotan
    • Makanan
    • Brewery
    • Corporate Gift
    Kesehatan & Fitness
    • Sports Club
    • Toko Kacamata
    • Fitness Center
    • Wellness Practitioners
    • Farmasi
    • Salon Rambut
    Perdagangan
    • Handyman
    • IT Hardware & Support
    • Sistem-Sistem Energi Surya
    • Pembuat Sepatu
    • Cleaning Service
    • Layanan HVAC
    Lainnya
    • Organisasi Nirlaba
    • Agen Lingkungan
    • Rental Billboard
    • Fotografi
    • Penyewaan Sepeda
    • Reseller Software
    Browse semua Industri
  • Komunitas
    Belajar
    • Tutorial-tutorial
    • Dokumentasi
    • Sertifikasi
    • Pelatihan
    • Blog
    • Podcast
    Empower Education
    • Program Edukasi
    • Game Bisnis 'Scale Up!'
    • Kunjungi Odoo
    Dapatkan Softwarenya
    • Download
    • Bandingkan Edisi
    • Daftar Rilis
    Kolaborasi
    • Github
    • Forum
    • Acara
    • Terjemahan
    • Menjadi Partner
    • Layanan untuk Partner
    • Daftarkan perusahaan Akuntansi Anda.
    Dapatkan Layanan
    • Temukan Mitra
    • Temukan Akuntan
    • Temui penasihat
    • Layanan Implementasi
    • Referensi Pelanggan
    • Bantuan
    • Upgrades
    Github Youtube Twitter Linkedin Instagram Facebook Spotify
    +1 (650) 691-3277
    Dapatkan demo
  • Harga
  • Bantuan

Odoo is the world's easiest all-in-one management software.
It includes hundreds of business apps:

  • CRM
  • e-Commerce
  • Akuntansi
  • Inventaris
  • PoS
  • Project
  • MRP
All apps
Anda harus terdaftar untuk dapat berinteraksi di komunitas.
Semua Post Orang Lencana-Lencana
Label (Lihat semua)
odoo accounting v14 pos v15
Mengenai forum ini
Anda harus terdaftar untuk dapat berinteraksi di komunitas.
Semua Post Orang Lencana-Lencana
Label (Lihat semua)
odoo accounting v14 pos v15
Mengenai forum ini
Help

AttributeError: 'NoneType' object has no attribute 'encode' - Custom module for tax register

Langganan

Dapatkan notifikasi saat terdapat aktivitas pada post ini

Pertanyaan ini telah diberikan tanda
addonscustomodooV9odooV12
1 Balas
21930 Tampilan
Avatar
Simon

I am getting this error after migrating a custom module for v9 to v12 for the code below after testing connection to external servers. How can I fix this? Thank you

Error:
Odoo Server Error

Traceback (most recent call last):
File "/opt/odoo12/odoo/odoo/http.py", line 654, in _handle_exception
return super(JsonRequest, self)._handle_exception(exception)
File "/opt/odoo12/odoo/odoo/http.py", line 312, in _handle_exception
raise pycompat.reraise(type(exception), exception, sys.exc_info()[2])
File "/opt/odoo12/odoo/odoo/tools/pycompat.py", line 87, in reraise
raise value
File "/opt/odoo12/odoo/odoo/http.py", line 696, in dispatch
result = self._call_function(**self.params)
File "/opt/odoo12/odoo/odoo/http.py", line 344, in _call_function
return checked_call(self.db, *args, **kwargs)
File "/opt/odoo12/odoo/odoo/service/model.py", line 97, in wrapper
return f(dbname, *args, **kwargs)
File "/opt/odoo12/odoo/odoo/http.py", line 337, in checked_call
result = self.endpoint(*a, **kw)
File "/opt/odoo12/odoo/odoo/http.py", line 939, in __call__
return self.method(*args, **kw)
File "/opt/odoo12/odoo/odoo/http.py", line 517, in response_wrap
response = f(*args, **kw)
File "/opt/odoo12/odoo/addons/web/controllers/main.py", line 966, in call_button
action = self._call_kw(model, method, args, {})
File "/opt/odoo12/odoo/addons/web/controllers/main.py", line 954, in _call_kw
return call_kw(request.env[model], method, args, kwargs)
File "/opt/odoo12/odoo/odoo/api.py", line 749, in call_kw
return _call_kw_multi(method, model, args, kwargs)
File "/opt/odoo12/odoo/odoo/api.py", line 736, in _call_kw_multi
result = method(recs, *args, **kwargs)
File "/opt/odoo12/odoo-custom-addons/l10n_si_tax_registry/models/res_company.py", line 31, in action_test_l10n_si_tax_reg_conn
if not s.echo():
File "/opt/odoo12/odoo-custom-addons/l10n_si_tax_registry/SITaxReg/SITaxReg.py", line 127, in echo
sign=False)
File "/opt/odoo12/odoo-custom-addons/l10n_si_tax_registry/SITaxReg/SITaxReg.py", line 265, in send
'---END---' + "\n", self.DEBUG_CLIENT).encode()
AttributeError: 'NoneType' object has no attribute 'encode'



Code v12:


# -*- coding: utf-8 -*-


"""Slovenian tax invoice registration and other commands required by the
system.
"""

import base64
#import Cookie
from http import Cookies
from Crypto.Hash import SHA256
from Crypto.PublicKey import RSA
import Crypto.Signature.PKCS1_v1_5 as PKCS
import datetime
from datetime import datetime as dt
#from httplib import HTTPResponse
from http.client import HTTPResponse
#from httplib import OK
from http.client import OK
from logging import debug
#import md5
import hashlib
from OpenSSL import crypto
from re import sub
import json
import socket
import ssl
from uuid import uuid4

from . import exceptions
#from __init__ import __version__
from .version import __version__
from . import TmpSock
import logging



_log = logging.getLogger(__name__)


class SITaxReg(object):
"""Slovenian Tax Registry client object."""

DEBUG_OFF = 0
DEBUG_SERVER = 1
DEBUG_CLIENT = 2
DEBUG_OUTPUT_PRINT = 'print'
DEBUG_OUTPUT_LOGGER = 'logger'

SI_TAX_REG_DATE_FORMAT = '%Y-%m-%d'
SI_TAX_REG_DT_FORMAT = '%Y-%m-%dT%H:%M:%SZ'

_CRLF = "\r\n"
_ALGO = 'RS256'

_keyfile = ''
_certfile = ''
_ca_cert = None
_cert_reqs = ssl.CERT_NONE
_jws_header = ''
_key = None
_crypto_private_key = None
_debug_output = DEBUG_OUTPUT_PRINT
_debug = DEBUG_OFF

_scheme = 'https'
_host = 'blagajne.fu.gov.si'
_port = 9003
_query_string = '/v1/'

_user_agent = 'Editor d.o.o. SI Tax Registry Python Library/%s'
_cookies = None

_message_id = ''
_request_datetime = None
_zoi = ''

def __init__(self, keyfile, certfile, ca_certs=None, key=None, dev=False):
"""Return a new class instance with specified certificates."""
self._keyfile = keyfile
self._certfile = certfile
self._ca_certs = ca_certs
self._key = key

# When verifier certificates are provided, we require certificate
# authentication.
if self._ca_certs:
self._cert_reqs = ssl.CERT_REQUIRED

self._user_agent = self._user_agent % (__version__)

if dev:
self._host = 'blagajne-test.fu.gov.si'
self._port = 9002

def set_debug_output(self, output):
"""Set debug output destionation."""
self._debug_output = output

def set_debug(self, level):
"""Set debug level."""
self._debug = level

def get_message_id(self):
"""Get message ID of last sent message if any."""
return self._message_id

def get_request_datetime(self):
"""Get last message request time if any."""
return self._request_datetime

def get_zoi(self):
"""Get last used issuer mark in a request if any."""
return self._zoi

def echo(self):
"""Send echo request and verify echo server response."""
res = self.send('cash_registers/echo', {'EchoRequest': 'furs'},
sign=False)
return 'EchoResponse' in res and res['EchoResponse'] == 'furs'

def register_business_premise(self, data):
"""Register a business premise."""
self.send('cash_registers/invoices/register', {
'BusinessPremiseRequest': {'BusinessPremise': data},
})
return True

def issue_electronic_invoice(self, data):
"""Send invoice request for invoices made with an electronic device."""
res = self.send('cash_registers/invoices', {
'InvoiceRequest': {'Invoice': self._populate_invoice_data(data)},
})
return '' if 'UniqueInvoiceID' not in res else res['UniqueInvoiceID']

def issue_batch_electronic_invoice(self, data):
"""Send a list of invoices request for invoices made with an electronic
device.
"""
data_tmp = []
for i, d in enumerate(data):
data_tmp.append({
'RecordNumber': i + 1,
'Invoice': self._populate_invoice_data(d),
})

res = self.send('cash_registers_batch/invoices', {
'InvoiceListRequest': {'InvoiceList': {'RecordInfo': data_tmp}},
})
data_tmp = []
if 'RecordReply' not in res:
return []
# TODO: Check the response list against the request list.
return [(r['ProtectedID'],
r['UniqueInvoiceID']) for r in res['RecordReply']]

def issue_prenumbered_invoice(self, data):
"""Send invoice request for invoices made with a pre-numbered invoice
book.
"""
if 'IssueDate' in data and \
isinstance(data['IssueDate'], datetime.date):
data['IssueDate'] = \
data['IssueDate'].strftime(self.SI_TAX_REG_DATE_FORMAT)
if 'ReferenceSalesBook' in data and \
isinstance(data['ReferenceSalesBook'], list) and \
len(data['ReferenceSalesBook']) > 0 and \
'ReferenceSalesBookIssueDate' in \
data['ReferenceSalesBook'][0] and \
isinstance(
data['ReferenceSalesBook'][0]
['ReferenceSalesBookIssueDate'],
datetime.date
):
data['ReferenceSalesBook'][0]['ReferenceSalesBookIssueDate'] = \
data['ReferenceSalesBook'][0]['ReferenceSalesBookIssueDate'] \
.strftime(self.SI_TAX_REG_DATE_FORMAT)
if 'ReferenceInvoice' in data and \
isinstance(data['ReferenceInvoice'], list) and \
len(data['ReferenceInvoice']) > 0 and \
'ReferenceInvoiceIssueDateTime' in \
data['ReferenceInvoice'][0] and \
isinstance(
data['ReferenceInvoice'][0]
['ReferenceInvoiceIssueDateTime'],
datetime.datetime
):
data['ReferenceInvoice'][0]['ReferenceInvoiceIssueDateTime'] = \
data['ReferenceInvoice'][0]['ReferenceInvoiceIssueDateTime'] \
.strftime(self.SI_TAX_REG_DT_FORMAT)

res = self.send('cash_registers/invoices', {
'InvoiceRequest': {'SalesBookInvoice': data},
})
return '' if 'UniqueInvoiceID' not in res else res['UniqueInvoiceID']

def send(self, endpoint, data, sign=True):
"""Send request data to registry server."""
self._message_id = ''
self._request_datetime = None
self._zoi = ''

if isinstance(data, str):
data = json.loads(data)

if 'InvoiceRequest' in data and \
'Invoice' in data['InvoiceRequest'] and \
'ProtectedID' in data['InvoiceRequest']['Invoice']:
self._zoi = data['InvoiceRequest']['Invoice']['ProtectedID']
elif 'InvoiceListRequest' in data and \
'InvoiceList' in data['InvoiceListRequest'] and \
'RecordInfo' in data['InvoiceListRequest']['InvoiceList']:
str_tmp = len(data['InvoiceListRequest']['InvoiceList']
['RecordInfo'])
if str_tmp > 0:
str_tmp = (data['InvoiceListRequest']['InvoiceList']
['RecordInfo'][str_tmp - 1])
if 'Invoice' in str_tmp and \
'ProtectedID' in str_tmp['Invoice']:
self._zoi = str_tmp['Invoice']['ProtectedID']

if sign:
data = self._inject_header(data)
str_tmp = data.iterkeys().next()
self._message_id = data[str_tmp]['Header']['MessageID']
self._request_datetime = dt.strptime(
data[str_tmp]['Header']['DateTime'],
self.SI_TAX_REG_DT_FORMAT
)
data = self._sign(data)

conn = ssl.wrap_socket(socket.socket(), keyfile=self._keyfile,
certfile=self._certfile,
cert_reqs=self._cert_reqs,
ca_certs=self._ca_certs)
conn.connect((self._host, self._port))

if not isinstance(data, str):
data = json.dumps(data)
str_tmp = [
'POST %s%s HTTP/1.1' % (self._query_string, endpoint),
'Host: %s:%d' % (self._host, self._port),
'User-Agent: ' + (self._user_agent),
'Content-Type: application/json; charset=UTF-8',
'Connection: close',
'Content-Length: ' + str(len(data))
]
if self._cookies:
str_tmp.append(
'Cookie: ' + '; '.join([
x + '=' + self._cookies[x].value for x in self._cookies]
)
)

data = self._CRLF.join(str_tmp) + self._CRLF + self._CRLF + data
self._e_debug("\n" + '---SENDING---' + "\n" + data + "\n" +
'---END---' + "\n", self.DEBUG_CLIENT).encode()

try:
conn.write(data.encode())
except Exception as e:
conn.close()
raise e

data = ''
str_tmp = True
while str_tmp != '':
str_tmp = conn.read(4096)
data += str_tmp.decode()
conn.close()
str_tmp = None

self._e_debug("\n" + '---GOT---' + "\n" + data + "\n" +
'---END---' + "\n", self.DEBUG_SERVER)

data = self._parse_http_response(data)

# TODO: Improve cookie management.
c = data.getheader('Set-Cookie') or data.getheader('Cookie')
self._cookies = Cookies.Simple.Cookie(c) if c else None

if data.status != OK:
raise exceptions.SITaxServerError(data.read(), data.status)

data = data.read()
data = json.loads(data)
_log.info('\n*--------\n'
'*RECEIVE: %s\n'
'*-----------\n', data)
if sign:
data = self._parse_signed_response(data)
# Take only the payload.
data = data[1]
# Get sub-content.
data = data.itervalues().next()
# Remove header.
del(data['Header'])
# Any potencial errors should be stored here. If found, raise an
# exception.
if 'Error' in data:
raise exceptions.SITaxRegistryError(
data['Error']['ErrorMessage'],
data['Error']['ErrorCode']
)

return data

def get_session_cookie(self):
"""Get session cookie object."""
return self._cookies

def set_session_cookie(self, cookie):
"""Set session cookie object."""
self._cookies = cookie

def _inject_header(self, data, uid=None, dtm=None):
"""Insert message identification in first and only child of dict."""
if not uid:
uid = str(uuid4())
if not dtm:
dtm = dt.utcnow()
if isinstance(data, str):
data = json.loads(data)
if not isinstance(data, dict):
raise TypeError('Data inappropriate to perform signature on')
if len(data) != 1:
raise ValueError('Data inappropriate to perform signature on')
h = data.iterkeys().next()
if not isinstance(data[h], dict):
raise TypeError('Data inappropriate to perform signature on')
data[h]['Header'] = {
'MessageID': uid,
'DateTime': dtm.strftime(self.SI_TAX_REG_DT_FORMAT),
}
return data

def _sign(self, data):
"""Sign the request message."""
h = self._get_jws_header() + '.' + self._jws_base64_encode(
json.dumps(data))
return {'token': h + '.' + self._signature(h)}

def _signature(self, data):
"""Create a signature out of a certificate header and already encoded
and joined message payload.
"""
self._check_and_load_private_key()

h = SHA256.new(data.encode('UTF-8'))
return self._jws_base64_encode(
PKCS.new(self._crypto_private_key).sign(h))

def _parse_http_response(self, response_str):
"""Parse response string into an HTTP object."""
sock = TmpSock(response_str)
res = HTTPResponse(sock)
res.begin()
return res

def _parse_signed_response(self, data):
"""Parse and fix encoded response from the tax registry server."""
res = []
data = data['token'].split('.')
signature = data.pop()
# Parse JWS header and payload.
for d in data:
res.append(json.loads(self._jws_base64_decode(d)))
# TODO: Parse signature.
res.append(signature)
return res

def _get_jws_header(self):
"""Parse if necessary and get public certificate JWS header."""
if self._jws_header == '':
# If header hasn't been made yet, construct it and store in buffer
# for possible later access.
# TODO: This is the only place where we need the OpenSSL module.
# See if we can somehow read the data without it so we can remove
# the dependency.
b = open(self._certfile, 'r').read()
x = crypto.load_certificate(crypto.FILETYPE_PEM, b)

jws_header = {
'alg': self._ALGO,
'subject_name': ','.join(map(lambda x: x[0] + '=' + x[1],
x.get_subject().get_components())),
'issuer_name': ','.join(map(lambda x: x[0] + '=' + x[1],
x.get_issuer().get_components())),
'serial': x.get_serial_number(),
}

self._jws_header = self._jws_base64_encode(json.dumps(jws_header))

return self._jws_header

def _jws_base64_encode(self, s):
"""Encode string into a JWS compliant base64 format."""
return base64.urlsafe_b64encode(s).replace(b'=', b'')

def _jws_base64_decode(self, s):
"""Decode string from a JWS base64 format."""
s += b'=' * (4 - (len(s) % 4))
return base64.urlsafe_b64decode(s.encode('UTF-8'))

def _calculate_protective_mark(self, tax_number, date, invoice_number,
premise, device, amount):
"""Generate "ZOI" signature."""
self._check_and_load_private_key()
sig = str(tax_number) + date + invoice_number + premise + device \
+ str(amount)
sig = SHA256.new(sig.encode('UTF-8')).digest()
sig = self._crypto_private_key.sign(sig, '')[0]
return hashlib.md5.new(str(sig)).hexdigest()

def _populate_invoice_data(self, data):
"""Add missing and correct existing data in invoice dict."""
if 'IssueDateTime' in data and \
isinstance(data['IssueDateTime'], datetime.datetime):
data['IssueDateTime'] = \
data['IssueDateTime'].strftime(self.SI_TAX_REG_DT_FORMAT)
if 'ReferenceInvoice' in data and \
isinstance(data['ReferenceInvoice'], list) and \
len(data['ReferenceInvoice']) > 0 and \
'ReferenceInvoiceIssueDateTime' in \
data['ReferenceInvoice'][0] and \
isinstance(
data['ReferenceInvoice'][0]
['ReferenceInvoiceIssueDateTime'],
datetime.datetime
):
data['ReferenceInvoice'][0]['ReferenceInvoiceIssueDateTime'] = \
data['ReferenceInvoice'][0]['ReferenceInvoiceIssueDateTime'] \
.strftime(self.SI_TAX_REG_DT_FORMAT)
if 'ReferenceSalesBook' in data and \
isinstance(data['ReferenceSalesBook'], list) and \
len(data['ReferenceSalesBook']) > 0 and \
'ReferenceSalesBookIssueDate' in \
data['ReferenceSalesBook'][0] and \
isinstance(
data['ReferenceSalesBook'][0]
['ReferenceSalesBookIssueDate'],
datetime.date
):
data['ReferenceSalesBook'][0]['ReferenceSalesBookIssueDate'] = \
data['ReferenceSalesBook'][0]['ReferenceSalesBookIssueDate'] \
.strftime(self.SI_TAX_REG_DATE_FORMAT)

if 'ProtectedID' not in data:
data['ProtectedID'] = self._calculate_protective_mark(
data['TaxNumber'], data['IssueDateTime'],
data['InvoiceIdentifier']['InvoiceNumber'],
data['InvoiceIdentifier']['BusinessPremiseID'],
data['InvoiceIdentifier']['ElectronicDeviceID'],
data['InvoiceAmount']
)
_log.info('\n*-----------\n'
'*SEND: %s\n'
'*-------------\n', data)
return data

def _check_and_load_private_key(self):
"""Load the private key content into class property."""
if not self._crypto_private_key:
h = open(self._keyfile, 'r').read()
self._crypto_private_key = RSA.importKey(h, passphrase=self._key)

def _e_debug(self, msg, level):
"""Output debug message to user-selected method."""
if level > self._debug:
return

if self._debug_output == 'logger':
debug(msg)
else:
msg = sub('/(\r\n|\r|\n)/ms', "\n", msg)
print (dt.now().strftime('%Y-%m-%d %H:%M:%S') + "\t" \
+ msg.strip() + "\n")


Code v9:

# -*- coding: utf-8 -*-


"""Slovenian tax invoice registration and other commands required by the
system.
"""

import base64
import Cookie
from Crypto.Hash import SHA256
from Crypto.PublicKey import RSA
import Crypto.Signature.PKCS1_v1_5 as PKCS
import datetime
from datetime import datetime as dt
from httplib import HTTPResponse
from httplib import OK
from logging import debug
import md5
from OpenSSL import crypto
from re import sub
import json
import socket
import ssl
from uuid import uuid4

import exceptions
from __init__ import __version__
from TmpSock import TmpSock
import logging
_log = logging.getLogger(__name__)


class SITaxReg(object):
"""Slovenian Tax Registry client object."""

DEBUG_OFF = 0
DEBUG_SERVER = 1
DEBUG_CLIENT = 2
DEBUG_OUTPUT_PRINT = 'print'
DEBUG_OUTPUT_LOGGER = 'logger'

SI_TAX_REG_DATE_FORMAT = '%Y-%m-%d'
SI_TAX_REG_DT_FORMAT = '%Y-%m-%dT%H:%M:%SZ'

_CRLF = "\r\n"
_ALGO = 'RS256'

_keyfile = ''
_certfile = ''
_ca_cert = None
_cert_reqs = ssl.CERT_NONE
_jws_header = ''
_key = None
_crypto_private_key = None
_debug_output = DEBUG_OUTPUT_PRINT
_debug = DEBUG_OFF

_scheme = 'https'
_host = 'blagajne.fu.gov.si'
_port = 9003
_query_string = '/v1/'

_user_agent = 'Editor d.o.o. SI Tax Registry Python Library/%s'
_cookies = None

_message_id = ''
_request_datetime = None
_zoi = ''

def __init__(self, keyfile, certfile, ca_certs=None, key=None, dev=False):
"""Return a new class instance with specified certificates."""
self._keyfile = keyfile
self._certfile = certfile
self._ca_certs = ca_certs
self._key = key

# When verifier certificates are provided, we require certificate
# authentication.
if self._ca_certs:
self._cert_reqs = ssl.CERT_REQUIRED

self._user_agent = self._user_agent % (__version__)

if dev:
self._host = 'blagajne-test.fu.gov.si'
self._port = 9002

def set_debug_output(self, output):
"""Set debug output destionation."""
self._debug_output = output

def set_debug(self, level):
"""Set debug level."""
self._debug = level

def get_message_id(self):
"""Get message ID of last sent message if any."""
return self._message_id

def get_request_datetime(self):
"""Get last message request time if any."""
return self._request_datetime

def get_zoi(self):
"""Get last used issuer mark in a request if any."""
return self._zoi

def echo(self):
"""Send echo request and verify echo server response."""
res = self.send('cash_registers/echo', {'EchoRequest': 'furs'},
sign=False)
return 'EchoResponse' in res and res['EchoResponse'] == 'furs'

def register_business_premise(self, data):
"""Register a business premise."""
self.send('cash_registers/invoices/register', {
'BusinessPremiseRequest': {'BusinessPremise': data},
})
return True

def issue_electronic_invoice(self, data):
"""Send invoice request for invoices made with an electronic device."""
res = self.send('cash_registers/invoices', {
'InvoiceRequest': {'Invoice': self._populate_invoice_data(data)},
})
return '' if 'UniqueInvoiceID' not in res else res['UniqueInvoiceID']

def issue_batch_electronic_invoice(self, data):
"""Send a list of invoices request for invoices made with an electronic
device.
"""
data_tmp = []
for i, d in enumerate(data):
data_tmp.append({
'RecordNumber': i + 1,
'Invoice': self._populate_invoice_data(d),
})

res = self.send('cash_registers_batch/invoices', {
'InvoiceListRequest': {'InvoiceList': {'RecordInfo': data_tmp}},
})
data_tmp = []
if 'RecordReply' not in res:
return []
# TODO: Check the response list against the request list.
return [(r['ProtectedID'],
r['UniqueInvoiceID']) for r in res['RecordReply']]

def issue_prenumbered_invoice(self, data):
"""Send invoice request for invoices made with a pre-numbered invoice
book.
"""
if 'IssueDate' in data and \
isinstance(data['IssueDate'], datetime.date):
data['IssueDate'] = \
data['IssueDate'].strftime(self.SI_TAX_REG_DATE_FORMAT)
if 'ReferenceSalesBook' in data and \
isinstance(data['ReferenceSalesBook'], list) and \
len(data['ReferenceSalesBook']) > 0 and \
'ReferenceSalesBookIssueDate' in \
data['ReferenceSalesBook'][0] and \
isinstance(
data['ReferenceSalesBook'][0]
['ReferenceSalesBookIssueDate'],
datetime.date
):
data['ReferenceSalesBook'][0]['ReferenceSalesBookIssueDate'] = \
data['ReferenceSalesBook'][0]['ReferenceSalesBookIssueDate'] \
.strftime(self.SI_TAX_REG_DATE_FORMAT)
if 'ReferenceInvoice' in data and \
isinstance(data['ReferenceInvoice'], list) and \
len(data['ReferenceInvoice']) > 0 and \
'ReferenceInvoiceIssueDateTime' in \
data['ReferenceInvoice'][0] and \
isinstance(
data['ReferenceInvoice'][0]
['ReferenceInvoiceIssueDateTime'],
datetime.datetime
):
data['ReferenceInvoice'][0]['ReferenceInvoiceIssueDateTime'] = \
data['ReferenceInvoice'][0]['ReferenceInvoiceIssueDateTime'] \
.strftime(self.SI_TAX_REG_DT_FORMAT)

res = self.send('cash_registers/invoices', {
'InvoiceRequest': {'SalesBookInvoice': data},
})
return '' if 'UniqueInvoiceID' not in res else res['UniqueInvoiceID']

def send(self, endpoint, data, sign=True):
"""Send request data to registry server."""
self._message_id = ''
self._request_datetime = None
self._zoi = ''

if isinstance(data, str):
data = json.loads(data)

if 'InvoiceRequest' in data and \
'Invoice' in data['InvoiceRequest'] and \
'ProtectedID' in data['InvoiceRequest']['Invoice']:
self._zoi = data['InvoiceRequest']['Invoice']['ProtectedID']
elif 'InvoiceListRequest' in data and \
'InvoiceList' in data['InvoiceListRequest'] and \
'RecordInfo' in data['InvoiceListRequest']['InvoiceList']:
str_tmp = len(data['InvoiceListRequest']['InvoiceList']
['RecordInfo'])
if str_tmp > 0:
str_tmp = (data['InvoiceListRequest']['InvoiceList']
['RecordInfo'][str_tmp - 1])
if 'Invoice' in str_tmp and \
'ProtectedID' in str_tmp['Invoice']:
self._zoi = str_tmp['Invoice']['ProtectedID']

if sign:
data = self._inject_header(data)
str_tmp = data.iterkeys().next()
self._message_id = data[str_tmp]['Header']['MessageID']
self._request_datetime = dt.strptime(
data[str_tmp]['Header']['DateTime'],
self.SI_TAX_REG_DT_FORMAT
)
data = self._sign(data)

conn = ssl.wrap_socket(socket.socket(), keyfile=self._keyfile,
certfile=self._certfile,
cert_reqs=self._cert_reqs,
ca_certs=self._ca_certs)
conn.connect((self._host, self._port))

if not isinstance(data, str):
data = json.dumps(data)
str_tmp = [
'POST %s%s HTTP/1.1' % (self._query_string, endpoint),
'Host: %s:%d' % (self._host, self._port),
'User-Agent: ' + (self._user_agent),
'Content-Type: application/json; charset=UTF-8',
'Connection: close',
'Content-Length: ' + str(len(data))
]
if self._cookies:
str_tmp.append(
'Cookie: ' + '; '.join([
x + '=' + self._cookies[x].value for x in self._cookies]
)
)

data = self._CRLF.join(str_tmp) + self._CRLF + self._CRLF + data
self._e_debug("\n" + '---SENDING---' + "\n" + data + "\n" +
'---END---' + "\n", self.DEBUG_CLIENT)

try:
conn.write(data)
except Exception as e:
conn.close()
raise e

data = ''
str_tmp = True
while str_tmp != '':
str_tmp = conn.read(4096)
data += str_tmp
conn.close()
str_tmp = None

self._e_debug("\n" + '---GOT---' + "\n" + data + "\n" +
'---END---' + "\n", self.DEBUG_SERVER)

data = self._parse_http_response(data)

# TODO: Improve cookie management.
c = data.getheader('Set-Cookie') or data.getheader('Cookie')
self._cookies = Cookie.SimpleCookie(c) if c else None

if data.status != OK:
raise exceptions.SITaxServerError(data.read(), data.status)

data = data.read()
data = json.loads(data)
# _log.info('\n*--------\n'
# '*RECEIVE: %s\n'
# '*-----------\n', data)
if sign:
data = self._parse_signed_response(data)
# Take only the payload.
data = data[1]
# Get sub-content.
data = data.itervalues().next()
# Remove header.
del(data['Header'])
# Any potencial errors should be stored here. If found, raise an
# exception.
if 'Error' in data:
raise exceptions.SITaxRegistryError(
data['Error']['ErrorMessage'],
data['Error']['ErrorCode']
)

return data

def get_session_cookie(self):
"""Get session cookie object."""
return self._cookies

def set_session_cookie(self, cookie):
"""Set session cookie object."""
self._cookies = cookie

def _inject_header(self, data, uid=None, dtm=None):
"""Insert message identification in first and only child of dict."""
if not uid:
uid = str(uuid4())
if not dtm:
dtm = dt.utcnow()
if isinstance(data, str):
data = json.loads(data)
if not isinstance(data, dict):
raise TypeError('Data inappropriate to perform signature on')
if len(data) != 1:
raise ValueError('Data inappropriate to perform signature on')
h = data.iterkeys().next()
if not isinstance(data[h], dict):
raise TypeError('Data inappropriate to perform signature on')
data[h]['Header'] = {
'MessageID': uid,
'DateTime': dtm.strftime(self.SI_TAX_REG_DT_FORMAT),
}
return data

def _sign(self, data):
"""Sign the request message."""
h = self._get_jws_header() + '.' + self._jws_base64_encode(
json.dumps(data))
return {'token': h + '.' + self._signature(h)}

def _signature(self, data):
"""Create a signature out of a certificate header and already encoded
and joined message payload.
"""
self._check_and_load_private_key()

h = SHA256.new(data.encode('UTF-8'))
return self._jws_base64_encode(
PKCS.new(self._crypto_private_key).sign(h))

def _parse_http_response(self, response_str):
"""Parse response string into an HTTP object."""
sock = TmpSock(response_str)
res = HTTPResponse(sock)
res.begin()
return res

def _parse_signed_response(self, data):
"""Parse and fix encoded response from the tax registry server."""
res = []
data = data['token'].split('.')
signature = data.pop()
# Parse JWS header and payload.
for d in data:
res.append(json.loads(self._jws_base64_decode(d)))
# TODO: Parse signature.
res.append(signature)
return res

def _get_jws_header(self):
"""Parse if necessary and get public certificate JWS header."""
if self._jws_header == '':
# If header hasn't been made yet, construct it and store in buffer
# for possible later access.
# TODO: This is the only place where we need the OpenSSL module.
# See if we can somehow read the data without it so we can remove
# the dependency.
b = open(self._certfile, 'r').read()
x = crypto.load_certificate(crypto.FILETYPE_PEM, b)

jws_header = {
'alg': self._ALGO,
'subject_name': ','.join(map(lambda x: x[0] + '=' + x[1],
x.get_subject().get_components())),
'issuer_name': ','.join(map(lambda x: x[0] + '=' + x[1],
x.get_issuer().get_components())),
'serial': x.get_serial_number(),
}

self._jws_header = self._jws_base64_encode(json.dumps(jws_header))

return self._jws_header

def _jws_base64_encode(self, s):
"""Encode string into a JWS compliant base64 format."""
return base64.urlsafe_b64encode(s).replace(b'=', b'')

def _jws_base64_decode(self, s):
"""Decode string from a JWS base64 format."""
s += b'=' * (4 - (len(s) % 4))
return base64.urlsafe_b64decode(s.encode('UTF-8'))

def _calculate_protective_mark(self, tax_number, date, invoice_number,
premise, device, amount):
"""Generate "ZOI" signature."""
self._check_and_load_private_key()
sig = str(tax_number) + date + invoice_number + premise + device \
+ str(amount)
sig = SHA256.new(sig.encode('UTF-8')).digest()
sig = self._crypto_private_key.sign(sig, '')[0]
return md5.new(str(sig)).hexdigest()

def _populate_invoice_data(self, data):
"""Add missing and correct existing data in invoice dict."""
if 'IssueDateTime' in data and \
isinstance(data['IssueDateTime'], datetime.datetime):
data['IssueDateTime'] = \
data['IssueDateTime'].strftime(self.SI_TAX_REG_DT_FORMAT)
if 'ReferenceInvoice' in data and \
isinstance(data['ReferenceInvoice'], list) and \
len(data['ReferenceInvoice']) > 0 and \
'ReferenceInvoiceIssueDateTime' in \
data['ReferenceInvoice'][0] and \
isinstance(
data['ReferenceInvoice'][0]
['ReferenceInvoiceIssueDateTime'],
datetime.datetime
):
data['ReferenceInvoice'][0]['ReferenceInvoiceIssueDateTime'] = \
data['ReferenceInvoice'][0]['ReferenceInvoiceIssueDateTime'] \
.strftime(self.SI_TAX_REG_DT_FORMAT)
if 'ReferenceSalesBook' in data and \
isinstance(data['ReferenceSalesBook'], list) and \
len(data['ReferenceSalesBook']) > 0 and \
'ReferenceSalesBookIssueDate' in \
data['ReferenceSalesBook'][0] and \
isinstance(
data['ReferenceSalesBook'][0]
['ReferenceSalesBookIssueDate'],
datetime.date
):
data['ReferenceSalesBook'][0]['ReferenceSalesBookIssueDate'] = \
data['ReferenceSalesBook'][0]['ReferenceSalesBookIssueDate'] \
.strftime(self.SI_TAX_REG_DATE_FORMAT)

if 'ProtectedID' not in data:
data['ProtectedID'] = self._calculate_protective_mark(
data['TaxNumber'], data['IssueDateTime'],
data['InvoiceIdentifier']['InvoiceNumber'],
data['InvoiceIdentifier']['BusinessPremiseID'],
data['InvoiceIdentifier']['ElectronicDeviceID'],
data['InvoiceAmount']
)
# _log.info('\n*-----------\n'
# '*SEND: %s\n'
# '*-------------\n', data)
return data

def _check_and_load_private_key(self):
"""Load the private key content into class property."""
if not self._crypto_private_key:
h = open(self._keyfile, 'r').read()
self._crypto_private_key = RSA.importKey(h, passphrase=self._key)

def _e_debug(self, msg, level):
"""Output debug message to user-selected method."""
if level > self._debug:
return

if self._debug_output == 'logger':
debug(msg)
else:
msg = sub('/(\r\n|\r|\n)/ms', "\n", msg)
print dt.now().strftime('%Y-%m-%d %H:%M:%S') + "\t" \
+ msg.strip() + "\n"

0
Avatar
Buang
Avatar
shalin wilson
Jawaban Terbai

Nonetype object comes when u dont have any values at    self.DEBUG_CLIENT
please debug and check what value u have at self.DEBUG_CLIENT

0
Avatar
Buang
Menikmati diskusi? Jangan hanya membaca, ikuti!

Buat akun sekarang untuk menikmati fitur eksklufi dan agar terlibat dengan komunitas kami!

Daftar
Post Terkait Replies Tampilan Aktivitas
Odoo 14: custom addons path not working
addons custom v14
Avatar
0
Des 20
7226
ValueError: Invalid field 'module_l10n_fr_hr_payroll' on model 'res.config.settings'
field addons custom v15
Avatar
Avatar
1
Mar 22
7142
Add a third-party application on odoo 13
addons ubuntu custom terminal Odoo13.0
Avatar
Avatar
2
Okt 20
8621
odoo (V12) Error pops "a mandatory field is not set" when i try a custom module in multi company Diselesaikan
custom urgent multicompany mandatory odooV12
Avatar
Avatar
1
Jan 20
5014
Odoo isn't seeing an add-on that I downloaded and placed in the addons folder Diselesaikan
addons
Avatar
Avatar
Avatar
Avatar
Avatar
5
Okt 25
2883
Komunitas
  • Tutorial-tutorial
  • Dokumentasi
  • Forum
Open Source
  • Download
  • Github
  • Runbot
  • Terjemahan
Layanan
  • Odoo.sh Hosting
  • Bantuan
  • Peningkatan
  • Custom Development
  • Pendidikan
  • Temukan Akuntan
  • Temukan Mitra
  • Menjadi Partner
Tentang Kami
  • Perusahaan kami
  • Aset Merek
  • Hubungi kami
  • Tugas
  • Acara
  • Podcast
  • Blog
  • Pelanggan
  • Hukum • Privasi
  • Keamanan
الْعَرَبيّة Català 简体中文 繁體中文 (台灣) Čeština Dansk Nederlands English Suomi Français Deutsch हिंदी Bahasa Indonesia Italiano 日本語 한국어 (KR) Lietuvių kalba Język polski Português (BR) română русский язык Slovenský jazyk slovenščina Español (América Latina) Español ภาษาไทย Türkçe українська Tiếng Việt

Odoo adalah rangkaian aplikasi bisnis open source yang mencakup semua kebutuhan perusahaan Anda: CRM, eCommerce, akuntansi, inventaris, point of sale, manajemen project, dan seterusnya.

Mudah digunakan dan terintegrasi penuh pada saat yang sama adalah value proposition unik Odoo.

Website made with

Odoo Experience on YouTube

1. Use the live chat to ask your questions.
2. The operator answers within a few minutes.

Live support on Youtube
Watch now