143 lines
3.9 KiB
Python
143 lines
3.9 KiB
Python
#!/usr/bin/env python
|
|
# PYTHON_ARGCOMPLETE_OK
|
|
# This file is part of Tryton. The COPYRIGHT file at the top level of
|
|
# this repository contains the full copyright notices and license terms.
|
|
from __future__ import print_function
|
|
|
|
import csv
|
|
import os
|
|
import sys
|
|
|
|
try:
|
|
from urllib.error import HTTPError
|
|
from urllib.request import urlopen
|
|
except ImportError:
|
|
from urllib2 import urlopen, HTTPError
|
|
|
|
import zipfile
|
|
from argparse import ArgumentParser
|
|
from io import BytesIO, TextIOWrapper
|
|
|
|
try:
|
|
import argcomplete
|
|
except ImportError:
|
|
argcomplete = None
|
|
|
|
try:
|
|
from tqdm import tqdm
|
|
except ImportError:
|
|
tqdm = None
|
|
|
|
try:
|
|
from proteus import Model, config
|
|
except ImportError:
|
|
prog = os.path.basename(sys.argv[0])
|
|
sys.exit("proteus must be installed to use %s" % prog)
|
|
|
|
|
|
def _progress(iterable, **kwargs):
|
|
if tqdm:
|
|
return tqdm(iterable, disable=None, **kwargs)
|
|
else:
|
|
return iterable
|
|
|
|
|
|
def clean(code):
|
|
sys.stderr.write('Cleaning')
|
|
PostalCode = Model.get('country.postal_code')
|
|
PostalCode._proxy.delete(
|
|
[c.id for c in PostalCode.find([('country.code', '=', code)])], {})
|
|
print('.', file=sys.stderr)
|
|
|
|
|
|
def fetch(code):
|
|
sys.stderr.write('Fetching')
|
|
url = 'https://downloads.tryton.org/geonames/%s.zip' % code
|
|
try:
|
|
responce = urlopen(url)
|
|
except HTTPError as e:
|
|
sys.exit("\nError downloading %s: %s" % (code, e.reason))
|
|
data = responce.read()
|
|
with zipfile.ZipFile(BytesIO(data)) as zf:
|
|
data = zf.read('%s.txt' % code)
|
|
print('.', file=sys.stderr)
|
|
return data
|
|
|
|
|
|
def import_(data):
|
|
PostalCode = Model.get('country.postal_code')
|
|
Country = Model.get('country.country')
|
|
Subdivision = Model.get('country.subdivision')
|
|
print('Importing', file=sys.stderr)
|
|
|
|
def get_country(code):
|
|
country = countries.get(code)
|
|
if not country:
|
|
try:
|
|
country, = Country.find([('code', '=', code)])
|
|
except ValueError:
|
|
sys.exit("Error missing country with code %s" % code)
|
|
countries[code] = country
|
|
return country
|
|
countries = {}
|
|
|
|
def get_subdivision(country, code):
|
|
code = '%s-%s' % (country, code)
|
|
subdivision = subdivisions.get(code)
|
|
if not subdivision:
|
|
try:
|
|
subdivision, = Subdivision.find([('code', '=', code)])
|
|
except ValueError:
|
|
return
|
|
subdivisions[code] = subdivision
|
|
return subdivision
|
|
subdivisions = {}
|
|
|
|
f = TextIOWrapper(BytesIO(data), encoding='utf-8')
|
|
codes = []
|
|
for row in _progress(csv.DictReader(
|
|
f, fieldnames=_fieldnames, delimiter='\t'),
|
|
total=data.count(b'\n')):
|
|
country = get_country(row['country'])
|
|
for code in ['code1', 'code2', 'code3']:
|
|
subdivision = get_subdivision(row['country'], row[code])
|
|
if code == 'code1' or subdivision:
|
|
codes.append(
|
|
PostalCode(country=country, subdivision=subdivision,
|
|
postal_code=row['postal'], city=row['place']))
|
|
PostalCode.save(codes)
|
|
|
|
|
|
_fieldnames = ['country', 'postal', 'place', 'name1', 'code1',
|
|
'name2', 'code2', 'name3', 'code3', 'latitude', 'longitude', 'accuracy']
|
|
|
|
|
|
def main(database, codes, config_file=None):
|
|
config.set_trytond(database, config_file=config_file)
|
|
do_import(codes)
|
|
|
|
|
|
def do_import(codes):
|
|
for code in codes:
|
|
print(code, file=sys.stderr)
|
|
code = code.upper()
|
|
clean(code)
|
|
import_(fetch(code))
|
|
|
|
|
|
def run():
|
|
parser = ArgumentParser()
|
|
parser.add_argument('-d', '--database', dest='database', required=True)
|
|
parser.add_argument('-c', '--config', dest='config_file',
|
|
help='the trytond config file')
|
|
parser.add_argument('codes', nargs='+')
|
|
if argcomplete:
|
|
argcomplete.autocomplete(parser)
|
|
|
|
args = parser.parse_args()
|
|
main(args.database, args.codes, args.config_file)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
run()
|