Source code for gemstone.util

import sys
import logging
from multiprocessing.pool import ThreadPool

def init_default_logger():
    return logging.getLogger()

[docs]def as_completed(*async_result_wrappers): """ Yields results as they become available from asynchronous method calls. Example usage :: async_calls = [service.call_method_async("do_stuff", (x,)) for x in range(25)] for async_call in gemstone.as_completed(*async_calls): print("just finished with result ", async_call.result()) :param async_result_wrappers: :py:class:`gemstone.client.structs.AsyncMethodCall` instances. :return: a generator that yields items as soon they results become available. .. versionadded:: 0.5.0 """ wrappers_copy = list(async_result_wrappers) while len(wrappers_copy): completed = list(filter(lambda x: x.finished(), wrappers_copy)) if not len(completed): continue for item in completed: wrappers_copy.remove(item) yield item
[docs]def first_completed(*async_result_wrappers): """ Just like :py:func:`gemstone.as_completed`, but returns only the first item and discards the rest. :param async_result_wrappers: :return: .. versionadded:: 0.5.0 """ wrappers_copy = list(async_result_wrappers) while True: completed = list(filter(lambda x: x.finished(), wrappers_copy)) if not len(completed): continue return completed[0]
[docs]def make_callbacks(async_result_wrappers, on_result, on_error, run_in_threads=False): """ Monitors the :py:class:`gemstone.client.remote_service.AsyncMethodCall` instances from `async_result_wrappers` and apply callbacks depending on their outcome. :param async_result_wrappers: An iterable of :py:class:`gemstone.client.remote_service.AsyncMethodCall` :param on_result: a callable that takes a single positional argument (the result) :param on_error: a callabke that takes a single positional argument (the error) :param run_in_threads: flag tha specifies if the callbacks should be called in the current thread or in background threads .. versionadded:: 0.5.0 """ if run_in_threads: pool = ThreadPool() for item in as_completed(*async_result_wrappers): if item.error(): if run_in_threads: pool.apply_async(on_error, args=(item.error(),)) else: on_error(item.error()) else: if run_in_threads: pool.apply_async(on_result, args=(item.result(),)) else: on_result(item.result())