-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathscan_obs80.py
33 lines (26 loc) · 948 Bytes
/
scan_obs80.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
import io, sys
from . import obs80
namedtuple = obs80.collections.namedtuple
Roving = namedtuple('Roving', list(obs80.Roving._fields) + ['cod'])
def is_ob(x):
"""filter for valid 80-char observation records"""
return not isinstance(x[0], Exception)
def gen_obs(s):
for o in filter(is_ob, obs80.parse80(s)):
if o.__class__ == obs80.Roving:
kw = dict([(f, getattr(o, f)) for f in o._fields])
kw['cod'] = '247'
o = Roving(**kw)
yield o
def scan_obs80(text):
"""wrapper for obs80.parse80 that cleans and buffers the input
observation text, returns generator of named tuples"""
def clean(s):
s = s.replace("''", "'").replace("\\\\", "\\")
s = s.rstrip().replace(r"\n", "\n").replace("\n\n", "\n")
return s if s and s[-1] == '\n' else s + '\n'
B = io.StringIO()
for s in text:
B.write(clean(s))
B.seek(0)
return gen_obs(B)