import logging
import os
import functools
import random
import argparse
import threading
import sys
from abc import ABC
from concurrent.futures import ThreadPoolExecutor
from tornado.web import StaticFileHandler
from tornado.ioloop import IOLoop, PeriodicCallback
from tornado.web import Application
from tornado.log import enable_pretty_logging
from gemstone.config.configurable import Configurable
from gemstone.config.configurator import CommandLineConfigurator
from gemstone.core.stats import DefaultStatsContainer, DummyStatsContainer
from gemstone.discovery.cache import ServiceDiscoveryCache
from gemstone.errors import ServiceConfigurationError
from gemstone.core.handlers import TornadoJsonRpcHandler
from gemstone.core.decorators import exposed_method
from gemstone.client.remote_service import RemoteService
from gemstone.core.container import Container
__all__ = [
'MicroService'
]
IS_WINDOWS = sys.platform.startswith("win32")
[docs]class MicroService(Container):
#: The name of the service. Is required.
name = None
#: The host where the service will listen
host = "127.0.0.1"
#: The port where the service will bind
port = 8000
#: The url where the service can be accessed by other microservices.
#: Useful when using a service registry.
accessible_at = None
#: The path in the URL where the microservice JSON RPC endpoint will be accessible.
endpoint = "/api"
#: Template directory used by the created Tornado Application.
#: Useful when you plan to add web application functionality
#: to the microservice.
template_dir = "."
#: A list of directories where the static files will looked for.
static_dirs = []
#: A list of extra Tornado handlers that will be included in the
#: created Tornado application.
extra_handlers = []
#: A list of service registry complete URL which will enable service auto-discovery.
service_registry_urls = []
#: Interval (in seconds) when the microservice will ping all the service registries.
service_registry_ping_interval = 30
discovery_strategies = [
]
_remote_service_cache = ServiceDiscoveryCache(3600)
#: Specifies if should record statistics
use_statistics = False
#: A list of (callable, time_in_seconds) that will enable periodic task execution.
periodic_tasks = []
#: A list of Event transports that will enable the Event dispatching feature.
event_transports = []
#: Flag that if set to True, will disable the configurable sub-framework.
skip_configuration = False
#: A list of configurable objects that allows the service's running parameters to
#: be changed dynamically without changing its code.
configurables = [
Configurable("port",
template=lambda x: random.randint(8000, 65000) if "random" else int(x)),
Configurable("host"),
Configurable("accessible_at"),
Configurable("endpoint")
]
#: A list of configurator objects that will extract in order values for
#: the defined configurators
configurators = [
CommandLineConfigurator()
]
#: a list of ``gemstone.core.modules.Module`` instances
modules = [
]
# in some situations, on Windows the event loop may hang
# http://stackoverflow.com/questions/33634956/why-would-a-timeout-avoid-a-tornado-hang/33643631#33643631
default_periodic_tasks = [(lambda: None, 0.5)] if IS_WINDOWS else []
#: How many methods can be executed in parallel at the same time. Note that every blocking
#: method is executed in a ``concurrent.features.ThreadPoolExecutor``
max_parallel_blocking_tasks = os.cpu_count()
_executor = None
def __init__(self, io_loop=None):
"""
The base class for implementing microservices.
:param io_loop: A :py:class:`tornado.ioloop.IOLoop` instance -
can be used to share the same io loop between
multiple microservices running from the same process.
"""
self.app = None
self._periodic_tasks_objs = []
self.logger = self.get_logger()
self.registries = []
# name
if self.name is None:
raise ServiceConfigurationError("No name defined for the microservice")
self.logger.debug("Service name: {}".format(self.name))
# endpoint
if self.accessible_at is None:
self.accessible_at = "http://{host}:{port}{endpoint}".format(
host=self.host, port=self.port, endpoint=self.endpoint
)
# methods
self.methods = {}
# event handlers
self.event_handlers = {}
# executor
if self.max_parallel_blocking_tasks <= 0:
raise ServiceConfigurationError("Invalid max_parallel_blocking_tasks value")
self._executor = ThreadPoolExecutor(self.max_parallel_blocking_tasks)
# ioloop
self.io_loop = io_loop or IOLoop.current()
@exposed_method()
def get_service_specs(self):
"""
A default exposed method that returns the current microservice specifications. The returned information is
in the format:
::
{
"host": "127.0.0.1",
"port": 9000,
"name": "service.example",
"max_parallel_blocking_tasks": 8,
"methods": {
"get_service_specs": "...",
"method1": "method1's docstring",
...
}
}
:return:
"""
return {
"host": self.host,
"port": self.port,
"accessible_at": self.accessible_at,
"name": self.name,
"max_parallel_blocking_tasks": self.max_parallel_blocking_tasks,
"methods": {m: self.methods[m].__doc__ for m in self.methods},
"event_transports": [str(t) for t in self.event_transports],
"events_handled": {ev_name: ev_handle.__doc__ for ev_name, ev_handle in
self.event_handlers.items()}
}
# region Can be overridden by user
[docs] def on_service_start(self):
"""
Override this method to do a set of actions when the service starts
:return: ``None``
"""
pass
def before_method_call(self, request_object):
"""
Called before every RPC method call
:param request_object: a :py:class:`gemstone.core.structs.JsonRpcRequest` instance.
"""
pass
def after_method_call(self, request_object, response_object):
"""
Called after every RPC **successful** method call. If ``response_object`` instance is
modified the response of the actual call is modified
:param request_object: a :py:class:`gemstone.core.structs.JsonRpcRequest` instance.
:param response_object: a :py:class:`gemstone.core.structs.JsonRpcResponse` instance.
:return:
"""
pass
def on_failed_method_call(self, request_object, response_object):
# TODO: make the json rpc handler use this
pass
def authenticate_request(self, handler):
"""
Based on the current request handler, checks if the request if valid.
:param handler: a JsonRpcRequestHandler instance for the current request
:return: False or None if the method call should be denied, or something whose
boolean value is True otherwise.
.. versionadded:: 0.10
"""
return True
[docs] def get_logger(self):
"""
Override this method to designate the logger for the application
:return: a :py:class:`logging.Logger` instance
"""
enable_pretty_logging()
return logging.getLogger("tornado.application")
# endregion
# region Can be called by user
[docs] def get_service(self, name):
"""
Locates a remote service by name. The name can be a glob-like pattern
(``"project.worker.*"``). If multiple services match the given name, a
random instance will be chosen. There might be multiple services that
match a given name if there are multiple services with the same name
running, or when the pattern matches multiple different services.
.. todo::
Make this use self.io_loop to resolve the request. The current
implementation is blocking and slow
:param name: a pattern for the searched service.
:return: a :py:class:`gemstone.RemoteService` instance
:raises ValueError: when the service can not be located
:raises ServiceConfigurationError: when there is no configured discovery strategy
"""
if not self.discovery_strategies:
raise ServiceConfigurationError("No service registry available")
cached = self._remote_service_cache.get_entry(name)
if cached:
return cached.remote_service
for strategy in self.discovery_strategies:
endpoints = strategy.locate(name)
if not endpoints:
continue
random.shuffle(endpoints)
for url in endpoints:
try:
service = RemoteService(url)
self._remote_service_cache.add_entry(name, service)
return service
except ConnectionError:
continue # could not establish connection, try next
raise ValueError("Service could not be located")
def get_io_loop(self):
return self.io_loop or IOLoop.current()
[docs] def start_thread(self, target, args, kwargs):
"""
Shortcut method for starting a thread.
:param target: The function to be executed.
:param args: A tuple or list representing the positional arguments for the thread.
:param kwargs: A dictionary representing the keyword arguments.
.. versionadded:: 0.5.0
"""
thread_obj = threading.Thread(target=target, args=args, kwargs=kwargs, daemon=True)
thread_obj.start()
[docs] def emit_event(self, event_name, event_body, *, broadcast=True):
"""
Publishes an event of type ``event_name`` to all subscribers, having the body
``event_body``. The event is pushed through all available event transports.
The event body must be a Python object that can be represented as a JSON.
:param event_name: a ``str`` representing the event type
:param event_body: a Python object that can be represented as JSON.
:param broadcast: flag that specifies if the event should be received by
all subscribers or only by one
.. versionadded:: 0.5.0
.. versionchanged:: 0.10.0
Added parameter broadcast
"""
for transport in self.event_transports:
transport.emit_event(event_name, event_body, broadcast=broadcast)
[docs] def start(self):
"""
The main method that starts the service. This is blocking.
"""
self._initial_setup()
self.on_service_start()
self.app = self.make_tornado_app()
enable_pretty_logging()
self.app.listen(self.port, address=self.host)
for k, v in self.get_current_configuration().items():
self.logger.debug("{}={}".format(k, v))
self._start_periodic_tasks()
# starts the event handlers
self._initialize_event_handlers()
self._start_event_handlers()
try:
self.io_loop.start()
except RuntimeError:
# TODO : find a way to check if the io_loop is running before trying to start it
# this method to check if the loop is running is ugly
pass
def _start_periodic_tasks(self):
for periodic_task in self._periodic_task_iter():
self.logger.debug("Starting periodic task {}".format(periodic_task))
periodic_task.start()
[docs] def get_current_configuration(self):
return {
"name": self.name,
"host": self.host,
"port": self.port,
"endpoint": self.endpoint,
"accessible_at": self.accessible_at,
"autodiscovery": {
"service_registry_urls": self.service_registry_urls,
"service_registry_ping_interval": self.service_registry_ping_interval,
},
"max_parallel_blocking_tasks": self.max_parallel_blocking_tasks,
"webapp": {
"template_dir": self.template_dir,
"static_dirs": self.static_dirs,
"extra_handlers": [str(h) for h in self.extra_handlers]
},
"event": {
"event_transports": [str(t) for t in self.event_transports]
},
"configuration": {
"configurables": [str(c) for c in self.configurables],
"configurators": [str(c) for c in self.configurators]
}
}
# endregion
def _initial_setup(self):
if not self.skip_configuration:
self._prepare_configurators()
self._activate_configurators()
# prepare modules
for module in self.modules:
module.set_microservice(self)
self._gather_exposed_methods()
self._gather_event_handlers()
# initializing statistics handler
if self.use_statistics:
self.stats = DefaultStatsContainer()
self.methods["statistics"] = lambda: self.stats.as_json()
else:
self.stats = DummyStatsContainer()
def _initialize_event_handlers(self):
for event_transport in self.event_transports:
self.logger.debug("Initializing transport {}".format(event_transport))
for event_name, event_handler in self.event_handlers.items():
self.logger.debug("Setting handler for {}".format(event_name))
event_transport.register_event_handler(event_handler, event_name)
def _start_event_handlers(self):
for event_transport in self.event_transports:
self.start_thread(
target=event_transport.start_accepting_events, args=(), kwargs={}
)
[docs] def make_tornado_app(self):
"""
Creates a :py:class`tornado.web.Application` instance that respect the
JSON RPC 2.0 specs and exposes the designated methods. Can be used
in tests to obtain the Tornado application.
:return: a :py:class:`tornado.web.Application` instance
"""
handlers = [
(self.endpoint, TornadoJsonRpcHandler, {"microservice": self})
]
self._add_extra_handlers(handlers)
self._add_static_handlers(handlers)
return Application(handlers, template_path=self.template_dir)
def _add_extra_handlers(self, handlers):
"""
Adds the extra handler (defined by the user)
:param handlers: a list of :py:class:`tornado.web.RequestHandler` instances.
:return:
"""
extra_handlers = [(h[0], h[1], {"microservice": self}) for h in self.extra_handlers]
handlers.extend(extra_handlers)
def _add_static_handlers(self, handlers):
"""
Creates and adds the handles needed for serving static files.
:param handlers:
"""
for url, path in self.static_dirs:
handlers.append((url.rstrip("/") + "/(.*)", StaticFileHandler, {"path": path}))
def _gather_exposed_methods(self):
"""
Searches for the exposed methods in the current microservice class. A method is considered
exposed if it is decorated with the :py:func:`gemstone.public_method` or
:py:func:`gemstone.private_api_method`.
"""
self._extract_methods_from_container(self)
for module in self.modules:
self._extract_methods_from_container(module)
def _extract_methods_from_container(self, container):
for item in container.get_exposed_methods():
exposed_name = getattr(item, '_exposed_name', item.__name__)
if exposed_name in self.methods:
raise ValueError(
"Cannot expose two methods under the same name: '{}'".format(exposed_name))
self.methods[exposed_name] = item
def _gather_event_handlers(self):
"""
Searches for the event handlers in the current microservice class.
:return:
"""
self._extract_event_handlers_from_container(self)
for module in self.modules:
self._extract_event_handlers_from_container(module)
def _extract_event_handlers_from_container(self, container):
for item in container.get_event_handlers():
self.event_handlers.setdefault(getattr(item, "_handled_event"), item)
def _periodic_task_iter(self):
"""
Iterates through all the periodic tasks:
- the service registry pinging
- default dummy task if on Windows
- user defined periodic tasks
:return:
"""
for strategy in self.discovery_strategies:
self.default_periodic_tasks.append(
(functools.partial(strategy.ping, self.name, self.accessible_at),
self.service_registry_ping_interval)
)
self.default_periodic_tasks[-1][0]()
all_periodic_tasks = self.default_periodic_tasks + self.periodic_tasks
for func, timer_in_seconds in all_periodic_tasks:
timer_milisec = timer_in_seconds * 1000
yield PeriodicCallback(func, timer_milisec, io_loop=self.io_loop)
@classmethod
def _set_option_if_available(cls, args, name):
if hasattr(args, name) and getattr(args, name) is not None:
setattr(cls, name, getattr(args, name))
def _prepare_configurators(self):
for configurator in self.configurators:
for configurable in self.configurables:
configurator.register_configurable(configurable)
def _activate_configurators(self):
for configurator in self.configurators:
configurator.load()
for configurator in self.configurators:
for configurable in self.configurables:
name = configurable.name
value = configurator.get(name)
if not value:
continue
setattr(self, name, value)