-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy patheth_eventscanner.py
640 lines (493 loc) · 26.2 KB
/
eth_eventscanner.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
"""A stateful event scanner for Ethereum-based blockchains using Web3.py.
With the stateful mechanism, you can do one batch scan or incremental scans,
where events are added wherever the scanner left off.
"""
import datetime
import time
import logging
from abc import ABC, abstractmethod
from typing import Tuple, Optional, Callable, List, Iterable
from web3 import Web3
from web3.contract import Contract
from web3.datastructures import AttributeDict
from web3.exceptions import BlockNotFound
from eth_abi.codec import ABICodec
# Currently this method is not exposed over official web3 API,
# but we need it to construct eth_getLogs parameters
from web3._utils.filters import construct_event_filter_params
from web3._utils.events import get_event_data
logger = logging.getLogger(__name__)
class EventScannerState(ABC):
"""Application state that remembers what blocks we have scanned in the case of crash.
"""
@abstractmethod
def get_last_scanned_block(self) -> int:
"""Number of the last block we have scanned on the previous cycle.
:return: 0 if no blocks scanned yet
"""
@abstractmethod
def start_chunk(self, block_number: int):
"""Scanner is about to ask data of multiple blocks over JSON-RPC.
Start a database session if needed.
"""
@abstractmethod
def end_chunk(self, block_number: int):
"""Scanner finished a number of blocks.
Persistent any data in your state now.
"""
@abstractmethod
def process_event(self, block_when: datetime.datetime, event: AttributeDict) -> object:
"""Process incoming events.
This function takes raw events from Web3, transforms them to your application internal
format, then saves them in a database or some other state.
:param block_when: When this block was mined
:param event: Symbolic dictionary of the event data
:return: Internal state structure that is the result of event tranformation.
"""
@abstractmethod
def delete_data(self, since_block: int) -> int:
"""Delete any data since this block was scanned.
Purges any potential minor reorg data.
"""
class EventScanner:
"""Scan blockchain for events and try not to abuse JSON-RPC API too much.
Can be used for real-time scans, as it detects minor chain reorganisation and rescans.
Unlike the easy web3.contract.Contract, this scanner can scan events from multiple contracts at once.
For example, you can get all transfers from all tokens in the same scan.
You *should* disable the default `http_retry_request_middleware` on your provider for Web3,
because it cannot correctly throttle and decrease the `eth_getLogs` block number range.
"""
def __init__(self, web3: Web3, contract: Contract, state: EventScannerState, events: List, filters: {},
max_chunk_scan_size: int = 10000, max_request_retries: int = 30, request_retry_seconds: float = 3.0):
"""
:param contract: Contract
:param events: List of web3 Event we scan
:param filters: Filters passed to getLogs
:param max_chunk_scan_size: JSON-RPC API limit in the number of blocks we query. (Recommendation: 10,000 for mainnet, 500,000 for testnets)
:param max_request_retries: How many times we try to reattempt a failed JSON-RPC call
:param request_retry_seconds: Delay between failed requests to let JSON-RPC server to recover
"""
self.logger = logger
self.contract = contract
self.web3 = web3
self.state = state
self.events = events
self.filters = filters
# Our JSON-RPC throttling parameters
self.min_scan_chunk_size = 10 # 12 s/block = 120 seconds period
self.max_scan_chunk_size = max_chunk_scan_size
self.max_request_retries = max_request_retries
self.request_retry_seconds = request_retry_seconds
# Factor how fast we increase the chunk size if results are found
# # (slow down scan after starting to get hits)
self.chunk_size_decrease = 0.5
# Factor how was we increase chunk size if no results found
self.chunk_size_increase = 2.0
@property
def address(self):
return self.token_address
def get_block_timestamp(self, block_num) -> datetime.datetime:
"""Get Ethereum block timestamp"""
try:
block_info = self.web3.eth.getBlock(block_num)
except BlockNotFound:
# Block was not mined yet,
# minor chain reorganisation?
return None
last_time = block_info["timestamp"]
return datetime.datetime.utcfromtimestamp(last_time)
def get_suggested_scan_start_block(self):
"""Get where we should start to scan for new token events.
If there are no prior scans, start from block 1.
Otherwise, start from the last end block minus ten blocks.
We rescan the last ten scanned blocks in the case there were forks to avoid
misaccounting due to minor single block works (happens once in a hour in Ethereum).
These heurestics could be made more robust, but this is for the sake of simple reference implementation.
"""
end_block = self.get_last_scanned_block()
if end_block:
return max(1, end_block - self.NUM_BLOCKS_RESCAN_FOR_FORKS)
return 1
def get_suggested_scan_end_block(self):
"""Get the last mined block on Ethereum chain we are following."""
# Do not scan all the way to the final block, as this
# block might not be mined yet
return self.web3.eth.blockNumber - 1
def get_last_scanned_block(self) -> int:
return self.state.get_last_scanned_block()
def delete_potentially_forked_block_data(self, after_block: int):
"""Purge old data in the case of blockchain reorganisation."""
self.state.delete_data(after_block)
def scan_chunk(self, start_block, end_block) -> Tuple[int, datetime.datetime, list]:
"""Read and process events between to block numbers.
Dynamically decrease the size of the chunk if the case JSON-RPC server pukes out.
:return: tuple(actual end block number, when this block was mined, processed events)
"""
block_timestamps = {}
get_block_timestamp = self.get_block_timestamp
# Cache block timestamps to reduce some RPC overhead
# Real solution might include smarter models around block
def get_block_when(block_num):
if block_num not in block_timestamps:
block_timestamps[block_num] = get_block_timestamp(block_num)
return block_timestamps[block_num]
all_processed = []
for event_type in self.events:
# Callable that takes care of the underlying web3 call
def _fetch_events(_start_block, _end_block):
return _fetch_events_for_all_contracts(self.web3,
event_type,
self.filters,
from_block=_start_block,
to_block=_end_block)
# Do `n` retries on `eth_getLogs`,
# throttle down block range if needed
end_block, events = _retry_web3_call(
_fetch_events,
start_block=start_block,
end_block=end_block,
retries=self.max_request_retries,
delay=self.request_retry_seconds)
for evt in events:
idx = evt["logIndex"] # Integer of the log index position in the block, null when its pending
# We cannot avoid minor chain reorganisations, but
# at least we must avoid blocks that are not mined yet
assert idx is not None, "Somehow tried to scan a pending block"
block_number = evt["blockNumber"]
# Get UTC time when this event happened (block mined timestamp)
# from our in-memory cache
block_when = get_block_when(block_number)
logger.debug("Processing event %s, block:%d count:%d", evt["event"], evt["blockNumber"])
processed = self.state.process_event(block_when, evt)
all_processed.append(processed)
end_block_timestamp = get_block_when(end_block)
return end_block, end_block_timestamp, all_processed
def estimate_next_chunk_size(self, current_chuck_size: int, event_found_count: int):
"""Try to figure out optimal chunk size
Our scanner might need to scan the whole blockchain for all events
* We want to minimize API calls over empty blocks
* We want to make sure that one scan chunk does not try to process too many entries once, as we try to control commit buffer size and potentially asynchronous busy loop
* Do not overload node serving JSON-RPC API by asking data for too many events at a time
Currently Ethereum JSON-API does not have an API to tell when a first event occured in a blockchain
and our heuristics try to accelerate block fetching (chunk size) until we see the first event.
These heurestics exponentially increase the scan chunk size depending on if we are seeing events or not.
When any transfers are encountered, we are back to scanning only a few blocks at a time.
It does not make sense to do a full chain scan starting from block 1, doing one JSON-RPC call per 20 blocks.
"""
if event_found_count > 0:
# When we encounter first events, reset the chunk size window
current_chuck_size = self.min_scan_chunk_size
else:
current_chuck_size *= self.chunk_size_increase
current_chuck_size = max(self.min_scan_chunk_size, current_chuck_size)
current_chuck_size = min(self.max_scan_chunk_size, current_chuck_size)
return int(current_chuck_size)
def scan(self, start_block, end_block, start_chunk_size=20, progress_callback=Optional[Callable]) -> Tuple[
list, int]:
"""Perform a scan.
:param start_block: The first block included in the scan
:param end_block: The last block included in the scan
:param start_chunk_size: How many blocks we try to fetch over JSON-RPC on the first attempt
:param progress_callback: If this is an UI application, update the progress of the scan
:return: [All processed events, number of chunks used]
"""
assert start_block <= end_block
current_block = start_block
# Scan in chunks, commit between
chunk_size = start_chunk_size
last_scan_duration = last_logs_found = 0
total_chunks_scanned = 0
# All processed entries we got on this scan cycle
all_processed = []
while current_block <= end_block:
self.state.start_chunk(current_block, chunk_size)
# Print some diagnostics to logs to try to fiddle with real world JSON-RPC API performance
estimated_end_block = current_block + chunk_size
logger.debug(
"Scanning token transfers for blocks: %d - %d, chunk size %d, last chunk scan took %f, last logs found %d",
current_block, estimated_end_block, chunk_size, last_scan_duration, last_logs_found)
start = time.time()
actual_end_block, end_block_timestamp, new_entries = self.scan_chunk(current_block, estimated_end_block)
# Where does our current chunk scan ends - are we out of chain yet?
current_end = actual_end_block
last_scan_duration = time.time() - start
all_processed += new_entries
# Print progress bar
if progress_callback:
progress_callback(start_block, end_block, current_block, end_block_timestamp, chunk_size, len(new_entries))
# Try to guess how many blocks to fetch over `eth_getLogs` API next time
chunk_size = self.estimate_next_chunk_size(chunk_size, len(new_entries))
# Set where the next chunk starts
current_block = current_end + 1
total_chunks_scanned += 1
self.state.end_chunk(current_end)
return all_processed, total_chunks_scanned
def _retry_web3_call(func, start_block, end_block, retries, delay) -> Tuple[int, list]:
"""A custom retry loop to throttle down block range.
If our JSON-RPC server cannot serve all incoming `eth_getLogs` in a single request,
we retry and throttle down block range for every retry.
For example, Go Ethereum does not indicate what is an acceptable response size.
It just fails on the server-side with a "context was cancelled" warning.
:param func: A callable that triggers Ethereum JSON-RPC, as func(start_block, end_block)
:param start_block: The initial start block of the block range
:param end_block: The initial start block of the block range
:param retries: How many times we retry
:param delay: Time to sleep between retries
"""
for i in range(retries):
try:
return end_block, func(start_block, end_block)
except Exception as e:
# Assume this is HTTPConnectionPool(host='localhost', port=8545): Read timed out. (read timeout=10)
# from Go Ethereum. This translates to the error "context was cancelled" on the server side:
# https://github.com/ethereum/go-ethereum/issues/20426
if i < retries - 1:
# Give some more verbose info than the default middleware
logger.warning(
"Retrying events for block range %d - %d (%d) failed with %s, retrying in %s seconds",
start_block,
end_block,
end_block-start_block,
e,
delay)
# Decrease the `eth_getBlocks` range
end_block = start_block + ((end_block - start_block) // 2)
# Let the JSON-RPC to recover e.g. from restart
time.sleep(delay)
continue
else:
logger.warning("Out of retries")
raise
def _fetch_events_for_all_contracts(
web3,
event,
argument_filters: dict,
from_block: int,
to_block: int) -> Iterable:
"""Get events using eth_getLogs API.
This method is detached from any contract instance.
This is a stateless method, as opposed to createFilter.
It can be safely called against nodes which do not provide `eth_newFilter` API, like Infura.
"""
if from_block is None:
raise TypeError("Missing mandatory keyword argument to getLogs: fromBlock")
# Currently no way to poke this using a public Web3.py API.
# This will return raw underlying ABI JSON object for the event
abi = event._get_event_abi()
# Depending on the Solidity version used to compile
# the contract that uses the ABI,
# it might have Solidity ABI encoding v1 or v2.
# We just assume the default that you set on Web3 object here.
# More information here https://eth-abi.readthedocs.io/en/latest/index.html
codec: ABICodec = web3.codec
# Here we need to poke a bit into Web3 internals, as this
# functionality is not exposed by default.
# Construct JSON-RPC raw filter presentation based on human readable Python descriptions
# Namely, convert event names to their keccak signatures
# More information here:
# https://github.com/ethereum/web3.py/blob/e176ce0793dafdd0573acc8d4b76425b6eb604ca/web3/_utils/filters.py#L71
data_filter_set, event_filter_params = construct_event_filter_params(
abi,
codec,
address=argument_filters.get("address"),
argument_filters=argument_filters,
fromBlock=from_block,
toBlock=to_block
)
logger.debug("Querying eth_getLogs with the following parameters: %s", event_filter_params)
# Call JSON-RPC API on your Ethereum node.
# get_logs() returns raw AttributedDict entries
logs = web3.eth.get_logs(event_filter_params)
# Convert raw binary data to Python proxy objects as described by ABI
all_events = []
for log in logs:
# Convert raw JSON-RPC log result to human readable event by using ABI data
# More information how processLog works here
# https://github.com/ethereum/web3.py/blob/fbaf1ad11b0c7fac09ba34baff2c256cffe0a148/web3/_utils/events.py#L200
evt = get_event_data(codec, abi, log)
# Note: This was originally yield,
# but deferring the timeout exception caused the throttle logic not to work
all_events.append(evt)
return all_events
if __name__ == "__main__":
# Simple demo that scans all the token transfers of RCC token (11k).
# The demo supports persistant state by using a JSON file.
# You will need an Ethereum node for this.
# Running this script will consume around 20k JSON-RPC calls.
# With locally running Geth, the script takes 10 minutes.
# The resulting JSON state file is 2.9 MB.
import sys
import json
from web3.providers.rpc import HTTPProvider
# We use tqdm library to render a nice progress bar in the console
# https://pypi.org/project/tqdm/
from tqdm import tqdm
# Bridge contract on Ethereum
# https://etherscan.io/address/0x0fa0b4cc33d5a4f0ed073ca7f88259ab75c7a98b
BRIDGE_ADDRESS = "0x0fa0B4CC33d5a4f0ed073cA7F88259aB75C7A98b"
# Load bridge ABI from file
ABI = open("Bridge.json", "r")
class JSONifiedState(EventScannerState):
"""Store the state of scanned blocks and all events.
All state is an in-memory dict.
Simple load/store massive JSON on start up.
"""
def __init__(self):
self.state = None
self.fname = "ETH_tokenbridge_events.json"
# How many second ago we saved the JSON file
self.last_save = 0
def reset(self):
"""Create initial state of nothing scanned."""
self.state = {
"last_scanned_block": 13343730, #Block where the bridge was deployed
"blocks": {},
}
def restore(self):
"""Restore the last scan state from a file."""
try:
self.state = json.load(open(self.fname, "rt"))
print(f"Restored the state, previously {self.state['last_scanned_block']} blocks have been scanned")
except (IOError, json.decoder.JSONDecodeError):
print("State starting from scratch")
self.reset()
def save(self):
"""Save everything we have scanned so far in a file."""
with open(self.fname, "wt") as f:
json.dump(self.state, f)
self.last_save = time.time()
#
# EventScannerState methods implemented below
#
def get_last_scanned_block(self):
"""The number of the last block we have stored."""
return self.state["last_scanned_block"]
def delete_data(self, since_block):
"""Remove potentially reorganised blocks from the scan data."""
for block_num in range(since_block, self.get_last_scanned_block()):
if block_num in self.state["blocks"]:
del self.state["blocks"][block_num]
def start_chunk(self, block_number, chunk_size):
pass
def end_chunk(self, block_number):
"""Save at the end of each block, so we can resume in the case of a crash or CTRL+C"""
# Next time the scanner is started we will resume from this block
self.state["last_scanned_block"] = block_number
# Save the database file for every minute
if time.time() - self.last_save > 60:
self.save()
def process_event(self, block_when: datetime.datetime, event: AttributeDict) -> str:
"""Record Cross and AcceptedCrossTransfer in our database."""
# Events are keyed by their transaction hash and log index
# One transaction may contain multiple events
# and each one of those gets their own log index
event_name = event.event # "Cross" or "AcceptedCrossTransfer"
log_index = event.logIndex # Log index within the block
# transaction_index = event.transactionIndex # Transaction index within the block
txhash = event.transactionHash.hex() # Transaction hash
block_number = event.blockNumber
args = event["args"]
# Convert AcceptedCrossTransfer event to our internal format
if event_name == "AcceptedCrossTransfer":
acceptedcrosstransfer = {
"originalTokenAddress": args["_originalTokenAddress"],
"from": args["_from"],
"to": args["_to"],
"amount": args["_amount"],
"timestamp": block_when.isoformat(),
}
# Create empty dict as the block that contains all transactions by txhash
if block_number not in self.state["blocks"]:
self.state["blocks"][block_number] = {}
block = self.state["blocks"][block_number]
if txhash not in block:
# We have not yet recorded any transfers in this transaction
# (One transaction may contain multiple events if executed by a smart contract).
# Create a tx entry that contains all events by a log index
self.state["blocks"][block_number][txhash] = {}
# Record every cross in our database
self.state["blocks"][block_number][txhash][event_name] = acceptedcrosstransfer
# Return a pointer that allows us to look up this event later if needed
return f"{block_number}-{txhash}-{log_index}"
# Convert bridge Cross event to our internal format
if event_name == "Cross":
cross = {
"tokenAddress": args["_tokenAddress"],
"from": args["_from"],
"to": args["_to"],
"amount": args["_amount"],
"timestamp": block_when.isoformat(),
}
# Create empty dict as the block that contains all transactions by txhash
if block_number not in self.state["blocks"]:
self.state["blocks"][block_number] = {}
block = self.state["blocks"][block_number]
if txhash not in block:
#We have not yet recorded any transfers in this transaction
# (One transaction may contain multiple events if executed by a smart contract).
# Create a tx entry that contains all events by a log index
self.state["blocks"][block_number][txhash] = {}
# Record every cross in our database
self.state["blocks"][block_number][txhash][event_name] = cross
# Return a pointer that allows us to look up this event later if needed
return f"{block_number}-{txhash}-{log_index}"
def run():
if len(sys.argv) < 2:
print("Usage: eth_eventscanner.py http://your-node-url")
sys.exit(1)
api_url = sys.argv[1]
# Enable logs to the stdout.
# DEBUG is very verbose level
logging.basicConfig(level=logging.INFO)
provider = HTTPProvider(api_url)
# Remove the default JSON-RPC retry middleware
# as it correctly cannot handle eth_getLogs block range
# throttle down.
provider.middlewares.clear()
web3 = Web3(provider)
# Prepare stub BRIDGE contract object
abi = json.loads(ABI.read())
BRIDGE = web3.eth.contract(abi=abi)
# Restore/create our persistent state
state = JSONifiedState()
state.restore()
# chain_id: int, web3: Web3, abi: dict, state: EventScannerState, events: List, filters: {}, max_chunk_scan_size: int=10000
scanner = EventScanner(
web3=web3,
contract=BRIDGE,
state=state,
events=[BRIDGE.events.Cross, BRIDGE.events.AcceptedCrossTransfer],
filters={"address": BRIDGE_ADDRESS},
# How many maximum blocks at the time we request from JSON-RPC
# and we are unlikely to exceed the response size limit of the JSON-RPC server
max_chunk_scan_size=10000
)
# Assume we might have scanned the blocks all the way to the last Ethereum block
# that mined a few seconds before the previous scan run ended.
# Because there might have been a minor Etherueum chain reorganisations
# since the last scan ended, we need to discard
# the last few blocks from the previous scan results.
chain_reorg_safety_blocks = 10
scanner.delete_potentially_forked_block_data(state.get_last_scanned_block() - chain_reorg_safety_blocks)
# Scan from [last block scanned] - [latest ethereum block]
# Note that our chain reorg safety blocks cannot go negative
start_block = max(state.get_last_scanned_block() - chain_reorg_safety_blocks, 0)
end_block = scanner.get_suggested_scan_end_block()
blocks_to_scan = end_block - start_block
print(f"Scanning events from blocks {start_block} - {end_block}")
# Render a progress bar in the console
start = time.time()
with tqdm(total=blocks_to_scan) as progress_bar:
def _update_progress(start, end, current, current_block_timestamp, chunk_size, events_count):
if current_block_timestamp:
formatted_time = current_block_timestamp.strftime("%d-%m-%Y")
else:
formatted_time = "no block time available"
progress_bar.set_description(f"Current block: {current} ({formatted_time}), blocks in a scan batch: {chunk_size}, events processed in a batch {events_count}")
progress_bar.update(chunk_size)
# Run the scan
result, total_chunks_scanned = scanner.scan(start_block, end_block, progress_callback=_update_progress)
state.save()
duration = time.time() - start
print(f"Scanned total {len(result)} Transfer events, in {duration} seconds, total {total_chunks_scanned} chunk scans performed")
run()