diff --git a/.gitignore b/.gitignore index 861bdc396..f097278d4 100644 --- a/.gitignore +++ b/.gitignore @@ -49,4 +49,5 @@ venv/ ENV/ # etl -/last_synced_block.txt \ No newline at end of file +/last_synced_block.txt +.idea \ No newline at end of file diff --git a/.idea/.gitignore b/.idea/.gitignore new file mode 100644 index 000000000..26d33521a --- /dev/null +++ b/.idea/.gitignore @@ -0,0 +1,3 @@ +# Default ignored files +/shelf/ +/workspace.xml diff --git a/.idea/csv-editor.xml b/.idea/csv-editor.xml new file mode 100644 index 000000000..6da08f0e3 --- /dev/null +++ b/.idea/csv-editor.xml @@ -0,0 +1,44 @@ + + + + + + \ No newline at end of file diff --git a/.idea/ethereum-etl.iml b/.idea/ethereum-etl.iml new file mode 100644 index 000000000..5fdd65ba2 --- /dev/null +++ b/.idea/ethereum-etl.iml @@ -0,0 +1,15 @@ + + + + + + + + + + + + \ No newline at end of file diff --git a/.idea/inspectionProfiles/Project_Default.xml b/.idea/inspectionProfiles/Project_Default.xml new file mode 100644 index 000000000..acba12602 --- /dev/null +++ b/.idea/inspectionProfiles/Project_Default.xml @@ -0,0 +1,63 @@ + + + + \ No newline at end of file diff --git a/.idea/inspectionProfiles/profiles_settings.xml b/.idea/inspectionProfiles/profiles_settings.xml new file mode 100644 index 000000000..105ce2da2 --- /dev/null +++ b/.idea/inspectionProfiles/profiles_settings.xml @@ -0,0 +1,6 @@ + + + + \ No newline at end of file diff --git a/.idea/modules.xml b/.idea/modules.xml new file mode 100644 index 000000000..1a4adf2ca --- /dev/null +++ b/.idea/modules.xml @@ -0,0 +1,8 @@ + + + + + + + + \ No newline at end of file diff --git a/.idea/vcs.xml b/.idea/vcs.xml new file mode 100644 index 000000000..35eb1ddfb --- /dev/null +++ b/.idea/vcs.xml @@ -0,0 +1,6 @@ + + + + + + \ No newline at end of file diff --git a/ethereumetl/cli/export_blocks_and_transactions.py b/ethereumetl/cli/export_blocks_and_transactions.py index 88564be96..f8d22e048 100644 --- a/ethereumetl/cli/export_blocks_and_transactions.py +++ b/ethereumetl/cli/export_blocks_and_transactions.py @@ -48,7 +48,7 @@ 'If not provided transactions will not be exported. Use "-" for stdout') @click.option('-c', '--chain', default='ethereum', show_default=True, type=str, help='The chain network to connect to.') def export_blocks_and_transactions(start_block, end_block, batch_size, provider_uri, max_workers, blocks_output, - transactions_output, chain='ethereum'): + transactions_output, chain): """Exports blocks and transactions.""" provider_uri = check_classic_provider_uri(chain, provider_uri) if blocks_output is None and transactions_output is None: @@ -61,6 +61,8 @@ def export_blocks_and_transactions(start_block, end_block, batch_size, provider_ batch_web3_provider=ThreadLocalProxy(lambda: get_provider_from_uri(provider_uri, batch=True)), max_workers=max_workers, item_exporter=blocks_and_transactions_item_exporter(blocks_output, transactions_output), + chain=chain, export_blocks=blocks_output is not None, - export_transactions=transactions_output is not None) + export_transactions=transactions_output is not None, + ) job.run() diff --git a/ethereumetl/cli/export_receipts_and_logs.py b/ethereumetl/cli/export_receipts_and_logs.py index 95ff491a8..0e63eb435 100644 --- a/ethereumetl/cli/export_receipts_and_logs.py +++ b/ethereumetl/cli/export_receipts_and_logs.py @@ -36,8 +36,8 @@ @click.command(context_settings=dict(help_option_names=['-h', '--help'])) @click.option('-b', '--batch-size', default=100, show_default=True, type=int, help='The number of receipts to export at a time.') -@click.option('-t', '--transaction-hashes', required=True, type=str, - help='The file containing transaction hashes, one per line.') +@click.option('-t', '--block-hashes', required=True, type=str, + help='The file containing block hashes, one per line.') @click.option('-p', '--provider-uri', default='https://mainnet.infura.io', show_default=True, type=str, help='The URI of the web3 provider e.g. ' 'file://$HOME/Library/Ethereum/geth.ipc or https://mainnet.infura.io') @@ -48,13 +48,13 @@ help='The output file for receipt logs. ' 'If not provided receipt logs will not be exported. Use "-" for stdout') @click.option('-c', '--chain', default='ethereum', show_default=True, type=str, help='The chain network to connect to.') -def export_receipts_and_logs(batch_size, transaction_hashes, provider_uri, max_workers, receipts_output, logs_output, +def export_receipts_and_logs(batch_size, block_number, provider_uri, max_workers, receipts_output, logs_output, chain='ethereum'): """Exports receipts and logs.""" provider_uri = check_classic_provider_uri(chain, provider_uri) - with smart_open(transaction_hashes, 'r') as transaction_hashes_file: + with smart_open(block_number, 'r') as blocks_file: job = ExportReceiptsJob( - transaction_hashes_iterable=(transaction_hash.strip() for transaction_hash in transaction_hashes_file), + transaction_hashes_iterable=(block_number.strip() for block_number in blocks_file), batch_size=batch_size, batch_web3_provider=ThreadLocalProxy(lambda: get_provider_from_uri(provider_uri, batch=True)), max_workers=max_workers, diff --git a/ethereumetl/cli/stream.py b/ethereumetl/cli/stream.py index 217208dba..7fb7937aa 100644 --- a/ethereumetl/cli/stream.py +++ b/ethereumetl/cli/stream.py @@ -53,7 +53,8 @@ @click.option('-w', '--max-workers', default=5, show_default=True, type=int, help='The number of workers') @click.option('--log-file', default=None, show_default=True, type=str, help='Log file') @click.option('--pid-file', default=None, show_default=True, type=str, help='pid file') -def stream(last_synced_block_file, lag, provider_uri, output, start_block, entity_types, +@click.option('--chain', default=None, show_default=True, type=str, help='chain') +def stream(last_synced_block_file, lag, provider_uri, output, start_block, entity_types,chain, period_seconds=10, batch_size=2, block_batch_size=10, max_workers=5, log_file=None, pid_file=None): """Streams all data types to console or Google Pub/Sub.""" configure_logging(log_file) @@ -72,7 +73,8 @@ def stream(last_synced_block_file, lag, provider_uri, output, start_block, entit item_exporter=create_item_exporters(output), batch_size=batch_size, max_workers=max_workers, - entity_types=entity_types + entity_types=entity_types, + chain=chain ) streamer = Streamer( blockchain_streamer_adapter=streamer_adapter, diff --git a/ethereumetl/jobs/export_blocks_job.py b/ethereumetl/jobs/export_blocks_job.py index 6f4eff7a7..846f365af 100644 --- a/ethereumetl/jobs/export_blocks_job.py +++ b/ethereumetl/jobs/export_blocks_job.py @@ -25,7 +25,7 @@ from ethereumetl.executors.batch_work_executor import BatchWorkExecutor from blockchainetl.jobs.base_job import BaseJob -from ethereumetl.json_rpc_requests import generate_get_block_by_number_json_rpc +from ethereumetl.json_rpc_requests import generate_get_block_by_number_json_rpc,generate_get_bor_author_by_number_json_rpc from ethereumetl.mappers.block_mapper import EthBlockMapper from ethereumetl.mappers.transaction_mapper import EthTransactionMapper from ethereumetl.utils import rpc_response_batch_to_results, validate_range @@ -41,8 +41,10 @@ def __init__( batch_web3_provider, max_workers, item_exporter, + chain, export_blocks=True, - export_transactions=True): + export_transactions=True, + ): validate_range(start_block, end_block) self.start_block = start_block self.end_block = end_block @@ -54,6 +56,7 @@ def __init__( self.export_blocks = export_blocks self.export_transactions = export_transactions + self.chain = chain if not self.export_blocks and not self.export_transactions: raise ValueError('At least one of export_blocks or export_transactions must be True') @@ -80,8 +83,16 @@ def _export_batch(self, block_number_batch): self._export_block(block) def _export_block(self, block): - if self.export_blocks: - self.item_exporter.export_item(self.block_mapper.block_to_dict(block)) + if self.chain == 'polygon': + bor_author_rpc = generate_get_bor_author_by_number_json_rpc(block.number) + bor_response = self.batch_web3_provider.make_batch_request(json.dumps(next(bor_author_rpc))) + bor_result = rpc_response_batch_to_results([bor_response]) + if self.export_blocks: + self.item_exporter.export_item(self.block_mapper.block_to_dict_with_author(block, next(bor_result))) + else: + if self.export_blocks: + self.item_exporter.export_item(self.block_mapper.block_to_dict(block)) + if self.export_transactions: for tx in block.transactions: self.item_exporter.export_item(self.transaction_mapper.transaction_to_dict(tx)) diff --git a/ethereumetl/jobs/export_receipts_job.py b/ethereumetl/jobs/export_receipts_job.py index 3dc1622cb..82255dafe 100644 --- a/ethereumetl/jobs/export_receipts_job.py +++ b/ethereumetl/jobs/export_receipts_job.py @@ -25,7 +25,7 @@ from blockchainetl.jobs.base_job import BaseJob from ethereumetl.executors.batch_work_executor import BatchWorkExecutor -from ethereumetl.json_rpc_requests import generate_get_receipt_json_rpc +from ethereumetl.json_rpc_requests import generate_get_receipt_json_rpc,generate_get_receipt_by_block_json_rpc from ethereumetl.mappers.receipt_log_mapper import EthReceiptLogMapper from ethereumetl.mappers.receipt_mapper import EthReceiptMapper from ethereumetl.utils import rpc_response_batch_to_results @@ -62,13 +62,14 @@ def _start(self): def _export(self): self.batch_work_executor.execute(self.transaction_hashes_iterable, self._export_receipts) - def _export_receipts(self, transaction_hashes): - receipts_rpc = list(generate_get_receipt_json_rpc(transaction_hashes)) + def _export_receipts(self, block_number): + receipts_rpc = list(generate_get_receipt_by_block_json_rpc(block_number)) response = self.batch_web3_provider.make_batch_request(json.dumps(receipts_rpc)) - results = rpc_response_batch_to_results(response) - receipts = [self.receipt_mapper.json_dict_to_receipt(result) for result in results] - for receipt in receipts: - self._export_receipt(receipt) + results_raw = rpc_response_batch_to_results(response) + for results in results_raw: + receipts = [self.receipt_mapper.json_dict_to_receipt(result) for result in results] + for receipt in receipts: + self._export_receipt(receipt) def _export_receipt(self, receipt): if self.export_receipts: diff --git a/ethereumetl/json_rpc_requests.py b/ethereumetl/json_rpc_requests.py index 46103da8a..041ab3696 100644 --- a/ethereumetl/json_rpc_requests.py +++ b/ethereumetl/json_rpc_requests.py @@ -29,6 +29,11 @@ def generate_get_block_by_number_json_rpc(block_numbers, include_transactions): request_id=idx ) +def generate_get_bor_author_by_number_json_rpc(block_number): + yield generate_json_rpc( + method='bor_getAuthor', + params=[hex(block_number)], + ) def generate_trace_block_by_number_json_rpc(block_numbers): for block_number in block_numbers: @@ -49,6 +54,15 @@ def generate_get_receipt_json_rpc(transaction_hashes): ) +def generate_get_receipt_by_block_json_rpc(block_number): + for idx, block_number in enumerate(block_number): + yield generate_json_rpc( + method='eth_getBlockReceipts', + params=[hex(int(block_number))], + request_id=idx + ) + + def generate_get_code_json_rpc(contract_addresses, block='latest'): for idx, contract_address in enumerate(contract_addresses): yield generate_json_rpc( diff --git a/ethereumetl/mappers/block_mapper.py b/ethereumetl/mappers/block_mapper.py index d5ae556f5..144136262 100644 --- a/ethereumetl/mappers/block_mapper.py +++ b/ethereumetl/mappers/block_mapper.py @@ -105,3 +105,29 @@ def block_to_dict(self, block): 'withdrawals_root': block.withdrawals_root, 'withdrawals': block.withdrawals, } + + def block_to_dict_with_author(self, block, bor_result): + return { + 'type': 'block', + 'number': block.number, + 'hash': block.hash, + 'parent_hash': block.parent_hash, + 'nonce': block.nonce, + 'sha3_uncles': block.sha3_uncles, + 'logs_bloom': block.logs_bloom, + 'transactions_root': block.transactions_root, + 'state_root': block.state_root, + 'receipts_root': block.receipts_root, + 'miner': bor_result, + 'difficulty': block.difficulty, + 'total_difficulty': block.total_difficulty, + 'size': block.size, + 'extra_data': block.extra_data, + 'gas_limit': block.gas_limit, + 'gas_used': block.gas_used, + 'timestamp': block.timestamp, + 'transaction_count': block.transaction_count, + 'base_fee_per_gas': block.base_fee_per_gas, + 'withdrawals_root': block.withdrawals_root, + 'withdrawals': block.withdrawals + } diff --git a/ethereumetl/mappers/receipt_mapper.py b/ethereumetl/mappers/receipt_mapper.py index 95d1f8c65..4b45c723a 100644 --- a/ethereumetl/mappers/receipt_mapper.py +++ b/ethereumetl/mappers/receipt_mapper.py @@ -34,6 +34,7 @@ def __init__(self, receipt_log_mapper=None): self.receipt_log_mapper = receipt_log_mapper def json_dict_to_receipt(self, json_dict): + receipt = EthReceipt() receipt.transaction_hash = json_dict.get('transactionHash') diff --git a/ethereumetl/streaming/eth_streamer_adapter.py b/ethereumetl/streaming/eth_streamer_adapter.py index 7fcf39377..63236a717 100644 --- a/ethereumetl/streaming/eth_streamer_adapter.py +++ b/ethereumetl/streaming/eth_streamer_adapter.py @@ -24,6 +24,7 @@ def __init__( item_exporter=ConsoleItemExporter(), batch_size=100, max_workers=5, + chain="ethereum", entity_types=tuple(EntityType.ALL_FOR_STREAMING)): self.batch_web3_provider = batch_web3_provider self.item_exporter = item_exporter @@ -32,6 +33,7 @@ def __init__( self.entity_types = entity_types self.item_id_calculator = EthItemIdCalculator() self.item_timestamp_calculator = EthItemTimestampCalculator() + self.chain=chain def open(self): self.item_exporter.open() @@ -44,12 +46,12 @@ def export_all(self, start_block, end_block): # Export blocks and transactions blocks, transactions = [], [] if self._should_export(EntityType.BLOCK) or self._should_export(EntityType.TRANSACTION): - blocks, transactions = self._export_blocks_and_transactions(start_block, end_block) + blocks, transactions = self._export_blocks_and_transactions(start_block, end_block,self.chain) # Export receipts and logs receipts, logs = [], [] if self._should_export(EntityType.RECEIPT) or self._should_export(EntityType.LOG): - receipts, logs = self._export_receipts_and_logs(transactions) + receipts, logs = self._export_receipts_and_logs(blocks) # Extract token transfers token_transfers = [] @@ -102,7 +104,7 @@ def export_all(self, start_block, end_block): self.item_exporter.export_items(all_items) - def _export_blocks_and_transactions(self, start_block, end_block): + def _export_blocks_and_transactions(self, start_block, end_block, chain): blocks_and_transactions_item_exporter = InMemoryItemExporter(item_types=['block', 'transaction']) blocks_and_transactions_job = ExportBlocksJob( start_block=start_block, @@ -112,17 +114,18 @@ def _export_blocks_and_transactions(self, start_block, end_block): max_workers=self.max_workers, item_exporter=blocks_and_transactions_item_exporter, export_blocks=self._should_export(EntityType.BLOCK), - export_transactions=self._should_export(EntityType.TRANSACTION) + export_transactions=self._should_export(EntityType.TRANSACTION), + chain=chain ) blocks_and_transactions_job.run() blocks = blocks_and_transactions_item_exporter.get_items('block') transactions = blocks_and_transactions_item_exporter.get_items('transaction') return blocks, transactions - def _export_receipts_and_logs(self, transactions): + def _export_receipts_and_logs(self, blocks): exporter = InMemoryItemExporter(item_types=['receipt', 'log']) job = ExportReceiptsJob( - transaction_hashes_iterable=(transaction['hash'] for transaction in transactions), + transaction_hashes_iterable=(blocks['number'] for blocks in blocks), batch_size=self.batch_size, batch_web3_provider=self.batch_web3_provider, max_workers=self.max_workers, diff --git a/ethereumetl/utils.py b/ethereumetl/utils.py index 1d2512b64..c1ef7556a 100644 --- a/ethereumetl/utils.py +++ b/ethereumetl/utils.py @@ -23,6 +23,7 @@ import itertools import warnings +import base58 from ethereumetl.misc.retriable_value_error import RetriableValueError @@ -47,6 +48,7 @@ def to_int_or_none(val): except ValueError: return None + def to_float_or_none(val): if isinstance(val, float): return val @@ -58,6 +60,7 @@ def to_float_or_none(val): print("can't cast %s to float" % val) return val + def chunk_string(string, length): return (string[0 + i:length + i] for i in range(0, len(string), length)) @@ -68,6 +71,21 @@ def to_normalized_address(address): return address.lower() +def hex_to_base58(hex_string): + if hex_string is None: + return hex_string + if hex_string[:2] in ["0x", "0X"]: + hex_string = "41" + hex_string[2:] + bytes_str = bytes.fromhex(hex_string) + base58_str = base58.b58encode_check(bytes_str) + return base58_str.decode("UTF-8") + + +def base58_to_hex(self, base58_string): + asc_string = base58.b58decode_check(base58_string) + return asc_string.hex().upper() + + def validate_range(range_start_incl, range_end_incl): if range_start_incl < 0 or range_end_incl < 0: raise ValueError('range_start and range_end must be greater or equal to 0') diff --git a/setup.py b/setup.py index ab6b8843d..619c7157e 100644 --- a/setup.py +++ b/setup.py @@ -11,7 +11,7 @@ def read(fname): setup( name='ethereum-etl', - version='2.3.1', + version='2.3.4', author='Evgeny Medvedev', author_email='evge.medvedev@gmail.com', description='Tools for exporting Ethereum blockchain data to CSV or JSON',