# -*- coding: utf-8 -*- import os import sys import pickle import xbmc import xbmcplugin import xbmcaddon import xbmcgui from .request import Request from .listitem import ListItem from .urls import UrlRule, NotFoundException, AmbiguousUrlException from .storage import TimedStorage if sys.version_info[0] >= 3: unicode = str basestring = (bytes, str) else: long = int try: from xbmcvfs import translatePath except ImportError: from xbmc import translatePath class Plugin(object): def __init__(self, name=None, log_func=None, pre_dispatch=None, post_dispatch=None): self._routes = [] self._request = None self._view_functions = {} self._log_func = log_func self._pre_dispatch = pre_dispatch self._post_dispatch = post_dispatch self._viewmode = 0 self._addon = xbmcaddon.Addon() self._addon_id = self._addon.getAddonInfo('id') self._name = self._addon.getAddonInfo('name') self._contenttype = "" self._delay = 350 self._goto_top = True self._end_of_directory = False self._force_update = False self._rule = None self._addondata_path = translatePath('special://profile/addon_data/%s/' % self._addon_id) self._storage_path = os.path.join(self._addondata_path, ".storage/") if not os.path.isdir(self._storage_path): os.makedirs(self._storage_path) types = { 'video': 'video', 'audio': 'music', 'image': 'pictures', } self._info_type = types.get(self._addon_id.split('.')[1], 'video') self.added_items = [] @property def id(self): '''The id for the addon instance.''' return self._addon_id @property def storage_path(self): '''A full path to the storage folder for this plugin's addon data.''' return self._storage_path @property def addondata_path(self): return self._addondata_path @property def handle(self): return self.request.handle @property def addon(self): return self._addon @property def request(self): if self._request is None: raise Exception('It seems the current request has not been ' 'initialized yet. Please ensure that ' '`plugin.run()` has been called before attempting ' 'to access the current request.') return self._request @property def name(self): return self._name @property def info_type(self): return self._info_type def info(self, key): return self.addon.getAddonInfo(key) def log(self, message): if self._log_func: self._log_func(message) def get_setting(self, key, converter=None, choices=None): '''Returns the settings value for the provided key. If converter is str, unicode, bool or int the settings value will be returned converted to the provided type. If choices is an instance of list or tuple its item at position of the settings value be returned. .. note:: It is suggested to always use unicode for text-settings because else xbmc returns utf-8 encoded strings. :param key: The id of the setting defined in settings.xml. :param converter: (Optional) Choices are str, unicode, bool and int. :param converter: (Optional) Choices are instances of list or tuple. Examples: * ``plugin.get_setting('per_page', int)`` * ``plugin.get_setting('password', unicode)`` * ``plugin.get_setting('force_viewmode', bool)`` * ``plugin.get_setting('content', choices=('videos', 'movies'))`` ''' # TODO: allow pickling of settings items? # TODO: STUB THIS OUT ON CLI value = self.addon.getSetting(id=key) if converter is str: return value elif converter is unicode: return value.decode('utf-8') elif converter is bool: return value == 'true' elif converter is int: return int(value) elif isinstance(choices, (list, tuple)): return choices[int(value)] elif converter is None: return value else: raise TypeError('Acceptable converters are str, unicode, bool and ' 'int. Acceptable choices are instances of list ' ' or tuple.') def set_setting(self, key, val): return self.addon.setSetting(id=key, value=val) def open_settings(self): self.addon.openSettings() def _listitemify(self, item): info_type = self.info_type if hasattr(self, 'info_type') else 'video' if not hasattr(item, 'as_tuple'): if 'info_type' not in item.keys(): item['info_type'] = info_type item = ListItem.from_dict(**item) return item def add_items(self, items): _items = [self._listitemify(item) for item in items] tuples = [item.as_tuple() for item in _items] xbmcplugin.addDirectoryItems(self.handle, tuples, len(tuples)) self.added_items.extend(_items) return _items def url_for(self, endpoint, **items): try: rule = self._view_functions[endpoint] except KeyError: try: rule = (rule for rule in self._view_functions.values() if rule.view_func == endpoint).next() except StopIteration: raise NotFoundException('%s doesn\'t match any known patterns.' % endpoint) if not rule: raise AmbiguousUrlException pathqs = rule.make_path_qs(items) return 'plugin://%s%s' % (self._addon_id, pathqs) def set_force_update(self, value): self._force_update = value def set_delay(self, value): try: self._delay = int(value) except: self._delay = 350 def set_resolved_url(self, item=None, subtitles=None): '''Takes a url or a listitem to be played. Used in conjunction with a playable list item with a path that calls back into your addon. :param item: A playable list item or url. Pass None to alert XBMC of a failure to resolve the item. .. warning:: When using set_resolved_url you should ensure the initial playable item (which calls back into your addon) doesn't have a trailing slash in the URL. Otherwise it won't work reliably with XBMC's PlayMedia(). :param subtitles: A URL to a remote subtitles file or a local filename for a subtitles file to be played along with the item. ''' if self._end_of_directory: raise Exception('Current XBMC handle has been removed. Either ' 'set_resolved_url(), end_of_directory(), or ' 'finish() has already been called.') self._end_of_directory = True succeeded = True if item is None: succeeded = False item = {} if isinstance(item, basestring): # caller is passing a url instead of an item dict item = {'path': item} item = self._listitemify(item) item.set_played(True) xbmcplugin.setResolvedUrl(self.handle, succeeded, item.as_xbmc_listitem()) return [item] def _parse_request(self, url=None, handle=None): if url is None: url = sys.argv[0] if len(sys.argv) == 3: url += sys.argv[2] if handle is None: handle = sys.argv[1] return Request(url, handle) def route(self, url_rule, name=None, options=None, cache=True, update=False, content_type="", view_mode=0, goto_top=True): def decorator(f): view_name = name or f.__name__ self.add_url_rule(url_rule, f, name=view_name, options=options, cache=cache, update=update, content_type=content_type, view_mode=view_mode, goto_top=goto_top) return f return decorator def add_url_rule(self, url_rule, view_func, name, options=None, cache=True, update=False, content_type="", view_mode=0, goto_top=True): rule = UrlRule(url_rule, view_func, name, options=options, cache=cache, update=update, content_type=content_type, view_mode=view_mode, goto_top=goto_top) if name in self._view_functions.keys(): self._view_functions[name] = None else: self._view_functions[name] = rule self._routes.append(rule) # return rule def set_content(self, type): self._contenttype = type def set_view(self, mode): self._viewmode = int(mode) def set_goto_top(self, value): self._goto_top = value def redirect(self, url): new_request = self._parse_request(url=url, handle=self.request.handle) listitems, rule = self._dispatch(new_request.path) return listitems def get_storage(self, name='main', file_format='pickle', TTL=None): if not hasattr(self, '_unsynced_storages'): self._unsynced_storages = {} if not name[1] == ".": name = "." + name filename = os.path.join(self.storage_path, name) try: storage = self._unsynced_storages[filename] except KeyError: if TTL: TTL = timedelta(minutes=TTL) try: storage = TimedStorage(filename, file_format, TTL) except ValueError: choices = ['Clear storage', 'Cancel'] ret = xbmcgui.Dialog().select('A storage file is corrupted. It' ' is recommended to clear it.', choices) if ret == 0: os.remove(filename) storage = TimedStorage(filename, file_format, TTL) else: raise Exception('Corrupted storage file at %s' % filename) self._unsynced_storages[filename] = storage return storage def end_of_directory(self, succeeded=True, update_listing=False, cache_to_disc=True): self._update_listing = update_listing if not self._end_of_directory: self._end_of_directory = True # Finalize the directory items if succeeded: if self._contenttype: xbmcplugin.setContent(self.handle, self._contenttype) if self._viewmode > 0: xbmc.executebuiltin("Container.SetViewMode(" + str(self._viewmode) + ")") res = xbmcplugin.endOfDirectory(self.handle, succeeded, update_listing, cache_to_disc) if succeeded: if self._delay > 75: xbmc.sleep(self._delay - 75) if self._goto_top and (not self._rule or self._rule.goto_top): xbmc.executebuiltin("Action(firstpage)") if self._delay > 0: xbmc.sleep(75) return res assert False, 'Already called endOfDirectory.' def finish(self, items=None, sort_methods=None, succeeded=True, update_listing=False, cache_to_disc=True, view_mode=None): if items: self.add_items(items) if sort_methods: for sort_method in sort_methods: if not isinstance(sort_method, basestring) and hasattr(sort_method, '__len__'): self.add_sort_method(*sort_method) else: self.add_sort_method(sort_method) # Finalize the directory items self.end_of_directory(succeeded, update_listing, cache_to_disc) # Return the cached list of all the list items that were added return self.added_items def _dispatch(self, path): for rule in self._routes: try: view_func, items = rule.match(path) except NotFoundException: continue self._rule = rule pre = True if self._pre_dispatch: pre = self._pre_dispatch(self) if pre is None: pre = True if pre: listitems = view_func(**items) else: listitems = None if self._post_dispatch: self._post_dispatch(self, rule) if rule.content_type: self.set_content(rule.content_type) if rule.view_mode: self.set_view(rule.view_mode) if not self._end_of_directory and self.handle >= 0: if listitems is None: self.finish(succeeded=False, cache_to_disc=rule.cache, update_listing=True if self._force_update else rule.update) else: listitems = self.finish(listitems, cache_to_disc=rule.cache, update_listing=True if self._force_update else rule.update) return listitems, rule raise NotFoundException('No matching view found for %s' % path) def notify(self, msg='', title=None, delay=5000, image=''): if title is None: title = self._name xbmc.executebuiltin(u'Notification("%s", "%s", "%s", "%s")' % (msg, title, delay, image)) def dialog(self, msg="", title=None): if title is None: title = self._name xbmcgui.Dialog().ok(title, msg) def yesno(self, msg="", title=None): if title is None: title = self._name return xbmcgui.Dialog().yesno(title, msg) def keyboard(self, default=None, heading=None, hidden=False): '''Displays the keyboard input window to the user. If the user does not cancel the modal, the value entered by the user will be returned. :param default: The placeholder text used to prepopulate the input field. :param heading: The heading for the window. Defaults to the current addon's name. If you require a blank heading, pass an empty string. :param hidden: Whether or not the input field should be masked with stars, e.g. a password field. ''' if heading is None: heading = self._name if default is None: default = '' keyboard = xbmc.Keyboard(default, heading, hidden) keyboard.doModal() if keyboard.isConfirmed(): return keyboard.getText() return None def run(self): self._request = self._parse_request() items, rule = self._dispatch(self.request.path) if hasattr(self, '_unsynced_storages'): for storage in self._unsynced_storages.values(): storage.close() del self._unsynced_storages """ wasFinished = self._finished if items: self.add_items(items) if not wasFinished: if self._contenttype: xbmcplugin.setContent(self.handle, self._contenttype) if self._viewmode > 0: xbmc.executebuiltin("Container.SetViewMode(" + str(self._viewmode) + ")") if self.handle > 0 and not wasFinished and items != None: self.finish() xbmcplugin.endOfDirectory(self.handle, succeeded=True, updateListing=rule.update, cacheToDisc=rule.cache) if not wasFinished: if items: xbmc.sleep(300) if self._contenttype: xbmcplugin.setContent(self.handle, self._contenttype) if self._viewmode > 0: xbmc.executebuiltin("Container.SetViewMode(" + str(self._viewmode) + ")") """ return items