RE: [ENG/ITA] Python & Hive: My Scripts are Ready! My First Project is Completed :)
You are viewing a single comment's thread:
:)))))
I'd be extremely happy if it will help you at least a tiny bit, as it would be my first contributione ever to someone's else script! Just a very, very, very, veeeeeery small one, but that would still be something :)
That is my main script
Is this the script you shared in your latest post? Or an even bigger and more complex one?
Btw, here's your original code with the "Session" included, just to prove that I really experimented with it 🤣 (I don't know how to highlight my edits, but I only opened a session, added it as a parameter to the API calls and "prepared" the calls with the slightly different sintax of a session... but I have no idea if what I just said makes sense for you or for anyone at all!)
# fetch_liquidity_pools.py
import json
import os
import argparse
import requests
from time import sleep, time
from random import choice
# Hive-Engine API Nodes
NODES_FILE = 'nodes.json'
retry_delay = 5 # seconds to wait between retries
max_retries = 3 # Maximum number of retries
# Default values
DEFAULT_ACCOUNT_NAME = 'hive-engine' # Replace with your actual Hive account name
DEFAULT_FILTER_TOKEN = 'BTC' # Replace with the desired default token to filter, or use 'ALL' to list all tokens
# File to store token details with precision
TOKEN_CACHE_FILE = 'token_details_cache.json'
cached_token_details = {}
hive_engine_nodes = []
def load_nodes():
global hive_engine_nodes
# Check if the nodes file exists
if os.path.exists(NODES_FILE):
try:
with open(NODES_FILE, 'r') as f:
hive_engine_nodes = json.load(f)
print("Loaded Hive-Engine nodes from file.")
except (ValueError, IOError):
print("Error: Hive-Engine nodes file is corrupted or not readable. Please re-create 'nodes.json' with the list of nodes.")
else:
print("Error: Hive-Engine nodes file not found. Please create 'nodes.json' with the list of nodes.")
hive_engine_nodes = [] # Ensure nodes list is empty on error
def get_node():
# Choose a random node from the list
if hive_engine_nodes:
selected_node = choice(hive_engine_nodes)
print(f"Using Hive-Engine node: {selected_node}") # Print the current node
return selected_node
else:
print("Error: No Hive-Engine nodes available.")
return None
def load_token_cache():
global cached_token_details
# Check if the token cache file exists
if os.path.exists(TOKEN_CACHE_FILE):
try:
with open(TOKEN_CACHE_FILE, 'r') as f:
cached_token_details = json.load(f)
print("Loaded cached token details from file.")
except (ValueError, IOError):
print("Error: Failed to load token cache file. Starting with an empty cache.")
def save_token_cache():
# Save the current token details cache to a file
try:
with open(TOKEN_CACHE_FILE, 'w') as f:
json.dump(cached_token_details, f)
print("Saved token details to cache file.")
except IOError:
print("Error: Failed to save token cache file.")
def fetch_token_details(symbol, session: requests.Session):
# Check if token details are already cached
if symbol in cached_token_details:
print(f"Token details for {symbol} found in cache.")
return cached_token_details[symbol]
print (f"Fetching token details for {symbol}...")
# Fetch token details for the given symbol
for attempt in range(max_retries):
url = get_node()
if not url:
return {}
payload = {
"jsonrpc": "2.0",
"id": 1,
"method": "find",
"params": {
"contract": "tokens",
"table": "tokens",
"query": {"symbol": symbol},
"limit": 1
}
}
request = requests.Request("POST", url=url, json=payload).prepare()
response = session.send(request, allow_redirects=False)
print(f"Attempt {attempt+1}: Status Code: {response.status_code}, Response: {response.text}")
if response.status_code == 200:
try:
data = response.json()
except ValueError:
print("Error: Failed to parse JSON response.")
return {}
if 'result' in data and data['result']:
cached_token_details[symbol] = data['result'][0] # Cache the token details
save_token_cache() # Save cache after updating
return data['result'][0]
print(f"Error: Failed to fetch token details for {symbol}. Status Code: {response.status_code}")
if attempt < max_retries - 1:
sleep(retry_delay)
else:
print(f"Max retries exceeded for {symbol}. Skipping.")
return {}
def fetch_pool_details(token_pair, session: requests.Session):
# Fetch details of the specified liquidity pool
for attempt in range(max_retries):
url = get_node()
if not url:
print("Error: No node URL available, exiting fetch_pool_details.")
return {}
payload = {
"jsonrpc": "2.0",
"id": 1,
"method": "find",
"params": {
"contract": "marketpools",
"table": "pools",
"query": {"tokenPair": token_pair},
"limit": 1
}
}
# print(f"Attempt {attempt + 1} to fetch pool details for {token_pair} from {url}...") # Debugging statement
try:
request = requests.Request("POST", url=url, json=payload).prepare()
response = session.send(request, timeout=10, allow_redirects=False)
print(f"Received response status code: {response.status_code} for {token_pair} from {url}")
if response.status_code == 200:
try:
data = response.json()
# print(f"Data received for {token_pair}: {data}") # Debugging the received data
if 'result' in data and data['result']:
print(f"Successfully fetched pool details for {token_pair}")
return data['result'][0]
else:
print(f"Unexpected response format or empty result for {token_pair} from {url}: {data}")
except ValueError as e:
print("Error: Failed to parse JSON response.")
print(f"Response content: {response.text}") # Print the actual response content
else:
print(f"Error: Failed to fetch pool details for {token_pair}. Status Code: {response.status_code}")
except requests.exceptions.RequestException as e:
print(f"Request exception occurred for {token_pair} from {url}: {e}")
# Handle retries
if attempt < max_retries - 1:
print(f"Retrying after {retry_delay} seconds...")
sleep(retry_delay)
else:
print(f"Max retries exceeded for {token_pair}. Skipping to next.")
print(f"Returning empty details for {token_pair} after all attempts.")
return {}
def fetch_liquidity_positions(account_name, session: requests.Session):
# Fetch liquidity positions for the given account
url = get_node()
if not url:
return {}
payload = {
"jsonrpc": "2.0",
"id": 1,
"method": "find",
"params": {
"contract": "marketpools",
"table": "liquidityPositions",
"query": {"account": account_name},
"limit": 1000
}
}
request = requests.Request("POST", url=url, json=payload).prepare()
response = session.send(request, allow_redirects=False)
# print("Response Status Code: ", response.status_code)
# print("Response Content: ", response.text) # Debug
if response.status_code != 200:
print(f"Error: Failed to fetch data. Status Code: {response.status_code}")
return []
try:
data = response.json()
except ValueError:
print("Error: Failed to parse JSON response.")
return []
return data.get('result', [])
def get_filtered_pools(account_name, filter_token):
with requests.Session() as session:
# Get and filter pools by the specified token
positions = fetch_liquidity_positions(account_name, session)
# Debug: Check fetched positions
print(f"Fetched {len(positions)} liquidity positions for account {account_name}.")
if not positions:
print("No liquidity positions found for this account.")
return []
filtered_pools = []
for position in positions:
token_pair = position.get('tokenPair', 'Unknown')
# Debug: Print each position being processed
print(f"Processing position: {position}")
# If filter_token is 'ALL', skip filtering; otherwise, check for the token in the pair
if filter_token.upper() != 'ALL' and filter_token.upper() not in token_pair.upper():
print(f"Skipping position {token_pair} as it does not match filter token {filter_token.upper()}")
continue
# Additional debug to see which positions pass the filter
print(f"Including position {token_pair} with filter token {filter_token.upper()}")
# Fetch balances and calculate user share
shares = float(position.get('shares', '0'))
pool_details = fetch_pool_details(token_pair, session)
if not pool_details:
continue
total_shares = float(pool_details.get('totalShares', '0'))
base_quantity = float(pool_details.get('baseQuantity', '0'))
quote_quantity = float(pool_details.get('quoteQuantity', '0'))
if total_shares == 0:
print(f"Skipping position {token_pair} due to total shares being 0.")
continue
# Calculate user balances
user_base_balance = (shares / total_shares) * base_quantity
user_quote_balance = (shares / total_shares) * quote_quantity
if ':' in token_pair:
base_symbol, quote_symbol = token_pair.split(':')
else:
base_symbol, quote_symbol = "Unknown", "Unknown"
# Fetch token details to get precision
base_token_details = fetch_token_details(base_symbol, session)
quote_token_details = fetch_token_details(quote_symbol, session)
base_precision = base_token_details.get('precision', 0)
quote_precision = quote_token_details.get('precision', 0)
filtered_pools.append({
"token_pair": token_pair,
"base_symbol": base_symbol,
"quote_symbol": quote_symbol,
"base_balance": user_base_balance,
"quote_balance": user_quote_balance,
"base_precision": base_precision,
"quote_precision": quote_precision
})
# Debug: Print the number of filtered pools
# print(f"Number of filtered pools: {len(filtered_pools)}")
return filtered_pools
def main(account_name, filter_token):
start = time()
# Load nodes from the external file
load_nodes()
# Load cached token details
load_token_cache()
# Fetch and print filtered pools
pools = get_filtered_pools(account_name, filter_token)
if pools:
print(f"\nLiquidity Pool Positions with {filter_token.upper()} token:")
for pool in pools:
print(f"Token Pair: {pool['token_pair']} | Base Balance: {pool['base_balance']:.{pool['base_precision']}f} {pool['base_symbol']} | "
f"Quote Balance: {pool['quote_balance']:.{pool['quote_precision']}f} {pool['quote_symbol']}")
# Debug: If no pools were printed
# if not pools:
# print("No matching liquidity pools found for the given filter.")
end = time()
elapsed_time = end - start
print(f"Fetching pools required {elapsed_time} seconds")
if __name__ == "__main__":
parser = argparse.ArgumentParser(description='Fetch Hive-Engine liquidity pools.')
parser.add_argument('account_name', nargs='?', default=DEFAULT_ACCOUNT_NAME, help='Hive account name to fetch liquidity pools for.')
parser.add_argument('filter_token', nargs='?', default=DEFAULT_FILTER_TOKEN, help="Token to filter by, or 'ALL' to list all tokens.")
args = parser.parse_args()
main(args.account_name, args.filter_token)
Hmm... interesting. It appears I didn't know how to use the session that I opened in 'main', and only used the
requests.post()
function instead ofsession.post()
, this is what made my script slower. So my script still used the old way of fetching data, while it had opened a session for faster communication. I timed my old script with your enhancements:My current script, that I spruced up with your enhancements:
Oh, I notice you had added a timer inside the main loop, nice touch. I am lazy and didn't bother, since I could 'time' it in the terminal. But yep, there were some repeating reuquests in the original script that made it way slower. Here's the current one:
Edit: I noticed that I still hadn't added the 'session' in the arguments of the functions. So I did, and updated the script here to reflect that. The problem is, that now it's slower again. The 'broken' script was a tenth of a second faster, but this one is correctly establishing the session. According to ChatGPT, I might as well not use 'session' anymore, since I only make one request. Session only makes things faster if you do multiple calls to the API. But anyway, it was a fun experiment.
Oh, and I forgot to comment about the main script I'm working on. It's actually a balancer for liquidity pairs. I might write about it as soon as I iron out the wrinkles.
Well, I had no idea you could do that 😅 what you can do through terminal is still a bit oscure to me, because I'm not used to it, so I have no idea of its flexibility and power!
Wow, this one is crazy fast! Now I'm going to read your new post :)
It's the only thing I understand of this "Session" stuff 😂 because I still have to figure what parameters/values it keeps and if it can be used whatever I make more calls to an API or only if the calls are somehow linked or else... I have no idea tbh ahahah
Heh, it wasn't immediately clear for me either, although I had an inkling. Here's how I eventually understood the concept myself: The idea with sessions is that when you use one, you're keeping the connection to the server open, so you don't need to create a new connection for each API call. It’s like having an uninterrupted "conversation" with the server, rather than hanging up and redialing for every single message. This can save time and resources, especially when you’re making multiple API calls within the same session.
!PIZZA