trytonpsk-sale_web_channel/rappi.py

291 lines
11 KiB
Python

import requests
import json
from datetime import datetime
from collections import namedtuple
from trytond.pool import Pool
from . import endpoints_rappi
from decimal import Decimal
from trytond.transaction import Transaction
from trytond.modules.product import round_price
URL_AUTH_DEV = 'https://rests-integrations-dev.auth0.com/oauth/token'
URL_AUTH_PRODUCTION = 'https://rests-integrations.auth0.com/oauth/token'
URL_ENV = 'https://microservices.dev.rappi.com/api/v2/restaurants-integrations-public-api'
AUDIENCE = 'https://int-public-api-v2/api'
HEADERS = {
'Accept': 'application/json',
'Content-type': 'application/json'
}
URL_DEV = 'https://rests-integrations-dev.auth0.com/oauth/token'
URL_PRODUCTION = 'https://rests-integrations.auth0.com/oauth/token'
HEADERS = {
'Accept': 'application/json',
'Content-type': 'application/json'
}
class Rappi:
"""Rappi"""
def __init__(self, web_shop):
self.id = web_shop.id
self.price_list = web_shop.price_list
self.company = web_shop.company
self.seller_id = web_shop.seller_id
self.secret_key = web_shop.secret_key
self.app_id = web_shop.app_id
self.store_integration_id = web_shop.app_id
self.url_base = 'https://microservices.dev.rappi.com/api/v2/restaurants-integrations-public-api'
self.url_auth = URL_AUTH_DEV
self.token = web_shop.access_token if web_shop.access_token else ''
self.session = requests.Session()
self.session.headers.update({
'x-authorization': 'bearer ' + self.token
})
self.session.headers.update({'Content-Type': 'application/json'})
def _get_context(self):
print(self)
return {
'company': self.company.id,
'user': self.user.id,
'language': 'es'
}
def get_token_access(self):
data = {
"client_id": self.seller_id,
"client_secret": self.secret_key,
"audience": AUDIENCE,
"grant_type": "client_credentials"
}
return self._send_request(method='POST', data=data, auth=True)
def _send_request(self, endpoint=None, method='GET', data: dict = None, auth: bool = False):
if auth:
url = self.url_auth
else:
url = self.url_base + endpoint
print(url, 'esta es mi url')
try:
if method == 'GET':
response = self.session.get(url, timeout=10)
elif method == 'POST':
response = self.session.post(url, data=json.dumps(data), timeout=10)
elif method == 'PUT':
response = self.session.put(url, data=json.dumps(data), timeout=10)
elif method == 'DELETE':
response = self.session.delete(url, timeout=10)
else:
raise ValueError(f'Method {method} not supported')
print(response, 'response solo')
print(response.text, 'response text')
print(response.text.encode('utf8'))
content_type = response.headers.get('Content-Type', '')
# if response.status_code == 200:
if content_type.startswith('application/json'):
return response.json()
else:
return response.text
except requests.exceptions.RequestException as e:
raise RuntimeError(f'Request error: {str(e)}')
def synchronize_menu_rappi(self, products, categories):
items = []
"""
NOTE: If menu don't created return response message dict {message: 'reason'}
"""
items = self.get_products(products)
items = self.get_products_by_category(categories, items)
endpoint = '/menu'
items_data = [value for key, value in items.items()]
data = {
'storeId': self.store_integration_id,
'items': items_data,
}
print(data)
return self._send_request(endpoint=endpoint, method='POST', data=data)
def get_products(self, products):
item_dicts = {}
for product in products:
if product.categories:
item_dicts[product.code] = {
"name": product.name,
"description": product.description if product.description else 'not available',
"price": float(self.compute_unit_price(product)),
"sku": product.code,
"sortingPosition": 0,
"type": "PRODUCT",
"category": {
"id": product.categories[0].id,
"maxQty": 0,
"minQty": 0,
"name": product.categories[0].name,
"sortingPosition": 0
},
"children": self.get_children(product),
}
return item_dicts
def get_products_by_category(self, categories, items):
for category in categories:
for product in category.products:
try:
items[product.code] = {
"name": product.name,
"description": product.description if product.description else 'not available',
"price": float(self.compute_unit_price(product)),
"sku": product.code,
"sortingPosition": 0,
"type": "PRODUCT",
"category": {
"id": product.categories[0].id,
"maxQty": 0,
"minQty": 0,
"name": product.categories[0].name,
"sortingPosition": 0
}
}
except KeyError:
pass
return items
def get_children(self, product):
list_children = []
for mix in product.products_mix:
if mix.categories:
list_children.append({
"category": {
"id": mix.categories[0].id,
"maxQty": 1,
"minQty": 0,
"name": "Do you want to add?",
"sortingPosition": 0
},
"name": mix.name,
"description": "crunchy french fries",
"price": 0,
"sku": mix.code,
"maxLimit": 1,
"sortingPosition": 1,
"type": "TOPPING"
})
return list_children
def get_orders(self):
endpoint = f'/orders?storeId={self.app_id}'
return self._send_request(endpoint=endpoint, method='GET')
def take_order(self, orderId, cookingTime):
endpoint = f'/orders/{orderId}/take/{cookingTime}'
return self._send_request(endpoint=endpoint, method='PUT')
def reject(self, orderId):
endpoint = f'/orders/{orderId}/reject'
reason = {
"reason": "The order has invalid items",
"cancel_type": "ITEM_NOT_FOUND",
"items_skus": ["sku1", "sku2"]
}
return self._send_request(endpoint=endpoint, method='PUT', data=reason)
def ready_for_pickup(self, orderId):
endpoint = f'/orders/{orderId}/ready-for-pickup'
return self._send_request(endpoint=endpoint, method='POST')
def create_sales(self):
orders = self.get_orders()
Sale = Pool().get('sale.sale')
Party = Pool().get('party.party')
pos_client, = Party.search([
('id_number', '=', '222222222')
])
# _sales = []
for order in orders:
order_detail = order['order_detail']
to_create = Sale(
web_shop=self.id,
invoice_type='P',
company=self.company,
party=pos_client.id,
sale_device=None,
payment_method='cash',
payment_term=None,
kind='take_away',
web_id=order_detail['order_id'],
# consumer=self.check_consumer(order_detail['billing_information'],
# order_detail['delivery_information'],
# order_detail['totals']['charges']['shipping'])
)
to_create.save()
self.create_sale_lines(order_detail['items'], to_create.id)
def create_sale_lines(self, items, sale, sub_items=False):
print('create_sale_lines')
lines_to_create = []
lines_to_create_append = lines_to_create.append
Product = Pool().get('product.product')
SaleLine = Pool().get('sale.line')
for item in items:
product, = Product.search([('code', '=', item['sku'])])
_line = SaleLine(
sale=sale,
product=product.id,
description=product.description,
)
_line.on_change_product()
_line.quantity = item['quantity']
_line.base_price = self.get_price(item['unit_price_without_discount'], sub_items)
_line.discount_rate = self.get_price(item['percentage_discount'], sub_items)
_line.unit_price = self.get_price(item['unit_price_without_discount'], sub_items)
_line.unit = product.default_uom.id
_line.save()
lines_to_create_append(_line)
self.create_sale_lines(item['subitems'], sale, sub_items=True)
return lines_to_create
def get_price(self, price, subitem):
if subitem:
return 0
else:
return Decimal(str(round(price)))
def compute_unit_price(self, product):
pool = Pool()
Product = pool.get('product.product')
context = {}
if self.price_list:
print('si tiene lista de precios')
context['price_list'] = self.price_list.id
with Transaction().set_context(context):
unit_price = Product.get_sale_price([product],
1 or 0)[product.id]
print(unit_price, 'este es el unit price')
if unit_price:
unit_price = round_price(unit_price)
return unit_price
def check_consumer(self, billing_information, delivery_information, shipping):
Consumer = Pool().get('party.consumer')
consumer = Consumer.search([('phone', '=', billing_information['phone'])])
if len(consumer) > 0 and consumer[0]:
consumer.address = delivery_information['complete_address']
consumer.id_number = consumer.id_number if consumer.id_number else billing_information['document_number']
consumer.name = billing_information['name']
consumer.delivery = shipping
consumer.notes = billing_information['billing_type'] + ' / ' + 'indicaciones: ' + delivery_information['complement']
consumer.save()
else:
consumer = Consumer(
address=delivery_information['complete_address'],
id_number=consumer.id_number if consumer.id_number else billing_information['document_number'],
name=billing_information['name'],
delivery=shipping,
notes=billing_information['billing_type'] + ' / ' + 'indicaciones: ' + delivery_information['complement'],
)
return consumer.id