[ENG/ITA] Python & Hive: My Scripts are Ready! My First Project is Completed :)

avatar

cover


La versione italiana si trova sotto quella inglese

The italian version is under the english one


Python & Hive: My Scripts are Ready! My First Project is Completed :)

After almost 30 days, my project is finally finished!

Both scripts I was working on are ready, polished and improved thanks to the precious advice I got from @felixxx, whose knowledge has been crucial to re-write most of the original code I had originally created.

Thanks to his help, I have greatly reduced the use of the BEEM library, which is no longer supported and therefore is no longer very reliable, at least in its more complex functions.

The first of the two scripts I created, i.e. the one that takes care of reading the information from the chain and identifying what I am interested in - posts written in Italian, with a specific tag and with a minimum length -, has at its heart the custom client written by @felixxx, which I have slightly adapted to my needs.

The second script - whose task is to comment and/or upvote the selected posts, as well as to publish a summary post - still relies on BEEM, but now uses only the Hive module to sign the transactions, whereas before it used 3/4 of them... a big difference that makes the code not only much more readable, but also more robust, because there are fewer places where something could go wrong.


And now?

Now that the scripts are ready, I can consider my project completed, at least in terms of ‘core’ features.

It would be possible to go even further and create, for example, a small interface that allows one to interact with it without having any programming knowledge.

The code itself could be tweaked to suit the needs of a real user.

In short, if I wanted to, there would still be many things I could do, but since the code will presumably never be used by anyone other than myself, and also considering that I can work on it only occasionally and so my brain is a bit melted from all of this ‘start-stop-start-tru remember what I was doing-I have no idea-stop-start again and so on’😂 , I guess I'd say it's finished, for now.

Also in terms of testing I must admit that I haven't done much - I don't know how to start or connect to a testnet, so every time I have to upvote, comment and publish random posts, which isn't great - but it all seems to be working and this is already a huge win for me!


Learn, learn, learn!

Working on this project has taught me a lot, even though my code is definitely very low-level, so a real developer will look at it and feel disgusted and think I have just murdered Python... but you have to start somewhere, and creating something gives me much, much more gratification than repeating exhausting exercises with no real use cases.

Looking through Hive I am also starting to get to know users who are much more experienced than I am, whose scripts and suggestions are invaluable to me and give me hints to improve my code or ideas to attempt to create something new.

The end of this small project is therefore only the beginning of a new challenge :)


Finally, here are both scripts!

FIRST SCRIPT


import requests
import time
import datetime
import csv
import os
import re
import json
import markdown
from bs4 import BeautifulSoup
from langdetect import detect_langs, LangDetectException as lang_e
import logging
from logging.handlers import TimedRotatingFileHandler

# logger
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)

handler = TimedRotatingFileHandler("main.log", when="D", interval=1)
formatter = logging.Formatter("%(asctime)s - %(levelname)s - %(message)s")
handler.setFormatter(formatter)

logger.addHandler(handler)


# Check it target language is among the top languages
def text_language(text):
    try:
        languages = detect_langs(text)
    except lang_e:
        logger.error(f"Language error: {lang_e}")
        return False, 0

    num_languages = len(languages)
    languages_sorted = sorted(languages, key=lambda x: x.prob, reverse=True)
    top_languages = (
        languages_sorted[:2] if len(languages_sorted) > 1 else languages_sorted
    )

    contains_target_lang = any(lang.lang == "it" for lang in top_languages)
    return contains_target_lang, num_languages


# Convert text from markdown to HTML and count words
def convert_and_count_words(md_text):
    html = markdown.markdown(md_text)

    soup = BeautifulSoup(html, "html.parser")
    text = soup.get_text()

    words = re.findall(r"\b\w+\b", text)
    return len(words)


# Send request to HIVE API and return response
def get_response(data, url, session: requests.Session):
    request = requests.Request("POST", url=url, data=data).prepare()
    response = session.send(request, allow_redirects=False)
    return response


# Get properties and find last block
def get_properties(url, session: requests.Session):
    data = '{"jsonrpc":"2.0", "method":"database_api.get_dynamic_global_properties", "id":1}'
    response = get_response(data, url, session)
    properties = response.json()["result"]
    return properties


def get_ops_in_block(num, url, session: requests.Session):
    data = f'{{"jsonrpc":"2.0", "method":"condenser_api.get_ops_in_block", "params":[{num},false], "id":1}}'
    response = get_response(data, url, session)
    ops_in_block = response.json()["result"]
    return ops_in_block


def get_post(ops):
    comment_list = []
    for op in ops:
        if (
            op["op"][0] == "comment" and op["op"][1]["parent_author"] == ""
        ):  # Posts, not comments

            try:
                json_metadata = json.loads(op["op"][1]["json_metadata"])
            except (json.JSONDecodeError, KeyError) as e:
                logger.error(f"JSON decode error or missing key: {e}")
                continue

            # Check if there's the tag we are looking for
            if "ita" not in json_metadata.get("tags", []):
                continue

            # Check post language
            valid_language, lang_num = text_language(op["op"][1]["body"])

            if valid_language == False:
                continue

            # Check post length
            word_count = convert_and_count_words(op["op"][1]["body"])

            if (lang_num == 1 and word_count < 400) or (
                lang_num > 1 and word_count < 800
            ):
                continue

            author = op["op"][1]["author"]
            permlink = op["op"][1]["permlink"]
            link = f"https://peakd.com/@{author}/{permlink}"
            comment_list.append(link)
            logger.info(f"Found eligible post: {link}")
    return comment_list


def load_last_block():
    if os.path.exists("last_block.txt"):
        with open("last_block.txt", "r") as file:
            return int(file.read())
    return None


def save_last_block(block_num):
    with open("last_block.txt", "w") as file:
        file.write(str(block_num))


# return the eligible posts in a list
def get_post_list(url):
    with requests.Session() as session:
        last_hive_block_num = get_properties(url, session)[
            "last_irreversible_block_num"
        ]
        last_block_num = load_last_block()
        if last_block_num is None:
            last_block_num = last_hive_block_num
        if int(last_block_num) == int(last_hive_block_num):
            time.sleep(60)  # always stay behind the last Hive block
        ops = get_ops_in_block(last_block_num, url, session)
        post_list = get_post(ops)
        save_last_block(int(last_block_num) + 1)
        return post_list


# Get date and generate csv file name
def get_filename():
    current_date = datetime.datetime.now().strftime("%Y-%m-%d")
    return f"urls_{current_date}.csv"


def main():
    url = "https://api.deathwing.me"

    filename = get_filename()
    last_filename = filename
    i = 1

    if not os.path.exists(filename):
        with open(filename, "w", newline="", encoding="utf-8") as csvfile:
            writer = csv.writer(csvfile)
            writer.writerow(["ID", "URL", "Upvote_Value"])

    logger.info(f"Current file: {filename}")

    while True:
        post_list = get_post_list(url)

        current_filename = get_filename()

        if current_filename != last_filename:
            filename = current_filename
            last_filename = current_filename
            i = 1  # Reset counter
            logger.info(f"Started writing to a new file: {filename}")

        # Add posts to the current file
        with open(filename, "a", newline="", encoding="utf-8") as csvfile:
            writer = csv.writer(csvfile)
            for post in post_list:
                writer.writerow([i, post, ""])
                i += 1

        post_list.clear()


if __name__ == "__main__":

    main()


SECOND SCRIPT


#!/usr/bin/env python3
"""A script to upvote and comment posts from a .csv list"""
import os
import shutil
import jinja2
import configparser
import time
import re
import logging
from logging.handlers import TimedRotatingFileHandler
import pandas as pd
from beem import Hive, exceptions as beem_e
from beemapi import exceptions as beemapi_e


# Global configuration
config = configparser.ConfigParser()
config.read("config")

ENABLE_COMMENTS = config["Global"]["ENABLE_COMMENTS"] == "True"
ENABLE_UPVOTES = config["Global"]["ENABLE_UPVOTES"] == "True"
ENABLE_POST = config["Global"]["ENABLE_POST"] == "True"

ACCOUNT_NAME = config["Global"]["ACCOUNT_NAME"]
ACCOUNT_POSTING_KEY = config["Global"]["ACCOUNT_POSTING_KEY"]
HIVE_API_NODE = config["Global"]["HIVE_API_NODE"]
hive = Hive(node=[HIVE_API_NODE], keys=[config["Global"]["ACCOUNT_POSTING_KEY"]])

# Logger
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)

handler = TimedRotatingFileHandler("main.log", when="D", interval=1)
formatter = logging.Formatter("%(asctime)s - %(levelname)s - %(message)s")
handler.setFormatter(formatter)

logger.addHandler(handler)

logger.info("Configuration loaded:")
for section in config.keys():
    for key in config[section].keys():
        if "_key" in key:
            continue  # don't log posting keys
        logger.info(f"{section}, {key}, {config[section][key]}")


# Markdown template for comment
comment_curation_template = jinja2.Template(
    open(os.path.join("template", "comment_curation.template"), "r").read()
)
post_template = jinja2.Template(
    open(os.path.join("template", "post.template"), "r").read()
)


def give_upvote(vote_weight, voter, authorperm):
    print(f"Upvoting with weight {vote_weight}!")
    try:
        hive.vote(weight=vote_weight, account=voter, identifier=authorperm)
    except beem_e.VotingInvalidOnArchivedPost:
        logger.error("Post is too old to be upvoted")
    except beemapi_e.UnhandledRPCError:
        logger.error("Vote changed too many times")
    time.sleep(3)  # sleep 3s


def post_comment(author, replier, authorperm):
    print("Commenting!")
    comment_body = comment_curation_template.render(
        target_account=author, replier=replier
    )
    hive.post(
        title="",
        body=comment_body,
        author=replier,
        reply_identifier=authorperm,
    )
    time.sleep(3)  # sleep 3s


def post(posts):
    title = TITLE
    post_body = post_template.render(author=ACCOUNT_NAME, posts=posts)
    author = ACCOUNT_NAME
    tags = TAGS
    category = CATEGORY

    hive.post(
        title=title,
        body=post_body,
        author=author,
        permlink=None,
        json_metadata=json.dumps({"app": "leothreads/0.3", "tags": tags}),
        category=category,
    )

    logger.info(f"Post published with title: {title}")


def process_file(file_to_process):
    try:
        df = pd.read_csv(file_to_process)

        posts = []

        for _, row in df.iterrows():
            url = row["URL"]
            vote_weight = row["Upvote_Value"]
            print(f"Work in progress on {url}...")

            if pd.isna(vote_weight):
                print(f"No upvote value for {url}, skipping...")
                continue

            try:
                vote_weight = int(vote_weight)
            except ValueError:
                print(f"Invalid vote weight: {vote_weight}")
                continue

            if (vote_weight < 1) or (vote_weight > 100):
                print(f"Invalid vote weight: {vote_weight}%")
                continue

            # data of the post to be upvoted and/or replied
            permlink = re.search(r".+@([\w.-]+)/([\w-]+)", url)
            if not permlink:
                logger.error(f"Invalid URL format: {url}")
                continue
            post_author = permlink.group(1)
            permlink = permlink.group(2)
            post_url = f"{post_author}/{permlink}"
            logger.info(f"{post_author} is getting a {vote_weight}% upvote!")

            posts.append(
                {
                    "author": post_author,
                    "upvote_value": vote_weight,
                    "post_link": post_url,
                }
            )

            # leave an upvote
            if ENABLE_UPVOTES:
                give_upvote(vote_weight, ACCOUNT_NAME, post_url)
            else:
                print("Upvoting is disabled")

            # leave a comment
            if ENABLE_COMMENTS:
                post_comment(author, ACCOUNT_NAME, post_url)
            else:
                print("Posting is disabled")

    except pd.errors.EmptyDataError:
        logger.error(f"File {file_to_process} is empty. Skipping...")

    finally:
        # Once done, move file in the directory "posts_done"
        directory_done = "posts_done"
        destination = os.path.join(directory_done, os.path.basename(file_to_process))
        shutil.move(file_to_process, destination)
        logger.info(
            f"File {os.path.basename(file_to_process)} moved to '{directory_done}' directory."
        )
        return posts


def main():

    directory_to_do = "posts_to_do"

    file_to_process = None

    for filename in os.listdir(directory_to_do):
        if filename.endswith(".csv"):  # Only look for csv files
            file_to_process = os.path.join(directory_to_do, filename)
            break  # One file at a time

    if file_to_process:
        posts = process_file(file_to_process)
    else:
        logger.info("No files found in the 'urls_to_do' directory.")

    if posts and ENABLE_POST:
        post(posts)


if __name__ == "__main__":

    main()


I tag @gamer00 because he was curious to see what I was working on... and so now he can be horrified 🤣

Someday I'll be able to write decent, neat and readable code... but this is not the day! ahahah

I'm also tagging @slobberchops because he encouraged me to move away from BEEM, thus giving me the initial push I needed to commit to better study how Hive API works.

@felixxx I have already tagged him several times... but I'm tagging him once more because the heart of my project rests on his work, and I feel it's right to repeat it :)


cover made with Bing AI and edited with GIMP

to support the #OliodiBalena community, @balaenoptera is 3% beneficiary of this post


If you've read this far, thank you! If you want to leave an upvote, a reblog, a follow, a comment... well, any sign of life is really much appreciated!


Versione italiana

Italian version


cover

Python & Hive: i Miei Scripts sono Pronti! Il Mio Primo Progetto è Completo :)

Dopo quasi 30 giorni finalmente il mio progetto può considerarsi terminato!

Entrambi gli scripts su cui stavo lavorando sono pronti, rifiniti e migliorati grazie ai preziosi insegnamenti ricevuti da @felixxx, le cui conoscenze sono state fondamentali per riscrivere buona parte del codice originario che avevo creato.

Grazie al suo aiuto ho ridotto tantissimo l'utilizzo della libreria BEEM, non più supportata e quindi sempre meno affidabile, almeno nelle sue funzioni più complesse.

Il primo dei due scripts che ho creato, ossia quello che si occupa di leggere le informazioni presenti sulla chain ed individuare ciò che mi interessa - posts scritti in lingua italiana, dotati di un tag specifico ed aventi una lunghezza minima -, ha al suo cuore il client custom scritto da @felixxx, che ho leggermente adattato alle mie esigenze.

Il secondo script - il cui compito consiste nel commentare e/o upvotare i posts selezionati, nonchè pubblicare un post di riepilogo - si appoggia invece ancora a BEEM, ma adesso utilizza solamente il modulo Hive per firmare le transazioni, mentre prima ne utilizzava 3/4... una bella differenza che rende il codice oltre che molto più leggibile, anche più robusto, perchè sono meno i punti in cui qualcosa potrebbe andare storto.


E adesso?

Adesso che gli scripts sono pronti posso considerare concluso il mio progetto, almeno dal punto di vista delle funzionalità "core".

Volendo sarebbe possibile fare ancora di più e creare, ad esempio, una piccola interfaccia che consenta di interagire con lo stesso anche senza avere nessuna conoscenza in ambito di programmazione.

Il codice stesso potrebbe essere ritoccato per adeguarsi alle esigenze di un reale utilizzatore.

Insomma, volendo le cose che potrei fare sarebbero ancora tante ma, dato che presumibilmente il codice non sarà mai utilizzato da nessuno di diverso da me, e considerato anche che dovendoci lavorare quando posso ho ormai il cervello un po' fuso e comincio a non poterlo vedere più 😂 , per ora direi di considerarlo terminato.

Anche a livello di test ammetto di non averne fatti di particolarmente approfonditi - non so come avviare o connettermi ad una testnet, per cui ogni volta devo uvpotare, commentare e pubblicare post a caso, il che non è il massimo - ma di base mi sembra tutto funzionante e questo per me è già un'enorme vittoria!


Imparare, imparare, imparare!

Lavorare a questo progetto mi ha fatto imparare tanto, anche se il mio codice è sicuramente di livello bassissimo, per cui un vero sviluppatore guardandolo avrà il voltastomaco e penserà che ho appena assassinato Python... ma da qualche parte bisogna pur iniziare e creare qualcosa mi dà molta, molta più soddisfazione che ripetere allo sfinimento esercizi completamente fini a se stessi.

Cercando su Hive sto poi cominciando a conoscere utenti molto più esperti di me, i cui scripts e suggerimenti sono per me preziosissimi e mi forniscono spunti per migliorare il mio codice o idee per provare a creare qualcosa di nuovo.

La fine di questo piccolo progetto rappresenta perciò solamente l'inizio di una nuova sfida :)


Per concludere, ecco entrambi gli scripts!

PRIMO SCRIPT


import requests
import time
import datetime
import csv
import os
import re
import json
import markdown
from bs4 import BeautifulSoup
from langdetect import detect_langs, LangDetectException as lang_e
import logging
from logging.handlers import TimedRotatingFileHandler

# logger
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)

handler = TimedRotatingFileHandler("main.log", when="D", interval=1)
formatter = logging.Formatter("%(asctime)s - %(levelname)s - %(message)s")
handler.setFormatter(formatter)

logger.addHandler(handler)


# Check it target language is among the top languages
def text_language(text):
    try:
        languages = detect_langs(text)
    except lang_e:
        logger.error(f"Language error: {lang_e}")
        return False, 0

    num_languages = len(languages)
    languages_sorted = sorted(languages, key=lambda x: x.prob, reverse=True)
    top_languages = (
        languages_sorted[:2] if len(languages_sorted) > 1 else languages_sorted
    )

    contains_target_lang = any(lang.lang == "it" for lang in top_languages)
    return contains_target_lang, num_languages


# Convert text from markdown to HTML and count words
def convert_and_count_words(md_text):
    html = markdown.markdown(md_text)

    soup = BeautifulSoup(html, "html.parser")
    text = soup.get_text()

    words = re.findall(r"\b\w+\b", text)
    return len(words)


# Send request to HIVE API and return response
def get_response(data, url, session: requests.Session):
    request = requests.Request("POST", url=url, data=data).prepare()
    response = session.send(request, allow_redirects=False)
    return response


# Get properties and find last block
def get_properties(url, session: requests.Session):
    data = '{"jsonrpc":"2.0", "method":"database_api.get_dynamic_global_properties", "id":1}'
    response = get_response(data, url, session)
    properties = response.json()["result"]
    return properties


def get_ops_in_block(num, url, session: requests.Session):
    data = f'{{"jsonrpc":"2.0", "method":"condenser_api.get_ops_in_block", "params":[{num},false], "id":1}}'
    response = get_response(data, url, session)
    ops_in_block = response.json()["result"]
    return ops_in_block


def get_post(ops):
    comment_list = []
    for op in ops:
        if (
            op["op"][0] == "comment" and op["op"][1]["parent_author"] == ""
        ):  # Posts, not comments

            try:
                json_metadata = json.loads(op["op"][1]["json_metadata"])
            except (json.JSONDecodeError, KeyError) as e:
                logger.error(f"JSON decode error or missing key: {e}")
                continue

            # Check if there's the tag we are looking for
            if "ita" not in json_metadata.get("tags", []):
                continue

            # Check post language
            valid_language, lang_num = text_language(op["op"][1]["body"])

            if valid_language == False:
                continue

            # Check post length
            word_count = convert_and_count_words(op["op"][1]["body"])

            if (lang_num == 1 and word_count < 400) or (
                lang_num > 1 and word_count < 800
            ):
                continue

            author = op["op"][1]["author"]
            permlink = op["op"][1]["permlink"]
            link = f"https://peakd.com/@{author}/{permlink}"
            comment_list.append(link)
            logger.info(f"Found eligible post: {link}")
    return comment_list


def load_last_block():
    if os.path.exists("last_block.txt"):
        with open("last_block.txt", "r") as file:
            return int(file.read())
    return None


def save_last_block(block_num):
    with open("last_block.txt", "w") as file:
        file.write(str(block_num))


# return the eligible posts in a list
def get_post_list(url):
    with requests.Session() as session:
        last_hive_block_num = get_properties(url, session)[
            "last_irreversible_block_num"
        ]
        last_block_num = load_last_block()
        if last_block_num is None:
            last_block_num = last_hive_block_num
        if int(last_block_num) == int(last_hive_block_num):
            time.sleep(60)  # always stay behind the last Hive block
        ops = get_ops_in_block(last_block_num, url, session)
        post_list = get_post(ops)
        save_last_block(int(last_block_num) + 1)
        return post_list


# Get date and generate csv file name
def get_filename():
    current_date = datetime.datetime.now().strftime("%Y-%m-%d")
    return f"urls_{current_date}.csv"


def main():
    url = "https://api.deathwing.me"

    filename = get_filename()
    last_filename = filename
    i = 1

    if not os.path.exists(filename):
        with open(filename, "w", newline="", encoding="utf-8") as csvfile:
            writer = csv.writer(csvfile)
            writer.writerow(["ID", "URL", "Upvote_Value"])

    logger.info(f"Current file: {filename}")

    while True:
        post_list = get_post_list(url)

        current_filename = get_filename()

        if current_filename != last_filename:
            filename = current_filename
            last_filename = current_filename
            i = 1  # Reset counter
            logger.info(f"Started writing to a new file: {filename}")

        # Add posts to the current file
        with open(filename, "a", newline="", encoding="utf-8") as csvfile:
            writer = csv.writer(csvfile)
            for post in post_list:
                writer.writerow([i, post, ""])
                i += 1

        post_list.clear()


if __name__ == "__main__":

    main()


SECONDO SCRIPT


#!/usr/bin/env python3
"""A script to upvote and comment posts from a .csv list"""
import os
import shutil
import jinja2
import configparser
import time
import re
import logging
from logging.handlers import TimedRotatingFileHandler
import pandas as pd
from beem import Hive, exceptions as beem_e
from beemapi import exceptions as beemapi_e


# Global configuration
config = configparser.ConfigParser()
config.read("config")

ENABLE_COMMENTS = config["Global"]["ENABLE_COMMENTS"] == "True"
ENABLE_UPVOTES = config["Global"]["ENABLE_UPVOTES"] == "True"
ENABLE_POST = config["Global"]["ENABLE_POST"] == "True"

ACCOUNT_NAME = config["Global"]["ACCOUNT_NAME"]
ACCOUNT_POSTING_KEY = config["Global"]["ACCOUNT_POSTING_KEY"]
HIVE_API_NODE = config["Global"]["HIVE_API_NODE"]
hive = Hive(node=[HIVE_API_NODE], keys=[config["Global"]["ACCOUNT_POSTING_KEY"]])

# Logger
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)

handler = TimedRotatingFileHandler("main.log", when="D", interval=1)
formatter = logging.Formatter("%(asctime)s - %(levelname)s - %(message)s")
handler.setFormatter(formatter)

logger.addHandler(handler)

logger.info("Configuration loaded:")
for section in config.keys():
    for key in config[section].keys():
        if "_key" in key:
            continue  # don't log posting keys
        logger.info(f"{section}, {key}, {config[section][key]}")


# Markdown template for comment
comment_curation_template = jinja2.Template(
    open(os.path.join("template", "comment_curation.template"), "r").read()
)
post_template = jinja2.Template(
    open(os.path.join("template", "post.template"), "r").read()
)


def give_upvote(vote_weight, voter, authorperm):
    print(f"Upvoting with weight {vote_weight}!")
    try:
        hive.vote(weight=vote_weight, account=voter, identifier=authorperm)
    except beem_e.VotingInvalidOnArchivedPost:
        logger.error("Post is too old to be upvoted")
    except beemapi_e.UnhandledRPCError:
        logger.error("Vote changed too many times")
    time.sleep(3)  # sleep 3s


def post_comment(author, replier, authorperm):
    print("Commenting!")
    comment_body = comment_curation_template.render(
        target_account=author, replier=replier
    )
    hive.post(
        title="",
        body=comment_body,
        author=replier,
        reply_identifier=authorperm,
    )
    time.sleep(3)  # sleep 3s


def post(posts):
    title = TITLE
    post_body = post_template.render(author=ACCOUNT_NAME, posts=posts)
    author = ACCOUNT_NAME
    tags = TAGS
    category = CATEGORY

    hive.post(
        title=title,
        body=post_body,
        author=author,
        permlink=None,
        json_metadata=json.dumps({"app": "leothreads/0.3", "tags": tags}),
        category=category,
    )

    logger.info(f"Post published with title: {title}")


def process_file(file_to_process):
    try:
        df = pd.read_csv(file_to_process)

        posts = []

        for _, row in df.iterrows():
            url = row["URL"]
            vote_weight = row["Upvote_Value"]
            print(f"Work in progress on {url}...")

            if pd.isna(vote_weight):
                print(f"No upvote value for {url}, skipping...")
                continue

            try:
                vote_weight = int(vote_weight)
            except ValueError:
                print(f"Invalid vote weight: {vote_weight}")
                continue

            if (vote_weight < 1) or (vote_weight > 100):
                print(f"Invalid vote weight: {vote_weight}%")
                continue

            # data of the post to be upvoted and/or replied
            permlink = re.search(r".+@([\w.-]+)/([\w-]+)", url)
            if not permlink:
                logger.error(f"Invalid URL format: {url}")
                continue
            post_author = permlink.group(1)
            permlink = permlink.group(2)
            post_url = f"{post_author}/{permlink}"
            logger.info(f"{post_author} is getting a {vote_weight}% upvote!")

            posts.append(
                {
                    "author": post_author,
                    "upvote_value": vote_weight,
                    "post_link": post_url,
                }
            )

            # leave an upvote
            if ENABLE_UPVOTES:
                give_upvote(vote_weight, ACCOUNT_NAME, post_url)
            else:
                print("Upvoting is disabled")

            # leave a comment
            if ENABLE_COMMENTS:
                post_comment(author, ACCOUNT_NAME, post_url)
            else:
                print("Posting is disabled")

    except pd.errors.EmptyDataError:
        logger.error(f"File {file_to_process} is empty. Skipping...")

    finally:
        # Once done, move file in the directory "posts_done"
        directory_done = "posts_done"
        destination = os.path.join(directory_done, os.path.basename(file_to_process))
        shutil.move(file_to_process, destination)
        logger.info(
            f"File {os.path.basename(file_to_process)} moved to '{directory_done}' directory."
        )
        return posts


def main():

    directory_to_do = "posts_to_do"

    file_to_process = None

    for filename in os.listdir(directory_to_do):
        if filename.endswith(".csv"):  # Only look for csv files
            file_to_process = os.path.join(directory_to_do, filename)
            break  # One file at a time

    if file_to_process:
        posts = process_file(file_to_process)
    else:
        logger.info("No files found in the 'urls_to_do' directory.")

    if posts and ENABLE_POST:
        post(posts)


if __name__ == "__main__":

    main()


Taggo @gamer00 perchè era curioso di vedere a cosa stavo lavorando... e così ora può restarne traumatizzato 🤣

Un giorno riuscirò a scrivere del codice decente, ordinato e leggibile... ma non è questo il giorno! ahahah

Taggo anche @slobberchops perchè mi ha esortato ad affrancarmi da BEEM, dandomi la spinta iniziale di cui avevo bisogno per decidermi a cercare di studiare meglio il funzionamento delle API di Hive.

@felixxx l'ho già taggato varie volte... ma lo taggo una volta in più perchè il cuore del mio progetto poggia sul suo lavoro, e mi sembra giusto ribadirlo :)


cover realizzata con Bing AI ed editata con GIMP

a supporto della community #OliodiBalena, il 3% delle ricompense di questo post va a @balaenoptera

Se sei arrivato a leggere fin qui, grazie! Se hai voglia di lasciare un upvote, un reblog, un follow, un commento... be', un qualsiasi segnale di vita, in realtà, è molto apprezzato!

Posted Using InLeo Alpha



0
0
0.000
65 comments
avatar

Non si smette mai di imparare e quello che ha ancora più valore è la voglia di imparare!
Bravo e complimenti!
Mi dispiace solo non avere tipu disponibili ma il tuo impegno e lavoro merita tutto il mio supporto possibile!
!PIMP
!discovery 50
!hiqvote

0
0
0.000
avatar

Grazie mille! Non mi ero perso questo tuo bellissimo commento ma mi ero dimenticato di commentarlo per farti sapere che lo avevo letto :) ed il tuo supporto è da sempre più di quanto potessi sperare, per cui, a prescindere da tipu o altro, grazie come sempre!

!LOL !PIZZA !LUV

0
0
0.000
avatar

Bravo, io vorrei cominciare a lavorare sulla blockchain ma tramite PHP che conosco già e non tramite Phyton. Ho già del materiale pronto ad essere studiato ma non riesco mai a mettermici davvero a causa degli impegni. Ma prima o poi, metterò mano ai miei progetti: saprei già realizzarli, lavoro con PHP da anni, ma il problema per il momento è proprio interfacciarsi con la blockchain di Hive, tramite la libreria esistente.

0
0
0.000
avatar

Grazie! So che è un progetto modesto, ma almeno ho la soddisfazione di poter dire di aver creato qualcosa... che fa qualcosa 😂

Per PHP probabilmente ci hai già guardato, ma nel caso ho visto che c'è qualche tutorial sul Hive Developers Portal, anche se sono davvero pochi... meglio di niente però e magari ti aiutano a capire come interfacciarti con Hive :)

Io all'inizio usavo la libreria BEEM, ma poi mi è stato segnalato che non è più aggiornata e che avrei fatto meglio ad interfacciarmi direttamente con le API di Hive, procedura di cui per fortuna un utente mi ha fornito il codice già bello pronto perchè altrimenti sarei stato ancora in alto mare 😅

0
0
0.000
avatar

Si si, ho già visto il poco materiale. Ho già individuato anche la libreria PHP su Github da usare per interfacciarmi con le API di Hive, ma per il momento mi manca di cominciare a sporcarmi le mani, perché poi tanto so già che non avrei tempo per sviluppare i miei progetti, tra cui un paio di giochi. Al momento la cosa che mi preoccupa di più è come interfacciarmi con Keychain per il login: non ho ancora cercato se hanno anche loro una documentazione per farlo tramite API e Php.

0
0
0.000
avatar

Se può interessarti, un utente che seguo qui su Hive che si chiama "felixxx" ha da poco iniziato una serie di post in cui dovrebbe andare a trattare diversi argomenti relativi alla programmazione su Hive, incluso come implementare Keychain nei tuoi progetti. Lui lavora con Python, ma potrebbe comunque essere utile per farsi un'idea :)

Il GitHub di Keycahin comunque è questo qua.

0
0
0.000
avatar

Grazie mille per il github di Keychain. Risparmierò di cercarlo.

Ho già visto il profilo di felixx grazie ai tuoi post. L'ho già messo fra i collegamenti anche se non credo sarà molto utile ai miei progetti: probabilmente sarà più veloce studiare le librerie Php direttamente piuttosto che non cercare di 'tradurre' dal Phyton.

0
0
0.000
avatar

Immagino che tradurre da un linguaggio di programmazione sia un lavoraccio non da poco 😅 per cui ci credo che preferisci evitarlo: l'unica cosa dal GitHub di Keychain mi sembra di capire che sia scritto in JS, per cui forse anche felixxx dovrà in qualche modo adeguarlo a Python? E se fosse così magari potrebbe spiegare come farlo ed il processo potrebbe essere replicabile, con i dovuti accorgimenti, anche in altri linguaggi?

Non so eh, magari sto solo dicendo corbellerie 🤣

0
0
0.000
avatar

No, Javascript, js per gli amici ;), si integra sia a Php che a Phyton, basta includerlo con appositi tag. Quello non è un problema, conosco anche Javascript, anche se per includerlo in sé non serve neanche conoscerlo.

0
0
0.000
avatar

Non lo sapevo! Molto interessante :) se mai imparerò abbastanza bene Python, Javascript sarebbe il prossimo sulla lista, così da magari diventare in grado anche di sviluppare qualcosa lato frontend.

Per curiosità, te hai seguito un certo corso di studi per imparare a programmare o hai imparato da autodidatta?

0
0
0.000
avatar

Mentre curiosavo nel depository ho visto che Keychain no è scritto in Javascript, ma in Typescript, che si può definire come un'estensione di Javascript. Quello non lo maneggio adeguatamente. Ma non è necessario poiché anche Kaychain si usa tramite API, se solo ci fosse una documentazione degna di quel nome.

Ho studiato da autodidatta: prima HTML e CSS per realizzare siti statici (oramai molti anni fa) ed un minimo di Javascript per dare un po' di maggior dinamismo ed interazione. Poi abbandonai tutto per molti anni visto che il lavoro mi coinvolgeva in tutt'altro.

Poi una decina di anni fa ho ripreso in mano tutto, aggiornato le conoscenze di HTML e CSS, studiato PHP e Javascript per siti dinamici e Mysql per integrare i database, poi sono atterrato su WordPress per progetti per clienti con poco budget.

Ora vorrei, ma non so se ne avrò mai il tempo, fare un paio di passi avanti in direzioni diverse: il framework Laravel per lavorare su progetti PHP più ampi di quelli dei siti, e l'integrazione con la blockchain Hive per realizzare alcuni progetti che ho in mente.

Ho compilato già anni fa i moduli per ottenere l'allungamento delle mie giornate a 72 ore invece delle consuete 24, ma non mi hanno ancora approvato l'upgrade ;)

0
0
0.000
avatar

Wow, siete veramente in tanti ad aver studiato da soli... e con risultati che a me paiono a dir poco sorprendenti!

Già che ci sono, e vedo che hai utilizzato il termine, posso chiederti cosa è un "framework"? Perchè è un termine che ho visto utilizzato spesso anche in Python per quelle che a me, nella mia ignoranza abissale, sembrano delle sorta di librerie per gestire funzioni molto specifiche, come Django, che su Python serve per creare interfacce, o SQLlite, che immagino serva per creare e gestire database. Più o meno si tratta di questo o c'è dell'altro quando si parla di framework?

Ho compilato già anni fa i moduli per ottenere l'allungamento delle mie giornate a 72 ore invece delle consuete 24, ma non mi hanno ancora approvato l'upgrade ;)

Se per caso un giorno ti accettassero la domanda, fammi sapere, perchè vorrei presentarla anch'io ahahah

!LOL

0
0
0.000
avatar

Si esatto, un framework è una specie di libreria molto ampia. E' però meglio vederlo come un ambiente di lavoro: letteralmente sarebbe una 'cornice di lavoro', immaginandola come un a cornice di un quadro all'interno del quale puoi realizzare la tua opera.

In una framework sono già presenti molti strumenti che si appoggiano a librerie incluse che ti permettono di implementare funzionalità in modo molto più veloce, organizzato, efficace e sicuro. Puoi inoltre implementare ulteriori librerie.

Faccio un esempio grossolano per capire il concetto.

Immagina che tu debba realizzare un form di login. Devi prevedere il codice che serve a realizzare il form, fare le verifiche di congruità e di sicurezza dei dati inseriti lato client e lato server per evitare che vengano inseriti dati (nome utente o email e password di solito) scorretti ad esempio con caratteri non autorizzati (o email scritte male, ad esempio senza la @) o, peggio, pericolosi per la sicurezza del sito. Poi devi prevedere il collegamento con il database per confrontare le credenziali inserite nel form con quelle memorizzate in esse e gestire il caso di successo, con l'accesso alla pagina o alla funzionalità desiderata, ma anche quello di insuccesso con messaggi di errore rivolti all'utente. Ma devi prevedere anche tutta la struttura di recupero della password o di reset della stessa nel caso sia stata dimenticata, con la realizzazione di tutte le strutture necessarie. Ancora potresti aver bisogno di aggiornare un file di log che tenga traccia dei login effettuati o un sistema di logout. Quasi sicuramente avrai necessità anche di creare una sessione in modo che l'utente possa procedere alla navigazione senza dover effettuare il login ad ogni pagina. E potresti aver necessità di altre funzionalità ancora. Tutto solo a partire da un semplice form di login.

Il framework ti mette a disposizione delle strutture già organizzate che se da un lato introducono un certo livello di rigidità (si fa così e basta :D se vuoi modificare alcuni aspetti devi sbatterti non poco) ma ti abbrevia moltissimo il lavoro. Inoltre è molto più sicuro perché realizzato e testato già in maniera molto efficace quindi non devi preoccuparti più tanto che il codice sia solido.

Quello del form di login è solo un esempio, tra i più semplici. Nella realizzazione di un'applicazione ci sono molti aspetti da gestire ma che son un po' più complessi da capire per chi è alle prime armi, come ad esempio il routing (come si viene indirizzati in modo sicuro alle varie pagine/funzionalità dell'app e si evita che si finisca "fuori strada"?). Tieni presente che quelle che sembrano pagine web in realtà nelle applicazioni di una certa complessità in realtà non lo sono, ma vengono "assemblate al volo" prendendo codice "sparpagliato" quà e là :D

Usando i framework con competenza quando sviluppi un'applicazione (per un sito semplice non ha senso usarli) puoi quindi concentrarti meglio sulla logica di implementazione e funzionamento strutturale dell'applicazione e non starti a sbattere per inserire le mille funzionalità del form di login. :)

Sono stato lungo ma spero di aver chiarito, perché so benissimo, essendoci passato, che non è immediato capire cosa sia un framework finché non ci si mette le mani sopra.

0
0
0.000
avatar

Sei stato chiarissimo e anche l'esempio è super calzante! Grazie davvero :)

Le cose da imparare sono davvero infinite, mi sa che 72 ore al giorno non mi basterebbero... possiamo fare almeno 96? 😅

Comunque spero di avere il tempo di continuare ad esercitarmi ed imparare cose nuove: mi piacerebbe davvero tanto, un giorno, essere in grado di conversare con persone molto più esperte di me, come te, senza però sentirmi così perso in questo mare magnum di conoscenza.

0
0
0.000
avatar

Se può consolarti anch'io sono nella stessa identica situazione. Più ne studi e più scopri di doverne studiare. E ti accorgi che tantissimi ne sanno tantissimo più di te. Aggiungici che io, per via dell'età, ho anche una maggior difficoltà ad apprendere nuove nozioni: magari avessi la freschezza mentale che avevo in gioventù. L'esperienza e le competenze acquisite non bastano a coprire la differenza. Inoltre ho un altro fardello da portarmi dietro: non parlo bene l'inglese. Con lo scritto alla fine me la cavo, riesco a capirlo abbastanza bene almeno in ambito informatico. Ma ad esempio mi risulta molto difficile, quasi impossibile, usufruire di materiale video. Alla fine capisco meglio direttamente dal codice scritto che dall'audio di un video :D

0
0
0.000
avatar

Quello è sicuramente vero e, come un po' in tutte le cose, la verità è che più vai avanti, più in genere scopri che c'è ancora tanto, tanto di più da imparare... però quando arrivi al punto di dare forma alle tue idee e progetti, quello secondo già è proprio un punto di transizione: il mio obiettivo per ora è quello!

non parlo bene l'inglese.

Guarda, io con l'inglese non ci sono mai andato d'accordo: il mio cervello lo rifiuta proprio... però a forza di "costringermi" a guardare video in inglese nel corso degli anni ho fatto progressi enormi. Soprattutto con i sottotitoli ormai riesco a capire tutto, sbirciando ogni tanto in basso quando c'è qualcuno che parla troppo veloce o con un accento improbabile. Alla fine è un modo molto passivo per imparare e che può essere facilmente implementato nella propria routine (se uno ad esempio è abituato a seguire qualche canale su YT di intrattenimento, può sostituirlo qualcosa di simile ma in inglese, così quando fai una piccola pausa per "staccare" il cervello in realtà stai migliorando il tuo inglese... sì, ovviamente così la pausa va un po' a farsi benedire, ma piano piano i risultati arrivano!).

!LOL !PIZZA !HUG

0
0
0.000
avatar

Proprio ora vedo l'ultimo post di @felixxx che parla di questo.

Nel suo codice sotto al titolo "Code Example with Hive Keychain" c'è l'integrazione di javascript nel codice HTML tramite il tag

In Php i file scritti in questo linguaggio vengono interpretati da un compilatore che elabora le istruzioni PHP e produce il codice HTML che poi viene mostrato dal browser. I browser infatti non sono in grado di elaborare direttamente PHP (e neanche Phyton). Possono fare il rendering solo di HTML e CSS (che non sono linguaggi di programmazione non permettendo l'inserimento di logica) ed elaborare Javascript.

Non conosco Phyton ma credo funzioni allo stesso modo, quindi se lo studi a fondo credo che anche con quello tu possa realizzare siti anche su lato front-end.

Javascript resta comunque utile per la manipolazione del DOM (Document Object Model, che è la struttura ad oggetti con cui è organizzata qualsiasi pagina web e non solo) in tempo reale e lato client (PHP e Phyton sono lato server).

0
0
0.000
avatar

Nel suo codice sotto al titolo "Code Example with Hive Keychain" c'è l'integrazione di javascript nel codice HTML tramite il tag

Okok, avevo immaginato ti riferissi a quello ma devo ancora guardarmelo per bene, perchè di base per me è un concetto totalmente nuovo.

Quindi dopo Python forse dovrei optare per HTML e CSS, invece che JS? HTML e CSS immagino convenga conoscerli entrambi oppure sono anche loro alternativi (come, ad esempio, Python e PHP)?

Perdona le tante domande 😅

0
0
0.000
avatar

Si, HTML e CSS sono assolutamente indispensabili, soprattutto se vuoi realizzare qualcosa lato front-end. Andrebbero studiati per primi. Ma puoi recuperare in fretta: non serve approfondirli troppo per riuscire ad ottenere risultati eccellenti, anche se oggi hanno raggiunto con le ultime versioni un livello di complessità discreto anche loro, estendendo di molto le loro potenzialità.

Come ti scrivevo i browser non capiscono PHP e Phyton. Loro leggono solo HTML e CSS, che, lo ripeto, non sono linguaggi di programmazione. Poi hanno all'interno un compilatore Javascript che li permette di elaborare Javascript.

Infatti si dice che Javascript è un linguaggio di programmazione lato client (vedilo come 'lavora' sul pc dell'utente, che può vederne il codice normalmente), mentre PHP e Phyton sono linguaggi di programmazione lato server (lavorano sul server: l'utente non ha accesso al codice scritto e producono l'opportuno codice HTML e CSS passandolo poi al browser dell'utente).

HTML e CSS sono necessari entrambi perché hanno ruoli diversi e non sono intercambiabili.

HTML serve a creare la struttura della pagina, il DOM, creando i contenitori in cui le informazioni (testi, immagini, ecc...) sono inserite e la visualizzazione delle stesse.

Il CSS serve a informare il browser sullo stile con cui devono esser mostrate: il font Arial piuttosto che Time New Roman, la cornice rossa piuttosto che verde, la visualizzazione di due colonne affiancate invece che un'unica colonna, ecc...

Non ti preoccupare, domanda pure. Se avessi il tempo ci potrei fare tutta una serie di post.

0
0
0.000
avatar

Attento a darmi il via libera alle domande che posso farti perchè tecnicamente ne avrei infinite da fare ahahahah

Di HTML avevo qualche conoscenza minima tanti anni, quando ancora i forum erano diffusi; CSS invece non l'ho mai approfondito, da qui il dubbio che avevo e che mi hai chiarito :)

Quindi di base dovrei conoscere Python, HTML e CSS per creare applicazioni web complete... non ce la farò mai 😂

Comunque su HTML e CSS mi farebbe davvero tanto piacere leggere un qualche tuo post! Se un giorno dovesse venirti voglia di scrivere qualcosa, contami già come lettore attento ed entusiasta :)

!PIZZA !LUV !LOL

0
0
0.000
avatar

Non credo davvero che ne avrò tempo di scrivere post organici. Ma davvero di HTML e CSS trovi tantissimo materiale sul web.

Comincia da qui, dove li trovi entrambi e hai modo di sperimentare:

https://www.w3schools.com/html/

Ci trovi anche PHP, Javascript, Phyton e tanto, tanto altro.

0
0
0.000
avatar

Okok, grazie! Conosco il sito per Python, ma comincerò - appena possibile - ad usarlo anche per HTML e CSS :)

0
0
0.000
avatar

Uff ... mi avevi incuriosito così sono andato a buttarci un occhio. La documentazione è pessima, non si capisce molto perché in quel repository sono tutti i servzi di Kaychain e per ognuno non sono chiaramente riportati gli usi.

keychain-sdk, che dovrebbe esser quello che mi interessa, per integrare la procedura di login tramite Kaychain (ma lo dico avendolo visionato in maniera superficiale), ha un brevissimo README con scarse istruzioni. Ed i pochissimi esempi usano anche in quel caso Phyton e non Php.

0
0
0.000
avatar

Mi spiace, speravo potesse esserti un po' più d'aiuto :/

0
0
0.000
avatar

Lo è stato comunque. In seguito avrò modo di approfondire avendo un punto di partenza. Comunque proprio leggendo il post di felixxx che ti ho messo in link nell'altro commento ho trovato molte più informazioni con una documentazione migliore, anche se ancora non proprio completa.

Penso che il vero grosso problema di Hive sia questo. Non esiste una documentazione chiara e completa nei diversi linguaggi per lavorare con la blockchain. In Phyton è un po' più ampia, ma ad esempio in PHP, con cui si realizza il 75% dei siti web attualmente, è molto scarsa, così come in Javascript.

0
0
0.000
avatar

Ho visto che felixxx ha scritto un post in cui si lamentava proprio di questo, tempo fa: a quanto ho capito, in parte è dovuto al fatto che si vuole passare a qualcosa che è stato definito come "HAF" (non ho idea di cosa sia però 😅) ed in parte al fatto che, come al solito, molti sviluppatori sono gelosi di quello che creano e, nonostante spesso lo facciano con soldi pagati dal DHF, poi dopo si "dimenticano" di rendere disponibile per tutti quello che hanno creato...

Anche questo è, purtroppo, uno dei tanti punti deboli che, nel tempo, sto scoprendo di Hive... chissà se un giorno verrà risolto in qualche modo consentendo a sviluppatori esterni di portare qui i loro progetti con maggiore facilità.

0
0
0.000
avatar
(Edited)

PIZZA!

$PIZZA slices delivered:
bencwarmer tipped arc7icwolf
@gamer00(1/15) tipped @arc7icwolf (x2)
arc7icwolf tipped gamer00
arc7icwolf tipped libertycrypto27
arc7icwolf tipped bencwarmer
arc7icwolf tipped garlet (x2)
pousinha tipped arc7icwolf

0
0
0.000
avatar

Hi!

One thing that you could consider, is to instead of checking one block at a time, and waiting a full minute in between, you could fetch multiple blocks in one go, then immediately process them. This could be way faster if some of the blocks don't contain posts relevant to your query.

Something like this:

for block_num in range(last_block_num, last_hive_block_num):
    ops = get_ops_in_block(block_num, url, session)
    post_list.extend(get_post(ops))

Hope it helps. !WINE

0
0
0.000
avatar

If I haven't done something wrong - which I may have done 😅 - the 60 seconds delay should only be triggered when the current block I just processed is the same as the current last Hive block. So there shouldn't be any delay when I'm checking older blocks - as in this case there isn't the risk of checking a block which has still to be produced by the chain.

But beside that, my script isn't as fast as I hoped, so your suggestion may help it checking even older blocks faster, right? I have no access to the code right now, but as soon as I come back I'm going to implement it and see if I can make it works!

Many thanks!!!

!LUV

0
0
0.000
avatar
(Edited)

I'm so tired I have no idea what I've done, but working on your suggestion I was able to improve the script, which is now much faster (around 3 times faster I think!).

This is the part I changed:

# return the eligible posts in a list
def get_post_list(url):
    with requests.Session() as session:
        last_hive_block_num = get_properties(url, session)[
            "last_irreversible_block_num"
        ]
        last_block_num = load_last_block()
        if last_block_num is None:
            last_block_num = last_hive_block_num
            save_last_block(last_block_num)
        print(
            f"working on {last_block_num}. Last hive block is {last_hive_block_num}"
        )

        post_list = []

        if last_block_num < last_hive_block_num:
            diff = int(last_hive_block_num) - int(last_block_num)
            max_iterations = 100
            iterations = min(diff, max_iterations)

            for _ in range(iterations):

                ops = get_ops_in_block(last_block_num, url, session)
                list = get_post(ops)
                post_list.extend(list)

                last_block_num += 1
                save_last_block(last_block_num)

        elif last_block_num == last_hive_block_num:
            print("All blocks are up-to-date, waiting for new blocks...")
            time.sleep(3)

        else:
            sleep = (last_block_num - last_hive_block_num) * 3
            print(f"Selected block is too high: sleeping for {sleep} seconds")
            time.sleep(sleep)

        return post_list

Probably it could have been implemented better, but I'm still very happy of the improvement!

Thanks for your awesome suggestion :)

EDIT: made a slight improvement!

!PIZZA

0
0
0.000
avatar

You are now successfully processing multiple blocks in one go, instead of fetching them one-by-one and processing them separately. The minute-long pause sort of added insult to injury and made it look like the script made even less progress.

Btw. I like the idea of a dynamic sleep time based on the difference in block numbers! How'd you come up with that? I hate to admit I wouldn't have thought of something like that.

About your original script...

I had to add some print statements to see what it did though. I think seeing on-screen what the script does is helpful in many cases to understand what is going on.

Since I saw your script wrote logs, I also put 'tail -f' to follow them, but it looks like I didn't catch any Italian posts this time as nothing much got written on them.

I wrote a really long rant about double conversions from:

def convert_and_count_words(md_text):
    html = markdown.markdown(md_text)  # Convert Markdown to HTML
    soup = BeautifulSoup(html, "html.parser")
    text = soup.get_text()  # Extract plain text from the HTML
    words = re.findall(r"\b\w+\b", text)  # Find all words in the text
    return len(words)

...to only using Regex to strip out Markdown elements:

import re

# Convert markdown text to plain text and count the number of words
def convert_and_count_words(md_text):
    # Strip markdown syntax and convert directly to plain text
    # Removing common markdown elements like headers, lists, and formatting
    plain_text = re.sub(r'[#*\[\]\(\)`>]', '', md_text)
    
    # Find all words in the text
    words = re.findall(r"\b\w+\b", plain_text)
    
    return len(words)

...because it would've been faster and removed the need for extra libraries, but then I realized Hive posts quite often contain html tags. Even I use them quite often to make my posts look better. So maybe it's important to keep the double conversions after all.

So here I would've been completely off the base again.

Oh, btw, my editor (I currently use Neovim) complained about this line in your code:

            if valid_language == False:
                continue

The warning was: "Avoid equality comparisons to 'False'; use if not valid_language: for false checks"

The problem is that you made a comparison to a True/False boolean, and I guess it is not considered proper Python code. So the corrected code is like the warning said:

            if not valid_language:
                continue

Well... I think I might have to go through the other script too. This one was fun.

0
0
0.000
avatar

How'd you come up with that?

My biggest concern was to start checking hive blocks before they even existed, so I asked ChatGPT - yeah, I often ask it a lot of questions !LOL - if I could use the range function dinamically, switching between a default amount and a max amount, whichever was lower. And then it suggested me to use "min()" to check what the lower among the two was... this is how it went 😅

it looks like I didn't catch any Italian posts

Totally possible! For testing purpose I set the language to "en", reduce the words to a very few and often switch from posts to comments.

Hive posts quite often contain html tags

I came up with the solution because of that, as I soon realized I was counting a lot of strings that weren't words, so I looked for a solution... and Beautiful Soup is such a cool name ahahah

and I guess it is not considered proper Python code

Well, I'm still happy that the rest is considered Python ahahah

Thanks for this correction! It makes totally sense and I have no idea why I keep forgetting things I know... ahhhh it's exactly as you said in the other comment: code is an illusion!

0
0
0.000
avatar

ChatGPT seems to be a very good teacher, although quite often it just slams me with the complete solution and doesn't let me think for myself. I don't know what to think about that. But I guess it does sometimes come up with quite elegant solutions. A friend of mine once said that with ChatGPT, you are not really chatting with a computer, but the rest of the world. It's like you are chatting with the knowledge of everyone, so that's why it comes up with interesting solutions too.

Oh, I will try changing the language to see whether I can catch any english posts with the script, yes that might be a workable idea! :) Btw. the script needs some error-checking:

Lastest block 89122762 written to the file.
Last block file exists.
Lastest block read from the file: 89122762
Lastest block 89122763 written to the file.
Last block file exists.
Lastest block read from the file: 89122763
Traceback (most recent call last):
  File "/home/ambience/Projects/Code/Python/Hive/Arc7icWolf/venv/lib/python3.12/site-packages/requests/models.py", line 974, in json
    return complexjson.loads(self.text, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/lib/python3.12/json/__init__.py", line 346, in loads
    return _default_decoder.decode(s)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/lib/python3.12/json/decoder.py", line 337, in decode
    obj, end = self.raw_decode(s, idx=_w(s, 0).end())
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/lib/python3.12/json/decoder.py", line 355, in raw_decode
    raise JSONDecodeError("Expecting value", s, err.value) from None
json.decoder.JSONDecodeError: Expecting value: line 1 column 1 (char 0)

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/ambience/Projects/Code/Python/Hive/Arc7icWolf/oma_fetch_italian_posts.py", line 193, in <module>
    main()
  File "/home/ambience/Projects/Code/Python/Hive/Arc7icWolf/oma_fetch_italian_posts.py", line 172, in main
    post_list = get_post_list(url)
                ^^^^^^^^^^^^^^^^^^
  File "/home/ambience/Projects/Code/Python/Hive/Arc7icWolf/oma_fetch_italian_posts.py", line 145, in get_post_list
    ops = get_ops_in_block(last_block_num, url, session)
          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/ambience/Projects/Code/Python/Hive/Arc7icWolf/oma_fetch_italian_posts.py", line 72, in get_ops_in_block
    ops_in_block = response.json()["result"]
                   ^^^^^^^^^^^^^^^
  File "/home/ambience/Projects/Code/Python/Hive/Arc7icWolf/venv/lib/python3.12/site-packages/requests/models.py", line 978, in json
    raise RequestsJSONDecodeError(e.msg, e.doc, e.pos)
requests.exceptions.JSONDecodeError: Expecting value: line 1 column 1 (char 0)

Since I have no idea what this error is trying to tell me, I asked ChatGPT, and since its answer was quite long, I'll condense it here:

The error json.decoder.JSONDecodeError: Expecting value: line 1 column 1 (char 0) occurs when the response received from the API is empty or not a valid JSON.

I guess that for one reason or another, the server borked the output, and the script didn't quite know what to do.

These were the suggested ways to fix it:

  • Check for Empty Responses: Modify your get_ops_in_block function to handle the case where the response is empty or not JSON.
  • Add Retry Mechanism: Sometimes the issue can be temporary, and retrying after a small delay can resolve it.
  • Log the Response: Add logging to see exactly what the response contains when it fails. This could help you pinpoint whether it’s an HTML error page, an empty response, or something else.
  • Use a Different API Node: Sometimes, API nodes can experience downtime or performance issues. Try using a different Hive API node.

In case you want to be adventurous, check my script for error handling. If you get lost, ask ChatGPT and it will come up with an excellent example that's the complete answer and leaves nothing to actually learn. ;)

Personally, if I am being adventurous, I like to pose my questions in a way that forces the bot to give a detailed explanation of how to fix the problem without showing me any code. Usually I just get frustrated and cheat again. But maybe, just maybe it is a good idea to at least try.

Cheers!

!BEER

0
0
0.000
avatar

I like to pose my questions in a way that forces the bot to give a detailed explanation of how to fix the problem without showing me any code

Ahahah love it! I noticed the same problem with ChatGPT, hence why I usually try to avoid it... unless I'm so clueless that I have no idea what to do 😅 and this happens too often lately...

Btw. the script needs some error-checking

I saw you inserted a lot of error-checking in your script, so, if you don't mind, I'll take some inspiration from yours :)

There's plenty of room for improvements in my script and I only have to find the will to start working on it again 🤣

0
0
0.000
avatar

Complimentoni caro!!!!
Secondo me i tuoi script sono veramente utili per tutti gli account curation, soprattutto quello di creare un report periodico!

!LOLZ
!PIZZA
!BEER

0
0
0.000
avatar

Grazie mille!

Quello del report automatico volendo potrebbe anche essere "estratto" e salvato a parte, perché quello più semplice e con meno punti critici.

Nella mia idea iniziale gli scripts infatti dovevano essere, ma quando mi sono messo a creare quello del report automatico mi sono accorto che era così breve che aveva più senso incorporarlo nel secondo.

!PIZZA !LOL !LUV

0
0
0.000
avatar

Come è impaginato il report? Che elementi estrapoli?

!LOLZ

0
0
0.000
avatar

Come impaginazione per ora nulla di che, perché alla fine non riguardava Python per cui non mi interessava piu tanto mettermi a fare qualcosa.

Come dati estrapolati, invece, link al post, autore del post e percentuale di upvote, ma si potrebbero facilmente salvare anche più dati :) io ho scelto questi perché mi sembravano i più basilari.

0
0
0.000
avatar

I finally found the time to read this post a little further.

while True:

That's not nice.
I haven't really understood the bot, but infinite loops are ass.
There are nicer ways to loop things.

I'll show better options on my blog.
And I'll look more into what your bot above does and try to cover it in the future.

Anyways, cool to see how you are building things.

0
0
0.000
avatar

The idea is that it should read the blocks - starting from the last irreversible block if launched for the first time or from the last checked/saved block if already used in the past - and look for posts that have specific requirements (language, tags, length).

If you look through the comments and look for those of @gamer00 he suggested me a very cool change that improved the workflow of the bot and he also pointed out that my scripts are very poor in terms of error checking 😅 but I guess they are very poor in general 🤣

Anyways, cool to see how you are building things.

I know that they sucks, but that was exactly my goal: to show everyone that I'm trying to do something with what I learn and getting suggestions from more experienced coders :)

I haven't really understood the bot, but infinite loops are ass.

I thought that it was an easy way to create something that could work "forever", but I'm not surprised it could have be done better 🤣

0
0
0.000
avatar

I know that they sucks, but that was exactly my goal: to show everyone that I'm trying to do something with what I learn and getting suggestions from more experienced coders

I never said they suck.
I just couldn't read it in one pass.
If it works, it's ok.
The infinite loop is an eyesore, though :D

error checking

error checking is lame. Don't worry about it.
I'll blog about that soon, too.
You know this meme?:

[https://www.reddit.com/r/ProgrammerHumor/comments/16nhu63/printbellcurve/]

0
0
0.000
avatar

I know that they sucks, but that was exactly my goal: to show everyone that I'm trying to do something with what I learn and getting suggestions from more experienced coders

I never said they suck.
I just couldn't read it in one pass.
If it works, it's ok.
The infinite loop is an eyesore, though :D

error checking

error checking is lame. Don't worry about it.
I'll blog about that soon, too.
You know this meme?:

[https://www.reddit.com/r/ProgrammerHumor/comments/16nhu63/printbellcurve/]

0
0
0.000
avatar

They seems to work but they crash sometimes (I at least have to catch some errors related to JSON files, as they seem to create troubles from time to time... or should I say from block to block? 😂) and I didn't test them extensively enough to be sure that there aren't some bugs I'm not aware of.

The infinite loop is an eyesore, though :D

Stupid infinite loops 🤣 and I tought that they were cool ahahahah

You know this meme?

Nope, but I get it ahahah

0
0
0.000
avatar

Infinite loops are not always stupid. You are also right about catching errors that may have got nothing to do with your code. And there can always be some bug you couldn't have seen unless you had made those print statements. Talking about catching bugs, I must soon release a new version of my 'fetch_liquidity_pools.py' script, because I happened to catch an enormous bug and also made the script blazingly fast by fixing it.

0
0
0.000
avatar

Super cool! I'm very curious to see your updated version and learn new stuff from your work!

On a side note, I aslo experimented a bit with your script and I tried to introduce a "Session" (not sure how I can describe it 😅 it's part of the "request" library) to see if it could make the script even faster... and it worked! Not sure if you may be interested in seeing it :) it's a very small change and it may help you make it (ultra)blazingly fast!

0
0
0.000
avatar
(Edited)

"Session"

Oh cool, a new toy!

I bet it made the original script faster, but now that I tried it with the improved one, it surprisingly made it slower. I guess it has something to do with the fact that when I originally caught the script doing a pool details for each token pair separately, I decided that since it wasn't possible to retrieve pool details for certain accounts only, I would set it to retrieve details for every pool in one go. This made the script faster, but now that I introduced requests.Session to my script, it added just a tad overhead, making it somewhat slower than previously. Weird.

But I already know what I will use the requests.Session for. That is my main script that controls among others, the fetch_liquidity_pools.py script. Thank you for possibly solving a serious problem for me, as the main script was giving me some serious gray hair!

I'll be checking this out more thoroughly for sure! :)

!PIZZA !BEER !WINE

0
0
0.000
avatar

:)))))

I'd be extremely happy if it will help you at least a tiny bit, as it would be my first contributione ever to someone's else script! Just a very, very, very, veeeeeery small one, but that would still be something :)

That is my main script

Is this the script you shared in your latest post? Or an even bigger and more complex one?

Btw, here's your original code with the "Session" included, just to prove that I really experimented with it 🤣 (I don't know how to highlight my edits, but I only opened a session, added it as a parameter to the API calls and "prepared" the calls with the slightly different sintax of a session... but I have no idea if what I just said makes sense for you or for anyone at all!)

# fetch_liquidity_pools.py
import json
import os
import argparse
import requests
from time import sleep, time
from random import choice

# Hive-Engine API Nodes
NODES_FILE = 'nodes.json'
retry_delay = 5  # seconds to wait between retries
max_retries = 3  # Maximum number of retries

# Default values
DEFAULT_ACCOUNT_NAME = 'hive-engine'  # Replace with your actual Hive account name
DEFAULT_FILTER_TOKEN = 'BTC'  # Replace with the desired default token to filter, or use 'ALL' to list all tokens

# File to store token details with precision
TOKEN_CACHE_FILE = 'token_details_cache.json'
cached_token_details = {}
hive_engine_nodes = []

def load_nodes():
    global hive_engine_nodes
    # Check if the nodes file exists
    if os.path.exists(NODES_FILE):
        try:
            with open(NODES_FILE, 'r') as f:
                hive_engine_nodes = json.load(f)
                print("Loaded Hive-Engine nodes from file.")
        except (ValueError, IOError):
            print("Error: Hive-Engine nodes file is corrupted or not readable. Please re-create 'nodes.json' with the list of nodes.")
    else:
        print("Error: Hive-Engine nodes file not found. Please create 'nodes.json' with the list of nodes.")
        hive_engine_nodes = [] # Ensure nodes list is empty on error


def get_node():
# Choose a random node from the list
    if hive_engine_nodes:
        selected_node = choice(hive_engine_nodes)
        print(f"Using Hive-Engine node: {selected_node}") # Print the current node
        return selected_node
    else:
        print("Error: No Hive-Engine nodes available.")
        return None


def load_token_cache():
    global cached_token_details
    # Check if the token cache file exists
    if os.path.exists(TOKEN_CACHE_FILE):
        try:
            with open(TOKEN_CACHE_FILE, 'r') as f:
                cached_token_details = json.load(f)
                print("Loaded cached token details from file.")
        except (ValueError, IOError):
            print("Error: Failed to load token cache file. Starting with an empty cache.")


def save_token_cache():
    # Save the current token details cache to a file
    try:
        with open(TOKEN_CACHE_FILE, 'w') as f:
            json.dump(cached_token_details, f)
            print("Saved token details to cache file.")
    except IOError:
        print("Error: Failed to save token cache file.")


def fetch_token_details(symbol, session: requests.Session):
    # Check if token details are already cached
    if symbol in cached_token_details:
        print(f"Token details for {symbol} found in cache.")
        return cached_token_details[symbol]

    print (f"Fetching token details for {symbol}...")
    # Fetch token details for the given symbol
    for attempt in range(max_retries):
        url = get_node()
        if not url:
            return {}

        payload = {
            "jsonrpc": "2.0",
            "id": 1,
            "method": "find",
            "params": {
                "contract": "tokens",
                "table": "tokens",
                "query": {"symbol": symbol},
                "limit": 1
            }
        }

        request = requests.Request("POST", url=url, json=payload).prepare()
        response = session.send(request, allow_redirects=False)

        print(f"Attempt {attempt+1}: Status Code: {response.status_code}, Response: {response.text}")

        if response.status_code == 200:
            try:
                data = response.json()
            except ValueError:
                print("Error: Failed to parse JSON response.")
                return {}

            if 'result' in data and data['result']:
                cached_token_details[symbol] = data['result'][0]  # Cache the token details
                save_token_cache()  # Save cache after updating
                return data['result'][0]

        print(f"Error: Failed to fetch token details for {symbol}. Status Code: {response.status_code}")
        if attempt < max_retries - 1:
            sleep(retry_delay)
        else:
            print(f"Max retries exceeded for {symbol}. Skipping.")

    return {}


def fetch_pool_details(token_pair, session: requests.Session):
    # Fetch details of the specified liquidity pool
    for attempt in range(max_retries):
        url = get_node()
        if not url:
            print("Error: No node URL available, exiting fetch_pool_details.")
            return {}

        payload = {
            "jsonrpc": "2.0",
            "id": 1,
            "method": "find",
            "params": {
                "contract": "marketpools",
                "table": "pools",
                "query": {"tokenPair": token_pair},
                "limit": 1
            }
        }

        # print(f"Attempt {attempt + 1} to fetch pool details for {token_pair} from {url}...") # Debugging statement

        try:
            request = requests.Request("POST", url=url, json=payload).prepare()
            response = session.send(request, timeout=10, allow_redirects=False)
            print(f"Received response status code: {response.status_code} for {token_pair} from {url}")

            if response.status_code == 200:
                try:
                    data = response.json()
                    # print(f"Data received for {token_pair}: {data}") # Debugging the received data
                    if 'result' in data and data['result']:
                        print(f"Successfully fetched pool details for {token_pair}")
                        return data['result'][0]
                    else:
                        print(f"Unexpected response format or empty result for {token_pair} from {url}: {data}")
                except ValueError as e:
                    print("Error: Failed to parse JSON response.")
                    print(f"Response content: {response.text}") # Print the actual response content
            else:
                print(f"Error: Failed to fetch pool details for {token_pair}. Status Code: {response.status_code}")
        except requests.exceptions.RequestException as e:
            print(f"Request exception occurred for {token_pair} from {url}: {e}")

        # Handle retries
        if attempt < max_retries - 1:
            print(f"Retrying after {retry_delay} seconds...")
            sleep(retry_delay)
        else:
            print(f"Max retries exceeded for {token_pair}. Skipping to next.")

    print(f"Returning empty details for {token_pair} after all attempts.")
    return {}


def fetch_liquidity_positions(account_name, session: requests.Session):
    # Fetch liquidity positions for the given account
    url = get_node()
    if not url:
        return {}

    payload = {
        "jsonrpc": "2.0",
        "id": 1,
        "method": "find",
        "params": {
            "contract": "marketpools",
            "table": "liquidityPositions",
            "query": {"account": account_name},
            "limit": 1000
        }
    }

    request = requests.Request("POST", url=url, json=payload).prepare()
    response = session.send(request, allow_redirects=False)

    # print("Response Status Code: ", response.status_code)
    # print("Response Content: ", response.text) # Debug

    if response.status_code != 200:
        print(f"Error: Failed to fetch data. Status Code: {response.status_code}")
        return []

    try:
        data = response.json()
    except ValueError:
        print("Error: Failed to parse JSON response.")
        return []

    return data.get('result', [])


def get_filtered_pools(account_name, filter_token):
    with requests.Session() as session:
        # Get and filter pools by the specified token
        positions = fetch_liquidity_positions(account_name, session)

        # Debug: Check fetched positions
        print(f"Fetched {len(positions)} liquidity positions for account {account_name}.")

        if not positions:
            print("No liquidity positions found for this account.")
            return []

        filtered_pools = []

        for position in positions:
            token_pair = position.get('tokenPair', 'Unknown')

            # Debug: Print each position being processed
            print(f"Processing position: {position}")

            # If filter_token is 'ALL', skip filtering; otherwise, check for the token in the pair
            if filter_token.upper() != 'ALL' and filter_token.upper() not in token_pair.upper():
                print(f"Skipping position {token_pair} as it does not match filter token {filter_token.upper()}")
                continue

            # Additional debug to see which positions pass the filter
            print(f"Including position {token_pair} with filter token {filter_token.upper()}")

            # Fetch balances and calculate user share
            shares = float(position.get('shares', '0'))
            pool_details = fetch_pool_details(token_pair, session)
            if not pool_details:
                continue

            total_shares = float(pool_details.get('totalShares', '0'))
            base_quantity = float(pool_details.get('baseQuantity', '0'))
            quote_quantity = float(pool_details.get('quoteQuantity', '0'))

            if total_shares == 0:
                print(f"Skipping position {token_pair} due to total shares being 0.")
                continue

            # Calculate user balances
            user_base_balance = (shares / total_shares) * base_quantity
            user_quote_balance = (shares / total_shares) * quote_quantity

            if ':' in token_pair:
                base_symbol, quote_symbol = token_pair.split(':')
            else:
                base_symbol, quote_symbol = "Unknown", "Unknown"

            # Fetch token details to get precision
            base_token_details = fetch_token_details(base_symbol, session)
            quote_token_details = fetch_token_details(quote_symbol, session)
            base_precision = base_token_details.get('precision', 0)
            quote_precision = quote_token_details.get('precision', 0)

            filtered_pools.append({
                "token_pair": token_pair,
                "base_symbol": base_symbol,
                "quote_symbol": quote_symbol,
                "base_balance": user_base_balance,
                "quote_balance": user_quote_balance,
                "base_precision": base_precision,
                "quote_precision": quote_precision
            })

        # Debug: Print the number of filtered pools
        # print(f"Number of filtered pools: {len(filtered_pools)}")

        return filtered_pools


def main(account_name, filter_token):
    start = time()
    # Load nodes from the external file
    load_nodes()

    # Load cached token details
    load_token_cache()

    # Fetch and print filtered pools
    pools = get_filtered_pools(account_name, filter_token)
    if pools:
        print(f"\nLiquidity Pool Positions with {filter_token.upper()} token:")
    for pool in pools:
        print(f"Token Pair: {pool['token_pair']} | Base Balance: {pool['base_balance']:.{pool['base_precision']}f} {pool['base_symbol']} | "
              f"Quote Balance: {pool['quote_balance']:.{pool['quote_precision']}f} {pool['quote_symbol']}")

    # Debug: If no pools were printed
    # if not pools:
    #    print("No matching liquidity pools found for the given filter.")
    end = time()
    elapsed_time = end - start
    print(f"Fetching pools required {elapsed_time} seconds")

if __name__ == "__main__":
    parser = argparse.ArgumentParser(description='Fetch Hive-Engine liquidity pools.')
    parser.add_argument('account_name', nargs='?', default=DEFAULT_ACCOUNT_NAME, help='Hive account name to fetch liquidity pools for.')
    parser.add_argument('filter_token', nargs='?', default=DEFAULT_FILTER_TOKEN, help="Token to filter by, or 'ALL' to list all tokens.")

    args = parser.parse_args()

    main(args.account_name, args.filter_token)

0
0
0.000
avatar
(Edited)

Hmm... interesting. It appears I didn't know how to use the session that I opened in 'main', and only used the requests.post() function instead of session.post(), this is what made my script slower. So my script still used the old way of fetching data, while it had opened a session for faster communication. I timed my old script with your enhancements:

➜ time python arc7ic_fetch.py spswhale ALL
...
Fetching pools required 10.768758773803711 seconds

real    0m10.913s
user    0m0.196s
sys 0m0.017s

My current script, that I spruced up with your enhancements:

➜ time python fetch_liquidity_pools-add_session.py spswhale ALL
...
real    0m0.336s
user    0m0.150s
sys 0m0.018s

Oh, I notice you had added a timer inside the main loop, nice touch. I am lazy and didn't bother, since I could 'time' it in the terminal. But yep, there were some repeating reuquests in the original script that made it way slower. Here's the current one:

# fetch_liquidity_pools.py
import json
import os
import argparse
import requests

# from time import sleep
import time
from random import choice  # To randomly choose a node from the list

# Hive-Engine API Node
# HIVE_ENGINE_NODE = 'https://api2.hive-engine.com/rpc/contracts'
NODES_FILE = "nodes.json"
retry_delay = 5  # seconds to wait between retries
max_retries = 3  # Maximum number of retries

# Default values
DEFAULT_ACCOUNT_NAME = "hive-engine"  # Replace with your actual Hive account name
DEFAULT_FILTER_TOKEN = (
    "BTC"  # Replace with the desired token to filter, or use 'ALL' to list all tokens
)

# File to store token details with precision
TOKEN_CACHE_FILE = "token_details_cache.json"
cached_token_details = {}
hive_engine_nodes = []


def load_nodes():
    global hive_engine_nodes
    # Check if the nodes file exists
    if os.path.exists(NODES_FILE):
        try:
            with open(NODES_FILE, "r") as f:
                hive_engine_nodes = json.load(f)
                print("Loaded Hive-Engine nodes from file.")
        except (ValueError, IOError):
            print(
                "Error: Hive-Engine nodes file is corrupted or not readable. Please re-create 'nodes.json' with the list of nodes."
            )
    else:
        print(
            "Error: Hive-Engine nodes file not found. Please create 'nodes.json' with the list of nodes."
        )
        hive_engine_nodes = []  # Ensure nodes list is empty on error


def get_node():
    # Choose a random node from the list
    if hive_engine_nodes:
        selected_node = choice(hive_engine_nodes)
        print(f"Using Hive-Engine node: {selected_node}")  # Print the current node
        return selected_node
    else:
        print("Error: No Hive-Engine nodes available.")
        return None


def load_token_cache():
    global cached_token_details
    # Check if the token cache file exists
    if os.path.exists(TOKEN_CACHE_FILE):
        try:
            with open(TOKEN_CACHE_FILE, "r") as f:
                cached_token_details = json.load(f)
                print("Loaded cached token details from file.")
        except (ValueError, IOError):
            print(
                "Error: Failed to load token cache file. Starting with an empty cache."
            )


def save_token_cache():
    # Save the current token details cache to a file
    try:
        with open(TOKEN_CACHE_FILE, "w") as f:
            json.dump(cached_token_details, f)
            print("Saved token details to cache file.")
    except IOError:
        print("Error: Failed to save token cache file.")


def fetch_token_details(symbol, session):
    # Check if token details are already cached
    if symbol in cached_token_details:
        # print(f"Token details for {symbol} found in cache.")
        return cached_token_details[symbol]

    print(f"Fetching token details for {symbol}...")
    # Fetch token details for the given symbol
    for attempt in range(max_retries):
        url = get_node()
        if not url:
            return {}

        payload = {
            "jsonrpc": "2.0",
            "id": 1,
            "method": "find",
            "params": {
                "contract": "tokens",
                "table": "tokens",
                "query": {"symbol": symbol},
                "limit": 1,
            },
        }

        response = session.post(url, json=payload)

        # print(f"Attempt {attempt+1}: Status Code: {response.status_code}, Response: {response.text}")

        if response.status_code == 200:
            try:
                data = response.json()
            except ValueError:
                print("Error: Failed to parse JSON response.")
                return {}

            if "result" in data and data["result"]:
                cached_token_details[symbol] = data["result"][
                    0
                ]  # Cache the token details
                save_token_cache()  # Save cache after updating
                return data["result"][0]

        print(
            f"Error: Failed to fetch token details for {symbol}. Status Code: {response.status_code}"
        )
        if attempt < max_retries - 1:
            time.sleep(retry_delay)
        else:
            print(f"Max retries exceeded for {symbol}. Skipping.")

    return {}


def fetch_pool_details(token_pair):
    # Fetch details of the specified liquidity pool
    for attempt in range(max_retries):
        url = get_node()
        if not url:
            print("Error: No node URL available, exiting fetch_pool_details.")
            return {}

        payload = {
            "jsonrpc": "2.0",
            "id": 1,
            "method": "find",
            "params": {
                "contract": "marketpools",
                "table": "pools",
                "query": {"tokenPair": token_pair},
                "limit": 1,
            },
        }

        print(
            f"Attempt {attempt + 1} to fetch pool details for {token_pair} from {url}..."
        )  # Debugging statement

        try:
            response = session.post(
                url, json=payload, timeout=10
            )  # Set a timeout for the request
            # print(
            #     f"Received response status code: {response.status_code} for {token_pair} from {url}"
            # )

            if response.status_code == 200:
                try:
                    data = response.json()
                    print(
                        f"Data received for {token_pair}: {data}"
                    )  # Debugging the received data
                    if "result" in data and data["result"]:
                        print(f"Successfully fetched pool details for {token_pair}")
                        return data["result"][0]
                    else:
                        print(
                            f"Unexpected response format or empty result for {token_pair} from {url}: {data}"
                        )
                except ValueError as e:
                    print(f"Error: Failed to parse JSON response: {e}.")
                    # print(f"Response content: {response.text}") # Print the actual response content
            else:
                print(
                    f"Error: Failed to fetch pool details for {token_pair}. Status Code: {response.status_code}"
                )
        except requests.exceptions.RequestException as e:
            print(f"Request exception occurred for {token_pair} from {url}: {e}")

        # Handle retries
        if attempt < max_retries - 1:
            print(f"Retrying after {retry_delay} seconds...")
            time.sleep(retry_delay)
        else:
            print(f"Max retries exceeded for {token_pair}. Skipping to next.")

    print(f"Returning empty details for {token_pair} after all attempts.")
    return {}


def fetch_all_pools(session):
    url = get_node()
    if not url:
        return []

    payload = {
        "jsonrpc": "2.0",
        "id": 1,
        "method": "find",
        "params": {
            "contract": "marketpools",
            "table": "pools",
            "query": {},
            "limit": 1000,  # Adjust limit based on how many pools exist
        },
    }

    try:
        response = session.post(url, json=payload, timeout=10)
        if response.status_code == 200:
            data = response.json()
            return data.get("result", [])
        else:
            print(
                f"Error: Failed to fetch all pools. Status Code: {response.status_code}"
            )
            return []
    except requests.exceptions.RequestException as e:
        print(f"An error occurred: {e}")
        return []


def fetch_liquidity_positions(account_name, retries=5, backoff_factor=5):
    # Fetch liquidity positions for the given account
    url = get_node()
    if not url:
        return {}

    payload = {
        "jsonrpc": "2.0",
        "id": 1,
        "method": "find",
        "params": {
            "contract": "marketpools",
            "table": "liquidityPositions",
            "query": {"account": account_name},
            "limit": 1000,
        },
    }

    for attempt in range(retries):
        try:
            response = session.post(url, json=payload, timeout=10)
            # print("Response Status Code: ", response.status_code)

            # Print the entire raw response text for debugging purposes
            # print("Raw response text: ", response.text)

            if response.status_code == 200:
                try:
                    data = response.json()
                    return data.get("result", [])
                except ValueError:
                    print("Error: Failed to parse JSON response.")
                    return []
            else:
                print(
                    f"Error: Failed to fetch data. Status Code: {response.status_code}"
                )
                return []

        except requests.exceptions.ConnectionError as e:
            print(
                f"Attempt {attempt + 1}: Connection error: {e}, retrying in {backoff_factor} seconds..."
            )
            time.sleep(backoff_factor)
        except requests.exceptions.Timeout as e:
            print(
                f"Attempt {attempt + 1}: Request timed out: {e}, retrying in {backoff_factor} seconds..."
            )
            time.sleep(backoff_factor)
        except requests.exceptions.RequestException as e:
            print(f"Attempt {attempt + 1}: An error occurred: {e}")
            return []

    print(f"Max retries exceeded. Could not fetch data for account: {account_name}")
    return []


def get_filtered_pools(account_name, filter_token, session):
    # Fetch all pools in one go
    all_pools = fetch_all_pools(session)

    # Check if pools were fetched succesfully
    if not all_pools:
        print("Error: Failed to fetch all pools.")
        return []

    pool_dict = {pool["tokenPair"]: pool for pool in all_pools}

    # Get and filter pools by the specified token
    positions = fetch_liquidity_positions(account_name)

    # Debug: Check fetched positions
    print(f"Fetched {len(positions)} liquidity positions for account {account_name}.")
    # print("Test print of all the fetched positions:")
    # print(json.dumps(positions, indent=4))  # Pretty-print the positions

    if not positions:
        print("No liquidity positions found for this account.")
        return []

    filtered_pools = []

    for position in positions:
        token_pair = position.get("tokenPair", "Unknown")

        # Debug: Print each position being processed
        # print(f"Processing position: {position}")

        # If filter_token is 'ALL', skip filtering; otherwise, check for the token in the pair
        if (
            filter_token.upper() != "ALL"
            and filter_token.upper() not in token_pair.upper()
        ):
            # print(f"Skipping position {token_pair} as it does not match filter token {filter_token.upper()}")
            continue

        # Additional debug to see which positions pass the filter
        # print(
        #    f"Including position {token_pair} with filter token {filter_token.upper()}"
        # )

        # Fetch the pool details from the all_pools dictionary
        pool_details = pool_dict.get(token_pair)
        if not pool_details:
            print(f"Warning: No pool details found for {token_pair}")
            continue

        # Calculate user balances
        shares = float(position.get("shares", "0"))
        base_quantity = float(pool_details.get("baseQuantity", "0"))
        quote_quantity = float(pool_details.get("quoteQuantity", "0"))
        total_shares = float(pool_details.get("totalShares", "0"))

        if total_shares == 0:
            print(f"Skipping position {token_pair} due to total shares being 0.")
            continue

        # Calculate user balances
        user_base_balance = (shares / total_shares) * base_quantity
        user_quote_balance = (shares / total_shares) * quote_quantity

        if ":" in token_pair:
            base_symbol, quote_symbol = token_pair.split(":")
        else:
            base_symbol, quote_symbol = "Unknown", "Unknown"

        # Fetch token details to get precision
        base_token_details = fetch_token_details(base_symbol, session)
        quote_token_details = fetch_token_details(quote_symbol, session)
        base_precision = base_token_details.get("precision", 0)
        quote_precision = quote_token_details.get("precision", 0)

        filtered_pools.append(
            {
                "token_pair": token_pair,
                "base_symbol": base_symbol,
                "quote_symbol": quote_symbol,
                "base_balance": user_base_balance,
                "quote_balance": user_quote_balance,
                "base_precision": base_precision,
                "quote_precision": quote_precision,
            }
        )

    # Debug: Print the number of filtered pools
    print(f"Number of filtered pools: {len(filtered_pools)}")

    return filtered_pools


def main(account_name, filter_token):
    # Load nodes from the external file
    load_nodes()

    # Load cached token details
    load_token_cache()

    # Create a session object
    with requests.Session() as session:
        # Fetch and print filtered pools
        pools = get_filtered_pools(account_name, filter_token, session)
        print(f"\nLiquidity Pool Positions with {filter_token.upper()} token:")
        for pool in pools:
            print(
                f"Token Pair: {pool['token_pair']} | Base Balance: {pool['base_balance']:.{pool['base_precision']}f} {pool['base_symbol']} | "
                f"Quote Balance: {pool['quote_balance']:.{pool['quote_precision']}f} {pool['quote_symbol']}"
            )

        # Debug: If no pools were printed
        if not pools:
            print("No matching liquidity pools found for the given filter.")


if __name__ == "__main__":
    # When run as a standalone script
    session = requests.Session()
    try:
        parser = argparse.ArgumentParser(
            description="Fetch Hive-Engine liquidity pools."
        )
        parser.add_argument(
            "account_name",
            nargs="?",
            default=DEFAULT_ACCOUNT_NAME,
            help="Hive account name to fetch liquidity pools for.",
        )
        parser.add_argument(
            "filter_token",
            nargs="?",
            default=DEFAULT_FILTER_TOKEN,
            help="Token to filter by, or 'ALL' to list all tokens.",
        )

        args = parser.parse_args()

        main(args.account_name, args.filter_token)
    finally:
        session.close()

Edit: I noticed that I still hadn't added the 'session' in the arguments of the functions. So I did, and updated the script here to reflect that. The problem is, that now it's slower again. The 'broken' script was a tenth of a second faster, but this one is correctly establishing the session. According to ChatGPT, I might as well not use 'session' anymore, since I only make one request. Session only makes things faster if you do multiple calls to the API. But anyway, it was a fun experiment.

Oh, and I forgot to comment about the main script I'm working on. It's actually a balancer for liquidity pairs. I might write about it as soon as I iron out the wrinkles.

0
0
0.000
avatar

I am lazy and didn't bother, since I could 'time' it in the terminal.

Well, I had no idea you could do that 😅 what you can do through terminal is still a bit oscure to me, because I'm not used to it, so I have no idea of its flexibility and power!

My current script, that I spruced up with your enhancements:

Wow, this one is crazy fast! Now I'm going to read your new post :)

Session only makes things faster if you do multiple calls to the API

It's the only thing I understand of this "Session" stuff 😂 because I still have to figure what parameters/values it keeps and if it can be used whatever I make more calls to an API or only if the calls are somehow linked or else... I have no idea tbh ahahah

0
0
0.000
avatar

Heh, it wasn't immediately clear for me either, although I had an inkling. Here's how I eventually understood the concept myself: The idea with sessions is that when you use one, you're keeping the connection to the server open, so you don't need to create a new connection for each API call. It’s like having an uninterrupted "conversation" with the server, rather than hanging up and redialing for every single message. This can save time and resources, especially when you’re making multiple API calls within the same session.

!PIZZA

0
0
0.000
avatar

Hey felixxx,

I think you might have misunderstood the meme. It’s showing that both people on the low and high ends of the spectrum often stick with simple techniques, like adding print statements, because they actually work. The middle of the curve, though, is where people tend to overcomplicate things by going through function descriptions or stepping through debuggers — which sometimes doesn't reveal the root of the problem. Print statements, though, can give you a direct, clear view of what’s actually going on in the code.

But beyond just print statements, error handling ensures that your script doesn't crash when something goes wrong — like an API failing or invalid data sneaking in. Without it, you're left in the dark, which is why it’s far from being 'lame.' It's what gives you control and the ability to recover when things don't go as expected.

As for infinite loops — they aren’t bad by nature. When you're working with something like continuously reading blockchain data, they can be the best option. The key is to manage them properly with safeguards like rate limiting and exit conditions, so they don’t end up causing performance issues. It’s all about using the right tool for the job.

Looking forward to seeing more of your insights in your upcoming blog!

0
0
0.000