Page MenuHomePhabricator
Authored By
ArthurPSmith
Oct 9 2015, 5:08 PM
Size
6 KB
Referenced Files
None
Subscribers
None

nuclides.py

import json
import operator
from units import time_quantity_in_seconds
from urllib import urlencode
from urllib2 import urlopen
from collections import defaultdict
try:
from functools import ttl_cache
except ImportError:
from cachetools import ttl_cache
# cachetools does not support maxsize=None
@ttl_cache(maxsize=20, ttl=21600)
def get_json_cached(url, data):
"""The information is cached for 6 hours."""
return json.load(urlopen(url, data))
def get_json(url, data):
return get_json_cached(url, urlencode(data))
class NuclideProvider(object):
"""Base class for nuclide providers."""
WD_API = 'http://www.wikidata.org/w/api.php'
API_LIMIT = 50
WDQ_API = 'http://wdq.wmflabs.org/api'
def __init__(self, language):
self.language = language
@classmethod
def get_available_languages(cls):
query = dict(action='query', format='json', meta='siteinfo', siprop='languages')
result = get_json(cls.WD_API, query).get('query', {}).get('languages', [])
return [lang['code'] for lang in result]
@classmethod
def get_entities(cls, ids, **kwargs):
entities = {}
query = dict(action='wbgetentities', format='json', **kwargs)
for index in range(0, len(ids), cls.API_LIMIT):
query['ids'] = '|'.join(ids[index:index + cls.API_LIMIT])
new_entities = get_json(cls.WD_API, query).get('entities', {})
entities.update(new_entities)
return entities
def iter_good(self):
iterator = iter(self)
while True:
try:
yield next(iterator)
except StopIteration:
raise
def get_table(self):
table = {}
nuclides = []
incomplete = []
lastanum = -1
lastnnum = -1
for nuclide in self.iter_good():
if nuclide.atomic_number is not None and nuclide.neutron_number is not None:
if nuclide.atomic_number > lastanum:
lastanum = nuclide.atomic_number
if nuclide.neutron_number > lastnnum:
lastnnum = nuclide.neutron_number
if nuclide.atomic_number not in table:
table[nuclide.atomic_number] = {}
table[nuclide.atomic_number][nuclide.neutron_number] = nuclide
nuclides.append(nuclide)
else:
incomplete.append(nuclide)
nuclides.sort(key=operator.attrgetter('atomic_number', 'neutron_number'))
for anum in range(0, lastanum+1):
if anum not in table:
table[anum] = {}
for nnum in range(0,lastnnum+1):
if nnum in table[anum]:
table[anum][nnum].__class__ = NuclideCell
else:
table[anum][nnum] = EmptyCell()
return nuclides, table, incomplete
class WdqNuclideProvider(NuclideProvider):
"""Load nuclides from Wikidata Query."""
def __iter__(self):
wdq = self.get_wdq()
ids = ['Q%d' % item_id for item_id in wdq['items']]
entities = self.get_entities(ids, props='labels|claims',
languages=self.language, languagefallback=1)
nuclides = defaultdict(Nuclide)
wdq['props'] = defaultdict(list, wdq.get('props', {}))
for item_id, datatype, value in wdq['props'][str(Nuclide.atomic_number_pid)]:
if datatype != 'quantity':
continue
value = value.split('|')
if len(value) == 4:
value = map(float, value)
if len(set(value[:3])) == 1 and value[3] == 1 and value[0] == int(value[0]):
nuclides[item_id].atomic_number = int(value[0])
for item_id, datatype, value in wdq['props'][str(Nuclide.neutron_number_pid)]:
if datatype != 'quantity':
continue
value = value.split('|')
if len(value) == 4:
value = map(float, value)
if len(set(value[:3])) == 1 and value[3] == 1 and value[0] == int(value[0]):
nuclides[item_id].neutron_number = int(value[0])
for item_id, datatype, value in wdq['props'][str(Nuclide.decay_mode_pid)]:
if datatype != 'item':
continue
nuclides[item_id].decay_modes.append(value)
for item_id, nuclide in nuclides.items():
nuclide.item_id = 'Q%d' % item_id
for prop in ('atomic_number', 'neutron_number'):
if not hasattr(nuclide, prop):
setattr(nuclide, prop, None)
# ?? nuclide.load_data_from_superclasses(subclass_of[item_id])
label = None
entity = entities.get(nuclide.item_id)
if entity and 'labels' in entity and len(entity['labels']) == 1:
label = entity['labels'].values()[0]['value']
nuclide.label = label
half_life = "unknown";
if entity:
claims = entity['claims']
hlprop = 'P%d' % Nuclide.half_life_pid
if hlprop in claims:
hl_claims = claims[hlprop]
for hl_claim in hl_claims:
half_life = time_quantity_in_seconds(hl_claim)
nuclide.half_life = half_life
yield nuclide
@classmethod
def get_wdq(cls):
pids = [str(getattr(Nuclide, name))
for name in ('atomic_number_pid', 'neutron_number_pid', 'decay_mode_pid')]
query = {
'q': 'claim[%d:(tree[%d][][%d])]' % (Nuclide.instance_pid, Nuclide.isotope_qid, Nuclide.subclass_pid),
'props': ','.join(pids)
}
return get_json(cls.WDQ_API, query)
class PropertyAlreadySetException(Exception):
"""Property already set."""
class Nuclide(object):
props = ('atomic_number', 'neutron_number', 'item_id', 'label', 'half_life', 'decay_modes')
atomic_number_pid = 1086
neutron_number_pid = 1148
half_life_pid = 2114
decay_mode_pid = 817
instance_pid = 31
subclass_pid = 279
isotope_qid = 25276 # top-level class under which all isotopes to be found
# isotope_qid = 471790
def __init__(self, **kwargs):
self.decay_modes = []
for key, val in kwargs.items():
if key in self.props:
setattr(self, key, val)
self.classes = []
def __setattr__(self, key, value):
if (key in self.props and hasattr(self, key) and
getattr(self, key) is not None and getattr(self, key) != value):
raise PropertyAlreadySetException
super(Nuclide, self).__setattr__(key, value)
def __iter__(self):
for key in self.props:
yield (key, getattr(self, key))
class TableCell(object):
"""A table cell."""
class NuclideCell(Nuclide, TableCell):
"""A nuclide cell."""
class EmptyCell(TableCell):
"""An empty cell."""

File Metadata

Mime Type
text/x-python
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
2686722
Default Alt Text
nuclides.py (6 KB)

Event Timeline