-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathobtain_new_ids.py
executable file
·330 lines (282 loc) · 14.2 KB
/
obtain_new_ids.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
#!/usr/bin/env python3
"""
Acquire a list of work IDs to be disseminated.
Purpose: automatic dissemination at regular intervals of specified works from selected publishers.
For dissemination to Internet Archive, (Loughborough) Figshare, Zenodo, CUL and Google Play:
find newly-published works for upload.
For dissemination to Crossref: find newly-updated works for metadata deposit (including update).
Based on `iabulkupload/obtain_work_ids.py`.
"""
# Both third-party packages already included in thoth-dissemination/requirements.txt
from internetarchive import search_items
from thothlibrary import errors, ThothClient
import argparse
import json
import logging
from datetime import datetime, timedelta, UTC
from os import environ
import sys
class IDFinder():
"""Common logic for retrieving work IDs for all platforms"""
def __init__(self):
"""Set up Thoth client instance and variables for use in other methods"""
self.thoth = ThothClient()
self.thoth_ids = []
self.work_statuses = None
self.publishers = None
self.order = None
self.updated_at_with_relations = None
def run(self):
"""
Retrieve the required set of work IDs and output them
(as an array of comma-separated, quote-enclosed strings)
"""
self.get_publishers()
self.get_query_parameters()
self.get_thoth_ids()
self.remove_exceptions()
self.post_process()
print(self.thoth_ids)
def get_publishers(self):
""""Retrieve IDs for all publishers whose works should be included"""
# Check that a list of IDs of publishers whose works should be uploaded
# has been provided as a JSON-formatted environment variable
try:
publishers_env = json.loads(environ.get('ENV_PUBLISHERS'))
except:
logging.error(
'Failed to retrieve publisher IDs from environment variable')
sys.exit(1)
# Test that list is not empty - if so, the Thoth client call would erroneously
# retrieve the full list of works from all publishers
if len(publishers_env) < 1:
logging.error(
'No publisher IDs found in environment variable: list is empty')
sys.exit(1)
# Test that all supplied publisher IDs are valid - if a mistyped ID was passed to the Thoth
# client call, it would behave the same as a valid ID for which no relevant works exist
for publisher in publishers_env:
try:
self.thoth.publisher(publisher)
except errors.ThothError:
# Don't include full error text as it's lengthy (contains full query/response)
logging.error('No record found for publisher {}: ID may be incorrect'.format(
publisher))
sys.exit(1)
self.publishers = json.dumps(publishers_env)
def get_thoth_ids(self):
"""Query Thoth GraphQL API with relevant parameters to retrieve required work IDs"""
# `books` query includes Monographs, Edited Books, Textbooks and Journal Issues
# but excludes Chapters and Book Sets. `bookIds` variant only retrieves their workIds.
thoth_works = self.thoth.bookIds(
# The default limit is 100; publishers' back catalogues may be bigger than that
limit='9999',
work_statuses=self.work_statuses,
order=self.order,
publishers=self.publishers,
updated_at_with_relations=self.updated_at_with_relations,
)
# Extract the Thoth work ID strings from the set of results
self.thoth_ids = [n.workId for n in thoth_works]
def get_thoth_ids_iteratively(self, start_date, end_date):
"""
Query Thoth GraphQL API with relevant parameters to retrieve required work IDs,
iterating through results to select only those published between the specified dates
"""
# TODO Once https://github.com/thoth-pub/thoth/issues/486 is completed,
# we can simply construct a standard query filtering by publication date
offset = 0
while True:
next_batch = self.thoth.books(
limit=1,
offset=offset,
work_statuses=self.work_statuses,
order=self.order,
publishers=self.publishers,
updated_at_with_relations=self.updated_at_with_relations,
)
if len(next_batch) < 1:
# No more works to be found
break
offset += 1
next_work = next_batch[0]
next_work_pub_date = datetime.strptime(next_work.publicationDate, "%Y-%m-%d").date()
if next_work_pub_date > end_date:
# This work will be handled in the next run - don't cause duplication
continue
elif next_work_pub_date >= start_date:
# This work was published in the target period - include it
self.thoth_ids.append(next_work.workId)
else:
# We've reached the first work in the list which was published
# earlier than the target period - stop
break
def remove_exceptions(self):
"""
If a list of exceptions has been provided, remove these from the results
(e.g. works that are ineligible for upload due to not being available as PDFs)
"""
# Omitted exceptions may be represented as None if running locally,
# or an empty string if passed via GitHub Actions inheritance
if environ.get('ENV_EXCEPTIONS'):
try:
exceptions = json.loads(environ.get('ENV_EXCEPTIONS').lower())
self.thoth_ids = list(
set(self.thoth_ids).difference(exceptions))
except Exception:
# Current use case for exceptions list is just to avoid attempting
# uploads which are expected to fail. However, an exception here
# would indicate that the list has been entered incorrectly.
# Early-exit to alert users that it needs to be fixed.
logging.error(
'Failed to retrieve excepted works from environment variable')
sys.exit(1)
class CrossrefIDFinder(IDFinder):
"""Logic for retrieving work IDs which is specific to Crossref dissemination"""
def get_query_parameters(self):
"""Construct Thoth work ID query parameters depending on Crossref-specific requirements"""
# The schedule for finding and depositing updated metadata is once hourly.
# TODO ideally we could pass this value from the GitHub Action to ensure synchronisation.
DEPOSIT_INTERVAL_HRS = 1
# Scheduled GitHub Actions may not start exactly at the specified time.
# A couple of months of daily runs showed average delay of 10-15 mins.
# Try to avoid missing any works which were updated in the gap between
# when the Action should have run and when it actually ran.
DELAY_BUFFER_HRS = 0.25
# Target: all works listed in Thoth (from the selected publishers) which are
# Active, and which have been updated since the last deposit.
# Use UTC, as GitHub Actions scheduling runs in UTC.
current_time = datetime.now(UTC)
last_deposit_time = current_time - \
timedelta(hours=(DEPOSIT_INTERVAL_HRS + DELAY_BUFFER_HRS))
last_deposit_time_str = datetime.strftime(
last_deposit_time, "%Y-%m-%dT%H:%M:%SZ")
self.work_statuses = '[ACTIVE]'
# Start with the most recently updated
self.order = '{field: UPDATED_AT_WITH_RELATIONS, direction: DESC}'
self.updated_at_with_relations = '{{timestamp: "{}", expression: GREATER_THAN}}'.format(
last_deposit_time_str)
def post_process(self):
"""Amend list of retrieved work IDs depending on Crossref-specific requirements"""
# Not required for Crossref dissemination - keep full list
pass
class InternetArchiveIDFinder(IDFinder):
"""Logic for retrieving work IDs which is specific to Internet Archive dissemination"""
def get_query_parameters(self):
"""Construct Thoth work ID query parameters depending on Internet Archive-specific requirements"""
# Target: all active (published) works listed in Thoth (from the selected publishers).
self.work_statuses = '[ACTIVE]'
# Start with the earliest, so that the upload is logically ordered
self.order = '{field: PUBLICATION_DATE, direction: ASC}'
self.updated_at_with_relations = None
def post_process(self):
"""Amend list of retrieved work IDs depending on Internet Archive-specific requirements"""
# Obtain all works listed in the Internet Archive's Thoth Archiving Network collection.
# We only need the identifier; this matches the Thoth work ID.
# If the collection later grows to include more publishers, we may want to
# additionally filter the query to only return works from those selected.
ia_works = search_items(
query='collection:thoth-archiving-network', fields=['identifier'])
# Extract the IA identifiers from the set of results
ia_ids = [n['identifier'] for n in ia_works]
# The set of IDs of works that need to be uploaded to the Internet Archive
# is those which appear as published for the selected publishers in Thoth
# but do not appear as already uploaded to the IA collection
# (minus any specified exceptions).
self.thoth_ids = list(set(self.thoth_ids).difference(ia_ids))
class CatchupIDFinder(IDFinder):
"""
Logic for retrieving work IDs which is specific to recurring 'catchup'
dissemination of recent publications to various archiving platforms.
Currently used for (Loughborough) Figshare, CUL and Zenodo. Internet Archive
is handled separately, as its API allows a simpler workflow.
"""
def get_query_parameters(self):
"""
Construct Thoth work ID query parameters depending on platform-specific
requirements
"""
# Target: all active (published) works listed in Thoth (from the selected publishers).
self.work_statuses = '[ACTIVE]'
# Start with the most recent, so that we can disregard everything else
# as soon as we hit the first work published earlier than the desired date range.
self.order = '{field: PUBLICATION_DATE, direction: DESC}'
self.updated_at_with_relations = None
def get_thoth_ids(self):
"""Query Thoth GraphQL API with relevant parameters to retrieve required work IDs"""
# TODO Once https://github.com/thoth-pub/thoth/issues/486 is completed,
# we can remove this overriding method and simply construct a standard query
# filtering by publication date
# In addition to the conditions of the query parameters, we need to filter the results
# to obtain only works with a publication date within the previous calendar month.
# The schedule for finding and depositing newly published works is once monthly
# (a few days after the start of the month, to allow for delays in updating records).
current_date = datetime.now(UTC).date()
current_month_start = current_date.replace(day=1)
previous_month_end = current_month_start - timedelta(days=1)
previous_month_start = previous_month_end.replace(day=1)
self.get_thoth_ids_iteratively(previous_month_start, previous_month_end)
def post_process(self):
"""
Amend list of retrieved work IDs depending on platform-specific
requirements
"""
# Not required - keep full list
pass
class GooglePlayIDFinder(IDFinder):
"""Logic for retrieving work IDs which is specific to Google Play dissemination"""
def get_query_parameters(self):
"""
Construct Thoth work ID query parameters depending on platform-specific
requirements
"""
# Target: all active (published) works listed in Thoth (from the selected publishers).
self.work_statuses = '[ACTIVE]'
# Start with the most recent, so that we can disregard everything else
# as soon as we hit the first work published earlier than the desired date range.
self.order = '{field: PUBLICATION_DATE, direction: DESC}'
self.updated_at_with_relations = None
def get_thoth_ids(self):
"""Query Thoth GraphQL API with relevant parameters to retrieve required work IDs"""
# TODO Once https://github.com/thoth-pub/thoth/issues/486 is completed,
# we can remove this overriding method and simply construct a standard query
# filtering by publication date
# In addition to the conditions of the query parameters, we need to filter the results
# to obtain only works with a publication date within the previous day.
# The schedule for finding and depositing newly published works is once daily.
current_date = datetime.now(UTC).date()
previous_day = current_date - timedelta(days=1)
self.get_thoth_ids_iteratively(previous_day, previous_day)
def post_process(self):
"""
Amend list of retrieved work IDs depending on platform-specific
requirements
"""
# Not required - keep full list
pass
def get_arguments():
"""Simple argument parsing"""
parser = argparse.ArgumentParser()
parser.add_argument("-p", "--platform")
args = parser.parse_args()
return args
if __name__ == '__main__':
logging.basicConfig(level=logging.INFO,
format='%(levelname)s:%(asctime)s: %(message)s')
args = get_arguments()
platform = args.platform
match platform:
case 'InternetArchive':
id_finder = InternetArchiveIDFinder()
case 'Crossref':
id_finder = CrossrefIDFinder()
case 'GooglePlay':
id_finder = GooglePlayIDFinder()
case 'Figshare' | 'Zenodo' | 'CUL':
id_finder = CatchupIDFinder()
case _:
logging.error(
'Platform must be one of InternetArchive, Crossref, Figshare, '
'Zenodo, CUL or GooglePlay')
sys.exit(1)
id_finder.run()