Update rofi spotify script to use a better cache architecture
This commit is contained in:
@@ -1,2 +1,2 @@
|
||||
joblib==1.5.1
|
||||
diskcache==5.6.3
|
||||
spotipy==2.25.1
|
||||
|
||||
@@ -1,9 +1,7 @@
|
||||
#!/home/womax/.config/rofi/scripts/.env/bin/python
|
||||
import joblib
|
||||
from joblib import expires_after
|
||||
from diskcache import Cache
|
||||
import os
|
||||
import os.path as path
|
||||
import shutil
|
||||
import spotipy
|
||||
from spotipy.oauth2 import SpotifyPKCE
|
||||
import sys
|
||||
@@ -20,13 +18,9 @@ cache_dir = path.join(
|
||||
path.join(os.getenv("HOME"), ".cache")),
|
||||
"rofi-spotify")
|
||||
|
||||
if path.exists(cache_dir) and not path.isdir(cache_dir):
|
||||
shutil.rmtree(cache_dir)
|
||||
|
||||
if not path.exists(cache_dir):
|
||||
os.makedirs(cache_dir, mode=0o755)
|
||||
|
||||
memory = joblib.Memory(cache_dir, verbose=0)
|
||||
track_cache = Cache(path.join(cache_dir, "track_cache"),
|
||||
eviction_policy="least-recently-stored",
|
||||
size_limit=int(10e6))
|
||||
|
||||
scopes = ["user-top-read", "user-modify-playback-state"]
|
||||
|
||||
@@ -39,67 +33,111 @@ sp = spotipy.Spotify(
|
||||
cache_path=path.join(cache_dir, "credentials"))
|
||||
))
|
||||
|
||||
@memory.cache(cache_validation_callback=expires_after(days=30))
|
||||
def get_top_tracks(limit=200):
|
||||
offset = 0
|
||||
tracks = []
|
||||
while offset < limit:
|
||||
reading = min(50, limit-offset)
|
||||
tracks += sp.current_user_top_tracks(
|
||||
new_tracks = sp.current_user_top_tracks(
|
||||
limit=reading,
|
||||
offset=offset)["items"]
|
||||
if len(new_tracks) == 0:
|
||||
return tracks
|
||||
tracks += new_tracks
|
||||
offset += reading
|
||||
return tracks
|
||||
|
||||
def simplify_track_info(track):
|
||||
result = {}
|
||||
result["id"] = track["id"]
|
||||
result["fullname"] = f'{track["name"]} - {track["artists"][0]["name"]}'
|
||||
return result
|
||||
|
||||
def get_cached_tracks():
|
||||
if len(track_cache) == 0:
|
||||
tt = get_top_tracks(500)
|
||||
for track in tt:
|
||||
track_cache[track["id"]] = simplify_track_info(track)
|
||||
return [track_cache[k] for k in track_cache]
|
||||
|
||||
def search_track(query: str):
|
||||
search_rs = sp.search(query, limit=10, type='track')["tracks"]["items"]
|
||||
return [simplify_track_info(t) for t in search_rs]
|
||||
|
||||
def play_track(track_id: str, track_fullname: str, now=True):
|
||||
if track_id == "":
|
||||
return
|
||||
try:
|
||||
to_play = {
|
||||
"id": track_id,
|
||||
"fullname": track_fullname
|
||||
}
|
||||
sp.add_to_queue(f"spotify:track:{to_play['id']}")
|
||||
track_cache[to_play['id']] = to_play
|
||||
except spotipy.SpotifyException as err:
|
||||
print(f"{err.msg}", file=sys.stderr)
|
||||
return
|
||||
if now:
|
||||
time.sleep(0.1)
|
||||
sp.next_track()
|
||||
|
||||
def format_track(track):
|
||||
track_format = "{title} - {artist}\0info\x1f{track_id}\n"
|
||||
track_format = "{fullname}\0info\x1f{track_id}\n"
|
||||
return track_format.format(
|
||||
title=track["name"],
|
||||
artist=track["artists"][0]["name"],
|
||||
fullname=track["fullname"],
|
||||
track_id=track["id"]
|
||||
)
|
||||
|
||||
def display_default():
|
||||
liked_tracks = get_top_tracks()
|
||||
for item in liked_tracks:
|
||||
def display_track_list(track_list=None):
|
||||
if track_list is None:
|
||||
track_list = get_cached_tracks()
|
||||
for item in track_list:
|
||||
print(format_track(item), end="")
|
||||
|
||||
def play_track(track_id: str):
|
||||
try:
|
||||
sp.add_to_queue(f"spotify:track:{track_id}")
|
||||
except spotipy.SpotifyException as err:
|
||||
print(f"\0message\x1f{err.msg}")
|
||||
return
|
||||
time.sleep(0.1)
|
||||
sp.next_track()
|
||||
controls = {
|
||||
"Next": sp.next_track,
|
||||
"Play": sp.start_playback,
|
||||
"Pause": sp.pause_playback,
|
||||
"Previous": sp.previous_track,
|
||||
}
|
||||
|
||||
@memory.cache(cache_validation_callback=expires_after(seconds=180))
|
||||
def search_track(query: str):
|
||||
try:
|
||||
query_res = sp.search(query, limit=30, type='track')["tracks"]["items"]
|
||||
if len(query_res) == 0:
|
||||
print("\0message\x1fNo result in search")
|
||||
return
|
||||
for result in query_res:
|
||||
print(format_track(result), end="")
|
||||
except spotipy.SpotifyException as err:
|
||||
print(f"\0message\x1f{err.msg}")
|
||||
return
|
||||
def display_controls():
|
||||
for k, _ in controls.items():
|
||||
print(k)
|
||||
|
||||
def main():
|
||||
match int(os.getenv("ROFI_RETV", default=-1)):
|
||||
rofi_retv = int(os.getenv("ROFI_RETV", default=-1));
|
||||
print(rofi_retv, file=sys.stderr)
|
||||
match rofi_retv:
|
||||
case 0:
|
||||
# Initial call
|
||||
display_default()
|
||||
# enable custom hot key
|
||||
print("\0use-hot-keys\x1ftrue")
|
||||
display_controls()
|
||||
display_track_list()
|
||||
case 1:
|
||||
# Selected entry
|
||||
play_track(os.getenv("ROFI_INFO", ""))
|
||||
exit(0)
|
||||
# Default behavior to play the track immediatly
|
||||
if sys.argv[1] in controls.keys():
|
||||
controls[sys.argv[1]]()
|
||||
return
|
||||
|
||||
play_track(os.getenv("ROFI_INFO", ""), sys.argv[1])
|
||||
|
||||
case 2:
|
||||
# Custom entry
|
||||
track = sys.argv[1]
|
||||
search_track(track)
|
||||
memory.reduce_size(bytes_limit="10M")
|
||||
display_track_list(search_track(track))
|
||||
|
||||
case 10:
|
||||
# Custom hot key 1
|
||||
# Add entry to the queue
|
||||
if sys.argv[1] in controls.keys():
|
||||
return
|
||||
|
||||
play_track(os.getenv("ROFI_INFO", ""),
|
||||
sys.argv[1],
|
||||
now=False)
|
||||
case _:
|
||||
exit(1)
|
||||
|
||||
|
||||
@@ -1,11 +0,0 @@
|
||||
* {
|
||||
main-bg: #11111be6;
|
||||
main-fg: #cdd6f4ff;
|
||||
main-br: #cba6f7ff;
|
||||
main-ex: #f5e0dcff;
|
||||
select-bg: #b4befeff;
|
||||
select-fg: #11111bff;
|
||||
separatorcolor: transparent;
|
||||
border-color: transparent;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user