agus1111 commited on
Commit
2856a4c
Β·
verified Β·
1 Parent(s): 831018b

Update autotrack.py

Browse files
Files changed (1) hide show
  1. autotrack.py +293 -0
autotrack.py CHANGED
@@ -0,0 +1,293 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # autotrack.py
2
+ # Auto-tracking milestone (mis. x2) dari ENTRY (price atau market cap) untuk sebuah CA.
3
+ # Sumber data: Dexscreener (gratis, tanpa API key) + fallback Jupiter (Solana).
4
+ # Dependensi: aiohttp
5
+
6
+ import os
7
+ import time
8
+ import math
9
+ import asyncio
10
+ import aiohttp
11
+ from dataclasses import dataclass, field
12
+ from typing import Optional, Dict, List, Tuple
13
+
14
+ DEXSCREENER_TOKEN_URL = "https://api.dexscreener.com/latest/dex/tokens/"
15
+ JUPITER_URL = "https://price.jup.ag/v6/price?ids=" # gratis untuk Solana
16
+
17
+ def _fmt_money(x: Optional[float]) -> str:
18
+ if x is None:
19
+ return "?"
20
+ if x >= 1:
21
+ return f"${x:,.4f}"
22
+ return f"${x:.8f}"
23
+
24
+ def _fmt_big(x: Optional[float]) -> str:
25
+ if x is None:
26
+ return "?"
27
+ # Format untuk market cap
28
+ if x >= 1_000_000_000:
29
+ return f"${x/1_000_000_000:.2f}B"
30
+ if x >= 1_000_000:
31
+ return f"${x/1_000_000:.2f}M"
32
+ if x >= 1_000:
33
+ return f"${x/1_000:.2f}K"
34
+ return f"${x:.0f}"
35
+
36
+ @dataclass
37
+ class TrackItem:
38
+ ca: str
39
+ basis: str = "mcap" # "mcap" (default) atau "price"
40
+ entry_price: Optional[float] = None
41
+ entry_mcap: Optional[float] = None
42
+ symbol_hint: Optional[str] = None
43
+ source_link: Optional[str] = None
44
+ milestones: List[float] = field(default_factory=lambda: [2.0]) # contoh: [1.5, 2.0, 3.0]
45
+ hit: Dict[float, bool] = field(default_factory=dict)
46
+ poll_secs: int = 20
47
+ stop_when_hit: bool = True
48
+ started_at: float = field(default_factory=time.time)
49
+
50
+ class PriceTracker:
51
+ """
52
+ Cara pakai (contoh):
53
+ tracker = PriceTracker(client, announce_chat="@MidasTouchSignall")
54
+ await tracker.start(
55
+ ca="GXKC9BCWCiYVeEpQ8cFoFHpgAeXTzX3YCFX4BnaZpump",
56
+ basis="mcap", # atau "price"
57
+ milestones=[2.0], # x2
58
+ poll_secs=20,
59
+ source_link=None, # opsional link ke pesan sumber
60
+ symbol_hint="COIN", # opsional
61
+ stop_when_hit=True
62
+ )
63
+ """
64
+ def __init__(self, client, announce_chat):
65
+ self.client = client
66
+ self.announce_chat = announce_chat
67
+ self._tasks: Dict[str, asyncio.Task] = {}
68
+ self._items: Dict[str, TrackItem] = {}
69
+
70
+ # =========================
71
+ # HTTP helpers
72
+ # =========================
73
+ async def _get(self, url: str, headers: Optional[Dict[str, str]] = None, timeout: int = 8):
74
+ tout = aiohttp.ClientTimeout(total=timeout)
75
+ async with aiohttp.ClientSession(timeout=tout, headers=headers) as sess:
76
+ async with sess.get(url) as r:
77
+ if r.status != 200:
78
+ return None
79
+ return await r.json()
80
+
81
+ async def _dexscreener_price(self, ca: str) -> Optional[Tuple[Optional[float], Optional[str], Optional[float]]]:
82
+ """
83
+ Mengembalikan (priceUsd, symbol, marketCapUsd/FDV) dari pair 'terbaik' (likuiditas USD terbesar).
84
+ Bisa saja price atau mcap None bila tidak tersedia.
85
+ """
86
+ data = await self._get(DEXSCREENER_TOKEN_URL + ca)
87
+ if not data:
88
+ return None
89
+ pairs = data.get("pairs") or []
90
+ if not pairs:
91
+ return None
92
+
93
+ # Pilih pair dengan likuiditas USD terbesar (heuristik stabil)
94
+ best = max(pairs, key=lambda p: (p.get("liquidity", {}) or {}).get("usd", 0))
95
+ price = float(best.get("priceUsd")) if best.get("priceUsd") else None
96
+ symbol = ((best.get("baseToken") or {}).get("symbol")) or None
97
+
98
+ mcap = None
99
+ # Dexscreener kadang expose 'fdv' &/atau 'marketCap' pada pair
100
+ fdv = best.get("fdv")
101
+ if isinstance(fdv, (int, float)) and fdv > 0:
102
+ mcap = float(fdv)
103
+ mc = best.get("marketCap")
104
+ if isinstance(mc, (int, float)) and mc > 0:
105
+ mcap = float(mc)
106
+
107
+ return (price, symbol, mcap)
108
+
109
+ async def _jupiter_price(self, ca: str) -> Optional[float]:
110
+ """
111
+ Fallback harga khusus Solana via Jupiter (gratis).
112
+ """
113
+ try:
114
+ data = await self._get(JUPITER_URL + ca)
115
+ if not data:
116
+ return None
117
+ d = (data.get("data") or {}).get(ca)
118
+ if not d:
119
+ return None
120
+ p = d.get("price")
121
+ return float(p) if p is not None else None
122
+ except:
123
+ return None
124
+
125
+ async def _get_snapshot(self, ca: str) -> Optional[Dict[str, Optional[float]]]:
126
+ """
127
+ Ambil snapshot {price, mcap, symbol_hint}.
128
+ Urutan fallback: Dexscreener -> Jupiter (Solana only).
129
+ """
130
+ res = await self._dexscreener_price(ca)
131
+ if res:
132
+ price, sym, mcap = res
133
+ # kalau price kosong, coba fallback Jupiter
134
+ if price is None:
135
+ jp = await self._jupiter_price(ca)
136
+ price = jp if jp is not None else None
137
+ return {"price": price, "symbol_hint": sym, "mcap": mcap}
138
+
139
+ # kalau Dexscreener gagal total β†’ coba Jupiter (harga saja)
140
+ jp = await self._jupiter_price(ca)
141
+ if jp is not None:
142
+ return {"price": jp, "symbol_hint": None, "mcap": None}
143
+
144
+ return None
145
+
146
+ # =========================
147
+ # Helpers UI
148
+ # =========================
149
+ def _name(self, item: TrackItem) -> str:
150
+ s = item.symbol_hint or ""
151
+ return f"{s} ({item.ca[:4]}…{item.ca[-4:]})" if s else item.ca
152
+
153
+ async def _say(self, text: str):
154
+ try:
155
+ await self.client.send_message(self.announce_chat, text, link_preview=False)
156
+ except Exception as e:
157
+ print(f"[TRACK] announce failed: {e}")
158
+
159
+ # =========================
160
+ # Core loop
161
+ # =========================
162
+ async def _loop(self, item: TrackItem):
163
+ # Inisialisasi entry dari snapshot
164
+ snap = await self._get_snapshot(item.ca)
165
+ if not snap:
166
+ await self._say(f"⚠️ Gagal mulai tracking untuk `{item.ca}` (tidak dapat harga/mcap awal).")
167
+ return
168
+
169
+ if item.entry_price is None:
170
+ item.entry_price = snap.get("price")
171
+ if item.entry_mcap is None:
172
+ item.entry_mcap = snap.get("mcap")
173
+ if not item.symbol_hint and snap.get("symbol_hint"):
174
+ item.symbol_hint = snap.get("symbol_hint")
175
+
176
+ # Jika basis=mcap tapi mcap awal tidak ada β†’ fallback ke basis=price
177
+ if item.basis == "mcap" and not item.entry_mcap:
178
+ item.basis = "price"
179
+
180
+ basis_label = "Market Cap" if item.basis == "mcap" else "Price"
181
+ entry_disp = _fmt_big(item.entry_mcap) if item.basis == "mcap" else _fmt_money(item.entry_price)
182
+
183
+ await self._say(
184
+ f"πŸ›°οΈ Tracking: **{self._name(item)}** | Basis: **{basis_label}**\n"
185
+ f"Entry: {entry_disp} β€’ Milestones: {', '.join([f'{m}Γ—' for m in item.milestones])}"
186
+ + (f"\nπŸ”— {item.source_link}" if item.source_link else "")
187
+ + f"\nCA: `{item.ca}`"
188
+ )
189
+
190
+ # Loop polling
191
+ while True:
192
+ snap = await self._get_snapshot(item.ca)
193
+ if not snap:
194
+ await asyncio.sleep(item.poll_secs)
195
+ continue
196
+
197
+ cur_price = snap.get("price")
198
+ cur_mcap = snap.get("mcap")
199
+ if not item.symbol_hint and snap.get("symbol_hint"):
200
+ item.symbol_hint = snap.get("symbol_hint")
201
+
202
+ # hitung rasio sesuai basis
203
+ if item.basis == "mcap":
204
+ if not (item.entry_mcap and cur_mcap):
205
+ await asyncio.sleep(item.poll_secs)
206
+ continue
207
+ ratio = (cur_mcap / item.entry_mcap) if item.entry_mcap > 0 else 0.0
208
+ else:
209
+ if not (item.entry_price and cur_price):
210
+ await asyncio.sleep(item.poll_secs)
211
+ continue
212
+ ratio = (cur_price / item.entry_price) if item.entry_price > 0 else 0.0
213
+
214
+ # cek milestones
215
+ for m in item.milestones:
216
+ if item.hit.get(m):
217
+ continue
218
+ if ratio >= m:
219
+ item.hit[m] = True
220
+ elapsed = int(time.time() - item.started_at)
221
+ mm, ss = divmod(elapsed, 60)
222
+ if item.basis == "mcap":
223
+ msg = (
224
+ "βœ… **AUTO-TRACK**\n"
225
+ f"{self._name(item)} menembus **{m}Γ—** mcap\n"
226
+ f"{_fmt_big(item.entry_mcap)} β†’ {_fmt_big(cur_mcap)} (~{ratio:.2f}Γ—)\n"
227
+ f"⏱️ {mm}m {ss}s sejak call"
228
+ + (f"\nπŸ”— {item.source_link}" if item.source_link else "")
229
+ + f"\nCA: `{item.ca}`"
230
+ )
231
+ else:
232
+ msg = (
233
+ "βœ… **AUTO-TRACK**\n"
234
+ f"{self._name(item)} menembus **{m}Γ—** harga\n"
235
+ f"{_fmt_money(item.entry_price)} β†’ {_fmt_money(cur_price)} (~{ratio:.2f}Γ—)\n"
236
+ f"⏱️ {mm}m {ss}s sejak call"
237
+ + (f"\nπŸ”— {item.source_link}" if item.source_link else "")
238
+ + f"\nCA: `{item.ca}`"
239
+ )
240
+ await self._say(msg)
241
+
242
+ if item.stop_when_hit and m == max(item.milestones):
243
+ await self._say(f"πŸ›‘ Tracking dihentikan untuk {self._name(item)} (milestone akhir tercapai).")
244
+ return
245
+
246
+ await asyncio.sleep(item.poll_secs)
247
+
248
+ # =========================
249
+ # Public API
250
+ # =========================
251
+ def is_tracking(self, ca: str) -> bool:
252
+ t = self._tasks.get(ca)
253
+ return bool(t) and not t.done()
254
+
255
+ async def start(
256
+ self,
257
+ ca: str,
258
+ *,
259
+ basis: str = "mcap",
260
+ entry_price: Optional[float] = None,
261
+ entry_mcap: Optional[float] = None,
262
+ milestones: List[float] = [2.0],
263
+ poll_secs: int = 20,
264
+ source_link: Optional[str] = None,
265
+ symbol_hint: Optional[str] = None,
266
+ stop_when_hit: bool = True,
267
+ ):
268
+ ca = ca.strip()
269
+ if self.is_tracking(ca):
270
+ return
271
+ item = TrackItem(
272
+ ca=ca,
273
+ basis=basis.lower(),
274
+ entry_price=entry_price,
275
+ entry_mcap=entry_mcap,
276
+ milestones=sorted(set(milestones)),
277
+ poll_secs=poll_secs,
278
+ source_link=source_link,
279
+ symbol_hint=symbol_hint,
280
+ stop_when_hit=stop_when_hit,
281
+ )
282
+ self._items[ca] = item
283
+ self._tasks[ca] = asyncio.create_task(self._loop(item))
284
+
285
+ def stop(self, ca: str):
286
+ t = self._tasks.get(ca)
287
+ if t and not t.done():
288
+ t.cancel()
289
+
290
+ def stop_all(self):
291
+ for t in list(self._tasks.values()):
292
+ if t and not t.done():
293
+ t.cancel()