Skip to content

Commit 8a4314d

Browse files
committed
Fix addresses parsing
1 parent 2a4bcf1 commit 8a4314d

File tree

6 files changed

+187
-28
lines changed

6 files changed

+187
-28
lines changed
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
"""Remove tx addresses index
2+
3+
Revision ID: d2035abc33cf
4+
Revises: f14d9f9ad604
5+
Create Date: 2025-08-04 11:13:34.498234
6+
7+
"""
8+
from typing import Sequence, Union
9+
10+
from alembic import op
11+
import sqlalchemy as sa
12+
13+
14+
# revision identifiers, used by Alembic.
15+
revision: str = 'd2035abc33cf'
16+
down_revision: Union[str, None] = 'f14d9f9ad604'
17+
branch_labels: Union[str, Sequence[str], None] = None
18+
depends_on: Union[str, Sequence[str], None] = None
19+
20+
21+
def upgrade() -> None:
22+
# ### commands auto generated by Alembic - please adjust! ###
23+
op.drop_index(op.f('ix_service_transactions_addresses'), table_name='service_transactions')
24+
# ### end Alembic commands ###
25+
26+
27+
def downgrade() -> None:
28+
# ### commands auto generated by Alembic - please adjust! ###
29+
op.create_index(op.f('ix_service_transactions_addresses'), 'service_transactions', ['addresses'], unique=False)
30+
# ### end Alembic commands ###

app/address/router.py

Lines changed: 21 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,9 @@
11
from app.utils import pagination, paginated_response
22
from sqlalchemy.ext.asyncio import AsyncSession
3+
from fastapi import APIRouter, Depends
34
from app.dependencies import get_page
45
from app.database import get_session
5-
from fastapi.params import Depends
66
from app.address import service
7-
from fastapi import APIRouter
87

98
from app.schemas import (
109
TransactionPaginatedResponse,
@@ -16,9 +15,7 @@
1615
router = APIRouter(prefix="/address", tags=["Addresses"])
1716

1817

19-
@router.get(
20-
"/{address}/outputs/{currency}", response_model=OutputPaginatedResponse
21-
)
18+
@router.get("/{address}/outputs/{currency}", response_model=OutputPaginatedResponse)
2219
async def get_unspent_outputs(
2320
address: str,
2421
currency: str,
@@ -36,8 +33,26 @@ async def get_unspent_outputs(
3633

3734

3835
@router.get(
39-
"/{address}/transactions", response_model=TransactionPaginatedResponse
36+
"/{address}/utxo/{currency}",
37+
summary="Get utxo for specified amount",
38+
response_model=OutputPaginatedResponse,
4039
)
40+
async def get_utxo(
41+
address: str,
42+
currency: str,
43+
amount: float,
44+
session: AsyncSession = Depends(get_session),
45+
page: int = Depends(get_page),
46+
):
47+
limit, offset = pagination(page)
48+
49+
total = await service.count_utxo(session, address, currency, amount)
50+
items = await service.list_utxo(session, address, currency, amount, limit, offset)
51+
52+
return paginated_response(items.all(), total, page, limit)
53+
54+
55+
@router.get("/{address}/transactions", response_model=TransactionPaginatedResponse)
4156
async def get_transactions(
4257
address: str,
4358
session: AsyncSession = Depends(get_session),

app/address/service.py

Lines changed: 48 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,7 @@ async def count_unspent_outputs(
2323
session: AsyncSession, address: str, currency: str
2424
) -> int:
2525
return await session.scalar(
26-
unspent_outputs_filters(
27-
select(func.count(Output.id)), address, currency
28-
)
26+
unspent_outputs_filters(select(func.count(Output.id)), address, currency)
2927
)
3028

3129

@@ -38,16 +36,55 @@ async def list_unspent_outputs(
3836
) -> ScalarResult[Output]:
3937
return await session.scalars(
4038
unspent_outputs_filters(
41-
select(Output)
42-
.order_by(Output.amount.desc())
43-
.limit(limit)
44-
.offset(offset),
39+
select(Output).order_by(Output.amount.desc()).limit(limit).offset(offset),
4540
address,
4641
currency,
4742
)
4843
)
4944

5045

46+
def utxo_cte(address: str, currency: str):
47+
return (
48+
select(
49+
Output,
50+
func.sum(Output.amount) # type: ignore
51+
.over(order_by=Output.blockhash)
52+
.label("cumulative_amount"),
53+
)
54+
.filter(Output.address == address, Output.currency == currency, ~Output.spent)
55+
.cte("cumulative_outputs")
56+
)
57+
58+
59+
async def count_utxo(
60+
session: AsyncSession, address: str, currency: str, amount: float
61+
) -> int:
62+
cte = utxo_cte(address, currency)
63+
query = (
64+
select(func.count(1)).select_from(cte).where(cte.c.cumulative_amount < amount)
65+
)
66+
return await session.scalar(query) or 0
67+
68+
69+
async def list_utxo(
70+
session: AsyncSession,
71+
address: str,
72+
currency: str,
73+
amount: float,
74+
limit: int,
75+
offset: int,
76+
):
77+
cte = utxo_cte(address, currency)
78+
query = (
79+
select(cte)
80+
.select_from(cte)
81+
.where(cte.c.cumulative_amount < amount)
82+
.limit(limit)
83+
.offset(offset)
84+
)
85+
return await session.execute(query)
86+
87+
5188
def transactions_filters(query: Select, address: str) -> Select:
5289
query = query.filter(Transaction.addresses.contains([address]))
5390

@@ -74,16 +111,12 @@ async def list_transactions(
74111
address,
75112
)
76113
):
77-
transactions.append(
78-
await load_tx_details(session, transaction, latest_block)
79-
)
114+
transactions.append(await load_tx_details(session, transaction, latest_block))
80115

81116
return transactions
82117

83118

84-
async def list_balances(
85-
session: AsyncSession, address: str
86-
) -> list[AddressBalance]:
119+
async def list_balances(session: AsyncSession, address: str) -> list[AddressBalance]:
87120
balances = []
88121
for balance in await session.scalars(
89122
select(AddressBalance).filter(
@@ -97,18 +130,14 @@ async def list_balances(
97130
return balances
98131

99132

100-
async def list_address_mempool_transactions(
101-
session: AsyncSession, address: str
102-
):
133+
async def list_address_mempool_transactions(session: AsyncSession, address: str):
103134
mempool = await session.scalar(select(MemPool).limit(1))
104135

105136
if mempool is None:
106137
return []
107138

108139
return [
109-
await load_mempool_tx_details(
110-
session, transaction, mempool.raw["outputs"]
111-
)
140+
await load_mempool_tx_details(session, transaction, mempool.raw["outputs"])
112141
for transaction in mempool.raw["transactions"]
113142
if address in transaction["addresses"]
114143
]

app/models/transaction.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ class Transaction(Base):
1313
currencies: Mapped[list[str]] = mapped_column(ARRAY(String(64)), index=True)
1414
txid: Mapped[str] = mapped_column(String(64), index=True, unique=True)
1515
blockhash: Mapped[str] = mapped_column(String(64), index=True)
16-
addresses: Mapped[list[str]] = mapped_column(ARRAY(String), index=True)
16+
addresses: Mapped[list[str]] = mapped_column(ARRAY(String))
1717
created: Mapped[datetime]
1818
timestamp: Mapped[int]
1919
size: Mapped[int]

app/parser.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -164,9 +164,9 @@ async def parse_transactions(txids: list[str]):
164164

165165
addresses = list(
166166
set(
167-
address
167+
vout["scriptPubKey"]["address"]
168168
for vout in transaction_data["vout"]
169-
for address in vout["scriptPubKey"].get("addresses", [])
169+
if vout["scriptPubKey"]["type"] != "nulldata"
170170
)
171171
)
172172
timestamp = transaction_data.get("time", None)

scripts/add_addresses_to_txs.py

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
import asyncio
2+
import typing
3+
4+
from sqlalchemy import func, select, update
5+
6+
from app.database import sessionmanager
7+
from app.settings import get_settings
8+
from app.parser import make_request
9+
from app.models import Transaction
10+
11+
settings: typing.Any = get_settings()
12+
13+
14+
async def main():
15+
sessionmanager.init(settings.database.endpoint)
16+
17+
async with sessionmanager.session() as session:
18+
19+
total = (
20+
await session.scalar(
21+
select(func.count(Transaction.id)).filter(Transaction.addresses == ())
22+
)
23+
or 0
24+
)
25+
limit = 100
26+
27+
offset = 0
28+
query = (
29+
select(Transaction.id, Transaction.txid)
30+
.limit(limit)
31+
.filter(Transaction.addresses == ())
32+
.limit(limit)
33+
)
34+
35+
total /= limit
36+
37+
while txs := (await session.execute(query.offset(offset))).all():
38+
offset += limit
39+
40+
transactions_result = await make_request(
41+
settings.blockchain.endpoint,
42+
[
43+
{
44+
"id": str(dbid),
45+
"method": "getrawtransaction",
46+
"params": [txid, True],
47+
}
48+
for dbid, txid in txs
49+
],
50+
)
51+
52+
addresses: dict[str, list[str]] = {
53+
tx["id"]: (
54+
list(
55+
set(
56+
vout["scriptPubKey"]["address"]
57+
for vout in tx["result"]["vout"]
58+
if vout["scriptPubKey"]["type"] != "nulldata"
59+
)
60+
)
61+
)
62+
for tx in transactions_result
63+
}
64+
65+
await session.execute(
66+
update(Transaction),
67+
[
68+
{"id": dbid, "addresses": addresses}
69+
for dbid, addresses in addresses.items()
70+
],
71+
execution_options={"synchronize_session": False},
72+
)
73+
await session.commit()
74+
print(
75+
f"Progress: {offset // limit}/{total} ({(offset // limit/total)*100:.2f})",
76+
end="\r",
77+
flush=True,
78+
)
79+
break
80+
81+
print("\nDone")
82+
83+
84+
if __name__ == "__main__":
85+
asyncio.run(main())

0 commit comments

Comments
 (0)