Skip to content

zamba.data.video

Classes

VideoLoaderConfig

Bases: BaseModel

Configuration for load_video_frames.

Parameters:

Name Type Description Default
crop_bottom_pixels int

Number of pixels to crop from the bottom of the video (prior to resizing to video_height).

required
i_frames bool

Only load the I-Frames. See https://en.wikipedia.org/wiki/Video_compression_picture_types#Intra-coded_(I)frames/slices(key_frames)

required
scene_threshold float

Only load frames that correspond to scene changes. See http://www.ffmpeg.org/ffmpeg-filters.html#select_002c-aselect

required
megadetector_lite_config MegadetectorLiteYoloXConfig

Configuration of MegadetectorLiteYoloX frame selection model.

required
frame_selection_height int

Resize the video to this height in pixels, prior to frame selection. If None, the full size video will be used for frame selection. Using full size images (setting to None) is recommended for MegadetectorLite, especially if your species of interest are smaller.

required
frame_selection_width int

Resize the video to this width in pixels, prior to frame selection.

required
total_frames int

Number of frames that should ultimately be returned.

required
ensure_total_frames bool

Selecting the number of frames by resampling may result in one more or fewer frames due to rounding. If True, ensure the requested number of frames is returned by either clipping or duplicating the final frame. Raises an error if no frames have been selected. Otherwise, return the array unchanged.

required
fps float

Resample the video evenly from the entire duration to a specific number of frames per second.

required
early_bias bool

Resamples to 24 fps and selects 16 frames biased toward the front (strategy used by competition winner).

required
frame_indices list(int)

Select specific frame numbers. Note: frame selection is done after any resampling.

required
evenly_sample_total_frames bool

Reach the total number of frames specified by evenly sampling from the duration of the video. Defaults to False.

required
pix_fmt str

ffmpeg pixel format, defaults to 'rgb24' for RGB channels; can be changed to 'bgr24' for BGR.

required
model_input_height int

After frame selection, resize the video to this height in pixels.

required
model_input_width int

After frame selection, resize the video to this width in pixels.

required
cache_dir Path

Cache directory where preprocessed videos will be saved upon first load. Alternatively, can be set with VIDEO_CACHE_DIR environment variable. Defaults to None, which means videos will not be cached. Provided there is enough space on your machine, it is highly encouraged to cache videos for training as this will speed up all subsequent epochs. If you are predicting on the same videos with the same video loader configuration, this will save time on future runs.

required
cleanup_cache bool

Whether to delete the cache dir after training or predicting ends. Defaults to False.

required
Source code in zamba/data/video.py
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
class VideoLoaderConfig(BaseModel):
    """
    Configuration for load_video_frames.

    Args:
        crop_bottom_pixels (int, optional): Number of pixels to crop from the bottom of the video
            (prior to resizing to `video_height`).
        i_frames (bool, optional): Only load the I-Frames. See
            https://en.wikipedia.org/wiki/Video_compression_picture_types#Intra-coded_(I)_frames/slices_(key_frames)
        scene_threshold (float, optional): Only load frames that correspond to scene changes.
            See http://www.ffmpeg.org/ffmpeg-filters.html#select_002c-aselect
        megadetector_lite_config (MegadetectorLiteYoloXConfig, optional): Configuration of
            MegadetectorLiteYoloX frame selection model.
        frame_selection_height (int, optional): Resize the video to this height in pixels, prior to
            frame selection. If None, the full size video will be used for frame selection. Using full
            size images (setting to None) is recommended for MegadetectorLite, especially if your
            species of interest are smaller.
        frame_selection_width (int, optional): Resize the video to this width in pixels, prior to
            frame selection.
        total_frames (int, optional): Number of frames that should ultimately be returned.
        ensure_total_frames (bool): Selecting the number of frames by resampling may result in one
            more or fewer frames due to rounding. If True, ensure the requested number of frames
            is returned by either clipping or duplicating the final frame. Raises an error if no
            frames have been selected. Otherwise, return the array unchanged.
        fps (float, optional): Resample the video evenly from the entire duration to a specific
            number of frames per second.
        early_bias (bool, optional): Resamples to 24 fps and selects 16 frames biased toward the
            front (strategy used by competition winner).
        frame_indices (list(int), optional): Select specific frame numbers. Note: frame selection
            is done after any resampling.
        evenly_sample_total_frames (bool, optional): Reach the total number of frames specified by
            evenly sampling from the duration of the video. Defaults to False.
        pix_fmt (str, optional): ffmpeg pixel format, defaults to 'rgb24' for RGB channels; can be
            changed to 'bgr24' for BGR.
        model_input_height (int, optional): After frame selection, resize the video to this height
            in pixels.
        model_input_width (int, optional): After frame selection, resize the video to this width in
            pixels.
        cache_dir (Path, optional): Cache directory where preprocessed videos will be saved
            upon first load. Alternatively, can be set with VIDEO_CACHE_DIR environment variable.
            Defaults to None, which means videos will not be cached. Provided there is enough space
            on your machine, it is highly encouraged to cache videos for training as this will
            speed up all subsequent epochs. If you are predicting on the same videos with the
            same video loader configuration, this will save time on future runs.
        cleanup_cache (bool): Whether to delete the cache dir after training or predicting ends.
            Defaults to False.
    """

    crop_bottom_pixels: Optional[int] = None
    i_frames: Optional[bool] = False
    scene_threshold: Optional[float] = None
    megadetector_lite_config: Optional[MegadetectorLiteYoloXConfig] = None
    frame_selection_height: Optional[int] = None
    frame_selection_width: Optional[int] = None
    total_frames: Optional[int] = None
    ensure_total_frames: Optional[bool] = True
    fps: Optional[float] = None
    early_bias: Optional[bool] = False
    frame_indices: Optional[List[int]] = None
    evenly_sample_total_frames: Optional[bool] = False
    pix_fmt: Optional[str] = "rgb24"
    model_input_height: Optional[int] = None
    model_input_width: Optional[int] = None
    cache_dir: Optional[Path] = None
    cleanup_cache: bool = False

    class Config:
        extra = "forbid"

    @validator("cache_dir", always=True)
    def validate_video_cache_dir(cls, cache_dir):
        """Set up cache directory for preprocessed videos. Config argument takes precedence
        over environment variable.
        """
        if cache_dir is None:
            cache_dir = os.getenv("VIDEO_CACHE_DIR", None)

        if cache_dir is not None:
            cache_dir = Path(cache_dir)
            cache_dir.mkdir(parents=True, exist_ok=True)

        return cache_dir

    @root_validator(skip_on_failure=True)
    def check_height_and_width(cls, values):
        if (values["frame_selection_height"] is None) ^ (values["frame_selection_width"] is None):
            raise ValueError(
                f"Must provide both frame_selection_height and frame_selection_width or neither. Values provided are {values}."
            )
        if (values["model_input_height"] is None) ^ (values["model_input_width"] is None):
            raise ValueError(
                f"Must provide both model_input_height and model_input_width or neither. Values provided are {values}."
            )
        return values

    @root_validator(skip_on_failure=True)
    def check_fps_compatibility(cls, values):
        if values["fps"] and (
            values["evenly_sample_total_frames"] or values["i_frames"] or values["scene_threshold"]
        ):
            raise ValueError(
                f"fps cannot be used with evenly_sample_total_frames, i_frames, or scene_threshold. Values provided are {values}."
            )
        return values

    @root_validator(skip_on_failure=True)
    def check_i_frame_compatibility(cls, values):
        if values["scene_threshold"] and values["i_frames"]:
            raise ValueError(
                f"i_frames cannot be used with scene_threshold. Values provided are {values}."
            )
        return values

    @root_validator(skip_on_failure=True)
    def check_early_bias_compatibility(cls, values):
        if values["early_bias"] and (
            values["i_frames"]
            or values["scene_threshold"]
            or values["total_frames"]
            or values["evenly_sample_total_frames"]
            or values["fps"]
        ):
            raise ValueError(
                f"early_bias cannot be used with i_frames, scene_threshold, total_frames, evenly_sample_total_frames, or fps. Values provided are {values}."
            )
        return values

    @root_validator(skip_on_failure=True)
    def check_frame_indices_compatibility(cls, values):
        if values["frame_indices"] and (
            values["total_frames"]
            or values["scene_threshold"]
            or values["i_frames"]
            or values["early_bias"]
            or values["evenly_sample_total_frames"]
        ):
            raise ValueError(
                f"frame_indices cannot be used with total_frames, scene_threshold, i_frames, early_bias, or evenly_sample_total_frames. Values provided are {values}."
            )
        return values

    @root_validator(skip_on_failure=True)
    def check_megadetector_lite_compatibility(cls, values):
        if values["megadetector_lite_config"] and (
            values["early_bias"] or values["evenly_sample_total_frames"]
        ):
            raise ValueError(
                f"megadetector_lite_config cannot be used with early_bias or evenly_sample_total_frames. Values provided are {values}."
            )
        return values

    @root_validator(skip_on_failure=True)
    def check_evenly_sample_total_frames_compatibility(cls, values):
        if values["evenly_sample_total_frames"] is True and values["total_frames"] is None:
            raise ValueError(
                f"total_frames must be specified if evenly_sample_total_frames is used. Values provided are {values}."
            )
        if values["evenly_sample_total_frames"] and (
            values["scene_threshold"]
            or values["i_frames"]
            or values["fps"]
            or values["early_bias"]
        ):
            raise ValueError(
                f"evenly_sample_total_frames cannot be used with scene_threshold, i_frames, fps, or early_bias. Values provided are {values}."
            )
        return values

    @root_validator(skip_on_failure=True)
    def validate_total_frames(cls, values):
        if values["megadetector_lite_config"] is not None:
            # set n frames for megadetector_lite_config if only specified by total_frames
            if values["megadetector_lite_config"].n_frames is None:
                values["megadetector_lite_config"].n_frames = values["total_frames"]

            # set total frames if only specified in megadetector_lite_config
            if values["total_frames"] is None:
                values["total_frames"] = values["megadetector_lite_config"].n_frames

        return values

Attributes

cache_dir: Optional[Path] = None class-attribute
cleanup_cache: bool = False class-attribute
crop_bottom_pixels: Optional[int] = None class-attribute
early_bias: Optional[bool] = False class-attribute
ensure_total_frames: Optional[bool] = True class-attribute
evenly_sample_total_frames: Optional[bool] = False class-attribute
fps: Optional[float] = None class-attribute
frame_indices: Optional[List[int]] = None class-attribute
frame_selection_height: Optional[int] = None class-attribute
frame_selection_width: Optional[int] = None class-attribute
i_frames: Optional[bool] = False class-attribute
megadetector_lite_config: Optional[MegadetectorLiteYoloXConfig] = None class-attribute
model_input_height: Optional[int] = None class-attribute
model_input_width: Optional[int] = None class-attribute
pix_fmt: Optional[str] = 'rgb24' class-attribute
scene_threshold: Optional[float] = None class-attribute
total_frames: Optional[int] = None class-attribute

Classes

Config
Source code in zamba/data/video.py
212
213
class Config:
    extra = "forbid"
Attributes
extra = 'forbid' class-attribute

Functions

check_early_bias_compatibility(values)
Source code in zamba/data/video.py
259
260
261
262
263
264
265
266
267
268
269
270
271
@root_validator(skip_on_failure=True)
def check_early_bias_compatibility(cls, values):
    if values["early_bias"] and (
        values["i_frames"]
        or values["scene_threshold"]
        or values["total_frames"]
        or values["evenly_sample_total_frames"]
        or values["fps"]
    ):
        raise ValueError(
            f"early_bias cannot be used with i_frames, scene_threshold, total_frames, evenly_sample_total_frames, or fps. Values provided are {values}."
        )
    return values
check_evenly_sample_total_frames_compatibility(values)
Source code in zamba/data/video.py
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
@root_validator(skip_on_failure=True)
def check_evenly_sample_total_frames_compatibility(cls, values):
    if values["evenly_sample_total_frames"] is True and values["total_frames"] is None:
        raise ValueError(
            f"total_frames must be specified if evenly_sample_total_frames is used. Values provided are {values}."
        )
    if values["evenly_sample_total_frames"] and (
        values["scene_threshold"]
        or values["i_frames"]
        or values["fps"]
        or values["early_bias"]
    ):
        raise ValueError(
            f"evenly_sample_total_frames cannot be used with scene_threshold, i_frames, fps, or early_bias. Values provided are {values}."
        )
    return values
check_fps_compatibility(values)
Source code in zamba/data/video.py
241
242
243
244
245
246
247
248
249
@root_validator(skip_on_failure=True)
def check_fps_compatibility(cls, values):
    if values["fps"] and (
        values["evenly_sample_total_frames"] or values["i_frames"] or values["scene_threshold"]
    ):
        raise ValueError(
            f"fps cannot be used with evenly_sample_total_frames, i_frames, or scene_threshold. Values provided are {values}."
        )
    return values
check_frame_indices_compatibility(values)
Source code in zamba/data/video.py
273
274
275
276
277
278
279
280
281
282
283
284
285
@root_validator(skip_on_failure=True)
def check_frame_indices_compatibility(cls, values):
    if values["frame_indices"] and (
        values["total_frames"]
        or values["scene_threshold"]
        or values["i_frames"]
        or values["early_bias"]
        or values["evenly_sample_total_frames"]
    ):
        raise ValueError(
            f"frame_indices cannot be used with total_frames, scene_threshold, i_frames, early_bias, or evenly_sample_total_frames. Values provided are {values}."
        )
    return values
check_height_and_width(values)
Source code in zamba/data/video.py
229
230
231
232
233
234
235
236
237
238
239
@root_validator(skip_on_failure=True)
def check_height_and_width(cls, values):
    if (values["frame_selection_height"] is None) ^ (values["frame_selection_width"] is None):
        raise ValueError(
            f"Must provide both frame_selection_height and frame_selection_width or neither. Values provided are {values}."
        )
    if (values["model_input_height"] is None) ^ (values["model_input_width"] is None):
        raise ValueError(
            f"Must provide both model_input_height and model_input_width or neither. Values provided are {values}."
        )
    return values
check_i_frame_compatibility(values)
Source code in zamba/data/video.py
251
252
253
254
255
256
257
@root_validator(skip_on_failure=True)
def check_i_frame_compatibility(cls, values):
    if values["scene_threshold"] and values["i_frames"]:
        raise ValueError(
            f"i_frames cannot be used with scene_threshold. Values provided are {values}."
        )
    return values
check_megadetector_lite_compatibility(values)
Source code in zamba/data/video.py
287
288
289
290
291
292
293
294
295
@root_validator(skip_on_failure=True)
def check_megadetector_lite_compatibility(cls, values):
    if values["megadetector_lite_config"] and (
        values["early_bias"] or values["evenly_sample_total_frames"]
    ):
        raise ValueError(
            f"megadetector_lite_config cannot be used with early_bias or evenly_sample_total_frames. Values provided are {values}."
        )
    return values
validate_total_frames(values)
Source code in zamba/data/video.py
314
315
316
317
318
319
320
321
322
323
324
325
@root_validator(skip_on_failure=True)
def validate_total_frames(cls, values):
    if values["megadetector_lite_config"] is not None:
        # set n frames for megadetector_lite_config if only specified by total_frames
        if values["megadetector_lite_config"].n_frames is None:
            values["megadetector_lite_config"].n_frames = values["total_frames"]

        # set total frames if only specified in megadetector_lite_config
        if values["total_frames"] is None:
            values["total_frames"] = values["megadetector_lite_config"].n_frames

    return values
validate_video_cache_dir(cache_dir)

Set up cache directory for preprocessed videos. Config argument takes precedence over environment variable.

Source code in zamba/data/video.py
215
216
217
218
219
220
221
222
223
224
225
226
227
@validator("cache_dir", always=True)
def validate_video_cache_dir(cls, cache_dir):
    """Set up cache directory for preprocessed videos. Config argument takes precedence
    over environment variable.
    """
    if cache_dir is None:
        cache_dir = os.getenv("VIDEO_CACHE_DIR", None)

    if cache_dir is not None:
        cache_dir = Path(cache_dir)
        cache_dir.mkdir(parents=True, exist_ok=True)

    return cache_dir

VideoMetadata

Bases: BaseModel

Source code in zamba/data/video.py
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
class VideoMetadata(BaseModel):
    height: int
    width: int
    n_frames: int
    duration_s: float
    fps: int

    @classmethod
    def from_video(cls, path: os.PathLike):
        stream = get_video_stream(path)
        return cls(
            height=int(stream["height"]),
            width=int(stream["width"]),
            n_frames=int(stream["nb_frames"]),
            duration_s=float(stream["duration"]),
            fps=int(Fraction(stream["r_frame_rate"])),  # reported, not average
        )

Attributes

duration_s: float class-attribute
fps: int class-attribute
height: int class-attribute
n_frames: int class-attribute
width: int class-attribute

Functions

from_video(path: os.PathLike) classmethod
Source code in zamba/data/video.py
134
135
136
137
138
139
140
141
142
143
@classmethod
def from_video(cls, path: os.PathLike):
    stream = get_video_stream(path)
    return cls(
        height=int(stream["height"]),
        width=int(stream["width"]),
        n_frames=int(stream["nb_frames"]),
        duration_s=float(stream["duration"]),
        fps=int(Fraction(stream["r_frame_rate"])),  # reported, not average
    )

npy_cache

Source code in zamba/data/video.py
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
class npy_cache:
    def __init__(self, cache_path: Optional[Path] = None, cleanup: bool = False):
        self.cache_path = cache_path
        self.cleanup = cleanup

    def __call__(self, f):
        def _wrapped(*args, **kwargs):
            try:
                vid_path = kwargs["filepath"]
            except Exception:
                vid_path = args[0]
            try:
                config = kwargs["config"].dict()
            except Exception:
                config = kwargs

            # don't include cleanup in the hashed config
            config.pop("cleanup_cache")

            # hash config for inclusion in filename
            hash_str = hashlib.sha1(str(config).encode("utf-8")).hexdigest()
            logger.opt(lazy=True).debug(
                "Generated hash {hash_str} from {config}",
                hash_str=lambda: hash_str,
                config=lambda: str(config),
            )

            # strip leading "/" in absolute path
            vid_path = AnyPath(str(vid_path).lstrip("/"))

            if isinstance(vid_path, S3Path):
                vid_path = AnyPath(vid_path.key)

            npy_path = self.cache_path / hash_str / vid_path.with_suffix(".npy")
            # make parent directories since we're using absolute paths
            npy_path.parent.mkdir(parents=True, exist_ok=True)

            if npy_path.exists():
                logger.debug(f"Loading from cache {npy_path}: size {npy_path.stat().st_size}")
                return np.load(npy_path)
            else:
                logger.debug(f"Loading video from disk: {vid_path}")
                loaded_video = f(*args, **kwargs)
                np.save(npy_path, loaded_video)
                logger.debug(f"Wrote to cache {npy_path}: size {npy_path.stat().st_size}")
                return loaded_video

        if self.cache_path is not None:
            return _wrapped
        else:
            return f

    def __del__(self):
        if hasattr(self, "cache_path") and self.cleanup and self.cache_path.exists():
            if self.cache_path.parents[0] == tempfile.gettempdir():
                logger.info(f"Deleting cache dir {self.cache_path}.")
                rmtree(self.cache_path)
            else:
                logger.warning(
                    "Bravely refusing to delete directory that is not a subdirectory of the "
                    "system temp directory. If you really want to delete, do so manually using:\n "
                    f"rm -r {self.cache_path}"
                )

Attributes

cache_path = cache_path instance-attribute
cleanup = cleanup instance-attribute

Functions

__init__(cache_path: Optional[Path] = None, cleanup: bool = False)
Source code in zamba/data/video.py
329
330
331
def __init__(self, cache_path: Optional[Path] = None, cleanup: bool = False):
    self.cache_path = cache_path
    self.cleanup = cleanup

Functions

ensure_frame_number(arr, total_frames: int)

Ensures the array contains the requested number of frames either by clipping frames from the end or dulpicating the last frame.

Parameters:

Name Type Description Default
arr np.ndarray

Array of video frames with shape (frames, height, width, channel).

required
total_frames int

Desired number of frames in output array.

required
Source code in zamba/data/video.py
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
def ensure_frame_number(arr, total_frames: int):
    """Ensures the array contains the requested number of frames either by clipping frames from
    the end or dulpicating the last frame.

    Args:
        arr (np.ndarray): Array of video frames with shape (frames, height, width, channel).
        total_frames (int): Desired number of frames in output array.
    """
    if (total_frames is None) or (arr.shape[0] == total_frames):
        return arr
    elif arr.shape[0] == 0:
        logger.warning(
            "No frames selected. Returning an array in the desired shape with all zeros."
        )
        return np.zeros((total_frames, arr.shape[1], arr.shape[2], arr.shape[3]), dtype="int")
    elif arr.shape[0] > total_frames:
        logger.info(
            f"Clipping {arr.shape[0] - total_frames} frames "
            f"(original: {arr.shape[0]}, requested: {total_frames})."
        )
        return arr[:total_frames]
    elif arr.shape[0] < total_frames:
        logger.info(
            f"Duplicating last frame {total_frames - arr.shape[0]} times "
            f"(original: {arr.shape[0]}, requested: {total_frames})."
        )
        return np.concatenate(
            [arr, np.tile(arr[-1], (total_frames - arr.shape[0], 1, 1, 1))], axis=0
        )

ffprobe(path: os.PathLike) -> pd.Series

Source code in zamba/data/video.py
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
def ffprobe(path: os.PathLike) -> pd.Series:
    def flatten_json(j, name=""):
        for k in j:
            if isinstance(j[k], dict):
                yield from flatten_json(j[k], f"{name}.{k}")
            elif isinstance(j[k], list):
                for i in range(len(j[k])):
                    yield from flatten_json(j[k][i], f"{name}.{k}[{i}]")
            else:
                yield {f"{name}.{k}".strip("."): j[k]}

    output = subprocess.check_output(
        [
            "ffprobe",
            "-v",
            "quiet",
            "-show_entries",
            "stream:format",
            "-select_streams",
            "v",
            "-of",
            "json",
            path,
        ]
    )
    output = json.loads(output)
    result = reduce(lambda a, b: {**a, **b}, flatten_json(output))
    return pd.Series(result)

get_frame_time_estimates(path: os.PathLike)

Source code in zamba/data/video.py
122
123
124
def get_frame_time_estimates(path: os.PathLike):
    probe = ffmpeg.probe(str(path), show_entries="frame=best_effort_timestamp_time")
    return [float(x["best_effort_timestamp_time"]) for x in probe["frames"]]

get_video_stream(path: Union[os.PathLike, S3Path]) -> dict

Source code in zamba/data/video.py
58
59
60
61
62
63
64
def get_video_stream(path: Union[os.PathLike, S3Path]) -> dict:
    try:
        probe = ffmpeg.probe(str(path))
    except ffmpeg.Error as exc:
        raise ZambaFfmpegException(exc.stderr)

    return next((stream for stream in probe["streams"] if stream["codec_type"] == "video"), None)

load_video_frames(filepath: os.PathLike, config: Optional[VideoLoaderConfig] = None, **kwargs)

Loads frames from videos using fast ffmpeg commands.

Parameters:

Name Type Description Default
filepath os.PathLike

Path to the video.

required
config VideoLoaderConfig

Configuration for video loading.

None
**kwargs

Optionally, arguments for VideoLoaderConfig can be passed in directly.

{}

Returns:

Type Description

np.ndarray: An array of video frames with dimensions (time x height x width x channels).

Source code in zamba/data/video.py
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
def load_video_frames(
    filepath: os.PathLike,
    config: Optional[VideoLoaderConfig] = None,
    **kwargs,
):
    """Loads frames from videos using fast ffmpeg commands.

    Args:
        filepath (os.PathLike): Path to the video.
        config (VideoLoaderConfig, optional): Configuration for video loading.
        **kwargs: Optionally, arguments for VideoLoaderConfig can be passed in directly.

    Returns:
        np.ndarray: An array of video frames with dimensions (time x height x width x channels).
    """
    if not Path(filepath).exists():
        raise FileNotFoundError(f"No file found at {filepath}")

    if config is None:
        config = VideoLoaderConfig(**kwargs)

    video_stream = get_video_stream(filepath)
    w = int(video_stream["width"])
    h = int(video_stream["height"])

    pipeline = ffmpeg.input(str(filepath))
    pipeline_kwargs = {}

    if (config.crop_bottom_pixels is not None) and (config.crop_bottom_pixels > 0):
        # scale to ensure all frames are the same height and we can crop
        pipeline = pipeline.filter("scale", f"{w},{h}")
        pipeline = pipeline.crop("0", "0", "iw", f"ih-{config.crop_bottom_pixels}")
        h = h - config.crop_bottom_pixels

    if config.evenly_sample_total_frames:
        config.fps = config.total_frames / float(video_stream["duration"])

    if config.early_bias:
        config.fps = 24  # competition frame selection assumes 24 frames per second
        config.total_frames = 16  # used for ensure_total_frames

    if config.fps:
        pipeline = pipeline.filter("fps", fps=config.fps, round="up")

    if config.i_frames:
        pipeline = pipeline.filter("select", "eq(pict_type,PICT_TYPE_I)")

    if config.scene_threshold:
        pipeline = pipeline.filter("select", f"gt(scene,{config.scene_threshold})")

    if config.frame_selection_height and config.frame_selection_width:
        pipeline = pipeline.filter(
            "scale", f"{config.frame_selection_width},{config.frame_selection_height}"
        )
        w, h = config.frame_selection_width, config.frame_selection_height

    if config.early_bias:
        config.frame_indices = [2, 8, 12, 18, 24, 36, 48, 60, 72, 84, 96, 108, 120, 132, 144, 156]

    if config.frame_indices:
        pipeline = pipeline.filter("select", "+".join(f"eq(n,{f})" for f in config.frame_indices))
        pipeline_kwargs = {"vsync": 0}

    pipeline = pipeline.output(
        "pipe:", format="rawvideo", pix_fmt=config.pix_fmt, **pipeline_kwargs
    )

    try:
        out, err = pipeline.run(capture_stdout=True, capture_stderr=True)
    except ffmpeg.Error as exc:
        raise ZambaFfmpegException(exc.stderr)

    arr = np.frombuffer(out, np.uint8).reshape([-1, h, w, 3])

    if config.megadetector_lite_config is not None:
        mdlite = MegadetectorLiteYoloX(config=config.megadetector_lite_config)
        detection_probs = mdlite.detect_video(video_arr=arr)

        arr = mdlite.filter_frames(arr, detection_probs)

    if (config.model_input_height is not None) and (config.model_input_width is not None):
        resized_frames = np.zeros(
            (arr.shape[0], config.model_input_height, config.model_input_width, 3), np.uint8
        )
        for ix, f in enumerate(arr):
            if (f.shape[0] != config.model_input_height) or (
                f.shape[1] != config.model_input_width
            ):
                f = cv2.resize(
                    f,
                    (config.model_input_width, config.model_input_height),
                    # https://stackoverflow.com/a/51042104/1692709
                    interpolation=(
                        cv2.INTER_LINEAR
                        if f.shape[1] < config.model_input_width
                        else cv2.INTER_AREA
                    ),
                )
            resized_frames[ix, ...] = f
        arr = np.array(resized_frames)

    if config.ensure_total_frames:
        arr = ensure_frame_number(arr, total_frames=config.total_frames)

    return arr

num_frames(stream_or_path: Union[dict, os.PathLike, S3Path]) -> Optional[int]

Source code in zamba/data/video.py
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
def num_frames(stream_or_path: Union[dict, os.PathLike, S3Path]) -> Optional[int]:
    if not isinstance(stream_or_path, dict):
        stream = get_video_stream(stream_or_path)
    else:
        stream = stream_or_path

    if not stream:
        return

    if "nb_frames" in stream:
        return int(stream["nb_frames"])

    if "duration" in stream:
        duration = float(stream["duration"])

        if "r_frame_rate" in stream:
            frame_rate = float(Fraction(stream["r_frame_rate"]))
        elif "avg_frame_rate" in stream:
            frame_rate = float(stream["avg_frame_rate"])
        duration -= float(stream.get("start_time", 0))

        return floor(duration * frame_rate)