boinc_load.py 4.58 KB
Newer Older
1 2 3 4 5 6 7 8
#!/usr/bin/env python3

"""
Keep the Zoe scheduler loaded with BOINC clients.

Usage: boinc_load.py <zapp_json_file>
"""

9
import csv
10 11 12 13
import json
import os
import sys
import time
14
import random
15 16 17 18

from zoe_lib.statistics import ZoeStatisticsAPI
from zoe_lib.executions import ZoeExecutionsAPI

19
TARGET_QUEUE_LENGTH = 10
20
NUMBER_OF_HOSTS = 10
21
ZAPP_PER_HOST = 8
22 23
TOTAL_JOBS = NUMBER_OF_HOSTS * ZAPP_PER_HOST
MAX_TO_START_PER_LOOP = 10
24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44


def zoe_url():
    """Gets the API URL."""
    return os.environ['ZOE_URL']


def zoe_user():
    """Gets the API user name."""
    return os.environ['ZOE_USER']


def zoe_pass():
    """Gets the API password."""
    return os.environ['ZOE_PASS']


def check_queue_length():
    """Checks how many zapps are in the scheduler queue."""
    stats_api = ZoeStatisticsAPI(zoe_url(), zoe_user(), zoe_pass())
    sched = stats_api.scheduler()
45
#    print('Scheduler queue length: {}'.format(sched['queue_length']))
46 47 48 49 50 51 52 53 54 55 56 57
    return sched['queue_length']


def load_zapp(filename):
    """Loads and parses the ZApp json file."""
    return json.load(open(filename, 'r'))


def submit_zapp(zapp):
    """Submits one ZApp for execution."""
    exec_api = ZoeExecutionsAPI(zoe_url(), zoe_user(), zoe_pass())
    ret = exec_api.start('boinc-loader', zapp)
58
    return ret
59 60


61
def count_jobs(all=False):
62 63 64 65 66 67
    """Count how many zapps have already been submitted."""
    exec_api = ZoeExecutionsAPI(zoe_url(), zoe_user(), zoe_pass())
    execs = exec_api.list()
    count = 0
    for e_id in execs:
        e = exec_api.get(e_id)
Daniele Venzano's avatar
Daniele Venzano committed
68
        if e is None or (not all and e['name'] != 'boinc-loader'):
69
            continue
70
        if e['status'] != 'terminated':
71 72
            count += 1
    return count
73 74


75 76 77 78 79 80
def delete_finished():
    """Delete finished executions from Zoe."""
    exec_api = ZoeExecutionsAPI(zoe_url(), zoe_user(), zoe_pass())
    execs = exec_api.list()
    for e_id in execs:
        e = exec_api.get(e_id)
Daniele Venzano's avatar
Daniele Venzano committed
81 82 83
        if e is None:
            continue
        if e['name'] == 'boinc-loader' and e['status'] == 'terminated':
84
            print('Execution {} has finished, deleting...'.format(e_id))
85 86 87 88
            exec_api.delete(e['id'])


def start_batches(zapp, log):
89 90 91 92 93
    zapps_to_start = TOTAL_JOBS - count_jobs()
    print('I need to start {} zapps'.format(zapps_to_start))
    while zapps_to_start > 0:
        queue_length = check_queue_length()
        if queue_length >= TARGET_QUEUE_LENGTH:
94
            to_sleep = random.randint(1, 60)
95 96 97 98 99 100
            print("Target scheduler queue length reached, sleeping for {} seconds".format(to_sleep))
            time.sleep(to_sleep)
            continue
        to_start_now = random.randint(1, MAX_TO_START_PER_LOOP)
        print('Will submit {} new zapps'.format(to_start_now))
        for i in range(to_start_now):
101
            zapp_id = submit_zapp(zapp)
102
            zapps_to_start -= 1
103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121
            print("ZApp submitted with ID {}, queue length {}, {} zapps to go".format(zapp_id, queue_length, zapps_to_start))
            log.writerow({'zapp_id': zapp_id, 'queue_length': queue_length})


def start_continuous(zapp, log):
    zapps_to_start = TOTAL_JOBS - count_jobs()
    print('I need to start {} zapps'.format(zapps_to_start))
    while zapps_to_start > 0:
        zapp_id = submit_zapp(zapp)
        zapps_to_start -= 1
        time_to_sleep = random.uniform(0, 1)
        queue_length = check_queue_length()
        print("ZApp submitted with ID {}, queue length {}, will sleep for {}, {} zapps to go".format(zapp_id, queue_length, time_to_sleep, zapps_to_start))
        log.writerow({'zapp_id': zapp_id, 'queue_length': queue_length})
        time.sleep(time_to_sleep)


def keep_some_running(zapp):
    while True:
122 123 124
        running_zapps = count_jobs(all=True)
        zapps_to_start = TOTAL_JOBS - running_zapps
        print('I need to start {} zapps ({} running)'.format(zapps_to_start, running_zapps))
125 126 127
        if zapps_to_start > 0:
            for i in range(zapps_to_start):
                queue_length = check_queue_length()
128
                zapp_id = submit_zapp(zapp)
129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146
                print("ZApp submitted with ID {}, queue length {}, {} zapps to go".format(zapp_id, queue_length, zapps_to_start))
        try:
            print('Sleeping')
            time.sleep(60)
        except KeyboardInterrupt:
            print('Exiting infinite loop')
            break
        delete_finished()


def main():
    """Main."""
    log_fieldnames = ['zapp_id', 'queue_length']
    log = csv.DictWriter(open('run.csv', 'w'), fieldnames=log_fieldnames)
    log.writeheader()
    zapp = load_zapp(sys.argv[1])
    keep_some_running(zapp)

147
    print('All Zapps submitted, my work is done.')
148 149 150

if __name__ == "__main__":
    main()