Commit 8d72db78 authored by Daniele Venzano's avatar Daniele Venzano

Enable type checking

parent 4313d121
......@@ -219,10 +219,10 @@ class-attribute-rgx=([A-Za-z_][A-Za-z0-9_]{2,30}|(__.*__))$
class-attribute-name-hint=([A-Za-z_][A-Za-z0-9_]{2,30}|(__.*__))$
# Regular expression matching correct constant names
const-rgx=(([A-Z_][A-Z0-9_]*)|(__.*__)|log)$
const-rgx=(([A-Z_][A-Z0-9_]*)|(__.*__)|log|([A-Z][A-Za-z]*Type))$
# Naming hint for constant names
const-name-hint=(([A-Z_][A-Z0-9_]*)|(__.*__)|log)$
const-name-hint=(([A-Z_][A-Z0-9_]*)|(__.*__)|log|([A-Z][A-Za-z]*Type))$
# Regular expression matching correct attribute names
attr-rgx=([a-z_][a-z0-9_]{2,30}|id)$
......
......@@ -7,4 +7,5 @@ install:
before_script:
script:
- pylint *.py zoe_*
- mypy -s *.py zoe_*
- doc8 docs/
......@@ -16,6 +16,7 @@
"""The client side of the ZeroMQ API."""
import logging
from typing import Dict, Any, Tuple
import zmq
......@@ -23,17 +24,19 @@ import zoe_lib.config as config
log = logging.getLogger(__name__)
APIReturnType = Tuple[bool, str]
class APIManager:
"""Main class for the API."""
REQUEST_TIMEOUT = 2500
REQUEST_RETRIES = 1
REQUEST_TIMEOUT = 2500 # type: int
REQUEST_RETRIES = 1 # type: int
def __init__(self):
self.context = zmq.Context(1)
self.zmq_s = None
self.poll = zmq.Poller()
self.master_uri = config.get_conf().master_url
self.master_uri = config.get_conf().master_url # type: str
self._connect()
def _connect(self):
......@@ -49,7 +52,7 @@ class APIManager:
self.poll.unregister(self.zmq_s)
self.zmq_s = None
def _request_reply(self, message):
def _request_reply(self, message: Dict[str, Any]) -> APIReturnType:
"""
Implements the Lazy Pirate Pattern for a reliable client communication.
"""
......@@ -74,7 +77,7 @@ class APIManager:
log.warning('Reconnecting and retrying request...')
self._connect()
def execution_start(self, exec_id):
def execution_start(self, exec_id: int) -> APIReturnType:
"""Start an execution."""
msg = {
'command': 'execution_start',
......@@ -82,7 +85,7 @@ class APIManager:
}
return self._request_reply(msg)
def execution_terminate(self, exec_id):
def execution_terminate(self, exec_id: int) -> APIReturnType:
"""Terminate an execution."""
msg = {
'command': 'execution_terminate',
......@@ -90,7 +93,7 @@ class APIManager:
}
return self._request_reply(msg)
def execution_delete(self, exec_id):
def execution_delete(self, exec_id) -> APIReturnType:
"""Delete an execution."""
msg = {
'command': 'execution_delete',
......
......@@ -26,7 +26,7 @@ import zoe_api.api_endpoint
class ExecutionAPI(Resource):
"""The Execution API endpoint."""
def __init__(self, api_endpoint: zoe_api.api_endpoint.APIEndpoint):
def __init__(self, api_endpoint: zoe_api.api_endpoint.APIEndpoint) -> None:
self.api_endpoint = api_endpoint
@catch_exceptions
......@@ -58,7 +58,7 @@ class ExecutionAPI(Resource):
class ExecutionDeleteAPI(Resource):
"""The ExecutionDelete API endpoints."""
def __init__(self, api_endpoint: zoe_api.api_endpoint.APIEndpoint):
def __init__(self, api_endpoint: zoe_api.api_endpoint.APIEndpoint) -> None:
self.api_endpoint = api_endpoint
@catch_exceptions
......@@ -81,7 +81,7 @@ class ExecutionDeleteAPI(Resource):
class ExecutionCollectionAPI(Resource):
"""The Execution Collection API endpoints."""
def __init__(self, api_endpoint: zoe_api.api_endpoint.APIEndpoint):
def __init__(self, api_endpoint: zoe_api.api_endpoint.APIEndpoint) -> None:
self.api_endpoint = api_endpoint
@catch_exceptions
......
......@@ -25,7 +25,7 @@ from zoe_lib.version import ZOE_API_VERSION, ZOE_APPLICATION_FORMAT_VERSION, ZOE
class InfoAPI(Resource):
"""The Info API endpoint."""
def __init__(self, api_endpoint: zoe_api.api_endpoint.APIEndpoint):
def __init__(self, api_endpoint: zoe_api.api_endpoint.APIEndpoint) -> None:
self.api_endpoint = api_endpoint
@catch_exceptions
......
......@@ -23,6 +23,7 @@ import logging
import os
import sys
from argparse import ArgumentParser, Namespace, FileType, RawDescriptionHelpFormatter
from typing import Tuple
from zoe_cmd import utils
from zoe_lib.info import ZoeInfoAPI
......@@ -132,7 +133,7 @@ ZOE_USER: the username used for authentication
ZOE_PASS: the password used for authentication'''
def process_arguments() -> Namespace:
def process_arguments() -> Tuple[ArgumentParser, Namespace]:
"""Parse command line arguments."""
parser = ArgumentParser(description="Zoe command-line client", epilog=ENV_HELP_TEXT, formatter_class=RawDescriptionHelpFormatter)
parser.add_argument('--debug', action='store_true', help='Enable debug output')
......
......@@ -16,9 +16,6 @@ ACTION_TYPES_THAT_DONT_NEED_A_VALUE = {argparse._StoreTrueAction,
argparse._StoreFalseAction, argparse._CountAction,
argparse._StoreConstAction, argparse._AppendConstAction}
# global ArgumentParser instances
_parsers = {}
# used while parsing args to keep track of where they came from
_COMMAND_LINE_SOURCE_KEY = "command_line"
......@@ -676,8 +673,8 @@ def already_on_command_line(existing_args, potential_command_line_args):
# wrap ArgumentParser's add_argument(..) method with the one above
argparse._ActionsContainer.original_add_argument_method = argparse._ActionsContainer.add_argument
argparse._ActionsContainer.add_argument = add_argument
argparse._ActionsContainer.original_add_argument_method = argparse._ActionsContainer.add_argument # type: ignore
argparse._ActionsContainer.add_argument = add_argument # type:ignore
# add all public classes and constants from argparse module's namespace to this
# module's namespace so that the 2 modules are truly interchangeable
......
......@@ -25,7 +25,7 @@ log = logging.getLogger(__name__)
def time_diff_ms(start: float, end: float) -> int:
"""Return a time difference in milliseconds."""
return (end - start) * 1000
return int((end - start) * 1000)
class BaseMetricSender:
......
......@@ -15,8 +15,10 @@
"""Interface to the low-level Docker API."""
from argparse import Namespace
import time
import logging
from typing import Iterable, Callable, Dict, Any
import humanfriendly
......@@ -35,7 +37,70 @@ from zoe_lib.exceptions import ZoeException
log = logging.getLogger(__name__)
def zookeeper_swarm(zk_server_list: str, path='/docker'):
class DockerContainerOptions:
"""Wrapper for the Docker container options."""
def __init__(self):
self.env = {}
self.volume_binds = []
self.volumes = []
self.command = ""
self.memory_limit = '2g'
self.name = ''
self.ports = []
self.network_name = 'bridge'
self.restart = True
self.labels = []
self.gelf_log_address = ''
def add_env_variable(self, name: str, value: str) -> None:
"""Adds an environment variable to the container definition."""
if value is not None:
self.env[name] = value
@property
def environment(self) -> Dict[str, str]:
"""Access the environment variables."""
return self.env
def add_volume_bind(self, path: str, mountpoint: str, readonly=False) -> None:
"""Add a volume to the container."""
self.volumes.append(mountpoint)
self.volume_binds.append(path + ":" + mountpoint + ":" + ("ro" if readonly else "rw"))
def get_volumes(self) -> Iterable[str]:
"""Get the volumes in Docker format."""
return self.volumes
def get_volume_binds(self) -> Iterable[str]:
"""Get the volumes in another Docker format."""
return self.volume_binds
def set_command(self, cmd) -> str:
"""Setter for the command to run in the container."""
self.command = cmd
def get_command(self) -> str:
"""Getter for the command to run in the container."""
return self.command
def set_memory_limit(self, limit: int):
"""Setter for the memory limit of the container."""
self.memory_limit = limit
def get_memory_limit(self) -> int:
"""Getter for the memory limit of the container."""
return self.memory_limit
@property
def restart_policy(self) -> Dict[str, str]:
"""Getter for the restart policy of the container."""
if self.restart:
return {'Name': 'always'}
else:
return {}
def zookeeper_swarm(zk_server_list: str, path='/docker') -> str:
"""
Given a Zookeeper server list, find the currently active Swarm master.
:param zk_server_list: Zookeeper server list
......@@ -47,18 +112,17 @@ def zookeeper_swarm(zk_server_list: str, path='/docker'):
zk_client.start()
master, stat_ = zk_client.get(path)
zk_client.stop()
return master
return master.decode('utf-8')
class SwarmClient:
"""The Swarm client class that wraps the Docker API."""
def __init__(self, opts):
def __init__(self, opts: Namespace) -> None:
self.opts = opts
url = opts.swarm
if 'zk://' in url:
url = url[len('zk://'):]
manager = zookeeper_swarm(url)
manager = manager.decode('utf-8')
elif 'http://' or 'https://' in url:
manager = url
else:
......@@ -118,10 +182,10 @@ class SwarmClient:
pl_status.timestamp = time.time()
return pl_status
def spawn_container(self, image, options) -> dict:
def spawn_container(self, image: str, options: DockerContainerOptions) -> Dict[str, Any]:
"""Create and start a new container."""
cont = None
port_bindings = {}
port_bindings = {} # type: Dict[str, Any]
for port in options.ports:
port_bindings[port] = None
......@@ -158,7 +222,7 @@ class SwarmClient:
info = self.inspect_container(cont.get('Id'))
return info
def inspect_container(self, docker_id) -> dict:
def inspect_container(self, docker_id: str) -> Dict[str, Any]:
"""Retrieve information about a running container."""
try:
docker_info = self.cli.inspect_container(container=docker_id)
......@@ -168,7 +232,7 @@ class SwarmClient:
info = {
"docker_id": docker_id,
"ip_address": {}
}
} # type: Dict[str, Any]
for net in docker_info["NetworkSettings"]["Networks"]:
info["ip_address"][net] = docker_info["NetworkSettings"]["Networks"][net]['IPAddress']
......@@ -203,7 +267,7 @@ class SwarmClient:
info['ports'][port] = None
return info
def terminate_container(self, docker_id, delete=False):
def terminate_container(self, docker_id: str, delete=False) -> None:
"""
Terminate a container.
......@@ -224,27 +288,27 @@ class SwarmClient:
except docker.errors.NotFound:
log.warning("cannot remove a non-existent service")
def event_listener(self, callback):
def event_listener(self, callback: Callable[[str], bool]) -> None:
"""An infinite loop that listens for events from Swarm."""
for event in self.cli.events(decode=True):
if not callback(event):
break
def connect_to_network(self, container_id, network_id):
def connect_to_network(self, container_id: str, network_id: str) -> None:
"""Connect a container to a network."""
try:
self.cli.connect_container_to_network(container_id, network_id)
except Exception as e:
log.exception(str(e))
def disconnect_from_network(self, container_id, network_id):
def disconnect_from_network(self, container_id: str, network_id: str) -> None:
"""Disconnects a container from a network."""
try:
self.cli.disconnect_container_from_network(container_id, network_id)
except Exception as e:
log.exception(str(e))
def list(self, only_label=None) -> list:
def list(self, only_label=None) -> Iterable[dict]:
"""
List running or defined containers.
......@@ -272,66 +336,3 @@ class SwarmClient:
'status': cont_info['Status']
})
return conts
class DockerContainerOptions:
"""Wrapper for the Docker container options."""
def __init__(self):
self.env = {}
self.volume_binds = []
self.volumes = []
self.command = ""
self.memory_limit = '2g'
self.name = ''
self.ports = []
self.network_name = 'bridge'
self.restart = True
self.labels = []
self.gelf_log_address = ''
def add_env_variable(self, name, value):
"""Adds an environment variable to the container definition."""
if value is not None:
self.env[name] = value
@property
def environment(self):
"""Access the environment variables."""
return self.env
def add_volume_bind(self, path, mountpoint, readonly=False):
"""Add a volume to the container."""
self.volumes.append(mountpoint)
self.volume_binds.append(path + ":" + mountpoint + ":" + ("ro" if readonly else "rw"))
def get_volumes(self):
"""Get the volumes in Docker format."""
return self.volumes
def get_volume_binds(self):
"""Get the volumes in another Docker format."""
return self.volume_binds
def set_command(self, cmd):
"""Setter for the command to run in the container."""
self.command = cmd
def get_command(self):
"""Getter for the command to run in the container."""
return self.command
def set_memory_limit(self, limit):
"""Setter for the memory limit of the container."""
self.memory_limit = limit
def get_memory_limit(self):
"""Getter for the memory limit of the container."""
return self.memory_limit
@property
def restart_policy(self):
"""Getter for the restart policy of the container."""
if self.restart:
return {'Name': 'always'}
else:
return {}
......@@ -33,7 +33,7 @@ log = logging.getLogger(__name__)
class APIManager:
"""The API Manager."""
def __init__(self, metrics: BaseMetricSender, scheduler: ZoeScheduler, state: SQLManager):
def __init__(self, metrics: BaseMetricSender, scheduler: ZoeScheduler, state: SQLManager) -> None:
self.context = zmq.Context()
self.zmq_s = self.context.socket(zmq.REP)
self.listen_uri = config.get_conf().api_listen_uri
......@@ -43,7 +43,7 @@ class APIManager:
self.scheduler = scheduler
self.state = state
def _reply_error(self, message):
def _reply_error(self, message: str) -> None:
self.zmq_s.send_json({'result': 'error', 'message': message})
self.debug_has_replied = True
......@@ -96,7 +96,7 @@ class APIManager:
self.metrics.metric_api_call(start_time, message['command'])
def quit(self):
def quit(self) -> None:
"""Cleanly close the ZMQ resources."""
self.zmq_s.close()
self.context.term()
......@@ -39,6 +39,10 @@ class SwarmNodeStats(Stats):
self.memory_total = 0
self.memory_reserved = 0
self.labels = {}
self.status = None
self.error = ''
self.last_update = None
self.server_version = None
class SwarmStats(Stats):
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment