# Ultralytics YOLO 🚀, GPL-3.0 license import glob import math import os import time from dataclasses import dataclass from pathlib import Path from threading import Thread from urllib.parse import urlparse import cv2 import numpy as np import requests import torch from PIL import Image, ImageOps from ultralytics.yolo.data.augment import LetterBox from ultralytics.yolo.data.utils import IMG_FORMATS, VID_FORMATS from ultralytics.yolo.utils import LOGGER, ROOT, is_colab, is_kaggle, ops from ultralytics.yolo.utils.checks import check_requirements @dataclass class SourceTypes: webcam: bool = False screenshot: bool = False from_img: bool = False class LoadStreams: # YOLOv8 streamloader, i.e. `python detect.py --source 'rtsp://example.com/media.mp4' # RTSP, RTMP, HTTP streams` def __init__(self, sources='file.streams', imgsz=640, stride=32, auto=True, transforms=None, vid_stride=1): torch.backends.cudnn.benchmark = True # faster for fixed-size inference self.mode = 'stream' self.imgsz = imgsz self.stride = stride self.vid_stride = vid_stride # video frame-rate stride sources = Path(sources).read_text().rsplit() if os.path.isfile(sources) else [sources] n = len(sources) self.sources = [ops.clean_str(x) for x in sources] # clean source names for later self.imgs, self.fps, self.frames, self.threads = [None] * n, [0] * n, [0] * n, [None] * n for i, s in enumerate(sources): # index, source # Start thread to read frames from video stream st = f'{i + 1}/{n}: {s}... ' if urlparse(s).hostname in ('www.youtube.com', 'youtube.com', 'youtu.be'): # if source is YouTube video # YouTube format i.e. 'https://www.youtube.com/watch?v=Zgi9g1ksQHc' or 'https://youtu.be/Zgi9g1ksQHc' check_requirements(('pafy', 'youtube_dl==2020.12.2')) import pafy # noqa s = pafy.new(s).getbest(preftype="mp4").url # YouTube URL s = eval(s) if s.isnumeric() else s # i.e. s = '0' local webcam if s == 0: assert not is_colab(), '--source 0 webcam unsupported on Colab. Rerun command in a local environment.' assert not is_kaggle(), '--source 0 webcam unsupported on Kaggle. Rerun command in a local environment.' cap = cv2.VideoCapture(s) if not cap.isOpened(): raise ConnectionError(f'{st}Failed to open {s}') w = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) h = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) fps = cap.get(cv2.CAP_PROP_FPS) # warning: may return 0 or nan self.frames[i] = max(int(cap.get(cv2.CAP_PROP_FRAME_COUNT)), 0) or float('inf') # infinite stream fallback self.fps[i] = max((fps if math.isfinite(fps) else 0) % 100, 0) or 30 # 30 FPS fallback _, self.imgs[i] = cap.read() # guarantee first frame self.threads[i] = Thread(target=self.update, args=([i, cap, s]), daemon=True) LOGGER.info(f"{st} Success ({self.frames[i]} frames {w}x{h} at {self.fps[i]:.2f} FPS)") self.threads[i].start() LOGGER.info('') # newline # check for common shapes s = np.stack([LetterBox(imgsz, auto, stride=stride)(image=x).shape for x in self.imgs]) self.rect = np.unique(s, axis=0).shape[0] == 1 # rect inference if all shapes equal self.auto = auto and self.rect self.transforms = transforms # optional self.bs = self.__len__() if not self.rect: LOGGER.warning('WARNING ⚠️ Stream shapes differ. For optimal performance supply similarly-shaped streams.') def update(self, i, cap, stream): # Read stream `i` frames in daemon thread n, f = 0, self.frames[i] # frame number, frame array while cap.isOpened() and n < f: n += 1 cap.grab() # .read() = .grab() followed by .retrieve() if n % self.vid_stride == 0: success, im = cap.retrieve() if success: self.imgs[i] = im else: LOGGER.warning('WARNING ⚠️ Video stream unresponsive, please check your IP camera connection.') self.imgs[i] = np.zeros_like(self.imgs[i]) cap.open(stream) # re-open stream if signal was lost time.sleep(0.0) # wait time def __iter__(self): self.count = -1 return self def __next__(self): self.count += 1 if not all(x.is_alive() for x in self.threads) or cv2.waitKey(1) == ord('q'): # q to quit cv2.destroyAllWindows() raise StopIteration im0 = self.imgs.copy() if self.transforms: im = np.stack([self.transforms(x) for x in im0]) # transforms else: im = np.stack([LetterBox(self.imgsz, self.auto, stride=self.stride)(image=x) for x in im0]) im = im[..., ::-1].transpose((0, 3, 1, 2)) # BGR to RGB, BHWC to BCHW im = np.ascontiguousarray(im) # contiguous return self.sources, im, im0, None, '' def __len__(self): return len(self.sources) # 1E12 frames = 32 streams at 30 FPS for 30 years class LoadScreenshots: # YOLOv8 screenshot dataloader, i.e. `python detect.py --source "screen 0 100 100 512 256"` def __init__(self, source, imgsz=640, stride=32, auto=True, transforms=None): # source = [screen_number left top width height] (pixels) check_requirements('mss') import mss # noqa source, *params = source.split() self.screen, left, top, width, height = 0, None, None, None, None # default to full screen 0 if len(params) == 1: self.screen = int(params[0]) elif len(params) == 4: left, top, width, height = (int(x) for x in params) elif len(params) == 5: self.screen, left, top, width, height = (int(x) for x in params) self.imgsz = imgsz self.stride = stride self.transforms = transforms self.auto = auto self.mode = 'stream' self.frame = 0 self.sct = mss.mss() self.bs = 1 # Parse monitor shape monitor = self.sct.monitors[self.screen] self.top = monitor["top"] if top is None else (monitor["top"] + top) self.left = monitor["left"] if left is None else (monitor["left"] + left) self.width = width or monitor["width"] self.height = height or monitor["height"] self.monitor = {"left": self.left, "top": self.top, "width": self.width, "height": self.height} def __iter__(self): return self def __next__(self): # mss screen capture: get raw pixels from the screen as np array im0 = np.array(self.sct.grab(self.monitor))[:, :, :3] # [:, :, :3] BGRA to BGR s = f"screen {self.screen} (LTWH): {self.left},{self.top},{self.width},{self.height}: " if self.transforms: im = self.transforms(im0) # transforms else: im = LetterBox(self.imgsz, self.auto, stride=self.stride)(image=im0) im = im.transpose((2, 0, 1))[::-1] # HWC to CHW, BGR to RGB im = np.ascontiguousarray(im) # contiguous self.frame += 1 return str(self.screen), im, im0, None, s # screen, img, original img, im0s, s class LoadImages: # YOLOv8 image/video dataloader, i.e. `python detect.py --source image.jpg/vid.mp4` def __init__(self, path, imgsz=640, stride=32, auto=True, transforms=None, vid_stride=1): if isinstance(path, str) and Path(path).suffix == ".txt": # *.txt file with img/vid/dir on each line path = Path(path).read_text().rsplit() files = [] for p in sorted(path) if isinstance(path, (list, tuple)) else [path]: p = str(Path(p).resolve()) if '*' in p: files.extend(sorted(glob.glob(p, recursive=True))) # glob elif os.path.isdir(p): files.extend(sorted(glob.glob(os.path.join(p, '*.*')))) # dir elif os.path.isfile(p): files.append(p) # files else: raise FileNotFoundError(f'{p} does not exist') images = [x for x in files if x.split('.')[-1].lower() in IMG_FORMATS] videos = [x for x in files if x.split('.')[-1].lower() in VID_FORMATS] ni, nv = len(images), len(videos) self.imgsz = imgsz self.stride = stride self.files = images + videos self.nf = ni + nv # number of files self.video_flag = [False] * ni + [True] * nv self.mode = 'image' self.auto = auto self.transforms = transforms # optional self.vid_stride = vid_stride # video frame-rate stride self.bs = 1 if any(videos): self.orientation = None # rotation degrees self._new_video(videos[0]) # new video else: self.cap = None if self.nf == 0: raise FileNotFoundError(f'No images or videos found in {p}. ' f'Supported formats are:\nimages: {IMG_FORMATS}\nvideos: {VID_FORMATS}') def __iter__(self): self.count = 0 return self def __next__(self): if self.count == self.nf: raise StopIteration path = self.files[self.count] if self.video_flag[self.count]: # Read video self.mode = 'video' for _ in range(self.vid_stride): self.cap.grab() ret_val, im0 = self.cap.retrieve() while not ret_val: self.count += 1 self.cap.release() if self.count == self.nf: # last video raise StopIteration path = self.files[self.count] self._new_video(path) ret_val, im0 = self.cap.read() self.frame += 1 # im0 = self._cv2_rotate(im0) # for use if cv2 autorotation is False s = f'video {self.count + 1}/{self.nf} ({self.frame}/{self.frames}) {path}: ' else: # Read image self.count += 1 im0 = cv2.imread(path) # BGR if im0 is None: raise FileNotFoundError(f'Image Not Found {path}') s = f'image {self.count}/{self.nf} {path}: ' if self.transforms: im = self.transforms(im0) # transforms else: im = LetterBox(self.imgsz, self.auto, stride=self.stride)(image=im0) im = im.transpose((2, 0, 1))[::-1] # HWC to CHW, BGR to RGB im = np.ascontiguousarray(im) # contiguous return path, im, im0, self.cap, s def _new_video(self, path): # Create a new video capture object self.frame = 0 self.cap = cv2.VideoCapture(path) self.frames = int(self.cap.get(cv2.CAP_PROP_FRAME_COUNT) / self.vid_stride) if hasattr(cv2, 'CAP_PROP_ORIENTATION_META'): # cv2<4.6.0 compatibility self.orientation = int(self.cap.get(cv2.CAP_PROP_ORIENTATION_META)) # rotation degrees # Disable auto-orientation due to known issues in https://github.com/ultralytics/yolov5/issues/8493 # self.cap.set(cv2.CAP_PROP_ORIENTATION_AUTO, 0) def _cv2_rotate(self, im): # Rotate a cv2 video manually if self.orientation == 0: return cv2.rotate(im, cv2.ROTATE_90_CLOCKWISE) elif self.orientation == 180: return cv2.rotate(im, cv2.ROTATE_90_COUNTERCLOCKWISE) elif self.orientation == 90: return cv2.rotate(im, cv2.ROTATE_180) return im def __len__(self): return self.nf # number of files class LoadPilAndNumpy: def __init__(self, im0, imgsz=640, stride=32, auto=True, transforms=None): if not isinstance(im0, list): im0 = [im0] self.im0 = [self._single_check(im) for im in im0] self.imgsz = imgsz self.stride = stride self.auto = auto self.transforms = transforms self.mode = 'image' # generate fake paths self.paths = [f"image{i}.jpg" for i in range(len(self.im0))] self.bs = 1 @staticmethod def _single_check(im): assert isinstance(im, (Image.Image, np.ndarray)), f"Expected PIL/np.ndarray image type, but got {type(im)}" if isinstance(im, Image.Image): im = np.asarray(im)[:, :, ::-1] im = np.ascontiguousarray(im) # contiguous return im def _single_preprocess(self, im, auto): if self.transforms: im = self.transforms(im) # transforms else: im = LetterBox(self.imgsz, auto=auto, stride=self.stride)(image=im) im = im.transpose((2, 0, 1))[::-1] # HWC to CHW, BGR to RGB im = np.ascontiguousarray(im) # contiguous return im def __len__(self): return len(self.im0) def __next__(self): if self.count == 1: # loop only once as it's batch inference raise StopIteration auto = all(x.shape == self.im0[0].shape for x in self.im0) and self.auto im = [self._single_preprocess(im, auto) for im in self.im0] im = np.stack(im, 0) if len(im) > 1 else im[0][None] self.count += 1 return self.paths, im, self.im0, None, '' def __iter__(self): self.count = 0 return self def autocast_list(source): """ Merges a list of source of different types into a list of numpy arrays or PIL images """ files = [] for _, im in enumerate(source): if isinstance(im, (str, Path)): # filename or uri files.append(Image.open(requests.get(im, stream=True).raw if str(im).startswith('http') else im)) elif isinstance(im, (Image.Image, np.ndarray)): # PIL or np Image files.append(im) else: raise Exception( "Unsupported type encountered! See docs for supported types https://docs.ultralytics.com/predict") return files LOADERS = [LoadStreams, LoadPilAndNumpy, LoadImages, LoadScreenshots] if __name__ == "__main__": img = cv2.imread(str(ROOT / "assets/bus.jpg")) dataset = LoadPilAndNumpy(im0=img) for d in dataset: print(d[0])