#!/usr/bin/env python # -*- coding: utf-8 -*- # # Rename misnamed last.fm tags # # Copyright (C) 2007 - 2008 Timur Izhbulatov # # $LastChangedDate$ # # Date: 2008-08-15 # Version: 0.0.4 # Changes: # Updated to work with new Last.fm site. # Added command line options for user name and password. # Refactoring: Got rid of many global variables. # # Date: 2008-05-26 # Version: 0.0.3 # Changes: Fixed NameError and some typos. # # Date: 2007-01-28 # Version: 0.0.2 # Changes: Fixed typo # # Date: 2007-01-22 # Version: 0.0.1 # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published # by the Free Software Foundation; version 2 only. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. __doc__ = ''' python %prog [options] Retag mistagged Last.fm tracks '''.strip() import getpass import httplib import re import os import sys import time from HTMLParser import HTMLParser from md5 import md5 from optparse import OptionParser from urllib import quote_plus as q from xml.parsers.expat import ExpatError from xmlrpclib import ServerProxy, MultiCall, ProtocolError try: set except NameError: from set import Set as set WS_HOST = 'ws.audioscrobbler.com' LASTFM_HOST = 'www.last.fm' class TracksParser(HTMLParser): ''' Finds links to tracks ''' artist_href_pattern = re.compile('^/music/[^/]+$') track_href_pattern = re.compile('^/music/[^/]+/_/[^/]+$') title_pattern = re.compile('^(.+) – (.+)$') def __init__(self, *args, **kwargs): self.track = None self.tracks = [] HTMLParser.__init__(self, *args, **kwargs) def handle_starttag(self, tag, attrs): if tag == 'td' and dict(attrs).get('class') == 'subjectCell': self.track = dict() elif self.track is not None and tag == 'a': value = dict(attrs).get('href', '') if self.artist_href_pattern.match(value): self.track['artist'] = None elif self.track_href_pattern.match(value): self.track['title'] = None def handle_endtag(self, tag): if tag == 'td' and self.track is not None: track = self.track self.tracks.append((track['artist'],track['title'])) self.track = None def handle_data(self, data): if self.track is None: return keys = sorted(self.track.keys()) if ['artist'] == keys and self.track['artist'] is None: self.track['artist'] = data return if ['artist', 'title'] == keys and self.track['artist'] and self.track['title'] is None: self.track['title'] = data return class TagsParser(HTMLParser): ''' Parses XML response with track tags ''' def __init__(self, *args, **kwargs): self.found_tag = False self.tags = [] HTMLParser.__init__(self, *args, **kwargs) def handle_starttag(self, tag, attrs): if tag == 'name': self.found_tag = True def handle_endtag(self, tag): if tag == 'name' and self.found_tag: self.found_tag = False def handle_data(self, data): if self.found_tag: print >> sys.stderr, data self.tags.append(data) def make_xml_rpc_proxy(): schema = 'http' url_template = '%(schema)s://%(host)s%(path)s' path = '/1.0/rw/xmlrpc.php' xmlrpc_url = url_template % dict(schema=schema, host=WS_HOST, path=path) proxy = ServerProxy(xmlrpc_url) return proxy class TagRenamer(object): track_tags_uri = '/1.0/user/%(user)s/tracktags.xml?%(query_string)s' tracktags_query_template = 'artist=%(artist)s&track=%(track)s' tag_uri_template = '/user/%(user)s/library/tags?tag=%(tag)s' def __init__(self): self.user = None self.password = None self.debug = False def read_credentials(self): print >> sys.stderr, 'Logging in to Last.fm' while self.user is None: self.user = raw_input('Last.fm user: ') while self.password is None: prompt = "Last.fm password (characters won't be displayed): " self.password = getpass.getpass(prompt) def make_challenge_auth(self): if self.password is None: raise ValueError('Last.fm password not set!') challenge = str(int(time.time())) # This is very important! auth = md5(md5(self.password).hexdigest() + challenge).hexdigest() return challenge, auth def rename_tag(self, old_tag, new_tag): self.read_credentials() challenge, auth = self.make_challenge_auth() proxy = make_xml_rpc_proxy() print >> sys.stderr, 'Retrieving track list for "%s" ...\t\t' % old_tag, tracks = self.get_tracks_by_tag(old_tag) if tracks: print >> sys.stderr, 'OK' else: print >> sys.stderr, 'No tracks found' return multicall = MultiCall(proxy) print >> sys.stderr, 'Preparing conversion ...\t\t\n' for artist, track in tracks: tags = set(self.get_track_tags(artist, track)) new_tags = tags.copy() if old_tag in set(tags): new_tags.remove(old_tag) new_tags.add(new_tag) sorted = list(tags) sorted.sort() if new_tags != tags: print >> sys.stderr, '%s - %s:\n\tFrom: %s' % (artist, track, sorted), sorted = list(new_tags) sorted.sort() print >> sys.stderr, '\n\tTo: ', sorted multicall.tagTrack(self.user, challenge, auth, artist, track, sorted, 'set') else: print >> sys.stderr, '%s - %s:\n\t%s\n\tNothing to do. Skipping.' % (artist, track, sorted) print print >> sys.stderr print >> sys.stderr, 'Applying changes ... \t\t', for call in multicall(): pass print >> sys.stderr, 'Done\n' def get_tracks_by_tag(self, tag): http = self.connect(LASTFM_HOST) uri = self.tag_uri_template % dict(user=q(self.user), tag=q(tag)) data = self.retrieve(http, uri).read() parser = TracksParser() parser.feed(data) parser.close() return parser.tracks def get_track_tags(self, artist, track): http = self.connect(WS_HOST) query_string = self.tracktags_query_template % dict(artist=q(artist), track=q(track)) uri = self.track_tags_uri % dict(user=q(self.user), query_string=query_string) data = self.retrieve(http, uri).read() parser = TagsParser() parser.feed(data) parser.close() return parser.tags def connect(self, host): ''' Create an HTTP connection ''' http = httplib.HTTPConnection(host=host, strict=True) if self.debug: http.debuglevel = 1 http.connect() return http def retrieve(self, http, uri): ''' Request a URI and return file-like object with retrieved content. ''' http.request('GET', uri) response = http.getresponse() return response def main(): parser = OptionParser(usage=__doc__) parser.add_option("-d", "--debug", dest='debug', action='store_true', default=False, help='Enable debugging output') parser.add_option("-u", "--user", dest='user', action='store', type='str', metavar='USER', default=None, help='Log in to Last.fm as USER') parser.add_option("-p", "--password", dest='password', action='store', metavar='PASSWORD', default=None, type='str', help='[INSECURE!] Use specified Last.fm password') options, args = parser.parse_args() if len(args) != 2: parser.print_help() sys.exit(os.EX_USAGE) old_tag = args[0] new_tag = args[1] renamer = TagRenamer() renamer.user = vars(options).get('user') renamer.password = vars(options).get('password') renamer.debug = vars(options).get('debug', False) renamer.rename_tag(old_tag, new_tag) if __name__ == '__main__': try: main() except KeyboardInterrupt: sys.exit()