RE: [ENG/ITA] Python & Hive: My Scripts are Ready! My First Project is Completed :)

avatar
(Edited)

You are viewing a single comment's thread:

Hmm... interesting. It appears I didn't know how to use the session that I opened in 'main', and only used the requests.post() function instead of session.post(), this is what made my script slower. So my script still used the old way of fetching data, while it had opened a session for faster communication. I timed my old script with your enhancements:

➜ time python arc7ic_fetch.py spswhale ALL
...
Fetching pools required 10.768758773803711 seconds

real    0m10.913s
user    0m0.196s
sys 0m0.017s

My current script, that I spruced up with your enhancements:

➜ time python fetch_liquidity_pools-add_session.py spswhale ALL
...
real    0m0.336s
user    0m0.150s
sys 0m0.018s

Oh, I notice you had added a timer inside the main loop, nice touch. I am lazy and didn't bother, since I could 'time' it in the terminal. But yep, there were some repeating reuquests in the original script that made it way slower. Here's the current one:

# fetch_liquidity_pools.py
import json
import os
import argparse
import requests

# from time import sleep
import time
from random import choice  # To randomly choose a node from the list

# Hive-Engine API Node
# HIVE_ENGINE_NODE = 'https://api2.hive-engine.com/rpc/contracts'
NODES_FILE = "nodes.json"
retry_delay = 5  # seconds to wait between retries
max_retries = 3  # Maximum number of retries

# Default values
DEFAULT_ACCOUNT_NAME = "hive-engine"  # Replace with your actual Hive account name
DEFAULT_FILTER_TOKEN = (
    "BTC"  # Replace with the desired token to filter, or use 'ALL' to list all tokens
)

# File to store token details with precision
TOKEN_CACHE_FILE = "token_details_cache.json"
cached_token_details = {}
hive_engine_nodes = []


def load_nodes():
    global hive_engine_nodes
    # Check if the nodes file exists
    if os.path.exists(NODES_FILE):
        try:
            with open(NODES_FILE, "r") as f:
                hive_engine_nodes = json.load(f)
                print("Loaded Hive-Engine nodes from file.")
        except (ValueError, IOError):
            print(
                "Error: Hive-Engine nodes file is corrupted or not readable. Please re-create 'nodes.json' with the list of nodes."
            )
    else:
        print(
            "Error: Hive-Engine nodes file not found. Please create 'nodes.json' with the list of nodes."
        )
        hive_engine_nodes = []  # Ensure nodes list is empty on error


def get_node():
    # Choose a random node from the list
    if hive_engine_nodes:
        selected_node = choice(hive_engine_nodes)
        print(f"Using Hive-Engine node: {selected_node}")  # Print the current node
        return selected_node
    else:
        print("Error: No Hive-Engine nodes available.")
        return None


def load_token_cache():
    global cached_token_details
    # Check if the token cache file exists
    if os.path.exists(TOKEN_CACHE_FILE):
        try:
            with open(TOKEN_CACHE_FILE, "r") as f:
                cached_token_details = json.load(f)
                print("Loaded cached token details from file.")
        except (ValueError, IOError):
            print(
                "Error: Failed to load token cache file. Starting with an empty cache."
            )


def save_token_cache():
    # Save the current token details cache to a file
    try:
        with open(TOKEN_CACHE_FILE, "w") as f:
            json.dump(cached_token_details, f)
            print("Saved token details to cache file.")
    except IOError:
        print("Error: Failed to save token cache file.")


def fetch_token_details(symbol, session):
    # Check if token details are already cached
    if symbol in cached_token_details:
        # print(f"Token details for {symbol} found in cache.")
        return cached_token_details[symbol]

    print(f"Fetching token details for {symbol}...")
    # Fetch token details for the given symbol
    for attempt in range(max_retries):
        url = get_node()
        if not url:
            return {}

        payload = {
            "jsonrpc": "2.0",
            "id": 1,
            "method": "find",
            "params": {
                "contract": "tokens",
                "table": "tokens",
                "query": {"symbol": symbol},
                "limit": 1,
            },
        }

        response = session.post(url, json=payload)

        # print(f"Attempt {attempt+1}: Status Code: {response.status_code}, Response: {response.text}")

        if response.status_code == 200:
            try:
                data = response.json()
            except ValueError:
                print("Error: Failed to parse JSON response.")
                return {}

            if "result" in data and data["result"]:
                cached_token_details[symbol] = data["result"][
                    0
                ]  # Cache the token details
                save_token_cache()  # Save cache after updating
                return data["result"][0]

        print(
            f"Error: Failed to fetch token details for {symbol}. Status Code: {response.status_code}"
        )
        if attempt < max_retries - 1:
            time.sleep(retry_delay)
        else:
            print(f"Max retries exceeded for {symbol}. Skipping.")

    return {}


def fetch_pool_details(token_pair):
    # Fetch details of the specified liquidity pool
    for attempt in range(max_retries):
        url = get_node()
        if not url:
            print("Error: No node URL available, exiting fetch_pool_details.")
            return {}

        payload = {
            "jsonrpc": "2.0",
            "id": 1,
            "method": "find",
            "params": {
                "contract": "marketpools",
                "table": "pools",
                "query": {"tokenPair": token_pair},
                "limit": 1,
            },
        }

        print(
            f"Attempt {attempt + 1} to fetch pool details for {token_pair} from {url}..."
        )  # Debugging statement

        try:
            response = session.post(
                url, json=payload, timeout=10
            )  # Set a timeout for the request
            # print(
            #     f"Received response status code: {response.status_code} for {token_pair} from {url}"
            # )

            if response.status_code == 200:
                try:
                    data = response.json()
                    print(
                        f"Data received for {token_pair}: {data}"
                    )  # Debugging the received data
                    if "result" in data and data["result"]:
                        print(f"Successfully fetched pool details for {token_pair}")
                        return data["result"][0]
                    else:
                        print(
                            f"Unexpected response format or empty result for {token_pair} from {url}: {data}"
                        )
                except ValueError as e:
                    print(f"Error: Failed to parse JSON response: {e}.")
                    # print(f"Response content: {response.text}") # Print the actual response content
            else:
                print(
                    f"Error: Failed to fetch pool details for {token_pair}. Status Code: {response.status_code}"
                )
        except requests.exceptions.RequestException as e:
            print(f"Request exception occurred for {token_pair} from {url}: {e}")

        # Handle retries
        if attempt < max_retries - 1:
            print(f"Retrying after {retry_delay} seconds...")
            time.sleep(retry_delay)
        else:
            print(f"Max retries exceeded for {token_pair}. Skipping to next.")

    print(f"Returning empty details for {token_pair} after all attempts.")
    return {}


def fetch_all_pools(session):
    url = get_node()
    if not url:
        return []

    payload = {
        "jsonrpc": "2.0",
        "id": 1,
        "method": "find",
        "params": {
            "contract": "marketpools",
            "table": "pools",
            "query": {},
            "limit": 1000,  # Adjust limit based on how many pools exist
        },
    }

    try:
        response = session.post(url, json=payload, timeout=10)
        if response.status_code == 200:
            data = response.json()
            return data.get("result", [])
        else:
            print(
                f"Error: Failed to fetch all pools. Status Code: {response.status_code}"
            )
            return []
    except requests.exceptions.RequestException as e:
        print(f"An error occurred: {e}")
        return []


def fetch_liquidity_positions(account_name, retries=5, backoff_factor=5):
    # Fetch liquidity positions for the given account
    url = get_node()
    if not url:
        return {}

    payload = {
        "jsonrpc": "2.0",
        "id": 1,
        "method": "find",
        "params": {
            "contract": "marketpools",
            "table": "liquidityPositions",
            "query": {"account": account_name},
            "limit": 1000,
        },
    }

    for attempt in range(retries):
        try:
            response = session.post(url, json=payload, timeout=10)
            # print("Response Status Code: ", response.status_code)

            # Print the entire raw response text for debugging purposes
            # print("Raw response text: ", response.text)

            if response.status_code == 200:
                try:
                    data = response.json()
                    return data.get("result", [])
                except ValueError:
                    print("Error: Failed to parse JSON response.")
                    return []
            else:
                print(
                    f"Error: Failed to fetch data. Status Code: {response.status_code}"
                )
                return []

        except requests.exceptions.ConnectionError as e:
            print(
                f"Attempt {attempt + 1}: Connection error: {e}, retrying in {backoff_factor} seconds..."
            )
            time.sleep(backoff_factor)
        except requests.exceptions.Timeout as e:
            print(
                f"Attempt {attempt + 1}: Request timed out: {e}, retrying in {backoff_factor} seconds..."
            )
            time.sleep(backoff_factor)
        except requests.exceptions.RequestException as e:
            print(f"Attempt {attempt + 1}: An error occurred: {e}")
            return []

    print(f"Max retries exceeded. Could not fetch data for account: {account_name}")
    return []


def get_filtered_pools(account_name, filter_token, session):
    # Fetch all pools in one go
    all_pools = fetch_all_pools(session)

    # Check if pools were fetched succesfully
    if not all_pools:
        print("Error: Failed to fetch all pools.")
        return []

    pool_dict = {pool["tokenPair"]: pool for pool in all_pools}

    # Get and filter pools by the specified token
    positions = fetch_liquidity_positions(account_name)

    # Debug: Check fetched positions
    print(f"Fetched {len(positions)} liquidity positions for account {account_name}.")
    # print("Test print of all the fetched positions:")
    # print(json.dumps(positions, indent=4))  # Pretty-print the positions

    if not positions:
        print("No liquidity positions found for this account.")
        return []

    filtered_pools = []

    for position in positions:
        token_pair = position.get("tokenPair", "Unknown")

        # Debug: Print each position being processed
        # print(f"Processing position: {position}")

        # If filter_token is 'ALL', skip filtering; otherwise, check for the token in the pair
        if (
            filter_token.upper() != "ALL"
            and filter_token.upper() not in token_pair.upper()
        ):
            # print(f"Skipping position {token_pair} as it does not match filter token {filter_token.upper()}")
            continue

        # Additional debug to see which positions pass the filter
        # print(
        #    f"Including position {token_pair} with filter token {filter_token.upper()}"
        # )

        # Fetch the pool details from the all_pools dictionary
        pool_details = pool_dict.get(token_pair)
        if not pool_details:
            print(f"Warning: No pool details found for {token_pair}")
            continue

        # Calculate user balances
        shares = float(position.get("shares", "0"))
        base_quantity = float(pool_details.get("baseQuantity", "0"))
        quote_quantity = float(pool_details.get("quoteQuantity", "0"))
        total_shares = float(pool_details.get("totalShares", "0"))

        if total_shares == 0:
            print(f"Skipping position {token_pair} due to total shares being 0.")
            continue

        # Calculate user balances
        user_base_balance = (shares / total_shares) * base_quantity
        user_quote_balance = (shares / total_shares) * quote_quantity

        if ":" in token_pair:
            base_symbol, quote_symbol = token_pair.split(":")
        else:
            base_symbol, quote_symbol = "Unknown", "Unknown"

        # Fetch token details to get precision
        base_token_details = fetch_token_details(base_symbol, session)
        quote_token_details = fetch_token_details(quote_symbol, session)
        base_precision = base_token_details.get("precision", 0)
        quote_precision = quote_token_details.get("precision", 0)

        filtered_pools.append(
            {
                "token_pair": token_pair,
                "base_symbol": base_symbol,
                "quote_symbol": quote_symbol,
                "base_balance": user_base_balance,
                "quote_balance": user_quote_balance,
                "base_precision": base_precision,
                "quote_precision": quote_precision,
            }
        )

    # Debug: Print the number of filtered pools
    print(f"Number of filtered pools: {len(filtered_pools)}")

    return filtered_pools


def main(account_name, filter_token):
    # Load nodes from the external file
    load_nodes()

    # Load cached token details
    load_token_cache()

    # Create a session object
    with requests.Session() as session:
        # Fetch and print filtered pools
        pools = get_filtered_pools(account_name, filter_token, session)
        print(f"\nLiquidity Pool Positions with {filter_token.upper()} token:")
        for pool in pools:
            print(
                f"Token Pair: {pool['token_pair']} | Base Balance: {pool['base_balance']:.{pool['base_precision']}f} {pool['base_symbol']} | "
                f"Quote Balance: {pool['quote_balance']:.{pool['quote_precision']}f} {pool['quote_symbol']}"
            )

        # Debug: If no pools were printed
        if not pools:
            print("No matching liquidity pools found for the given filter.")


if __name__ == "__main__":
    # When run as a standalone script
    session = requests.Session()
    try:
        parser = argparse.ArgumentParser(
            description="Fetch Hive-Engine liquidity pools."
        )
        parser.add_argument(
            "account_name",
            nargs="?",
            default=DEFAULT_ACCOUNT_NAME,
            help="Hive account name to fetch liquidity pools for.",
        )
        parser.add_argument(
            "filter_token",
            nargs="?",
            default=DEFAULT_FILTER_TOKEN,
            help="Token to filter by, or 'ALL' to list all tokens.",
        )

        args = parser.parse_args()

        main(args.account_name, args.filter_token)
    finally:
        session.close()

Edit: I noticed that I still hadn't added the 'session' in the arguments of the functions. So I did, and updated the script here to reflect that. The problem is, that now it's slower again. The 'broken' script was a tenth of a second faster, but this one is correctly establishing the session. According to ChatGPT, I might as well not use 'session' anymore, since I only make one request. Session only makes things faster if you do multiple calls to the API. But anyway, it was a fun experiment.

Oh, and I forgot to comment about the main script I'm working on. It's actually a balancer for liquidity pairs. I might write about it as soon as I iron out the wrinkles.



0
0
0.000
2 comments
avatar

I am lazy and didn't bother, since I could 'time' it in the terminal.

Well, I had no idea you could do that 😅 what you can do through terminal is still a bit oscure to me, because I'm not used to it, so I have no idea of its flexibility and power!

My current script, that I spruced up with your enhancements:

Wow, this one is crazy fast! Now I'm going to read your new post :)

Session only makes things faster if you do multiple calls to the API

It's the only thing I understand of this "Session" stuff 😂 because I still have to figure what parameters/values it keeps and if it can be used whatever I make more calls to an API or only if the calls are somehow linked or else... I have no idea tbh ahahah

0
0
0.000
avatar

Heh, it wasn't immediately clear for me either, although I had an inkling. Here's how I eventually understood the concept myself: The idea with sessions is that when you use one, you're keeping the connection to the server open, so you don't need to create a new connection for each API call. It’s like having an uninterrupted "conversation" with the server, rather than hanging up and redialing for every single message. This can save time and resources, especially when you’re making multiple API calls within the same session.

!PIZZA

0
0
0.000