360 lines
14 KiB
Python
360 lines
14 KiB
Python
import asyncio
|
|
import base64
|
|
import json
|
|
import os
|
|
import random
|
|
import time
|
|
from datetime import datetime
|
|
from typing import Any, Coroutine
|
|
|
|
import requests
|
|
from yandex_music import ClientAsync, Track
|
|
from yandex_music.exceptions import NetworkError
|
|
# from yandex_music.exceptions import NetworkError
|
|
from yandex_music.playlist.playlist import Playlist
|
|
|
|
NTFY_SERVER = None
|
|
NTFY_READ_TOPIC = None
|
|
NTFY_READ_USER = None
|
|
NTFY_READ_PASS = None
|
|
|
|
SEARCH_TAG_PREFIX = 'req'
|
|
MESSAGE_CMD__PLAY_TRACK_RAW = '#play_track'
|
|
MESSAGE_CMD__PLAY_FAVORITES_RAW = '#play_favorites'
|
|
|
|
POSTFIX_URL_PLAY = ''
|
|
# POSTFIX_URL_PLAY = '?play=true'
|
|
TARGET_CONTROL_PLAYLIST_NAME = '#play'
|
|
TARGET_CONTROL_PLAYLIST_URL_FORMAT = 'https://music.yandex.com/users/{user_id}/playlists/{playlist_id}'
|
|
TRACK_URL_FORMAT = 'https://music.yandex.com/album/{album_id}/track/{track_id}'
|
|
FAVORITE_PLAYLIST_URL_FORMAT = 'https://music.yandex.com/users/{user_id}/playlists/3'
|
|
|
|
type_to_name = {
|
|
'track': 'трек', 'artist': 'исполнитель', 'album': 'альбом', 'playlist': 'плейлист',
|
|
'video': 'видео', 'user': 'пользователь', 'podcast': 'подкаст', 'podcast_episode': 'эпизод подкаста',
|
|
}
|
|
|
|
|
|
class YaMusicCustom:
|
|
def __init__(self, **kwargs):
|
|
# self.client = client
|
|
self.client = ClientAsync(token=kwargs['token'])
|
|
# await self.init()
|
|
|
|
async def init(self):
|
|
await self.client.init()
|
|
|
|
def _fix_revision_bug(self, object_with_revision_param):
|
|
object_with_revision_param.revision += 1
|
|
return object_with_revision_param
|
|
|
|
async def send_search_request_and_print_result(self, query):
|
|
query = f'{query} трек'
|
|
|
|
best_object = None
|
|
search_result = await self.client.search(query)
|
|
|
|
text = [f'[*] Результаты по запросу "{query}":']
|
|
|
|
best_result_text = ''
|
|
if search_result.best:
|
|
type_ = search_result.best.type
|
|
best_object = search_result.best.result
|
|
|
|
# text.append(f'[+] Лучший результат: <{}> {type_to_name.get(type_)}')
|
|
|
|
if type_ in ['track', 'podcast_episode']:
|
|
artists = ''
|
|
if best_object.artists:
|
|
artists = ' - ' + ', '.join(artist.name for artist in best_object.artists)
|
|
best_result_text = best_object.title + artists
|
|
elif type_ == 'artist':
|
|
best_result_text = best_object.name
|
|
elif type_ in ['album', 'podcast']:
|
|
best_result_text = best_object.title
|
|
elif type_ == 'playlist':
|
|
best_result_text = best_object.title
|
|
elif type_ == 'video':
|
|
best_result_text = f'{best_object.title} {best_object.text}'
|
|
|
|
text.append(f' [+] Лучший результат: <{type_to_name.get(type_)}> {best_result_text}')
|
|
|
|
if search_result.artists:
|
|
text.append(f' [*] Исполнителей: {search_result.artists.total}')
|
|
if search_result.albums:
|
|
text.append(f' [*] Альбомов: {search_result.albums.total}')
|
|
if search_result.tracks:
|
|
text.append(f' [*] Треков: {search_result.tracks.total}')
|
|
if search_result.playlists:
|
|
text.append(f' [*] Плейлистов: {search_result.playlists.total}')
|
|
if search_result.videos:
|
|
text.append(f' [*] Видео: {search_result.videos.total}')
|
|
|
|
text.append('')
|
|
print('\n'.join(text))
|
|
|
|
return best_result_text, best_object
|
|
|
|
async def get_create_controller_playlist(self) -> Playlist:
|
|
all_user_playlist = await self.client.users_playlists_list()
|
|
# parse and search target controller playlist
|
|
for user_playlist in all_user_playlist:
|
|
if user_playlist.custom_wave.title == TARGET_CONTROL_PLAYLIST_NAME:
|
|
return user_playlist
|
|
print('[*] Playlist was not found, recreate..')
|
|
# create and return playlist
|
|
# # get user account object
|
|
user_account = await self.client.account_status()
|
|
# # get user uid
|
|
user_uid = user_account.account.uid
|
|
# # create playlist
|
|
playlist = await self.client.users_playlists_create(user_id=user_uid, title=TARGET_CONTROL_PLAYLIST_NAME)
|
|
# return created playlist
|
|
return playlist
|
|
|
|
async def clean_playlist(self, playlist: Playlist):
|
|
playlist_tracks = await playlist.fetch_tracks_async()
|
|
if len(playlist_tracks) != 0:
|
|
await playlist.delete_tracks_async(0, len(playlist_tracks))
|
|
|
|
async def add_track_to_playlist(self, playlist: Playlist, track: Track):
|
|
await self.clean_playlist(playlist)
|
|
track_id, album_id = track.track_id.split(':')
|
|
|
|
#
|
|
async def insert_track_async(i_track_id, i_album_id, i_playlist, fix_revision_bug=False):
|
|
if fix_revision_bug:
|
|
i_playlist = self._fix_revision_bug(object_with_revision_param=i_playlist)
|
|
await i_playlist.insert_track_async(track_id=int(i_track_id),
|
|
album_id=int(i_album_id))
|
|
|
|
# add track to playlist
|
|
while True:
|
|
try:
|
|
dt_start = datetime.now()
|
|
await insert_track_async(i_track_id=track_id, i_album_id=album_id, i_playlist=playlist)
|
|
print(f' [*] Insert track time (common) : {datetime.now() - dt_start}')
|
|
break
|
|
except NetworkError:
|
|
print(f' [*] Insert track time (with error) : {datetime.now() - dt_start}')
|
|
dt_start = datetime.now()
|
|
await insert_track_async(i_track_id=track_id, i_album_id=album_id, i_playlist=playlist,
|
|
fix_revision_bug=True)
|
|
print(f' [*] Insert track time (fix revision) : {datetime.now() - dt_start}')
|
|
break
|
|
except Exception as ex:
|
|
print(ex)
|
|
await asyncio.sleep(.1)
|
|
|
|
dt_start = datetime.now()
|
|
# find track in playlist
|
|
while True:
|
|
playlist_tracks = await playlist.fetch_tracks_async()
|
|
for p_track in playlist_tracks:
|
|
if track.id == p_track.id:
|
|
print(f' [*] Check track in playlist exist time : {datetime.now() - dt_start}')
|
|
return
|
|
await asyncio.sleep(.1)
|
|
|
|
def get_controller_playlist_url(self, playlist: Playlist, turn_play=False):
|
|
url = TARGET_CONTROL_PLAYLIST_URL_FORMAT.format(user_id=playlist.owner.login,
|
|
playlist_id=playlist.kind)
|
|
return url if not turn_play else f'{url}{POSTFIX_URL_PLAY}'
|
|
|
|
def get_track_url(self, track: Track, turn_play=False):
|
|
track_id, album_id = track.track_id.split(':')
|
|
url = TRACK_URL_FORMAT.format(track_id=track_id, album_id=album_id)
|
|
return url if not turn_play else f'{url}{POSTFIX_URL_PLAY}'
|
|
|
|
|
|
class NtfyServerInterface:
|
|
def __init__(self, ntfy_server: str,
|
|
ntfy_send_topic: str,
|
|
ntfy_send_user: str,
|
|
ntfy_send_pass: str,
|
|
ntfy_read_topic: str,
|
|
ntfy_read_user: str,
|
|
ntfy_read_pass: str, ):
|
|
self.NTFY_SERVER = ntfy_server
|
|
self.NTFY_SEND_TOPIC = ntfy_send_topic
|
|
self.NTFY_SEND_USER = ntfy_send_user
|
|
self.NTFY_SEND_PASS = ntfy_send_pass
|
|
self.NTFY_READ_TOPIC = ntfy_read_topic
|
|
self.NTFY_READ_USER = ntfy_read_user
|
|
self.NTFY_READ_PASS = ntfy_read_pass
|
|
|
|
self.REPEAT_EVERY_IN_SECONDS = 1
|
|
self.MAX_TRIES_COUNT = 300
|
|
|
|
def send_text_notification(self, text_to_send: str):
|
|
url = f"{self.NTFY_SERVER}/{self.NTFY_SEND_TOPIC}"
|
|
# make request
|
|
# make auth
|
|
session_auth = (self.NTFY_SEND_USER, self.NTFY_SEND_PASS)
|
|
r_response = requests.post(url, data=text_to_send.encode('utf-8'), auth=session_auth)
|
|
r_response.raise_for_status() # Raises stored HTTPError if the request failed
|
|
return r_response.status_code
|
|
|
|
def listen_and_get_media_message(self, search_text: str) -> (str, str):
|
|
try:
|
|
tries_count = 0
|
|
while tries_count < self.MAX_TRIES_COUNT:
|
|
|
|
url = f"{self.NTFY_SERVER}/{self.NTFY_READ_TOPIC}/json?poll=1"
|
|
|
|
# make auth
|
|
session_auth = (self.NTFY_READ_USER, self.NTFY_READ_PASS)
|
|
headers = {
|
|
"Accept": "text/event-stream"
|
|
}
|
|
|
|
resp = requests.get(url, auth=session_auth, headers=headers)
|
|
last_events = [json.loads(i) for i in resp.content.decode('utf-8').split('\n') if i != '']
|
|
|
|
# enable for #debug
|
|
# pprint.pprint(last_events)
|
|
|
|
for event in last_events:
|
|
# pass
|
|
# print()
|
|
if search_text in event['tags']:
|
|
# success
|
|
# pprint.pprint(event)
|
|
return event, None
|
|
|
|
tries_count += 1
|
|
time.sleep(self.REPEAT_EVERY_IN_SECONDS)
|
|
except Exception as ex:
|
|
return None, ex
|
|
|
|
return None, Exception(
|
|
f'Timeout, the maximum number of attempts (max time {self.MAX_TRIES_COUNT * self.REPEAT_EVERY_IN_SECONDS} in sec) was called..')
|
|
|
|
def download_photo_bin_and_base64_from_ntfy_message(self, message):
|
|
# get url from message object
|
|
url = message['attachment']['url']
|
|
# fetch the content from the URL
|
|
r_response = requests.get(url)
|
|
r_response.raise_for_status() # raise error for bad status
|
|
|
|
# encode the content as base64
|
|
encoded = base64.b64encode(r_response.content)
|
|
|
|
# Convert bytes to string and return
|
|
return r_response.content, encoded.decode('utf-8')
|
|
|
|
|
|
def prepare_and_return_result(request_type: str, response_data):
|
|
return {'result': {
|
|
request_type: response_data
|
|
}}
|
|
|
|
|
|
async def main_asynched(
|
|
i_YA_TOKEN_1,
|
|
i_YA_TOKEN_2,
|
|
i_INPUT_QUERY,
|
|
i_REQUEST_TYPE,
|
|
i_NTFY_SERVER,
|
|
i_NTFY_SEND_TOPIC,
|
|
i_NTFY_SEND_USER,
|
|
i_NTFY_SEND_PASS,
|
|
):
|
|
i_ya_token = f'{i_YA_TOKEN_1}{i_YA_TOKEN_2}'
|
|
global SEARCH_TAG_PREFIX
|
|
global MESSAGE_CMD__PLAY_TRACK_RAW
|
|
|
|
ntfy_server = NtfyServerInterface(
|
|
ntfy_server=i_NTFY_SERVER,
|
|
ntfy_send_topic=i_NTFY_SEND_TOPIC,
|
|
ntfy_send_user=i_NTFY_SEND_USER,
|
|
ntfy_send_pass=i_NTFY_SEND_PASS,
|
|
ntfy_read_topic='',
|
|
ntfy_read_user='',
|
|
ntfy_read_pass=''
|
|
)
|
|
# client = YaMusicCustom(token=os.getenv('YA_TOKEN'))
|
|
client = YaMusicCustom(token=i_ya_token)
|
|
await client.init()
|
|
|
|
async def play_single_track_main(request_text: str):
|
|
# make search
|
|
search_result_text, search_result_track_object = await client.send_search_request_and_print_result(
|
|
query=request_text)
|
|
|
|
if type(search_result_track_object) not in [Track]:
|
|
print('[!] No track found, skipping..')
|
|
return
|
|
|
|
# get controller playlist
|
|
dt_start = datetime.now()
|
|
playlist_controller_object = await client.get_create_controller_playlist()
|
|
print(f' [*] Time to add track to playlist (action): {datetime.now() - dt_start}')
|
|
|
|
# add searched track
|
|
dt_start = datetime.now()
|
|
await client.add_track_to_playlist(playlist=playlist_controller_object, track=search_result_track_object)
|
|
print(f'[*] Time to add track to playlist : {datetime.now() - dt_start}')
|
|
|
|
# prepare url
|
|
playlist_url = client.get_controller_playlist_url(playlist=playlist_controller_object, turn_play=True)
|
|
track_url = client.get_track_url(track=search_result_track_object, turn_play=False)
|
|
|
|
# ### ###
|
|
#
|
|
# gen current req id
|
|
current_req_id = random.randrange(111111, 999999)
|
|
# target full request text code
|
|
target_msg_request_id = f'{SEARCH_TAG_PREFIX}{current_req_id}'
|
|
# prepare send request message
|
|
message = f'{MESSAGE_CMD__PLAY_TRACK_RAW}:{target_msg_request_id}:{playlist_url}'
|
|
#
|
|
# send remote ntfy command
|
|
ntfy_server.send_text_notification(text_to_send=message)
|
|
#
|
|
# ### ###
|
|
#
|
|
print(f'[*] Result : "playlist_url" - {playlist_url}, "track_url" - {track_url}')
|
|
|
|
async def play_favorites_track_main():
|
|
favorite_url = FAVORITE_PLAYLIST_URL_FORMAT.format(user_id=client.client.me.account.login)
|
|
#
|
|
# ### ###
|
|
#
|
|
# gen current req id
|
|
current_req_id = random.randrange(111111, 999999)
|
|
# target full request text code
|
|
target_msg_request_id = f'{SEARCH_TAG_PREFIX}{current_req_id}'
|
|
# prepare send request message
|
|
message = f'{MESSAGE_CMD__PLAY_FAVORITES_RAW}:{target_msg_request_id}:{favorite_url}'
|
|
#
|
|
# send remote ntfy command
|
|
ntfy_server.send_text_notification(text_to_send=message)
|
|
#
|
|
# ### ###
|
|
#
|
|
print(f'[*] Result : "playlist_url" - {favorite_url}')
|
|
|
|
result_data = None
|
|
if i_REQUEST_TYPE == 'play_favorites':
|
|
await play_favorites_track_main()
|
|
result_data = {"status": "success", "message": "Playing favorites"}
|
|
elif i_REQUEST_TYPE == 'play_track':
|
|
await play_single_track_main(request_text=i_INPUT_QUERY)
|
|
result_data = {"status": "success", "message": f"Playing track: {i_INPUT_QUERY}"}
|
|
|
|
# manual track play challenge
|
|
elif i_REQUEST_TYPE == '':
|
|
# infinite loop for #debug
|
|
while True:
|
|
# input_query = 'The Salmon Dance'
|
|
input_query = input('Введите название трека для поиска: ')
|
|
# play track main function
|
|
await play_single_track_main(request_text=input_query)
|
|
|
|
return prepare_and_return_result(request_type=i_REQUEST_TYPE, response_data=result_data)
|
|
|
|
|
|
def main(**kwargs) -> dict[str, dict[Any, Any]]:
|
|
return asyncio.run(main_asynched(**kwargs))
|