add code
This commit is contained in:
parent
bbdbcec327
commit
e24bf17515
|
@ -0,0 +1,136 @@
|
|||
import os
|
||||
|
||||
import requests as rest
|
||||
from tqdm import tqdm
|
||||
|
||||
from utils.audio import decode, download
|
||||
from utils.target import getSongsMetadata
|
||||
|
||||
|
||||
def main():
|
||||
user_ok = False # the provided user exists on spotify
|
||||
|
||||
### GET SPOTIFY USER ###
|
||||
c = 0
|
||||
while not user_ok:
|
||||
spotify_username = input(
|
||||
f"what is your spotify username? {'(see https://www.spotify.com/account/profile/) ' if c%3==0 else ''}"
|
||||
)
|
||||
r = rest.get(f"https://open.spotify.com/user/{spotify_username}")
|
||||
if (r.status_code) == 200:
|
||||
user_ok = True
|
||||
elif (r.status_code) == 404:
|
||||
print(f"uh-oh. user {spotify_username} doesn't seem to exist on spotify. ")
|
||||
if c % 3 == 2:
|
||||
print()
|
||||
c += 1
|
||||
continue
|
||||
else:
|
||||
raise Exception(f"Something's wrong. ({r.status_code} spotify user check)")
|
||||
|
||||
### GET TARGET SONGS FROM USER PLAYLISTS ###
|
||||
target = getSongsMetadata(
|
||||
spotifyUser=spotify_username, verbose=False, findPrivate=False
|
||||
)
|
||||
|
||||
print()
|
||||
for playlist in target.keys():
|
||||
print(f"{playlist}: {len(target[playlist]['songs'])} songs")
|
||||
print()
|
||||
|
||||
dirname = input(f"Where do you want the songs? {os.getcwd()}/")
|
||||
|
||||
### CRAWL THROUGH BANDCAMP ###
|
||||
NG_words = ["mix", "remix", "DJ", "set"]
|
||||
for pl in tqdm(target.keys()):
|
||||
print(f"downloading {pl}")
|
||||
for s in target[pl]["songs"]:
|
||||
success = False
|
||||
|
||||
res = rest.post(
|
||||
"https://bandcamp.com/api/bcsearch_public_api/1/autocomplete_elastic",
|
||||
data={
|
||||
"search_text": f"{s['title']} {s['artist']}",
|
||||
"search_filter": "",
|
||||
"full_page": False,
|
||||
"fan_id": None,
|
||||
},
|
||||
)
|
||||
|
||||
if res.status_code >= 400:
|
||||
print(
|
||||
f"ERROR ({res.status_code}): no elastic search response from bandcamp"
|
||||
)
|
||||
continue
|
||||
|
||||
bandcamp_search_hit = res.json()["auto"]["results"]
|
||||
for i in bandcamp_search_hit:
|
||||
try:
|
||||
titleHit = s["title"] == i["name"]
|
||||
artistHit = s["artist"] == i["band_name"]
|
||||
albumHit = s["album"] == i["album_name"]
|
||||
|
||||
if sum([titleHit, artistHit, albumHit]) >= 2:
|
||||
if sum([titleHit, artistHit, albumHit]) == 3:
|
||||
pass
|
||||
elif s["title"] == "Momus" and s["artist"] == "Aleksandir":
|
||||
pass
|
||||
else: # skip false positives; no weird remixes and stuff (unless the target track is also a remix)
|
||||
spotifyTitle_includesNGword = any(
|
||||
map(s["title"].lower().__contains__, NG_words)
|
||||
)
|
||||
bandcampTitle_includesNGword = any(
|
||||
map(i["name"].lower().__contains__, NG_words)
|
||||
)
|
||||
|
||||
spotifyAlbum_includesNGword = any(
|
||||
map(s["album"].lower().__contains__, NG_words)
|
||||
)
|
||||
bandcampAlbum_includesNGword = any(
|
||||
map(
|
||||
("" if i["album_name"] == None else i["album_name"])
|
||||
.lower()
|
||||
.__contains__,
|
||||
NG_words,
|
||||
)
|
||||
)
|
||||
|
||||
if (
|
||||
not spotifyTitle_includesNGword
|
||||
and bandcampTitle_includesNGword
|
||||
) or (
|
||||
not spotifyAlbum_includesNGword
|
||||
and bandcampAlbum_includesNGword
|
||||
):
|
||||
continue
|
||||
|
||||
songUrl = i["item_url_path"]
|
||||
print(f"{s['artist']} - {s['title']} => {songUrl}")
|
||||
r = rest.get(songUrl)
|
||||
try:
|
||||
audio = decode(r.text)
|
||||
download(
|
||||
audio, destination=str(os.getcwd() + "/" + dirname)
|
||||
)
|
||||
success = True
|
||||
except:
|
||||
print(f"ERROR decoding {s['artist']} - {s['title']}")
|
||||
|
||||
break
|
||||
|
||||
else: # 2 of [title, artist, album] doesn't match; likely incorrect audio
|
||||
continue
|
||||
|
||||
except: # item is an album, not a song
|
||||
continue
|
||||
|
||||
if not success:
|
||||
print(
|
||||
f"{s['artist']} - {s['title']} / {s['album']} not found on bandcamp"
|
||||
)
|
||||
|
||||
print()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
|
@ -0,0 +1,153 @@
|
|||
import argparse
|
||||
import html
|
||||
import json
|
||||
import os
|
||||
import re
|
||||
import sys
|
||||
from collections import namedtuple
|
||||
|
||||
import requests as rest
|
||||
|
||||
URL_PATTERN = r"^(?:https?://)?((?:[^./]+)\.bandcamp.com(?:/album(?:/[^\s/]+)?)?)/?$"
|
||||
|
||||
Album = namedtuple("Album", "artist title cover release_date tracks")
|
||||
Track = namedtuple("Track", "number title url duration unreleased")
|
||||
|
||||
|
||||
def decode(content):
|
||||
"""Decode the content of a Bandcamp page.
|
||||
|
||||
Args:
|
||||
content (str): HTML content.
|
||||
|
||||
"""
|
||||
# Search the cover.
|
||||
matches = re.search('<a class="popupImage" href="([^"]*)', content)
|
||||
cover = matches.group(1)
|
||||
|
||||
# Search album data.
|
||||
matches = re.search('data-tralbum="([^"]*)"', content)
|
||||
|
||||
if not matches:
|
||||
sys.exit("error: could not find any tracks.")
|
||||
|
||||
# Get album data.
|
||||
data = matches.group(1)
|
||||
# Decode HTML.
|
||||
data = html.unescape(data)
|
||||
# Decode to JSON.
|
||||
data = json.loads(data)
|
||||
|
||||
tracks = (
|
||||
Track(
|
||||
number=track["track_num"],
|
||||
title=track["title"],
|
||||
url=track.get("file", {}).get("mp3-128"),
|
||||
duration=track["duration"],
|
||||
unreleased=track["unreleased_track"],
|
||||
)
|
||||
for track in data["trackinfo"]
|
||||
)
|
||||
|
||||
album = Album(
|
||||
artist=data["artist"],
|
||||
title=data["current"]["title"],
|
||||
cover=cover,
|
||||
release_date=data["current"]["release_date"],
|
||||
tracks=tuple(tracks),
|
||||
)
|
||||
|
||||
return album
|
||||
|
||||
|
||||
def download(album, destination, cover=True):
|
||||
"""Download an album (or song).
|
||||
|
||||
Args:
|
||||
album (Album/song): Album/song data.
|
||||
destination (str): Destination of the file(s).
|
||||
cover (bool): Allow cover downloading (default: True).
|
||||
|
||||
"""
|
||||
# Create folder.
|
||||
os.makedirs(destination, exist_ok=True)
|
||||
|
||||
print("Downloading song into %s" % destination)
|
||||
|
||||
# Notify for unreleased tracks.
|
||||
if any((track.unreleased for track in album.tracks)):
|
||||
print("\nWARNING: some tracks are not released yet! " "I will ignore them.\n")
|
||||
|
||||
# Download tracks.
|
||||
for track in album.tracks:
|
||||
if track.unreleased:
|
||||
continue
|
||||
title = re.sub(r"[\:\/\\]", "", track.title) # Strip unwanted chars.
|
||||
file = "%s. %s.mp3" % (track.number, title)
|
||||
path = os.path.join(destination, file)
|
||||
download_file(track.url, path, file)
|
||||
|
||||
# Download album cover.
|
||||
if cover:
|
||||
path = os.path.join(destination, "cover.jpg")
|
||||
download_file(album.cover, path, "Album cover")
|
||||
|
||||
|
||||
def download_file(url, target, name):
|
||||
"""Download a file.
|
||||
|
||||
Adapted from https://stackoverflow.com/q/15644964/9322103.
|
||||
|
||||
Args:
|
||||
url (str): URL of the file.
|
||||
target (str): Target path.
|
||||
name (str): Title of the download.
|
||||
|
||||
"""
|
||||
with open(target, "wb") as f:
|
||||
response = rest.get(url, stream=True)
|
||||
size = response.headers.get("content-length")
|
||||
|
||||
if size is None:
|
||||
print("%s (unavailable)" % name)
|
||||
return
|
||||
|
||||
downloaded = 0
|
||||
size = int(size)
|
||||
for data in response.iter_content(chunk_size=4096):
|
||||
downloaded += len(data)
|
||||
f.write(data)
|
||||
progress = int(20 * downloaded / size)
|
||||
sys.stdout.write(
|
||||
"\r[%s%s] %s" % ("#" * progress, " " * (20 - progress), name)
|
||||
)
|
||||
sys.stdout.flush()
|
||||
sys.stdout.write("\n")
|
||||
|
||||
|
||||
def validate_url(url):
|
||||
matches = re.search(URL_PATTERN, url)
|
||||
return "https://" + matches.group(0)
|
||||
|
||||
|
||||
def parse():
|
||||
"""Parse arguments."""
|
||||
parser = argparse.ArgumentParser(
|
||||
description="Download an album from a Bandcamp page URL."
|
||||
)
|
||||
parser.add_argument("url", type=str, help="URL of the page")
|
||||
parser.add_argument(
|
||||
"-d",
|
||||
"--destination",
|
||||
default=os.getcwd(),
|
||||
dest="destination",
|
||||
help="destination of the files (current folder by default)",
|
||||
)
|
||||
parser.add_argument(
|
||||
"-c",
|
||||
"--no-cover",
|
||||
action="store_false",
|
||||
dest="cover",
|
||||
help="ignore album cover",
|
||||
)
|
||||
return parser.parse_args()
|
|
@ -0,0 +1,180 @@
|
|||
import os
|
||||
|
||||
import spotipy
|
||||
from dotenv import load_dotenv
|
||||
from spotipy.oauth2 import SpotifyClientCredentials
|
||||
|
||||
|
||||
def loadSpotifyCredentials(username: str):
|
||||
load_dotenv()
|
||||
|
||||
cred_ok = False
|
||||
while not cred_ok:
|
||||
spotify_client_id = os.getenv("SPOTIFY_CLIENT_ID")
|
||||
spotify_client_secret = os.getenv("SPOTIFY_CLIENT_SECRET")
|
||||
|
||||
print(f"id: {spotify_client_id}")
|
||||
print(f"secret: {spotify_client_secret}")
|
||||
|
||||
if not spotify_client_id or not spotify_client_secret:
|
||||
intro = f"""
|
||||
You must provide a Spotify credential to make this app work.
|
||||
|
||||
Go to the dashboard at https://developer.spotify.com/ and log in as {username}.
|
||||
If you do not already have an app with Web API permission, create one. (Any name, description, redirect URIs etc is fine)
|
||||
Check the app settings for your client ID and secret.
|
||||
"""
|
||||
print(intro, "\n")
|
||||
|
||||
spotify_client_id = input("Enter client ID: ")
|
||||
spotify_client_secret = input("Enter client secret: ")
|
||||
|
||||
with open(".env", "w") as env:
|
||||
env.writelines(
|
||||
[
|
||||
f"SPOTIFY_CLIENT_ID={spotify_client_id}\n",
|
||||
f"SPOTIFY_CLIENT_SECRET={spotify_client_secret}\n",
|
||||
]
|
||||
)
|
||||
os.environ["SPOTIFY_CLIENT_ID"] = spotify_client_id
|
||||
os.environ["SPOTIFY_CLIENT_SECRET"] = spotify_client_secret
|
||||
|
||||
else:
|
||||
try:
|
||||
spotipy.Spotify(
|
||||
auth_manager=SpotifyClientCredentials(
|
||||
client_id=spotify_client_id, client_secret=spotify_client_secret
|
||||
)
|
||||
).user_playlists(
|
||||
user=username
|
||||
) # test auth
|
||||
|
||||
with open(".env", "w") as env:
|
||||
env.writelines(
|
||||
[
|
||||
f"SPOTIFY_CLIENT_ID={spotify_client_id}\n",
|
||||
f"SPOTIFY_CLIENT_SECRET={spotify_client_secret}\n",
|
||||
]
|
||||
)
|
||||
os.environ["SPOTIFY_CLIENT_ID"] = spotify_client_id
|
||||
os.environ["SPOTIFY_CLIENT_SECRET"] = spotify_client_secret
|
||||
|
||||
cred_ok = True
|
||||
|
||||
except:
|
||||
print(
|
||||
f"Provided ID ({spotify_client_id}) and secret ({'*' * len(spotify_client_secret)}) is invalid."
|
||||
)
|
||||
|
||||
with open(".env", "w") as env:
|
||||
env.writelines(
|
||||
[
|
||||
"SPOTIFY_CLIENT_ID=\n",
|
||||
"SPOTIFY_CLIENT_SECRET=\n",
|
||||
]
|
||||
)
|
||||
os.environ["SPOTIFY_CLIENT_ID"] = ""
|
||||
os.environ["SPOTIFY_CLIENT_SECRET"] = ""
|
||||
|
||||
continue
|
||||
|
||||
return spotify_client_id, spotify_client_secret
|
||||
|
||||
|
||||
def getSongsMetadata(
|
||||
spotifyUser: str,
|
||||
findPrivate: bool,
|
||||
verbose: bool,
|
||||
):
|
||||
if findPrivate:
|
||||
id, secret = loadSpotifyCredentials(username=spotifyUser)
|
||||
else:
|
||||
id, secret = [
|
||||
1234, # could be anything really, since the credentials aren't needed
|
||||
5678, # unless target includes private playlists
|
||||
]
|
||||
|
||||
# authorize spotify
|
||||
auth_manager = SpotifyClientCredentials(client_id=id, client_secret=secret)
|
||||
sp = spotipy.Spotify(auth_manager=auth_manager)
|
||||
|
||||
# get user playlists
|
||||
playlists = sp.user_playlists(user=spotifyUser)
|
||||
|
||||
### Pick playlist to download ###
|
||||
target_playlists = {}
|
||||
while playlists:
|
||||
for _, playlist in enumerate(playlists["items"]):
|
||||
ok = False
|
||||
while not ok:
|
||||
try:
|
||||
download = input(f'download playlist "{playlist["name"]}"? (y/n)')
|
||||
assert isinstance(download, str)
|
||||
if download[0] == "y":
|
||||
if verbose:
|
||||
print(f"downloading: {playlist['name']}")
|
||||
target_playlists[playlist["name"]] = {"uri": playlist["uri"]}
|
||||
|
||||
elif download[0] == "n":
|
||||
pass
|
||||
|
||||
else:
|
||||
print(
|
||||
f"Invalid response: {download}. Please answer with a yes or no."
|
||||
)
|
||||
continue
|
||||
|
||||
except:
|
||||
print(
|
||||
f"Invalid response.You must provide your response as a string. (provided {type(download)})"
|
||||
)
|
||||
continue
|
||||
|
||||
ok = True
|
||||
|
||||
if playlists["next"]: # pagination
|
||||
playlists = sp.next(playlists)
|
||||
else:
|
||||
playlists = None
|
||||
|
||||
if verbose:
|
||||
print("downloading songs from the following playlists:")
|
||||
for playlist in target_playlists.keys():
|
||||
print(playlist)
|
||||
|
||||
### record songs from each playlist ###
|
||||
for pl_name in target_playlists.keys():
|
||||
pl_url = target_playlists[pl_name]["uri"]
|
||||
pl_content = sp.playlist_tracks(pl_url)
|
||||
|
||||
target_playlists[pl_name]["songs"] = []
|
||||
while pl_content:
|
||||
songs = pl_content["items"]
|
||||
|
||||
for s in songs:
|
||||
if s["track"]["is_local"]:
|
||||
continue # skip if item is a local download
|
||||
|
||||
title = s["track"]["name"]
|
||||
artist = s["track"]["artists"][0]["name"]
|
||||
album = s["track"]["album"]["name"]
|
||||
try:
|
||||
isrc = s["track"]["external_ids"]["isrc"]
|
||||
except:
|
||||
print(f"WARN: no ISRC found ({artist} - {title})")
|
||||
isrc = ""
|
||||
|
||||
target_playlists[pl_name]["songs"].append(
|
||||
{"title": title, "artist": artist, "album": album, "isrc": isrc}
|
||||
)
|
||||
|
||||
if pl_content["next"]:
|
||||
pl_content = sp.next(pl_content)
|
||||
else:
|
||||
if verbose:
|
||||
print(
|
||||
f"{pl_name}: {len(songs) + pl_content['offset']}/{pl_content['total']} \n"
|
||||
)
|
||||
pl_content = None
|
||||
|
||||
return target_playlists
|
Loading…
Reference in New Issue