Source code for gemstone.client.remote_service

import urllib.request
import os
import time
import random

from multiprocessing.pool import ThreadPool
import simplejson as json

from gemstone.client.structs import MethodCall, Notification, Result, BatchResult, AsyncMethodCall
from gemstone.core.structs import GenericResponse
from gemstone.errors import CalledServiceError, MethodNotFoundError, AccessDeniedError, \
    InternalErrorError, InvalidParamsError, UnknownError

[docs]class RemoteService(object): RESPONSE_CODES = { GenericResponse.ACCESS_DENIED.error["code"]: "access_denied", GenericResponse.INTERNAL_ERROR.error["code"]: "internal_error", GenericResponse.METHOD_NOT_FOUND.error["code"]: "method_not_found", GenericResponse.INVALID_PARAMS.error["code"]: "invalid_params" } def __init__(self, service_endpoint, *, authentication_method=None): self.url = service_endpoint self.authentication_method = authentication_method self._thread_pool = None def _get_thread_pool(self): # lazily initialized if not self._thread_pool: self._thread_pool = ThreadPool(os.cpu_count()) return self._thread_pool def handle_single_request(self, request_object): """ Handles a single request object and returns the raw response :param request_object: """ if not isinstance(request_object, (MethodCall, Notification)): raise TypeError("Invalid type for request_object") method_name = request_object.method_name params = request_object.params req_id = request_body = self.build_request_body(method_name, params, id=req_id) http_request = self.build_http_request_obj(request_body) try: response = urllib.request.urlopen(http_request) except urllib.request.HTTPError as e: raise CalledServiceError(e) if not req_id: return response_body = json.loads( return response_body def build_request_body(self, method_name, params, id=None): request_body = { "jsonrpc": "2.0", "method": method_name, "params": params } if id: request_body['id'] = id return request_body def build_http_request_obj(self, request_body): request = urllib.request.Request(self.url) request.add_header("Content-Type", "application/json") request.add_header("User-Agent", "gemstone-client") = json.dumps(request_body).encode() request.method = "POST" return request def call_method(self, method_name_or_object, params=None): """ Calls the ``method_name`` method from the given service and returns a :py:class:`gemstone.client.structs.Result` instance. :param method_name_or_object: The name of te called method or a ``MethodCall`` instance :param params: A list of dict representing the parameters for the request :return: a :py:class:`gemstone.client.structs.Result` instance. """ if isinstance(method_name_or_object, MethodCall): req_obj = method_name_or_object else: req_obj = MethodCall(method_name_or_object, params) raw_response = self.handle_single_request(req_obj) response_obj = Result(result=raw_response["result"], error=raw_response['error'], id=raw_response["id"], method_call=req_obj) return response_obj def call_method_async(self, method_name_or_object, params=None): """ Calls the ``method_name`` method from the given service asynchronously and returns a :py:class:`gemstone.client.structs.AsyncMethodCall` instance. :param method_name_or_object: The name of te called method or a ``MethodCall`` instance :param params: A list of dict representing the parameters for the request :return: a :py:class:`gemstone.client.structs.AsyncMethodCall` instance. """ thread_pool = self._get_thread_pool() if isinstance(method_name_or_object, MethodCall): req_obj = method_name_or_object else: req_obj = MethodCall(method_name_or_object, params) async_result_mp = thread_pool.apply_async(self.handle_single_request, args=(req_obj,)) return AsyncMethodCall(req_obj=req_obj, async_resp_object=async_result_mp) def notify(self, method_name_or_object, params=None): """ Sends a notification to the service by calling the ``method_name`` method with the ``params`` parameters. Does not wait for a response, even if the response triggers an error. :param method_name_or_object: the name of the method to be called or a ``Notification`` instance :param params: a list of dict representing the parameters for the call :return: None """ if isinstance(method_name_or_object, Notification): req_obj = method_name_or_object else: req_obj = Notification(method_name_or_object, params) self.handle_single_request(req_obj) def call_batch(self, *requests): body = [] ids = {} for item in requests: if not isinstance(item, (MethodCall, Notification)): raise TypeError("Invalid type for batch item: {}".format(item)) body.append(self.build_request_body( method_name=item.method_name, params=item.params or {}, )) if isinstance(item, MethodCall): ids[body[-1]["id"]] = item results = self.handle_batch_request(body) batch_result = BatchResult() for result in results: result_obj = Result(result["result"], result["error"], result["id"], method_call=ids[result["id"]]) batch_result.add_response(result_obj) return batch_result def handle_batch_request(self, body): request = self.build_http_request_obj(body) try: response = urllib.request.urlopen(request) except urllib.request.HTTPError as e: raise CalledServiceError(e) resp_body = json.loads( return resp_body