Files
ZiCode0 a84e0ea643 ♻️ refactor code ♻️
2025-08-25 13:28:06 +12:00

360 lines
14 KiB
Python

import asyncio
import base64
import json
import os
import random
import time
from datetime import datetime
from typing import Any, Coroutine
import requests
from yandex_music import ClientAsync, Track
from yandex_music.exceptions import NetworkError
# from yandex_music.exceptions import NetworkError
from yandex_music.playlist.playlist import Playlist
NTFY_SERVER = None
NTFY_READ_TOPIC = None
NTFY_READ_USER = None
NTFY_READ_PASS = None
SEARCH_TAG_PREFIX = 'req'
MESSAGE_CMD__PLAY_TRACK_RAW = '#play_track'
MESSAGE_CMD__PLAY_FAVORITES_RAW = '#play_favorites'
POSTFIX_URL_PLAY = ''
# POSTFIX_URL_PLAY = '?play=true'
TARGET_CONTROL_PLAYLIST_NAME = '#play'
TARGET_CONTROL_PLAYLIST_URL_FORMAT = 'https://music.yandex.com/users/{user_id}/playlists/{playlist_id}'
TRACK_URL_FORMAT = 'https://music.yandex.com/album/{album_id}/track/{track_id}'
FAVORITE_PLAYLIST_URL_FORMAT = 'https://music.yandex.com/users/{user_id}/playlists/3'
type_to_name = {
'track': 'трек', 'artist': 'исполнитель', 'album': 'альбом', 'playlist': 'плейлист',
'video': 'видео', 'user': 'пользователь', 'podcast': 'подкаст', 'podcast_episode': 'эпизод подкаста',
}
class YaMusicCustom:
def __init__(self, **kwargs):
# self.client = client
self.client = ClientAsync(token=kwargs['token'])
# await self.init()
async def init(self):
await self.client.init()
def _fix_revision_bug(self, object_with_revision_param):
object_with_revision_param.revision += 1
return object_with_revision_param
async def send_search_request_and_print_result(self, query):
query = f'{query} трек'
best_object = None
search_result = await self.client.search(query)
text = [f'[*] Результаты по запросу "{query}":']
best_result_text = ''
if search_result.best:
type_ = search_result.best.type
best_object = search_result.best.result
# text.append(f'[+] Лучший результат: <{}> {type_to_name.get(type_)}')
if type_ in ['track', 'podcast_episode']:
artists = ''
if best_object.artists:
artists = ' - ' + ', '.join(artist.name for artist in best_object.artists)
best_result_text = best_object.title + artists
elif type_ == 'artist':
best_result_text = best_object.name
elif type_ in ['album', 'podcast']:
best_result_text = best_object.title
elif type_ == 'playlist':
best_result_text = best_object.title
elif type_ == 'video':
best_result_text = f'{best_object.title} {best_object.text}'
text.append(f' [+] Лучший результат: <{type_to_name.get(type_)}> {best_result_text}')
if search_result.artists:
text.append(f' [*] Исполнителей: {search_result.artists.total}')
if search_result.albums:
text.append(f' [*] Альбомов: {search_result.albums.total}')
if search_result.tracks:
text.append(f' [*] Треков: {search_result.tracks.total}')
if search_result.playlists:
text.append(f' [*] Плейлистов: {search_result.playlists.total}')
if search_result.videos:
text.append(f' [*] Видео: {search_result.videos.total}')
text.append('')
print('\n'.join(text))
return best_result_text, best_object
async def get_create_controller_playlist(self) -> Playlist:
all_user_playlist = await self.client.users_playlists_list()
# parse and search target controller playlist
for user_playlist in all_user_playlist:
if user_playlist.custom_wave.title == TARGET_CONTROL_PLAYLIST_NAME:
return user_playlist
print('[*] Playlist was not found, recreate..')
# create and return playlist
# # get user account object
user_account = await self.client.account_status()
# # get user uid
user_uid = user_account.account.uid
# # create playlist
playlist = await self.client.users_playlists_create(user_id=user_uid, title=TARGET_CONTROL_PLAYLIST_NAME)
# return created playlist
return playlist
async def clean_playlist(self, playlist: Playlist):
playlist_tracks = await playlist.fetch_tracks_async()
if len(playlist_tracks) != 0:
await playlist.delete_tracks_async(0, len(playlist_tracks))
async def add_track_to_playlist(self, playlist: Playlist, track: Track):
await self.clean_playlist(playlist)
track_id, album_id = track.track_id.split(':')
#
async def insert_track_async(i_track_id, i_album_id, i_playlist, fix_revision_bug=False):
if fix_revision_bug:
i_playlist = self._fix_revision_bug(object_with_revision_param=i_playlist)
await i_playlist.insert_track_async(track_id=int(i_track_id),
album_id=int(i_album_id))
# add track to playlist
while True:
try:
dt_start = datetime.now()
await insert_track_async(i_track_id=track_id, i_album_id=album_id, i_playlist=playlist)
print(f' [*] Insert track time (common) : {datetime.now() - dt_start}')
break
except NetworkError:
print(f' [*] Insert track time (with error) : {datetime.now() - dt_start}')
dt_start = datetime.now()
await insert_track_async(i_track_id=track_id, i_album_id=album_id, i_playlist=playlist,
fix_revision_bug=True)
print(f' [*] Insert track time (fix revision) : {datetime.now() - dt_start}')
break
except Exception as ex:
print(ex)
await asyncio.sleep(.1)
dt_start = datetime.now()
# find track in playlist
while True:
playlist_tracks = await playlist.fetch_tracks_async()
for p_track in playlist_tracks:
if track.id == p_track.id:
print(f' [*] Check track in playlist exist time : {datetime.now() - dt_start}')
return
await asyncio.sleep(.1)
def get_controller_playlist_url(self, playlist: Playlist, turn_play=False):
url = TARGET_CONTROL_PLAYLIST_URL_FORMAT.format(user_id=playlist.owner.login,
playlist_id=playlist.kind)
return url if not turn_play else f'{url}{POSTFIX_URL_PLAY}'
def get_track_url(self, track: Track, turn_play=False):
track_id, album_id = track.track_id.split(':')
url = TRACK_URL_FORMAT.format(track_id=track_id, album_id=album_id)
return url if not turn_play else f'{url}{POSTFIX_URL_PLAY}'
class NtfyServerInterface:
def __init__(self, ntfy_server: str,
ntfy_send_topic: str,
ntfy_send_user: str,
ntfy_send_pass: str,
ntfy_read_topic: str,
ntfy_read_user: str,
ntfy_read_pass: str, ):
self.NTFY_SERVER = ntfy_server
self.NTFY_SEND_TOPIC = ntfy_send_topic
self.NTFY_SEND_USER = ntfy_send_user
self.NTFY_SEND_PASS = ntfy_send_pass
self.NTFY_READ_TOPIC = ntfy_read_topic
self.NTFY_READ_USER = ntfy_read_user
self.NTFY_READ_PASS = ntfy_read_pass
self.REPEAT_EVERY_IN_SECONDS = 1
self.MAX_TRIES_COUNT = 300
def send_text_notification(self, text_to_send: str):
url = f"{self.NTFY_SERVER}/{self.NTFY_SEND_TOPIC}"
# make request
# make auth
session_auth = (self.NTFY_SEND_USER, self.NTFY_SEND_PASS)
r_response = requests.post(url, data=text_to_send.encode('utf-8'), auth=session_auth)
r_response.raise_for_status() # Raises stored HTTPError if the request failed
return r_response.status_code
def listen_and_get_media_message(self, search_text: str) -> (str, str):
try:
tries_count = 0
while tries_count < self.MAX_TRIES_COUNT:
url = f"{self.NTFY_SERVER}/{self.NTFY_READ_TOPIC}/json?poll=1"
# make auth
session_auth = (self.NTFY_READ_USER, self.NTFY_READ_PASS)
headers = {
"Accept": "text/event-stream"
}
resp = requests.get(url, auth=session_auth, headers=headers)
last_events = [json.loads(i) for i in resp.content.decode('utf-8').split('\n') if i != '']
# enable for #debug
# pprint.pprint(last_events)
for event in last_events:
# pass
# print()
if search_text in event['tags']:
# success
# pprint.pprint(event)
return event, None
tries_count += 1
time.sleep(self.REPEAT_EVERY_IN_SECONDS)
except Exception as ex:
return None, ex
return None, Exception(
f'Timeout, the maximum number of attempts (max time {self.MAX_TRIES_COUNT * self.REPEAT_EVERY_IN_SECONDS} in sec) was called..')
def download_photo_bin_and_base64_from_ntfy_message(self, message):
# get url from message object
url = message['attachment']['url']
# fetch the content from the URL
r_response = requests.get(url)
r_response.raise_for_status() # raise error for bad status
# encode the content as base64
encoded = base64.b64encode(r_response.content)
# Convert bytes to string and return
return r_response.content, encoded.decode('utf-8')
def prepare_and_return_result(request_type: str, response_data):
return {'result': {
request_type: response_data
}}
async def main_asynched(
i_YA_TOKEN_1,
i_YA_TOKEN_2,
i_INPUT_QUERY,
i_REQUEST_TYPE,
i_NTFY_SERVER,
i_NTFY_SEND_TOPIC,
i_NTFY_SEND_USER,
i_NTFY_SEND_PASS,
):
i_ya_token = f'{i_YA_TOKEN_1}{i_YA_TOKEN_2}'
global SEARCH_TAG_PREFIX
global MESSAGE_CMD__PLAY_TRACK_RAW
ntfy_server = NtfyServerInterface(
ntfy_server=i_NTFY_SERVER,
ntfy_send_topic=i_NTFY_SEND_TOPIC,
ntfy_send_user=i_NTFY_SEND_USER,
ntfy_send_pass=i_NTFY_SEND_PASS,
ntfy_read_topic='',
ntfy_read_user='',
ntfy_read_pass=''
)
# client = YaMusicCustom(token=os.getenv('YA_TOKEN'))
client = YaMusicCustom(token=i_ya_token)
await client.init()
async def play_single_track_main(request_text: str):
# make search
search_result_text, search_result_track_object = await client.send_search_request_and_print_result(
query=request_text)
if type(search_result_track_object) not in [Track]:
print('[!] No track found, skipping..')
return
# get controller playlist
dt_start = datetime.now()
playlist_controller_object = await client.get_create_controller_playlist()
print(f' [*] Time to add track to playlist (action): {datetime.now() - dt_start}')
# add searched track
dt_start = datetime.now()
await client.add_track_to_playlist(playlist=playlist_controller_object, track=search_result_track_object)
print(f'[*] Time to add track to playlist : {datetime.now() - dt_start}')
# prepare url
playlist_url = client.get_controller_playlist_url(playlist=playlist_controller_object, turn_play=True)
track_url = client.get_track_url(track=search_result_track_object, turn_play=False)
# ### ###
#
# gen current req id
current_req_id = random.randrange(111111, 999999)
# target full request text code
target_msg_request_id = f'{SEARCH_TAG_PREFIX}{current_req_id}'
# prepare send request message
message = f'{MESSAGE_CMD__PLAY_TRACK_RAW}:{target_msg_request_id}:{playlist_url}'
#
# send remote ntfy command
ntfy_server.send_text_notification(text_to_send=message)
#
# ### ###
#
print(f'[*] Result : "playlist_url" - {playlist_url}, "track_url" - {track_url}')
async def play_favorites_track_main():
favorite_url = FAVORITE_PLAYLIST_URL_FORMAT.format(user_id=client.client.me.account.login)
#
# ### ###
#
# gen current req id
current_req_id = random.randrange(111111, 999999)
# target full request text code
target_msg_request_id = f'{SEARCH_TAG_PREFIX}{current_req_id}'
# prepare send request message
message = f'{MESSAGE_CMD__PLAY_FAVORITES_RAW}:{target_msg_request_id}:{favorite_url}'
#
# send remote ntfy command
ntfy_server.send_text_notification(text_to_send=message)
#
# ### ###
#
print(f'[*] Result : "playlist_url" - {favorite_url}')
result_data = None
if i_REQUEST_TYPE == 'play_favorites':
await play_favorites_track_main()
result_data = {"status": "success", "message": "Playing favorites"}
elif i_REQUEST_TYPE == 'play_track':
await play_single_track_main(request_text=i_INPUT_QUERY)
result_data = {"status": "success", "message": f"Playing track: {i_INPUT_QUERY}"}
# manual track play challenge
elif i_REQUEST_TYPE == '':
# infinite loop for #debug
while True:
# input_query = 'The Salmon Dance'
input_query = input('Введите название трека для поиска: ')
# play track main function
await play_single_track_main(request_text=input_query)
return prepare_and_return_result(request_type=i_REQUEST_TYPE, response_data=result_data)
def main(**kwargs) -> dict[str, dict[Any, Any]]:
return asyncio.run(main_asynched(**kwargs))