RE: [ENG/ITA] Python & Hive: My Scripts are Ready! My First Project is Completed :)

You are viewing a single comment's thread:

The idea is that it should read the blocks - starting from the last irreversible block if launched for the first time or from the last checked/saved block if already used in the past - and look for posts that have specific requirements (language, tags, length).

If you look through the comments and look for those of @gamer00 he suggested me a very cool change that improved the workflow of the bot and he also pointed out that my scripts are very poor in terms of error checking 😅 but I guess they are very poor in general 🤣

Anyways, cool to see how you are building things.

I know that they sucks, but that was exactly my goal: to show everyone that I'm trying to do something with what I learn and getting suggestions from more experienced coders :)

I haven't really understood the bot, but infinite loops are ass.

I thought that it was an easy way to create something that could work "forever", but I'm not surprised it could have be done better 🤣



0
0
0.000
11 comments
avatar

I know that they sucks, but that was exactly my goal: to show everyone that I'm trying to do something with what I learn and getting suggestions from more experienced coders

I never said they suck.
I just couldn't read it in one pass.
If it works, it's ok.
The infinite loop is an eyesore, though :D

error checking

error checking is lame. Don't worry about it.
I'll blog about that soon, too.
You know this meme?:

[https://www.reddit.com/r/ProgrammerHumor/comments/16nhu63/printbellcurve/]

0
0
0.000
avatar

I know that they sucks, but that was exactly my goal: to show everyone that I'm trying to do something with what I learn and getting suggestions from more experienced coders

I never said they suck.
I just couldn't read it in one pass.
If it works, it's ok.
The infinite loop is an eyesore, though :D

error checking

error checking is lame. Don't worry about it.
I'll blog about that soon, too.
You know this meme?:

[https://www.reddit.com/r/ProgrammerHumor/comments/16nhu63/printbellcurve/]

0
0
0.000
avatar

They seems to work but they crash sometimes (I at least have to catch some errors related to JSON files, as they seem to create troubles from time to time... or should I say from block to block? 😂) and I didn't test them extensively enough to be sure that there aren't some bugs I'm not aware of.

The infinite loop is an eyesore, though :D

Stupid infinite loops 🤣 and I tought that they were cool ahahahah

You know this meme?

Nope, but I get it ahahah

0
0
0.000
avatar

Infinite loops are not always stupid. You are also right about catching errors that may have got nothing to do with your code. And there can always be some bug you couldn't have seen unless you had made those print statements. Talking about catching bugs, I must soon release a new version of my 'fetch_liquidity_pools.py' script, because I happened to catch an enormous bug and also made the script blazingly fast by fixing it.

0
0
0.000
avatar

Super cool! I'm very curious to see your updated version and learn new stuff from your work!

On a side note, I aslo experimented a bit with your script and I tried to introduce a "Session" (not sure how I can describe it 😅 it's part of the "request" library) to see if it could make the script even faster... and it worked! Not sure if you may be interested in seeing it :) it's a very small change and it may help you make it (ultra)blazingly fast!

0
0
0.000
avatar
(Edited)

"Session"

Oh cool, a new toy!

I bet it made the original script faster, but now that I tried it with the improved one, it surprisingly made it slower. I guess it has something to do with the fact that when I originally caught the script doing a pool details for each token pair separately, I decided that since it wasn't possible to retrieve pool details for certain accounts only, I would set it to retrieve details for every pool in one go. This made the script faster, but now that I introduced requests.Session to my script, it added just a tad overhead, making it somewhat slower than previously. Weird.

But I already know what I will use the requests.Session for. That is my main script that controls among others, the fetch_liquidity_pools.py script. Thank you for possibly solving a serious problem for me, as the main script was giving me some serious gray hair!

I'll be checking this out more thoroughly for sure! :)

!PIZZA !BEER !WINE

0
0
0.000
avatar

:)))))

I'd be extremely happy if it will help you at least a tiny bit, as it would be my first contributione ever to someone's else script! Just a very, very, very, veeeeeery small one, but that would still be something :)

That is my main script

Is this the script you shared in your latest post? Or an even bigger and more complex one?

Btw, here's your original code with the "Session" included, just to prove that I really experimented with it 🤣 (I don't know how to highlight my edits, but I only opened a session, added it as a parameter to the API calls and "prepared" the calls with the slightly different sintax of a session... but I have no idea if what I just said makes sense for you or for anyone at all!)

# fetch_liquidity_pools.py
import json
import os
import argparse
import requests
from time import sleep, time
from random import choice

# Hive-Engine API Nodes
NODES_FILE = 'nodes.json'
retry_delay = 5  # seconds to wait between retries
max_retries = 3  # Maximum number of retries

# Default values
DEFAULT_ACCOUNT_NAME = 'hive-engine'  # Replace with your actual Hive account name
DEFAULT_FILTER_TOKEN = 'BTC'  # Replace with the desired default token to filter, or use 'ALL' to list all tokens

# File to store token details with precision
TOKEN_CACHE_FILE = 'token_details_cache.json'
cached_token_details = {}
hive_engine_nodes = []

def load_nodes():
    global hive_engine_nodes
    # Check if the nodes file exists
    if os.path.exists(NODES_FILE):
        try:
            with open(NODES_FILE, 'r') as f:
                hive_engine_nodes = json.load(f)
                print("Loaded Hive-Engine nodes from file.")
        except (ValueError, IOError):
            print("Error: Hive-Engine nodes file is corrupted or not readable. Please re-create 'nodes.json' with the list of nodes.")
    else:
        print("Error: Hive-Engine nodes file not found. Please create 'nodes.json' with the list of nodes.")
        hive_engine_nodes = [] # Ensure nodes list is empty on error


def get_node():
# Choose a random node from the list
    if hive_engine_nodes:
        selected_node = choice(hive_engine_nodes)
        print(f"Using Hive-Engine node: {selected_node}") # Print the current node
        return selected_node
    else:
        print("Error: No Hive-Engine nodes available.")
        return None


def load_token_cache():
    global cached_token_details
    # Check if the token cache file exists
    if os.path.exists(TOKEN_CACHE_FILE):
        try:
            with open(TOKEN_CACHE_FILE, 'r') as f:
                cached_token_details = json.load(f)
                print("Loaded cached token details from file.")
        except (ValueError, IOError):
            print("Error: Failed to load token cache file. Starting with an empty cache.")


def save_token_cache():
    # Save the current token details cache to a file
    try:
        with open(TOKEN_CACHE_FILE, 'w') as f:
            json.dump(cached_token_details, f)
            print("Saved token details to cache file.")
    except IOError:
        print("Error: Failed to save token cache file.")


def fetch_token_details(symbol, session: requests.Session):
    # Check if token details are already cached
    if symbol in cached_token_details:
        print(f"Token details for {symbol} found in cache.")
        return cached_token_details[symbol]

    print (f"Fetching token details for {symbol}...")
    # Fetch token details for the given symbol
    for attempt in range(max_retries):
        url = get_node()
        if not url:
            return {}

        payload = {
            "jsonrpc": "2.0",
            "id": 1,
            "method": "find",
            "params": {
                "contract": "tokens",
                "table": "tokens",
                "query": {"symbol": symbol},
                "limit": 1
            }
        }

        request = requests.Request("POST", url=url, json=payload).prepare()
        response = session.send(request, allow_redirects=False)

        print(f"Attempt {attempt+1}: Status Code: {response.status_code}, Response: {response.text}")

        if response.status_code == 200:
            try:
                data = response.json()
            except ValueError:
                print("Error: Failed to parse JSON response.")
                return {}

            if 'result' in data and data['result']:
                cached_token_details[symbol] = data['result'][0]  # Cache the token details
                save_token_cache()  # Save cache after updating
                return data['result'][0]

        print(f"Error: Failed to fetch token details for {symbol}. Status Code: {response.status_code}")
        if attempt < max_retries - 1:
            sleep(retry_delay)
        else:
            print(f"Max retries exceeded for {symbol}. Skipping.")

    return {}


def fetch_pool_details(token_pair, session: requests.Session):
    # Fetch details of the specified liquidity pool
    for attempt in range(max_retries):
        url = get_node()
        if not url:
            print("Error: No node URL available, exiting fetch_pool_details.")
            return {}

        payload = {
            "jsonrpc": "2.0",
            "id": 1,
            "method": "find",
            "params": {
                "contract": "marketpools",
                "table": "pools",
                "query": {"tokenPair": token_pair},
                "limit": 1
            }
        }

        # print(f"Attempt {attempt + 1} to fetch pool details for {token_pair} from {url}...") # Debugging statement

        try:
            request = requests.Request("POST", url=url, json=payload).prepare()
            response = session.send(request, timeout=10, allow_redirects=False)
            print(f"Received response status code: {response.status_code} for {token_pair} from {url}")

            if response.status_code == 200:
                try:
                    data = response.json()
                    # print(f"Data received for {token_pair}: {data}") # Debugging the received data
                    if 'result' in data and data['result']:
                        print(f"Successfully fetched pool details for {token_pair}")
                        return data['result'][0]
                    else:
                        print(f"Unexpected response format or empty result for {token_pair} from {url}: {data}")
                except ValueError as e:
                    print("Error: Failed to parse JSON response.")
                    print(f"Response content: {response.text}") # Print the actual response content
            else:
                print(f"Error: Failed to fetch pool details for {token_pair}. Status Code: {response.status_code}")
        except requests.exceptions.RequestException as e:
            print(f"Request exception occurred for {token_pair} from {url}: {e}")

        # Handle retries
        if attempt < max_retries - 1:
            print(f"Retrying after {retry_delay} seconds...")
            sleep(retry_delay)
        else:
            print(f"Max retries exceeded for {token_pair}. Skipping to next.")

    print(f"Returning empty details for {token_pair} after all attempts.")
    return {}


def fetch_liquidity_positions(account_name, session: requests.Session):
    # Fetch liquidity positions for the given account
    url = get_node()
    if not url:
        return {}

    payload = {
        "jsonrpc": "2.0",
        "id": 1,
        "method": "find",
        "params": {
            "contract": "marketpools",
            "table": "liquidityPositions",
            "query": {"account": account_name},
            "limit": 1000
        }
    }

    request = requests.Request("POST", url=url, json=payload).prepare()
    response = session.send(request, allow_redirects=False)

    # print("Response Status Code: ", response.status_code)
    # print("Response Content: ", response.text) # Debug

    if response.status_code != 200:
        print(f"Error: Failed to fetch data. Status Code: {response.status_code}")
        return []

    try:
        data = response.json()
    except ValueError:
        print("Error: Failed to parse JSON response.")
        return []

    return data.get('result', [])


def get_filtered_pools(account_name, filter_token):
    with requests.Session() as session:
        # Get and filter pools by the specified token
        positions = fetch_liquidity_positions(account_name, session)

        # Debug: Check fetched positions
        print(f"Fetched {len(positions)} liquidity positions for account {account_name}.")

        if not positions:
            print("No liquidity positions found for this account.")
            return []

        filtered_pools = []

        for position in positions:
            token_pair = position.get('tokenPair', 'Unknown')

            # Debug: Print each position being processed
            print(f"Processing position: {position}")

            # If filter_token is 'ALL', skip filtering; otherwise, check for the token in the pair
            if filter_token.upper() != 'ALL' and filter_token.upper() not in token_pair.upper():
                print(f"Skipping position {token_pair} as it does not match filter token {filter_token.upper()}")
                continue

            # Additional debug to see which positions pass the filter
            print(f"Including position {token_pair} with filter token {filter_token.upper()}")

            # Fetch balances and calculate user share
            shares = float(position.get('shares', '0'))
            pool_details = fetch_pool_details(token_pair, session)
            if not pool_details:
                continue

            total_shares = float(pool_details.get('totalShares', '0'))
            base_quantity = float(pool_details.get('baseQuantity', '0'))
            quote_quantity = float(pool_details.get('quoteQuantity', '0'))

            if total_shares == 0:
                print(f"Skipping position {token_pair} due to total shares being 0.")
                continue

            # Calculate user balances
            user_base_balance = (shares / total_shares) * base_quantity
            user_quote_balance = (shares / total_shares) * quote_quantity

            if ':' in token_pair:
                base_symbol, quote_symbol = token_pair.split(':')
            else:
                base_symbol, quote_symbol = "Unknown", "Unknown"

            # Fetch token details to get precision
            base_token_details = fetch_token_details(base_symbol, session)
            quote_token_details = fetch_token_details(quote_symbol, session)
            base_precision = base_token_details.get('precision', 0)
            quote_precision = quote_token_details.get('precision', 0)

            filtered_pools.append({
                "token_pair": token_pair,
                "base_symbol": base_symbol,
                "quote_symbol": quote_symbol,
                "base_balance": user_base_balance,
                "quote_balance": user_quote_balance,
                "base_precision": base_precision,
                "quote_precision": quote_precision
            })

        # Debug: Print the number of filtered pools
        # print(f"Number of filtered pools: {len(filtered_pools)}")

        return filtered_pools


def main(account_name, filter_token):
    start = time()
    # Load nodes from the external file
    load_nodes()

    # Load cached token details
    load_token_cache()

    # Fetch and print filtered pools
    pools = get_filtered_pools(account_name, filter_token)
    if pools:
        print(f"\nLiquidity Pool Positions with {filter_token.upper()} token:")
    for pool in pools:
        print(f"Token Pair: {pool['token_pair']} | Base Balance: {pool['base_balance']:.{pool['base_precision']}f} {pool['base_symbol']} | "
              f"Quote Balance: {pool['quote_balance']:.{pool['quote_precision']}f} {pool['quote_symbol']}")

    # Debug: If no pools were printed
    # if not pools:
    #    print("No matching liquidity pools found for the given filter.")
    end = time()
    elapsed_time = end - start
    print(f"Fetching pools required {elapsed_time} seconds")

if __name__ == "__main__":
    parser = argparse.ArgumentParser(description='Fetch Hive-Engine liquidity pools.')
    parser.add_argument('account_name', nargs='?', default=DEFAULT_ACCOUNT_NAME, help='Hive account name to fetch liquidity pools for.')
    parser.add_argument('filter_token', nargs='?', default=DEFAULT_FILTER_TOKEN, help="Token to filter by, or 'ALL' to list all tokens.")

    args = parser.parse_args()

    main(args.account_name, args.filter_token)

0
0
0.000
avatar
(Edited)

Hmm... interesting. It appears I didn't know how to use the session that I opened in 'main', and only used the requests.post() function instead of session.post(), this is what made my script slower. So my script still used the old way of fetching data, while it had opened a session for faster communication. I timed my old script with your enhancements:

➜ time python arc7ic_fetch.py spswhale ALL
...
Fetching pools required 10.768758773803711 seconds

real    0m10.913s
user    0m0.196s
sys 0m0.017s

My current script, that I spruced up with your enhancements:

➜ time python fetch_liquidity_pools-add_session.py spswhale ALL
...
real    0m0.336s
user    0m0.150s
sys 0m0.018s

Oh, I notice you had added a timer inside the main loop, nice touch. I am lazy and didn't bother, since I could 'time' it in the terminal. But yep, there were some repeating reuquests in the original script that made it way slower. Here's the current one:

# fetch_liquidity_pools.py
import json
import os
import argparse
import requests

# from time import sleep
import time
from random import choice  # To randomly choose a node from the list

# Hive-Engine API Node
# HIVE_ENGINE_NODE = 'https://api2.hive-engine.com/rpc/contracts'
NODES_FILE = "nodes.json"
retry_delay = 5  # seconds to wait between retries
max_retries = 3  # Maximum number of retries

# Default values
DEFAULT_ACCOUNT_NAME = "hive-engine"  # Replace with your actual Hive account name
DEFAULT_FILTER_TOKEN = (
    "BTC"  # Replace with the desired token to filter, or use 'ALL' to list all tokens
)

# File to store token details with precision
TOKEN_CACHE_FILE = "token_details_cache.json"
cached_token_details = {}
hive_engine_nodes = []


def load_nodes():
    global hive_engine_nodes
    # Check if the nodes file exists
    if os.path.exists(NODES_FILE):
        try:
            with open(NODES_FILE, "r") as f:
                hive_engine_nodes = json.load(f)
                print("Loaded Hive-Engine nodes from file.")
        except (ValueError, IOError):
            print(
                "Error: Hive-Engine nodes file is corrupted or not readable. Please re-create 'nodes.json' with the list of nodes."
            )
    else:
        print(
            "Error: Hive-Engine nodes file not found. Please create 'nodes.json' with the list of nodes."
        )
        hive_engine_nodes = []  # Ensure nodes list is empty on error


def get_node():
    # Choose a random node from the list
    if hive_engine_nodes:
        selected_node = choice(hive_engine_nodes)
        print(f"Using Hive-Engine node: {selected_node}")  # Print the current node
        return selected_node
    else:
        print("Error: No Hive-Engine nodes available.")
        return None


def load_token_cache():
    global cached_token_details
    # Check if the token cache file exists
    if os.path.exists(TOKEN_CACHE_FILE):
        try:
            with open(TOKEN_CACHE_FILE, "r") as f:
                cached_token_details = json.load(f)
                print("Loaded cached token details from file.")
        except (ValueError, IOError):
            print(
                "Error: Failed to load token cache file. Starting with an empty cache."
            )


def save_token_cache():
    # Save the current token details cache to a file
    try:
        with open(TOKEN_CACHE_FILE, "w") as f:
            json.dump(cached_token_details, f)
            print("Saved token details to cache file.")
    except IOError:
        print("Error: Failed to save token cache file.")


def fetch_token_details(symbol, session):
    # Check if token details are already cached
    if symbol in cached_token_details:
        # print(f"Token details for {symbol} found in cache.")
        return cached_token_details[symbol]

    print(f"Fetching token details for {symbol}...")
    # Fetch token details for the given symbol
    for attempt in range(max_retries):
        url = get_node()
        if not url:
            return {}

        payload = {
            "jsonrpc": "2.0",
            "id": 1,
            "method": "find",
            "params": {
                "contract": "tokens",
                "table": "tokens",
                "query": {"symbol": symbol},
                "limit": 1,
            },
        }

        response = session.post(url, json=payload)

        # print(f"Attempt {attempt+1}: Status Code: {response.status_code}, Response: {response.text}")

        if response.status_code == 200:
            try:
                data = response.json()
            except ValueError:
                print("Error: Failed to parse JSON response.")
                return {}

            if "result" in data and data["result"]:
                cached_token_details[symbol] = data["result"][
                    0
                ]  # Cache the token details
                save_token_cache()  # Save cache after updating
                return data["result"][0]

        print(
            f"Error: Failed to fetch token details for {symbol}. Status Code: {response.status_code}"
        )
        if attempt < max_retries - 1:
            time.sleep(retry_delay)
        else:
            print(f"Max retries exceeded for {symbol}. Skipping.")

    return {}


def fetch_pool_details(token_pair):
    # Fetch details of the specified liquidity pool
    for attempt in range(max_retries):
        url = get_node()
        if not url:
            print("Error: No node URL available, exiting fetch_pool_details.")
            return {}

        payload = {
            "jsonrpc": "2.0",
            "id": 1,
            "method": "find",
            "params": {
                "contract": "marketpools",
                "table": "pools",
                "query": {"tokenPair": token_pair},
                "limit": 1,
            },
        }

        print(
            f"Attempt {attempt + 1} to fetch pool details for {token_pair} from {url}..."
        )  # Debugging statement

        try:
            response = session.post(
                url, json=payload, timeout=10
            )  # Set a timeout for the request
            # print(
            #     f"Received response status code: {response.status_code} for {token_pair} from {url}"
            # )

            if response.status_code == 200:
                try:
                    data = response.json()
                    print(
                        f"Data received for {token_pair}: {data}"
                    )  # Debugging the received data
                    if "result" in data and data["result"]:
                        print(f"Successfully fetched pool details for {token_pair}")
                        return data["result"][0]
                    else:
                        print(
                            f"Unexpected response format or empty result for {token_pair} from {url}: {data}"
                        )
                except ValueError as e:
                    print(f"Error: Failed to parse JSON response: {e}.")
                    # print(f"Response content: {response.text}") # Print the actual response content
            else:
                print(
                    f"Error: Failed to fetch pool details for {token_pair}. Status Code: {response.status_code}"
                )
        except requests.exceptions.RequestException as e:
            print(f"Request exception occurred for {token_pair} from {url}: {e}")

        # Handle retries
        if attempt < max_retries - 1:
            print(f"Retrying after {retry_delay} seconds...")
            time.sleep(retry_delay)
        else:
            print(f"Max retries exceeded for {token_pair}. Skipping to next.")

    print(f"Returning empty details for {token_pair} after all attempts.")
    return {}


def fetch_all_pools(session):
    url = get_node()
    if not url:
        return []

    payload = {
        "jsonrpc": "2.0",
        "id": 1,
        "method": "find",
        "params": {
            "contract": "marketpools",
            "table": "pools",
            "query": {},
            "limit": 1000,  # Adjust limit based on how many pools exist
        },
    }

    try:
        response = session.post(url, json=payload, timeout=10)
        if response.status_code == 200:
            data = response.json()
            return data.get("result", [])
        else:
            print(
                f"Error: Failed to fetch all pools. Status Code: {response.status_code}"
            )
            return []
    except requests.exceptions.RequestException as e:
        print(f"An error occurred: {e}")
        return []


def fetch_liquidity_positions(account_name, retries=5, backoff_factor=5):
    # Fetch liquidity positions for the given account
    url = get_node()
    if not url:
        return {}

    payload = {
        "jsonrpc": "2.0",
        "id": 1,
        "method": "find",
        "params": {
            "contract": "marketpools",
            "table": "liquidityPositions",
            "query": {"account": account_name},
            "limit": 1000,
        },
    }

    for attempt in range(retries):
        try:
            response = session.post(url, json=payload, timeout=10)
            # print("Response Status Code: ", response.status_code)

            # Print the entire raw response text for debugging purposes
            # print("Raw response text: ", response.text)

            if response.status_code == 200:
                try:
                    data = response.json()
                    return data.get("result", [])
                except ValueError:
                    print("Error: Failed to parse JSON response.")
                    return []
            else:
                print(
                    f"Error: Failed to fetch data. Status Code: {response.status_code}"
                )
                return []

        except requests.exceptions.ConnectionError as e:
            print(
                f"Attempt {attempt + 1}: Connection error: {e}, retrying in {backoff_factor} seconds..."
            )
            time.sleep(backoff_factor)
        except requests.exceptions.Timeout as e:
            print(
                f"Attempt {attempt + 1}: Request timed out: {e}, retrying in {backoff_factor} seconds..."
            )
            time.sleep(backoff_factor)
        except requests.exceptions.RequestException as e:
            print(f"Attempt {attempt + 1}: An error occurred: {e}")
            return []

    print(f"Max retries exceeded. Could not fetch data for account: {account_name}")
    return []


def get_filtered_pools(account_name, filter_token, session):
    # Fetch all pools in one go
    all_pools = fetch_all_pools(session)

    # Check if pools were fetched succesfully
    if not all_pools:
        print("Error: Failed to fetch all pools.")
        return []

    pool_dict = {pool["tokenPair"]: pool for pool in all_pools}

    # Get and filter pools by the specified token
    positions = fetch_liquidity_positions(account_name)

    # Debug: Check fetched positions
    print(f"Fetched {len(positions)} liquidity positions for account {account_name}.")
    # print("Test print of all the fetched positions:")
    # print(json.dumps(positions, indent=4))  # Pretty-print the positions

    if not positions:
        print("No liquidity positions found for this account.")
        return []

    filtered_pools = []

    for position in positions:
        token_pair = position.get("tokenPair", "Unknown")

        # Debug: Print each position being processed
        # print(f"Processing position: {position}")

        # If filter_token is 'ALL', skip filtering; otherwise, check for the token in the pair
        if (
            filter_token.upper() != "ALL"
            and filter_token.upper() not in token_pair.upper()
        ):
            # print(f"Skipping position {token_pair} as it does not match filter token {filter_token.upper()}")
            continue

        # Additional debug to see which positions pass the filter
        # print(
        #    f"Including position {token_pair} with filter token {filter_token.upper()}"
        # )

        # Fetch the pool details from the all_pools dictionary
        pool_details = pool_dict.get(token_pair)
        if not pool_details:
            print(f"Warning: No pool details found for {token_pair}")
            continue

        # Calculate user balances
        shares = float(position.get("shares", "0"))
        base_quantity = float(pool_details.get("baseQuantity", "0"))
        quote_quantity = float(pool_details.get("quoteQuantity", "0"))
        total_shares = float(pool_details.get("totalShares", "0"))

        if total_shares == 0:
            print(f"Skipping position {token_pair} due to total shares being 0.")
            continue

        # Calculate user balances
        user_base_balance = (shares / total_shares) * base_quantity
        user_quote_balance = (shares / total_shares) * quote_quantity

        if ":" in token_pair:
            base_symbol, quote_symbol = token_pair.split(":")
        else:
            base_symbol, quote_symbol = "Unknown", "Unknown"

        # Fetch token details to get precision
        base_token_details = fetch_token_details(base_symbol, session)
        quote_token_details = fetch_token_details(quote_symbol, session)
        base_precision = base_token_details.get("precision", 0)
        quote_precision = quote_token_details.get("precision", 0)

        filtered_pools.append(
            {
                "token_pair": token_pair,
                "base_symbol": base_symbol,
                "quote_symbol": quote_symbol,
                "base_balance": user_base_balance,
                "quote_balance": user_quote_balance,
                "base_precision": base_precision,
                "quote_precision": quote_precision,
            }
        )

    # Debug: Print the number of filtered pools
    print(f"Number of filtered pools: {len(filtered_pools)}")

    return filtered_pools


def main(account_name, filter_token):
    # Load nodes from the external file
    load_nodes()

    # Load cached token details
    load_token_cache()

    # Create a session object
    with requests.Session() as session:
        # Fetch and print filtered pools
        pools = get_filtered_pools(account_name, filter_token, session)
        print(f"\nLiquidity Pool Positions with {filter_token.upper()} token:")
        for pool in pools:
            print(
                f"Token Pair: {pool['token_pair']} | Base Balance: {pool['base_balance']:.{pool['base_precision']}f} {pool['base_symbol']} | "
                f"Quote Balance: {pool['quote_balance']:.{pool['quote_precision']}f} {pool['quote_symbol']}"
            )

        # Debug: If no pools were printed
        if not pools:
            print("No matching liquidity pools found for the given filter.")


if __name__ == "__main__":
    # When run as a standalone script
    session = requests.Session()
    try:
        parser = argparse.ArgumentParser(
            description="Fetch Hive-Engine liquidity pools."
        )
        parser.add_argument(
            "account_name",
            nargs="?",
            default=DEFAULT_ACCOUNT_NAME,
            help="Hive account name to fetch liquidity pools for.",
        )
        parser.add_argument(
            "filter_token",
            nargs="?",
            default=DEFAULT_FILTER_TOKEN,
            help="Token to filter by, or 'ALL' to list all tokens.",
        )

        args = parser.parse_args()

        main(args.account_name, args.filter_token)
    finally:
        session.close()

Edit: I noticed that I still hadn't added the 'session' in the arguments of the functions. So I did, and updated the script here to reflect that. The problem is, that now it's slower again. The 'broken' script was a tenth of a second faster, but this one is correctly establishing the session. According to ChatGPT, I might as well not use 'session' anymore, since I only make one request. Session only makes things faster if you do multiple calls to the API. But anyway, it was a fun experiment.

Oh, and I forgot to comment about the main script I'm working on. It's actually a balancer for liquidity pairs. I might write about it as soon as I iron out the wrinkles.

0
0
0.000
avatar

I am lazy and didn't bother, since I could 'time' it in the terminal.

Well, I had no idea you could do that 😅 what you can do through terminal is still a bit oscure to me, because I'm not used to it, so I have no idea of its flexibility and power!

My current script, that I spruced up with your enhancements:

Wow, this one is crazy fast! Now I'm going to read your new post :)

Session only makes things faster if you do multiple calls to the API

It's the only thing I understand of this "Session" stuff 😂 because I still have to figure what parameters/values it keeps and if it can be used whatever I make more calls to an API or only if the calls are somehow linked or else... I have no idea tbh ahahah

0
0
0.000
avatar

Heh, it wasn't immediately clear for me either, although I had an inkling. Here's how I eventually understood the concept myself: The idea with sessions is that when you use one, you're keeping the connection to the server open, so you don't need to create a new connection for each API call. It’s like having an uninterrupted "conversation" with the server, rather than hanging up and redialing for every single message. This can save time and resources, especially when you’re making multiple API calls within the same session.

!PIZZA

0
0
0.000
avatar

Hey felixxx,

I think you might have misunderstood the meme. It’s showing that both people on the low and high ends of the spectrum often stick with simple techniques, like adding print statements, because they actually work. The middle of the curve, though, is where people tend to overcomplicate things by going through function descriptions or stepping through debuggers — which sometimes doesn't reveal the root of the problem. Print statements, though, can give you a direct, clear view of what’s actually going on in the code.

But beyond just print statements, error handling ensures that your script doesn't crash when something goes wrong — like an API failing or invalid data sneaking in. Without it, you're left in the dark, which is why it’s far from being 'lame.' It's what gives you control and the ability to recover when things don't go as expected.

As for infinite loops — they aren’t bad by nature. When you're working with something like continuously reading blockchain data, they can be the best option. The key is to manage them properly with safeguards like rate limiting and exit conditions, so they don’t end up causing performance issues. It’s all about using the right tool for the job.

Looking forward to seeing more of your insights in your upcoming blog!

0
0
0.000