# Ultralytics YOLO 🚀, GPL-3.0 license import json import signal import sys from pathlib import Path from time import sleep, time import requests from ultralytics.hub.utils import HUB_API_ROOT, check_dataset_disk_space, smart_request from ultralytics.yolo.utils import LOGGER, PREFIX, __version__, emojis, is_colab, threaded from ultralytics.yolo.utils.torch_utils import get_flops, get_num_params AGENT_NAME = f"python-{__version__}-colab" if is_colab() else f"python-{__version__}-local" session = None class HubTrainingSession: def __init__(self, model_id, auth): self.agent_id = None # identifies which instance is communicating with server self.model_id = model_id self.api_url = f"{HUB_API_ROOT}/v1/models/{model_id}" self.auth_header = auth.get_auth_header() self._rate_limits = {"metrics": 3.0, "ckpt": 900.0, "heartbeat": 300.0} # rate limits (seconds) self._timers = {} # rate limit timers (seconds) self._metrics_queue = {} # metrics queue self.model = self._get_model() self._start_heartbeat() # start heartbeats self._register_signal_handlers() def _register_signal_handlers(self): signal.signal(signal.SIGTERM, self._handle_signal) signal.signal(signal.SIGINT, self._handle_signal) def _handle_signal(self, signum, frame): """ Prevent heartbeats from being sent on Colab after kill. This method does not use frame, it is included as it is passed by signal. """ if self.alive is True: LOGGER.info(f"{PREFIX}Kill signal received! ❌") self._stop_heartbeat() sys.exit(signum) def _stop_heartbeat(self): """End the heartbeat loop""" self.alive = False def upload_metrics(self): payload = {"metrics": self._metrics_queue.copy(), "type": "metrics"} smart_request(f"{self.api_url}", json=payload, headers=self.auth_header, code=2) def upload_model(self, epoch, weights, is_best=False, map=0.0, final=False): # Upload a model to HUB file = None if Path(weights).is_file(): with open(weights, "rb") as f: file = f.read() if final: smart_request( f"{self.api_url}/upload", data={ "epoch": epoch, "type": "final", "map": map}, files={"best.pt": file}, headers=self.auth_header, retry=10, timeout=3600, code=4, ) else: smart_request( f"{self.api_url}/upload", data={ "epoch": epoch, "type": "epoch", "isBest": bool(is_best)}, headers=self.auth_header, files={"last.pt": file}, code=3, ) def _get_model(self): # Returns model from database by id api_url = f"{HUB_API_ROOT}/v1/models/{self.model_id}" headers = self.auth_header try: response = smart_request(api_url, method="get", headers=headers, thread=False, code=0) data = response.json().get("data", None) if data.get("status", None) == "trained": raise ValueError( emojis(f"Model is already trained and uploaded to " f"https://hub.ultralytics.com/models/{self.model_id} 🚀")) if not data.get("data", None): raise ValueError("Dataset may still be processing. Please wait a minute and try again.") # RF fix self.model_id = data["id"] # TODO: restore when server keys when dataset URL and GPU train is working self.train_args = { "batch": data["batch_size"], "epochs": data["epochs"], "imgsz": data["imgsz"], "patience": data["patience"], "device": data["device"], "cache": data["cache"], "data": data["data"]} self.input_file = data.get("cfg", data["weights"]) # hack for yolov5 cfg adds u if "cfg" in data and "yolov5" in data["cfg"]: self.input_file = data["cfg"].replace(".yaml", "u.yaml") return data except requests.exceptions.ConnectionError as e: raise ConnectionRefusedError("ERROR: The HUB server is not online. Please try again later.") from e except Exception: raise def check_disk_space(self): if not check_dataset_disk_space(self.model["data"]): raise MemoryError("Not enough disk space") def register_callbacks(self, trainer): trainer.add_callback("on_pretrain_routine_end", self.on_pretrain_routine_end) trainer.add_callback("on_fit_epoch_end", self.on_fit_epoch_end) trainer.add_callback("on_model_save", self.on_model_save) trainer.add_callback("on_train_end", self.on_train_end) def on_pretrain_routine_end(self, trainer): """ Start timer for upload rate limit. This method does not use trainer. It is passed to all callbacks by default. """ # Start timer for upload rate limit LOGGER.info(f"{PREFIX}View model at https://hub.ultralytics.com/models/{self.model_id} 🚀") self._timers = {"metrics": time(), "ckpt": time()} # start timer on self.rate_limit def on_fit_epoch_end(self, trainer): # Upload metrics after val end all_plots = {**trainer.label_loss_items(trainer.tloss, prefix="train"), **trainer.metrics} if trainer.epoch == 0: model_info = { "model/parameters": get_num_params(trainer.model), "model/GFLOPs": round(get_flops(trainer.model), 3), "model/speed(ms)": round(trainer.validator.speed[1], 3)} all_plots = {**all_plots, **model_info} self._metrics_queue[trainer.epoch] = json.dumps(all_plots) if time() - self._timers["metrics"] > self._rate_limits["metrics"]: self.upload_metrics() self._timers["metrics"] = time() # reset timer self._metrics_queue = {} # reset queue def on_model_save(self, trainer): # Upload checkpoints with rate limiting is_best = trainer.best_fitness == trainer.fitness if time() - self._timers["ckpt"] > self._rate_limits["ckpt"]: LOGGER.info(f"{PREFIX}Uploading checkpoint {self.model_id}") self._upload_model(trainer.epoch, trainer.last, is_best) self._timers["ckpt"] = time() # reset timer def on_train_end(self, trainer): # Upload final model and metrics with exponential standoff LOGGER.info(f"{PREFIX}Training completed successfully ✅") LOGGER.info(f"{PREFIX}Uploading final {self.model_id}") # hack for fetching mAP mAP = trainer.metrics.get("metrics/mAP50-95(B)", 0) self._upload_model(trainer.epoch, trainer.best, map=mAP, final=True) # results[3] is mAP0.5:0.95 self.alive = False # stop heartbeats LOGGER.info(f"{PREFIX}View model at https://hub.ultralytics.com/models/{self.model_id} 🚀") def _upload_model(self, epoch, weights, is_best=False, map=0.0, final=False): # Upload a model to HUB file = None if Path(weights).is_file(): with open(weights, "rb") as f: file = f.read() file_param = {"best.pt" if final else "last.pt": file} endpoint = f"{self.api_url}/upload" data = {"epoch": epoch} if final: data.update({"type": "final", "map": map}) else: data.update({"type": "epoch", "isBest": bool(is_best)}) smart_request( endpoint, data=data, files=file_param, headers=self.auth_header, retry=10 if final else None, timeout=3600 if final else None, code=4 if final else 3, ) @threaded def _start_heartbeat(self): self.alive = True while self.alive: r = smart_request( f"{HUB_API_ROOT}/v1/agent/heartbeat/models/{self.model_id}", json={ "agent": AGENT_NAME, "agentId": self.agent_id}, headers=self.auth_header, retry=0, code=5, thread=False, ) self.agent_id = r.json().get("data", {}).get("agentId", None) sleep(self._rate_limits["heartbeat"])