#!/usr/bin/env python3.7
# -*- mode: python; mode: elpy; coding: utf-8 -*-
#
# Copyright 2018-2019 Luc Chouinard lumostor@3X0.ca
#
# This file is part of pypartkeepr.
#
# pypartkeepr is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# pypartkeepr is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with pypartkeepr. If not, see <https://www.gnu.org/licenses/>.
#
import logging
from pathlib import Path
import time
import re
from types import MethodType
from collections import Mapping
from collections import MutableMapping
import ast
import json
import requests
from simple_rest_client.api import API
from simple_rest_client.api import Resource
import simple_rest_client.exceptions
from simple_rest_client.models import Request
from simple_rest_client.request import make_request
import yaml
import appdirs
import octopart
import pypartkeepr.api_actions as api_actions
# from pypartkeepr.dataclasses import PKDataClassBase, PartCategory, Part # noqa: F401
# from pypartkeepr.exceptions import ConfigFileNotFound, exception_handler
import pypartkeepr as pypk
from pypartkeepr.utils import set_logger, logger_border
##
default_config_dir = Path(appdirs.user_config_dir('pypartkeepr'))
default_config_file = default_config_dir / 'config'
default_cookies_file = default_config_dir / 'cookies'
module_logger = logging.getLogger(__name__)
# A more readable exception message for simple_rest_client
[docs]def exception__repr__(self):
""" A more readable exception message for simple_rest_client
:returns: A more readable string
:rtype: str
"""
if isinstance(self.response.body, Mapping):
body = self.response.body['hydra:description']
else:
body = self.response.body
etype = re.match('.*\.(.*)\'>', repr(type(self))).group(1)
return 'ERROR: {} {} - {}'.format(etype, self.response.status_code, body)
# modify the __repr__ and __str__ method of
# simple_rest_client.exceptions.ErrorWithResponse
eo = globals()['simple_rest_client'].exceptions.ErrorWithResponse
setattr(eo, '__repr__', exception__repr__)
setattr(eo, '__str__', exception__repr__)
[docs]class PKResource(Resource):
@set_logger
def __init__(self, *args, **kwargs):
# self.logger = logging.getLogger(__name__+'.PKResource')
self.actions = {}
super().__init__(*args, **kwargs)
for a in self.actions.keys():
delattr(self, a)
@logger_border
def add_action(self, action_name):
# self.logger.debug('({})'.format(action_name))
def action_method(self, *args, body=None, params=None, headers=None,
action_name=action_name, **kwargs):
url = self.get_action_full_url(action_name, *args)
method = self.get_action_method(action_name)
if self.json_encode_body and body:
body = json.dumps(body)
# print('action_method():', params )
request = Request(
url=url,
method=method,
params=params or {},
body=body,
headers=headers or {},
timeout=self.timeout,
kwargs=kwargs
)
request.params.update(self.params)
request.headers.update(self.headers)
r = make_request(self.session, request)
# pp=pprint.PrettyPrinter(indent=4)
# print('PKResource():',type(r.body))
# #print('PKResource():',r.body)
# pp.pprint(r.body)
if isinstance(r.body, str):
return ast.literal_eval(r.body)
else:
return r.body
setattr(self, action_name, MethodType(action_method, self))
@logger_border
def __repr__(self):
"""Return a string containing a printable representation of an object.
:returns: a printable representation of self
:rtype: str
"""
return self.__class__.__name__ +\
'(resource_name=' +\
repr(self.resource_name) +\
', actions=' +\
repr(self.actions) +\
')'
[docs]class PKClass(MutableMapping):
"""Class to pythonize PartKeepr web resources
This is the class for parts, part_categories, projects, users, ... resources found in
the PartKeepr object.
"""
@set_logger
def __init__(self, res):
self.logger = logging.getLogger(__name__+'.PKClass')
self.res = res
self.rmid_pat = re.compile('.*/')
self.backslash_pat = re.compile('\\\\')
# if res.resource_name != 'parts':
# delattr(self,'addStock')
# delattr(self,'removeStock')
# delattr(self,'setStock')
def _key_from_id(self, key):
"""private: return the id number from str, PKDataClassBase
get the id number from string like '/api/parts/[0-9]+' or PKDataClassBase.id
:param key: a int, a string, a PKDataClassBase
:returns: The id number
:rtype: int
"""
# what type of key
if isinstance(key, int):
return int(key)
elif isinstance(key, str):
return int(self.rmid_pat.sub('', key))
elif isinstance(key, pypk.PKDataClassBase) and key.id:
return int(self.rmid_pat.sub('', key.id))
elif isinstance(key, pypk.PKDataClassBase) and not key.id:
return None
elif key is None or key == '':
return key
else:
msg = 'Key Type {} Not Supported'.format(type(key))
raise TypeError(msg)
@logger_border
def addStock(self, key, quantity, price, comment):
"""
:param key: part id (/api/parts/10 or 10)
:param quantity: the quantity to add in PartUnit
:param price: the price
:param comment:
"""
# self.logger.debug('({}, {}, {}, {})'.format(key, quantity, price, comment))
headers = {
'Content-Type': 'application/x-www-form-urlencoded; charset=UTF-8',
}
key = self._key_from_id(key)
value = 'foo=0&quantity={}&price={}&comment={}&foo2=0'.format(quantity,
price,
comment)
if key in self:
try:
self.res.add_stock(
key, body=value, params={}, headers=headers)
except simple_rest_client.exceptions.NotFoundError:
raise KeyError('Key Not Found: {}({})'.format(key, type(key))) from None
except Exception as e:
pypk.exception_handler(e)
else:
raise KeyError('Part {} Not Found'.format(key))
@logger_border
def removeStock(self, key, quantity, comment):
"""
:param key: number portion of part id (/api/parts/10 --> 10)
:param quantity: the quantity to add, as a quantity of PartUnit (most of the time
pieces)
:param comment:
"""
# self.logger.debug('({}, {}, {})'.format(key, quantity, comment))
headers = {
'Content-Type': 'application/x-www-form-urlencoded; charset=UTF-8',
}
key = self._key_from_id(key)
value = 'foo=0&quantity={}&comment={}&foo2=0'.format(quantity, comment)
if key in self:
try:
self.res.remove_stock(key, body=value, params={}, headers=headers)
except simple_rest_client.exceptions.NotFoundError:
raise KeyError('Key Not Found: {}({})'.format(key, type(key))) from None
except Exception as e:
pypk.exception_handler(e)
else:
raise KeyError('Part {} Not Found'.format(key))
@logger_border
def massRemoveStock(self, pr):
"""Remove parts from stock
*** Not Implemented ***
This is the equivalent of the `Remove part from stock` button at the bottom of the
Project Report window.
:param pr: The ProjectReport that will be used to remove stock.
"""
# self.logger.debug('({})'.format(pr))
# headers = {
# 'Content-Type': 'application/x-www-form-urlencoded; charset=UTF-8',
# }
# key = self._key_from_id(key)
# value = 'foo=0&quantity={}&comment={}&foo2=0'.format(quantity, comment)
# if key in self:
# try:
# r = self.res.remove_stock(
# key, body=value, params={}, headers=headers)
# except simple_rest_client.exceptions.NotFoundError:
# raise KeyError(
# 'Key Not Found: {}({})'.format(
# key, type(key))) from None
# except Exception as e:
# exception_handler(e)
# else:
# raise KeyError('Part {} Not Found'.format(key))
raise NotImplementedError
@logger_border
def setStock(self, key, quantity, comment):
"""
:param key: number portion of part id (/api/parts/10 --> 10)
:param quantity: the quantity to add in PartUnit
:param comment:
"""
# self.logger.debug('({}, {}, {})'.format(key, quantity, comment))
headers = {
'Content-Type': 'application/x-www-form-urlencoded; charset=UTF-8',
}
key = self._key_from_id(key)
value = 'foo=0&quantity={}&comment={}&foo2=0'.format(quantity, comment)
if key in self:
try:
self.res.set_stock(
key, body=value, params={}, headers=headers)
except simple_rest_client.exceptions.NotFoundError:
raise KeyError(
'Key Not Found: {}({})'.format(
key, type(key))) from None
except Exception as e:
pypk.exception_handler(e)
else:
raise KeyError('Part {} Not Found'.format(key))
@logger_border
def __getitem__(self, key):
""" collections.abc.Mapping class method __getitem__()
D[k] -> v
:param key: the id, support int, str, PKDataClassBase
:returns: the value
:rtype: PKDataClassBase instance
"""
# self.logger.debug('({})'.format(key))
key = self._key_from_id(key)
try:
r = self.res.get(key, body=None, params={}, headers={})
except simple_rest_client.exceptions.NotFoundError as e:
raise KeyError(
'Key Not Found: {}({})'.format(
key, type(key))) from None
except Exception as e:
pypk.exception_handler(e)
else:
class_type = ''
if isinstance(r, str): # dict as a string returned by reports
r = ast.literal_eval(r)
if isinstance(r, Mapping):
class_type = r['@type']
else:
class_type = r[0]['@type']
#print('---->', class_type)
return getattr(globals()['pypk'], class_type).from_dict(r)
def __setitem__(self, key, dc):
"""collections.abc.MutableMapping class method __setitem__()
D[k]=v
:param key: the id, support int, str, PKDataClassBase
:param dc:
:returns: No return
:rtype:
"""
self.logger.debug('({}, {})'.format(key, dc))
key = self._key_from_id(key)
value = dc.asdict()
try:
if dc in self:
self.res.update(key, body=value, params={}, headers={})
else:
self.res.create(body=value, params={}, headers={})
except Exception as e:
pypk.exception_handler(e)
[docs] def add(self, dc):
"""Add PKDataClass dc to the PartKeepr database
Add dc to the PartKeepr database if not in the database else update it. Then
return the object created/updated.
:param dc: PKDataClass to add
:returns: The PKDataClass added
:rtype: PKDataClass
"""
value = dc.asdict()
self.logger.debug('add({})'.format(dc))
try:
if dc in self:
r = self.res.update(self._key_from_id(dc.id),
body=value, params={}, headers={})
else:
r = self.res.create(body=value, params={}, headers={})
except Exception as e:
pypk.exception_handler(e)
if isinstance(r, Mapping):
class_type = r['@type']
else:
class_type = r[0]['@type']
return getattr(globals()['pypk'], class_type).from_dict(r)
def __delitem__(self, key):
"""Delete the item from Partkeepr database
del D[k] Delete self[key].
:param key: the item id
:returns: no return
:rtype:
"""
self.logger.debug('({})'.format(key))
key = self._key_from_id(key)
try:
self.res.delete(key, body=None, params={}, headers={})
except Exception as e:
pypk.exception_handler(e)
def __len__(self):
"""Return the quantity of items
len(D) Return len(self).
:returns: quantity of items
:rtype: int
"""
self.logger.debug('()'.format())
try:
r = self.res.list(body=None, params={'itemsPerPage': 9999999999},
headers={})
except Exception as e:
pypk.exception_handler(e)
else:
return r['hydra:totalItems']
[docs] def items(self):
"""Items iterator, (key, value) pair
D.items() - To iterate over items, (key, value) pair.
:returns: (key, value) tuple
:rtype: (int, PKDataClassBase)
"""
for i in self.values():
yield int(self.rmid_pat.sub('', i.id)), i
[docs] def keys(self):
"""Iterate over keys
D.keys() - To iterate over keys.
:returns: key iterator
:rtype: int
"""
return self.__iter__()
def __iter__(self):
"""Iterate over keys
iter(D) - To iterate over keys.
:returns: key iterator
:rtype: int
"""
for i in self.values():
yield int(self.rmid_pat.sub('', i.id))
[docs] def values(self):
"""Iterate over values.
D.values() - To iterate over values.
:returns: value iterator
:rtype: PKDataClassBase
"""
self.logger.debug('values()')
try:
r = self.res.list(params={'itemsPerPage': 9999999999})
except Exception as e:
pypk.exception_handler(e)
else:
for i in r['hydra:member']:
self.logger.debug('(): type:{}'.format(type(i)))
if isinstance(i, Mapping):
class_type = i['@type']
else:
class_type = i[0]['@type']
yield getattr(globals()['pypk'], class_type).from_dict(i)
[docs] def search(self, prop, op, val):
"""Get items where prop op val (e.g. 'name' '=' '555')
:param prop (str): The name of the attribute to search
:param op (str): The comparison operation of the search. These are SQL
string comparison operators: '=', 'LIKE, 'NOT LIKE', ...
:param val (str): The value to match
:returns: A list generator of PKDataClassBase derived class instance of the
items found or created
:rtype: PKDataClassBase
"""
self.logger.debug('({}, {}, {})'.format(prop, op, val))
try:
r = self.res.list(
params={'itemsPerPage': 9999999999,
'filter':
'{{"property": "{}", "operator": "{}", "value": "{}" }}'.
format(prop, op, val)
}
)
except Exception as e:
pypk.exception_handler(e)
else:
for i in r['hydra:member']:
if isinstance(i, Mapping):
class_type = i['@type']
else:
class_type = i[0]['@type']
if self.res.resource_name == 'part_categories' and i[prop] != val:
pass
else:
# print('####', getattr(globals()['pypk'], class_type), type(i))
yield getattr(globals()['pypk'], class_type).from_dict(i)
[docs] def get_name(self, val, create=False, parent=None):
"""Get item where `name` = val
:param val (str): The name to retreive, the search is by using the 'name'
attribute of the item. Some types don't have that attribute,
there is no warnings.
:param create (bool, optional): create (or not) if `attr`=`val` not found.
Default to False
:param parent (int, optional): if create and a parent needed, use that parent.
It could be a int, str (id str) or PKDataClassBase derived class instance.
:returns: A PKDataClassBase derived class instance of the item found or created
:rtype: PKDataClassBase
TODO:
-Check if the attribute 'name' exists, and tell the user
"""
self.logger.debug('({}, {}, {})'.format(val, create, parent))
return self.get_attr(val, attr='name', create=create, parent=parent)
[docs] def get_attr(self, val, attr='name', create=False, parent=None):
"""Get item where `attr` = val
D.get_name( val, attr, create=False, parent=None) -> obj
:param val (str): The name to retreive, the search is by using the 'name'
attribute of the item. Some types don't have that attribute,
there is no warnings.
:param attr: The attribute name to get
:param create (bool, optional): create (or not) if `attr`=`val` not found.
Default to False
:param parent (int, optional): if create and a parent needed, use that parent.
It could be a int, str(id str) or PKDataClassBase derived class
instance.
:returns: A PKDataClassBase derived class instance of the item found or created
:rtype: PKDataClassBase
TODO:
-Check if the attribute 'name' exists, and tell the user
"""
self.logger.debug('({}, {}, {}, {})'.format(val, attr, create, parent))
lst = list(self.search(attr, '=', val))
if len(lst) == 1:
return lst[0]
elif len(lst) == 0:
if create:
parent_obj = None
if parent and isinstance(parent, str):
parent_obj = list(self.search('name', '=', parent))[0] # only one??
elif parent and isinstance(parent, int):
parent_obj = self[parent]
elif parent and isinstance(parent, pypk.PKDataClassBase):
parent_obj = parent
d = {attr: val, 'parent': parent_obj}
if isinstance(parent_obj, pypk.PartCategory) and attr != 'description':
d['description'] = ''
self[None] = getattr(globals()['pypk'], self.type).from_dict(d)
return list(self.search(attr, '=', val))[0] # should be only one
else:
return None
else:
print("""WARNING: more than 1 {} \
found for {}.\nReturning the first found.""".format(self.type, val))
return lst[0]
def __contains__(self, key):
"""Test if key in PartKeepr database
k in D - To test if key is in self
:param key: id of the item
:returns: contained or not
:rtype: Boolean
"""
key = self._key_from_id(key)
try:
self.res.get(key, body=None, params={}, headers={})
except simple_rest_client.exceptions.NotFoundError:
return False
except Exception as e:
pypk.exception_handler(e)
else:
return True
[docs] def get(self, key, default=None):
"""Get the value referenced by key
D.get(k, default) - Get value of key k if it exists in D else default
:param key: The id of the item
:param default: The value to return if key does not exists, default to None
:returns: The value referenced by key
:rtype: PKDataClassBase
"""
key = self._key_from_id(key)
try:
r = self.res.get(key, body=None, params={}, headers={})
except simple_rest_client.exceptions.NotFoundError:
return default
except Exception as e:
pypk.exception_handler(e)
else:
class_type = ''
if isinstance(r, Mapping):
class_type = r['@type']
else:
class_type = r[0]['@type']
return getattr(globals()['pypk'], class_type).from_dict(r) # r
def __eq__(self, other):
"""Does `other` equate self
self==other - Return whether self equate `other`
It will compare if type and res.api_root_url are equal.
:param other: the other PKDataClassBase to compare
:returns: wether self equate other
:rtype: boolean
"""
return self.type == other.type and self.res.api_root_url == other.res.api_root_url
def __ne__(self, other):
"""Does `other` NOT equate self
**Not Implemented**
self==other - Return wether self NOT equate other
:param other: the other PKDataClassBase to compare
:returns: wether self NOT equate other
:rtype: boolean
"""
return not self.__eq__(other)
[docs] def clear(self):
"""Remove all items from the PartKeepr database. NOT IMPLEMENTED
D.clear() Remove all items from D.
**Not Implemented**
**THIS METHOD MIGHT NEVER BE IMPLEMENTED, SEEMS TOO DANGEROUS FOR
THE USEFULNESS.**
:returns: no return
:rtype:
"""
raise NotImplementedError
[docs] def pop(self, key, default=None):
"""Get the item referenced by key else default
**Not Implemented**
If key is in the dictionary, remove it and return its value, else return
default. If default is not given and key is not in the dictionary,
a KeyError is raised.
:param key: the id of the item
:param default: The value to return if key is in the dictionary
:returns: item
:rtype: PKDataClassBase
"""
raise NotImplementedError
[docs] def popitem(self):
"""Remove and return a (key, value) pair from the dictionary.
**Not Implemented**
D.popitem() -> (k, v), remove and return some (key, value) pair
as a 2-tuple; but raise KeyError if D is empty.
:returns: Key, value pair
:rtype: (key, value) tuple
"""
raise NotImplementedError
[docs] def setdefault(self, key, default=None):
"""set key to default if not present
**Not Implemented**
If key is in the dictionary, return its value. If not, insert key with a
value of default and return default. default defaults to None.
:param key: the id of the item
:param default: The item to insert if key not in dictionary. Default: None
:returns: item referenced by key
:rtype: PKDataClassBase
"""
self.logger.debug('({}, {})'.format(key, default))
# what type of key
if isinstance(key, int):
pass
elif isinstance(key, str):
key = self.rmid_pat.sub('', key)
elif isinstance(key, pypk.PKDataClassBase):
key = self.rmid_pat.sub('', key.id)
else:
raise KeyError('Key ({}) Not Found'.format(type(key)))
raise NotImplementedError
[docs] def update(other):
"""Update PartKeepr database with this `other` PKClass dictionary
**Not Implemented**
Update the dictionary with the key/value pairs from other, overwriting
existing keys. Return None.
update() accepts either another dictionary object or an iterable of key/value
pairs (as tuples or other iterables of length two). If keyword arguments are
specified, the dictionary is then updated with those key/value pairs:
d.update(red=1, blue=2).
:param other: another PKClass dictionnary
:returns: None
:rtype: NoneType
"""
raise NotImplementedError
def __repr__(self):
"""Return a string containing a printable representation of an object.
:returns: a printable representation of self
:rtype: str
"""
return self.__class__.__name__ + \
'(type=' + repr(self.type) + ', res=' + repr(self.res) + ')'
[docs]class PartKeepr:
def __init__(self, config_dir=default_config_dir,
config_file=default_config_file,
cookies_file=default_cookies_file, timeout=30):
self.logger = logging.getLogger(__name__+self.__class__.__name__)
self.logger.debug('({}, {}, {})'.format(config_dir, config_file, cookies_file))
self.config_dir = config_dir
self.config_file = config_file
self.cookies_file = cookies_file
self.timeout = timeout
try:
self.load_config()
except pypk.ConfigFileNotFound as e:
self.logger.error(
'There is no configuration file at {}.'.format(self.config_file))
self.logger.warning("""Create a configuration file, either using pypk-config
or manually. By default it should reside at {}""".format(default_config_file))
raise
self.load_cookies()
self.api = API(
api_root_url='{}://{}:{}/api'.format(
self.conf.protocol,
self.conf.servername,
self.conf.port),
params={},
headers={},
timeout=self.timeout,
append_slash=False,
json_encode_body=True
)
for res in api_actions.ACTIONS.keys():
self.api.add_resource(resource_class=PKResource, resource_name=res)
# resobj is like self.api.parts
resobj = getattr(self.api, res)
# A cookie jar shared between all resources
# ( there is a session for each resource)
resobj.session.cookies = self.cookies
for act in api_actions.ACTIONS[res]['actions'].keys():
resobj.actions = api_actions.ACTIONS[res]['actions']
resobj.add_action(act)
# add self.parts
setattr(self, res, PKClass(getattr(self.api, res)))
setattr(
getattr(
self,
res),
'type',
api_actions.ACTIONS[res]['type'])
ea = {}
da = api_actions.default_actions(res)
for k in api_actions.ACTIONS[res]['actions'].keys():
if k not in da:
ea[k] = api_actions.ACTIONS[res]['actions'][k]
getattr(self, res).extra_actions = ea
getattr(self, res).default_actions = da
# add actions to PKClass object
# for k in api_actions.ACTIONS[res]['actions'].keys():
# setattr(getattr(self, res), k, getattr(getattr(self.api, res),k) )
# print("__init__():", dir(getattr(self, res)))
if self.conf.octopart_api_key:
self.octopart = octopart.client.OctopartClient(
api_key=self.conf.octopart_api_key,
base_url='http://octopart.com/api/v3'
)
octopart.logger.setLevel(logging.ERROR)
[docs] def load_config(self, file=None):
if file:
file = Path(file)
else:
file = Path(self.config_file)
with file.open() as f:
if file.exists():
self.conf = yaml.safe_load(f)
else:
self.conf = None
raise pypk.ConfigFileNotFound(
'No configuration file at {}.'.format(file))
[docs] def save_config(self, file=None):
if file:
file = Path(file)
else:
file = Path(self.config_file)
with file.open('w') as f:
yaml.dump(self.conf, f, default_flow_style=False)
[docs] def load_cookies(self, file=None):
if file:
file = Path(file)
else:
file = Path(self.cookies_file)
if not file.exists() or not file.stat().st_size:
file.touch()
self.cookies = requests.cookies.RequestsCookieJar()
else:
with file.open('r') as f:
self.cookies = yaml.load(f)
[docs] def save_cookies(self, file=None):
if file:
file = Path(file)
else:
file = Path(self.cookies_file)
with file.open('w') as f:
# yaml.dump(requests.utils.dict_from_cookiejar(self.cookies), f,
# default_flow_style=False)
yaml.dump(self.cookies, f, default_flow_style=False)
[docs] def octopart_match(self, queries):
"""Interrogate the octopart database for matching queries
This method support multiple queries, for single query refer to
:func:`~pypartkeepr.partkeepr.octopart_match1`.
The following is the query fields supported: (Taken from
`octopart._PartsMatchQuery schema`)
The following is the query fields supported: (Taken from
`octopart._PartsMatchQuery schema`)
========== ========================================
q Free-form keyword query
mpn MPN search filter
brand Brand search filter
sku SKU search filter
seller Seller search filter
mpn_or_sku MPN or SKU search filter
start Ordinal position of first returned item
limit Maximum number of items to return
reference Arbitrary string for identifying
========== ========================================
e.g.
.. code-block:: python
queries=[{'mpn': 'SN74S74N', brand: 'Texas Instruments'},
{'sku': '595-SN74S74N'}]
:param query (dict): Is a dictionnary with keys taken from below and
the value the str to match. The matching is SQL LIKE style.
:returns: a list of list of matched octopart.Part. Return one list of
matched octopart.Part per queries.
:rtype: List[List[octopart.Part]]
.. _PartsMatchQuery schema:
https://octopart.com/api/docs/v3/rest-api#response-schemas-partsmatchquery
"""
time.sleep(0.3)
# queries=[{'mpn':'LT1638CN8#PBF', 'sku':'LT1638CN8#PBF-ND'},
# {'q': '555',brand: 'TI' }]
results = self.octopart.match(queries,
includes=['category_uids',
'specs',
'descriptions'],)
for r in results['results']:
for i in r['items']:
i['mpn'] = re.sub(" +", "", i['mpn'])
# return [octopart.models.PartsMatchResult(r) for r in
# results['results']].parts
return [[octopart.models.Part(part) for part in resu['items']]
for resu in results['results']]
[docs] def octopart_match1(self, query):
"""Interrogate the octopart database for a matching query
This method support one query, for multiple query refer to
:func:`~pypartkeepr.partkeepr.octopart_match`.
The following is the query fields supported: (Taken from
`octopart._PartsMatchQuery schema`)
========== ========================================
q Free-form keyword query
mpn MPN search filter
brand Brand search filter
sku SKU search filter
seller Seller search filter
mpn_or_sku MPN or SKU search filter
start Ordinal position of first returned item
limit Maximum number of items to return
reference Arbitrary string for identifying
========== ========================================
e.g.
.. code-block:: python
queries={'mpn': 'SN74S74N', brand: 'Texas Instruments'}
:param query (dict): Is a dictionnary with keys taken from below and the
value the str to match. The matching is SQL LIKE style.
:returns: a list of matched octopart.Part
:rtype: List[octopart.Part]:
.. _PartsMatchQuery schema:
https://octopart.com/api/docs/v3/rest-api#response-schemas-partsmatchquery
"""
time.sleep(0.3)
# query={'mpn':'LT1638CN8#PBF', 'sku':'LT1638CN8#PBF-ND'}
results = self.octopart.match([query],
includes=['category_uids',
'specs',
'descriptions'])
r = results['results'][0]
for i in r['items']:
i['mpn'] = re.sub(" +", "", i['mpn'])
return [octopart.models.Part(part) for part in r['items']]
[docs] def octopart_get_category(self, part):
"""Get the OctoPart category of part
Will return a list containing the name of the category and all of its
ancestors. Indice 0 is the root, and indice -1 is the category.
If no category found then return [ 'Root Category', 'uncategorized' ]
:param part: An octopart.Part returned by
:func:`~pypartkeepr.partkeepr.octopart_match` or
:func:`~pypartkeepr.partkeepr.octopart_match1`
:returns List[str]: list of category names
:rtype: List[str]
"""
time.sleep(0.3)
catuids = part.category_uids
if catuids:
r = requests.get(
"http://octopart.com/api/v3/categories/get_multi",
{
'uid[]': part.category_uids,
'apikey': self.conf.octopart_api_key,
'pretty_print': 'true'
}
)
if r.status_code != requests.codes.ok:
print("getOPCategory(): ERROR: ", r.status_code)
return None
rj = json.loads(r.text)
# find leaf
d = {len(rj[k]['ancestor_uids']): k for k in rj.keys()}
cuid = d[sorted(d.keys(), reverse=True)[0]]
cnames = ['Root Category']
cnames.extend(rj[cuid]['ancestor_names'])
cnames.append(rj[cuid]['name'])
else:
cnames = ['Root Category', 'uncategorized']
return cnames
if __name__ == "__main__":
pk = PartKeepr()
exit()