Data shouldn’t be stuck in one place. Whether you’re getting daily sales from a partner’s system or sending invoices to your accountant’s software, manually uploading CSV files becomes slow and inefficient over time.
Automated scripts help connect your systems. Instead of manually importing data every day, you can set everything to run automatically—daily, hourly, or whenever something happens. This not only saves time but also reduces human errors that often occur during manual work.
Usage
For Imports: You typically use Python methods with Odoo's ORM to read data from external sources (JSON, XML, API responses) and create or update records in Odoo. The script can be triggered by scheduled actions (cron jobs) or called manually.
For Exports: You extract data from Odoo models using search and read operations, format it into the desired structure (CSV, Excel, JSON, XML), and send it to external systems via file export, email, FTP, or API calls.
Examples:
1. Scheduled Product Export to CSV
Set up a scheduled action that runs daily, extracts product data, writes it to a CSV file, and saves it to a specific directory or emails it to stakeholders.
File: your_module/models/product_export.py
import csv
import logging
import os
from datetime import datetime
from odoo import models, api
_logger = logging.getLogger(__name__)
EXPORT_DIR = '/home/odoo/exports/'
class ProductExport(models.Model):
_name = 'product.export'
_description = 'Scheduled Product Export'
@api.model
def export_products_to_csv(self):
"""
Exports all active storable/consumable products to a dated CSV file.
Designed to be called by a Scheduled Action (cron).
"""
# Ensure the export directory exists
os.makedirs(EXPORT_DIR, exist_ok=True)
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
filename = f"products_export_{timestamp}.csv"
filepath = os.path.join(EXPORT_DIR, filename)
# Fetch products - adjust domain as needed
products = self.env['product.template'].search([
('active', '=', True),
('type', 'in', ['product', 'consu']),
])
fieldnames = [
'id', 'name', 'default_code', 'type',
'list_price', 'standard_price', 'categ_id',
'qty_available', 'uom_id'
]
rows_written = 0
with open(filepath, mode='w', newline='', encoding='utf-8') as csvfile:
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
writer.writeheader()
for product in products:
writer.writerow({
'id': product.id,
'name': product.name,
'default_code': product.default_code or '',
'type': product.type,
'list_price': product.list_price,
'standard_price': product.standard_price,
'categ_id': product.categ_id.name if product.categ_id else '',
'qty_available': product.qty_available,
'uom_id': product.uom_id.name if product.uom_id else '',
})
rows_written += 1
_logger.info("Product export complete: %d rows written to %s", rows_written, filepath)
# Optional: Email the file to stakeholders
self._email_export_file(filepath, filename, rows_written)
return filepath
def _email_export_file(self, filepath, filename, row_count):
"""Attaches the CSV to an email and sends it."""
recipient_email = self.env['ir.config_parameter'].sudo().get_param(
'product_export.recipient_email', default='manager@yourcompany.com'
)
with open(filepath, 'rb') as f:
file_content = f.read()
# Create an ir.attachment
attachment = self.env['ir.attachment'].create({
'name': filename,
'type': 'binary',
'datas': file_content,
'res_model': 'product.export',
'mimetype': 'text/csv',
})
mail_values = {
'subject': f'Daily Product Export — {datetime.now().strftime("%Y-%m-%d")}',
'body_html': f'<p>Please find attached the daily product export report '
f'containing <strong>{row_count}</strong> products.</p>',
'email_to': recipient_email,
'attachment_ids': [(4, attachment.id)],
}
mail = self.env['mail.mail'].create(mail_values)
mail.send()
_logger.info("Product export email sent to %s", recipient_email)
Registering the Scheduled Action (XML):
<odoo>
<record id="cron_export_products" model="ir.cron">
<field name="name">Daily Product Export to CSV</field>
<field name="model_id" ref="your_module.model_product_export"/>
<field name="state">code</field>
<field name="code">model.export_products_to_csv()</field>
<field name="interval_number">1</field>
<field name="interval_type">days</field>
<field name="numbercall">-1</field>
<field name="active">True</field>
</record>
</odoo>
2. Import from External API
Use Python's requests library to fetch data from a REST API, parse the JSON response, and map it to Odoo model fields for import.
File: your_module/models/api_import.py
import logging
import requests
from odoo import models, api
from odoo.exceptions import UserError
_logger = logging.getLogger(__name__)
# Store these in Odoo System Parameters (Settings > Technical > Parameters)
API_BASE_URL = 'https://api.partnerstore.com/v2'
API_KEY = 'your_api_key_here'
class ApiImport(models.Model):
_name = 'api.import'
_description = 'External API Import'
@api.model
def import_orders_from_api(self):
"""
Fetches new orders from an external REST API and creates sale.orders in Odoo.
"""
# Retrieve API key securely from system parameters
api_key = self.env['ir.config_parameter'].sudo().get_param(
'api_import.api_key', default=API_KEY
)
headers = {
'Authorization': f'Bearer {api_key}',
'Content-Type': 'application/json',
'Accept': 'application/json',
}
try:
response = requests.get(
f'{API_BASE_URL}/orders',
headers=headers,
params={'status': 'new', 'per_page': 100},
timeout=30, # Always set a timeout
)
response.raise_for_status() # Raises HTTPError for 4xx/5xx
except requests.exceptions.Timeout:
_logger.error("API request timed out after 30 seconds.")
return
except requests.exceptions.ConnectionError as e:
_logger.error("Cannot connect to API: %s", str(e))
return
except requests.exceptions.HTTPError as e:
_logger.error("API returned error: %s", str(e))
return
orders_data = response.json()
if not isinstance(orders_data, list):
_logger.error("Unexpected API response format: %s", type(orders_data))
return
created_count = 0
skipped_count = 0
for order in orders_data:
try:
external_ref = str(order.get('id', ''))
# Skip if already imported (check for duplicate external reference)
existing = self.env['sale.order'].search(
[('client_order_ref', '=', f'EXT-{external_ref}')], limit=1
)
if existing:
skipped_count += 1
continue
# Resolve or create the customer
partner = self._get_or_create_partner(order.get('customer', {}))
if not partner:
_logger.warning("Could not resolve customer for order %s", external_ref)
continue
# Build order lines
order_lines = []
for item in order.get('line_items', []):
product = self.env['product.product'].search(
[('default_code', '=', item.get('sku'))], limit=1
)
if not product:
_logger.warning("Product SKU not found: %s", item.get('sku'))
continue
order_lines.append((0, 0, {
'product_id': product.id,
'product_uom_qty': float(item.get('quantity', 1)),
'price_unit': float(item.get('price', 0.0)),
'name': item.get('name', product.name),
}))
if not order_lines:
_logger.warning("No valid order lines for order %s", external_ref)
continue
# Create the sale order
self.env['sale.order'].create({
'partner_id': partner.id,
'client_order_ref': f'EXT-{external_ref}',
'order_line': order_lines,
'note': order.get('customer_note', ''),
})
created_count += 1
except Exception as e:
_logger.error("Failed to import order %s: %s", order.get('id'), str(e))
continue
_logger.info(
"API Import done — Created: %d | Skipped (duplicate): %d",
created_count, skipped_count
)
def _get_or_create_partner(self, customer_data):
"""Finds an existing partner by email or creates a new one."""
email = customer_data.get('email', '').strip()
if not email:
return None
partner = self.env['res.partner'].search([('email', '=', email)], limit=1)
if not partner:
partner = self.env['res.partner'].create({
'name': f"{customer_data.get('first_name', '')} {customer_data.get('last_name', '')}".strip(),
'email': email,
'phone': customer_data.get('phone', ''),
})
_logger.info("Created new partner: %s", partner.name)
return partner
3. Export Invoice Data to Excel
Generate Excel reports using libraries like openpyxl or xlsxwriter, format them with headers and styling, and attach them to email or save to file server.
Install dependency: pip install openpyxl (add to requirements.txt)
File: your_module/models/invoice_export.py
import io
import base64
import logging
from datetime import datetime, date
from odoo import models, api
_logger = logging.getLogger(__name__)
class InvoiceExportExcel(models.Model):
_name = 'invoice.export.excel'
_description = 'Invoice Excel Export'
@api.model
def export_invoices_to_excel(self, date_from=None, date_to=None):
"""
Generates an Excel report of posted invoices within a date range.
Returns the file as a base64-encoded ir.attachment.
"""
try:
import openpyxl
from openpyxl.styles import Font, PatternFill, Alignment, Border, Side
from openpyxl.utils import get_column_letter
except ImportError:
_logger.error("openpyxl is not installed. Run: pip install openpyxl")
return False
# Default to current month if no dates provided
today = date.today()
if not date_from:
date_from = today.replace(day=1)
if not date_to:
date_to = today
# Fetch posted (validated) customer invoices
invoices = self.env['account.move'].search([
('move_type', '=', 'out_invoice'),
('state', '=', 'posted'),
('invoice_date', '>=', date_from),
('invoice_date', '<=', date_to),
], order='invoice_date asc')
# -- Build Excel workbook ----------------------------------------------
wb = openpyxl.Workbook()
ws = wb.active
ws.title = 'Invoices'
# Styles
header_font = Font(name='Calibri', bold=True, color='FFFFFF', size=11)
header_fill = PatternFill(start_color='1F4E79', end_color='1F4E79', fill_type='solid')
header_align = Alignment(horizontal='center', vertical='center', wrap_text=True)
currency_fmt = '#,##0.00'
date_fmt = 'YYYY-MM-DD'
thin_border = Border(
left=Side(style='thin', color='CCCCCC'),
right=Side(style='thin', color='CCCCCC'),
bottom=Side(style='thin', color='CCCCCC'),
)
alt_fill = PatternFill(start_color='EBF3FB', end_color='EBF3FB', fill_type='solid')
# Headers
headers = [
'Invoice Number', 'Invoice Date', 'Due Date',
'Customer', 'Salesperson', 'Currency',
'Amount Untaxed', 'Tax Amount', 'Amount Total',
'Amount Due', 'Status'
]
ws.row_dimensions[1].height = 30
for col_idx, header in enumerate(headers, start=1):
cell = ws.cell(row=1, column=col_idx, value=header)
cell.font = header_font
cell.fill = header_fill
cell.alignment = header_align
cell.border = thin_border
# Data rows
for row_idx, inv in enumerate(invoices, start=2):
is_alt = (row_idx % 2 == 0)
row_data = [
inv.name,
inv.invoice_date,
inv.invoice_date_due,
inv.partner_id.name if inv.partner_id else '',
inv.invoice_user_id.name if inv.invoice_user_id else '',
inv.currency_id.name if inv.currency_id else '',
inv.amount_untaxed,
inv.amount_tax,
inv.amount_total,
inv.amount_residual,
dict(inv._fields['payment_state'].selection).get(inv.payment_state, inv.payment_state),
]
for col_idx, value in enumerate(row_data, start=1):
cell = ws.cell(row=row_idx, column=col_idx, value=value)
cell.border = thin_border
if is_alt:
cell.fill = alt_fill
# Apply specific formatting per column
if col_idx in (2, 3) and value: # Date columns
cell.number_format = date_fmt
cell.alignment = Alignment(horizontal='center')
elif col_idx in (7, 8, 9, 10): # Currency columns
cell.number_format = currency_fmt
cell.alignment = Alignment(horizontal='right')
# Auto-size columns
for col_idx, _ in enumerate(headers, start=1):
col_letter = get_column_letter(col_idx)
max_length = max(
len(str(ws.cell(row=r, column=col_idx).value or ''))
for r in range(1, len(invoices) + 2)
)
ws.column_dimensions[col_letter].width = min(max_length + 4, 40)
# Freeze the header row
ws.freeze_panes = 'A2'
# -- Save and attach
buffer = io.BytesIO()
wb.save(buffer)
buffer.seek(0)
filename = f"invoices_{date_from}_to_{date_to}.xlsx"
attachment = self.env['ir.attachment'].create({
'name': filename,
'type': 'binary',
'datas': base64.b64encode(buffer.read()),
'res_model': 'invoice.export.excel',
'mimetype': 'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet',
})
_logger.info("Invoice Excel export created: %s (%d rows)", filename, len(invoices))
return attachment.id
4. Automated FTP File Import
Connect to an FTP server, download new files, parse them, import data to Odoo, then archive or delete processed files.
File: your_module/models/ftp_import.py
import csv
import io
import ftplib
import logging
from odoo import models, api
_logger = logging.getLogger(__name__)
# Store these in Odoo System Parameters for security
FTP_HOST = 'ftp.yourpartner.com'
FTP_PORT = 21
FTP_USER = 'odoo_user'
FTP_PASSWORD = 'your_password'
FTP_INBOX = '/incoming/orders/'
FTP_ARCHIVE = '/processed/orders/'
class FtpImport(models.Model):
_name = 'ftp.import'
_description = 'FTP File Import'
@api.model
def run_ftp_import(self):
"""
Connects to an FTP server, downloads CSV files from the inbox,
imports the records to Odoo, then archives the processed files.
"""
ftp = None
try:
ftp = ftplib.FTP()
ftp.connect(host=FTP_HOST, port=FTP_PORT, timeout=30)
ftp.login(user=FTP_USER, passwd=FTP_PASSWORD)
_logger.info("Connected to FTP server: %s", FTP_HOST)
# List files in the inbox directory
ftp.cwd(FTP_INBOX)
filenames = ftp.nlst()
csv_files = [f for f in filenames if f.lower().endswith('.csv')]
if not csv_files:
_logger.info("No new CSV files found in FTP inbox.")
return
for filename in csv_files:
_logger.info("Processing FTP file: %s", filename)
try:
# Download the file into memory (no disk I/O needed)
buffer = io.BytesIO()
ftp.retrbinary(f'RETR {filename}', buffer.write)
buffer.seek(0)
# Decode and parse the CSV
content = buffer.read().decode('utf-8')
reader = csv.DictReader(io.StringIO(content))
imported = self._process_csv_rows(reader, filename)
archive_path = f'{FTP_ARCHIVE}{filename}'
ftp.rename(f'{FTP_INBOX}{filename}', archive_path)
_logger.info(
"Archived %s ? %s | Records imported: %d",
filename, archive_path, imported
)
except Exception as e:
_logger.error("Failed to process file %s: %s", filename, str(e))
continue # Move on to the next file
except ftplib.all_errors as e:
_logger.error("FTP connection error: %s", str(e))
finally:
if ftp:
try:
ftp.quit()
except Exception:
pass
def _process_csv_rows(self, reader, source_filename):
"""Processes rows from a parsed CSV DictReader and creates purchase orders."""
count = 0
for row in reader:
try:
vendor_name = row.get('vendor_name', '').strip()
product_sku = row.get('product_sku', '').strip()
quantity = float(row.get('quantity', 0))
unit_price = float(row.get('unit_price', 0.0))
if not all([vendor_name, product_sku, quantity]):
continue
vendor = self.env['res.partner'].search(
[('name', 'ilike', vendor_name), ('supplier_rank', '>', 0)],
limit=1
)
if not vendor:
_logger.warning("Vendor not found: %s", vendor_name)
continue
product = self.env['product.product'].search(
[('default_code', '=', product_sku)], limit=1
)
if not product:
_logger.warning("Product SKU not found: %s", product_sku)
continue
# Find or create a draft PO for this vendor
po = self.env['purchase.order'].search([
('partner_id', '=', vendor.id),
('state', '=', 'draft'),
('origin', '=', f'FTP:{source_filename}'),
], limit=1)
if not po:
po = self.env['purchase.order'].create({
'partner_id': vendor.id,
'origin': f'FTP:{source_filename}',
})
self.env['purchase.order.line'].create({
'order_id': po.id,
'product_id': product.id,
'product_qty': quantity,
'price_unit': unit_price,
'product_uom': product.uom_po_id.id,
'date_planned': fields.Datetime.now(),
'name': product.name,
})
count += 1
except Exception as e:
_logger.error("Error on row %s: %s", row, str(e))
return count
5. Real-time Data Sync Using Webhooks
Set up a controller endpoint that receives webhook notifications from external systems and immediately imports the data.
File: your_module/controllers/webhook.py
import json
import logging
import hmac
import hashlib
from odoo import http
from odoo.http import request, Response
_logger = logging.getLogger(__name__)
# Shared secret for verifying the webhook signature
WEBHOOK_SECRET = 'your_shared_webhook_secret'
class WebhookController(http.Controller):
@http.route(
'/webhook/orders/receive',
type='http',
auth='none', # Public endpoint — we validate by signature, not session
methods=['POST'],
csrf=False, # CSRF protection doesn't apply to webhooks
)
def receive_order_webhook(self, **kwargs):
"""
Receives a webhook POST from an external system and creates a sale.order.
Expected JSON payload:
{
"order_id": "EXT-78910",
"customer_email": "alice@example.com",
"customer_name": "Alice Johnson",
"items": [
{ "sku": "PROD-001", "quantity": 2, "unit_price": 49.99 }
],
"total": 99.98
}
"""
try:
# -- 1. Verify the request signature -----------------------------
raw_body = request.httprequest.get_data()
signature = request.httprequest.headers.get('X-Webhook-Signature', '')
if not self._verify_signature(raw_body, signature):
_logger.warning("Webhook received with invalid signature — rejected.")
return Response(
json.dumps({'error': 'Invalid signature'}),
status=401, content_type='application/json'
)
# -- 2. Parse the JSON payload ------------------------------------
try:
payload = json.loads(raw_body.decode('utf-8'))
except (json.JSONDecodeError, UnicodeDecodeError) as e:
_logger.error("Webhook: malformed JSON body — %s", str(e))
return Response(
json.dumps({'error': 'Invalid JSON'}),
status=400, content_type='application/json'
)
# -- 3. Prevent duplicate processing -----------------------------
external_ref = payload.get('order_id', '')
existing = request.env['sale.order'].sudo().search(
[('client_order_ref', '=', external_ref)], limit=1
)
if existing:
_logger.info("Webhook: duplicate order %s — skipped.", external_ref)
return Response(
json.dumps({'status': 'skipped', 'reason': 'duplicate'}),
status=200, content_type='application/json'
)
# -- 4. Resolve or create the customer ---------------------------
partner = request.env['res.partner'].sudo().search(
[('email', '=', payload.get('customer_email'))], limit=1
)
if not partner:
partner = request.env['res.partner'].sudo().create({
'name': payload.get('customer_name', 'Unknown'),
'email': payload.get('customer_email', ''),
})
# -- 5. Build and create the sale order ---------------------------
order_lines = []
for item in payload.get('items', []):
product = request.env['product.product'].sudo().search(
[('default_code', '=', item.get('sku'))], limit=1
)
if not product:
_logger.warning("Webhook: SKU not found: %s", item.get('sku'))
continue
order_lines.append((0, 0, {
'product_id': product.id,
'product_uom_qty': float(item.get('quantity', 1)),
'price_unit': float(item.get('unit_price', 0.0)),
'name': product.name,
}))
if not order_lines:
return Response(
json.dumps({'error': 'No valid products found in payload'}),
status=422, content_type='application/json'
)
order = request.env['sale.order'].sudo().create({
'partner_id': partner.id,
'client_order_ref': external_ref,
'order_line': order_lines,
})
_logger.info("Webhook: Created sale.order %s from external ref %s", order.name, external_ref)
return Response(
json.dumps({'status': 'ok', 'odoo_order': order.name}),
status=200, content_type='application/json'
)
except Exception as e:
_logger.exception("Webhook processing error: %s", str(e))
return Response(
json.dumps({'error': 'Internal server error'}),
status=500, content_type='application/json'
)
def _verify_signature(self, raw_body, provided_signature):
"""
Validates the HMAC-SHA256 signature of the incoming webhook.
The external system signs the raw request body with the shared secret.
"""
if not provided_signature:
return False
expected = hmac.new(
WEBHOOK_SECRET.encode('utf-8'),
raw_body,
hashlib.sha256
).hexdigest()
return hmac.compare_digest(expected, provided_signature)
Mastering automated import/export scripts transforms how you manage data in Odoo 19. Instead of spending hours on manual data entry or exports, you can set up reliable, scheduled processes that run in the background. Whether you're syncing with warehouses, accounting systems, or customer portals, automation ensures data consistency and frees your team to focus on higher-value work. Start with simple scheduled CSV exports, then gradually build more sophisticated integrations as your needs grow. The key is proper error handling, logging, and testing to ensure your automated scripts run reliably without manual intervention.
To read more about How to Transfer Data From Odoo 18 to Odoo 19 Using XML-RPC, refer to our blog How to Transfer Data From Odoo 18 to Odoo 19 Using XML-RPC.