muvs tutorial - WolframRhodium/muvsfunc GitHub Wiki

muvs is an experimental interface to VapourSynth. It features

Most of the existing VapourSynth-Python code is legacy code in this interface, except:

  1. the syntax for clip concatenation and multiplication is abandoned.

    Use std.Splice([clip1, clip2, clip3]) for clip1 + clip2 + clip3 and std.Loop(clip, times=N) for clip * N instead.

  2. Ambiguous argument passing is abandoned.

    In particular, existing Vapoursynth implementation allows function call like

    res = core.std.Interleave([src1, src2], 0, extend=1)

    The function specification in official document says

    std.Interleave(clip[] clips[, bint extend=0, bint mismatch=0, bint modify_duration=True])
    

    but the code evaluates to

    res = core.std.Interleave(clips=[src1, src2], mismatch=0, extend=1)

    This goes against general Python practice. In this interface, a TypeError exception is raised, as specified in Python Language Reference.

Usage

The following header is suggested in .vpy file.

import vapoursynth as vs
import muvs
from muvs import core

# functions for expression
from muvs import Abs, Exp, Not, And, Or, Xor, Log, Sqrt, Min, Max, Conditional

# import mvsfunc as mvf
# ...

muvs.pollute()

pollute is used for automatic pollution to modules using vapoursynth. It must be executed after all import statements. __main__ and __vapoursynth__ will never be polluted.

(Although automatic pollution is feasible, it is encouraged to pollute modules manually for clarity, e.g.

import havsfunc as haf
import mvsfunc as mvf

muvs.pollute(haf, mvf)

Unified infix expression syntax

Instead of writting postfix expression

# from havsfunc.DeHalo_alpha
darkstr = 1.0
brightstr = 1.0
them = core.std.Expr([clp, remove], expr=[f'x y < x x y - {darkstr} * - x x y - {brightstr} * - ?'])

you can write

darkstr = 1.0
brightstr = 1.0

muvs.arithmetic_expr = True
them = muvs.Expr([Conditional(clp < remove, clp - (clp - remove) * darkstr, clp - (clp - remove) * brightstr)])
muvs.arithmetic_expr = False

or simply

darkstr = 1.0
brightstr = 1.0

with muvs.expr():
    them = Conditional(clp < remove, clp - (clp - remove) * darkstr, clp - (clp - remove) * brightstr)

Note that in the latter case, them is computed lazily. .compute() method can be used to get the lazily computed value

them = them.compute(planes=0)

Laziness encourages the fusion of expression evaluations, as demonstrated in the following polynomial evaluation using Horner’s scheme

from typing import Iterable
import numbers

def polyval(p: Iterable[numbers.Real], x: muvs._VideoNode) -> muvs._VideoNode:
    p = iter(p)

    y = next(p)

    with muvs.expr():
        for coeff in p:
            y = y * x + coeff

        return y.compute()

res = polyval([1, 2, 3], src)
# res = core.std.Expr(clips=[src], expr='1.0 x * 2.0 + x * 3.0 +')

std.Lut/std.Lut2 is used if available, which can also be specified by the parameter use_lut:

with muvs.expr(), muvs.record("test_output.vpy", include_header=True):
    clip_a = core.std.BlankClip(format=vs.GRAY8)

    flt_a = ((clip_a ** 3.5) + 0.75 * clip_a) / 2
    flt_a = flt_a.compute(use_lut=True)

    flt_a.set_output()

executes

# ...... (headers)

# output: 640 x 480, Gray8, 240 frames, 24/1 fps
clip0 = core.std.BlankClip(format=vs.PresetFormat.GRAY8)

clip1 = core.std.Lut(clip=clip0, function=lambda x: min(max(int((((x ** 3.5) + (0.75 * x)) / 2.0) + 0.5), 0), 255), bits=8)

clip1.set_output()

And

with muvs.record("test_output.vpy"):
    clip_a = core.std.BlankClip(format=vs.GRAY8)
    clip_b = core.std.BlankClip(format=vs.GRAY8)

    with muvs.expr():
        flt = (clip_b * clip_a / 255 + 0.75 * clip_a) / 2
        flt = flt.compute(use_lut=True)

    flt.set_output()

executes

# output: 640 x 480, Gray8, 240 frames, 24/1 fps
clip0 = core.std.BlankClip(format=vs.PresetFormat.GRAY8)

# output: 640 x 480, Gray8, 240 frames, 24/1 fps
clip1 = core.std.BlankClip(format=vs.PresetFormat.GRAY8)

clip2 = core.std.Lut2(clipa=clip1, clipb=clip0, function=lambda x, y: min(max(int(((((x * y) / 255.0) + (0.75 * y)) / 2.0) + 0.5), 0), 255), bits=8)

clip2.set_output()

Example: Ridge Detection

Source from OpenCV (default params)

import mvsfunc as mvf
pollute(mvf)

def ridge_detection(clip):
    bits = clip.format.bits_per_sample
    clip = mvf.Depth(clip, 32)

    Sobel_x = lambda clip: clip.std.Convolution([-1, 0, 1, -2, 0, 2, -1, 0, 1])
    Sobel_y = lambda clip: clip.std.Convolution([-1, -2, -1, 0, 0, 0, 1, 2, 1])

    sbx = Sobel_x(clip)
    sby = Sobel_y(clip)
    sbxx = Sobel_x(sbx)
    sbyy = Sobel_y(sby)
    sbxy = Sobel_y(sbx)

    with muvs.expr():
        sb2xx = sbxx * sbxx
        sb2yy = sbyy * sbyy
        sb2xy = sbxy * sbxy
        sbxxyy = sbxx * sbyy
        rootex = sb2xx + (sb2xy + sb2xy + sb2xy + sb2xy) - (sbxxyy + sbxxyy) + sb2yy
        root = Sqrt(rootex)
        ridgexp = (sbxx + sbyy) + root
        res = (0.5 * ridgexp).compute()

    return mvf.Depth(res, bits)

Common sub-expression elimination

Common sub-expressions in std.Expr are eliminated. In the following example,

flt = (clip_a + clip_b * clip_b) * (clip_a + clip_b * clip_b)

x y y * + x y y * + * is translated to x y dup * + dup *.

Simple plane extraction syntax

clip.Y can be used to extract the Y channel from a YUV clip. clip.U, clip.V can also be used to extract chrominance components.

It is not legal to use clip.Y in an RGB clip.

Code transformation

recorder = muvs.Recorder() # multiple recorders may co-exist
recorder.start_recording()

# code to be recorded
# ...

recorder.end_recording("transformed_code.vpy")

Or

with muvs.record("transformed_code.vpy") as recorder:
    # vs code to be recorded
    # ...
    recorder.write("# label") # leave a label in the generated code
    # ...

Calls to VapourSynth functions will be written to the file transformed_code.vpy.

For example,

with muvs.record("hqdering_mod.vpy", include_header=True):
    src = core.std.BlankClip(width=1920, height=1080, format=vs.YUV420P8)
    flt = core.resize.Bicubic(src, 1280, 720, format=vs.YUV420P16)
    res = haf.HQDeringmod(flt).resize.Bicubic(format=vs.YUV420P10)
    res.set_output()

writes

import vapoursynth as vs
from vapoursynth import core

core.add_cache = True
core.num_threads = 8
core.max_cache_size = 4096

# output: 1920 x 1080, YUV420P8, 240 frames, 24/1 fps
clip0 = core.std.BlankClip(width=1920, height=1080, format=vs.PresetFormat.YUV420P8)

# width: 1920 -> 1280, height: 1080 -> 720, format: YUV420P8 -> YUV420P16
clip1 = core.resize.Bicubic(clip=clip0, width=1280, height=720, format=vs.PresetFormat.YUV420P16)

clip2 = core.std.Convolution(clip=clip1, matrix=[1, 2, 1, 2, 4, 2, 1, 2, 1], planes=[0])

clip3 = core.std.Convolution(clip=clip2, matrix=[1, 1, 1, 1, 1, 1, 1, 1, 1], planes=[0])

clip4 = core.ctmf.CTMF(clip=clip1, radius=2, planes=[0])

# output: 1280 x 720, YUV420P16, 240 frames, 24/1 fps
clip5 = core.std.Expr(clips=[clip1, clip3, clip4], expr=['x y - x z - * 0 < x x y - abs x z - abs < y z ? ?', '', ''])

clip6 = core.std.Median(clip=clip5, planes=[0])

clip7 = core.std.Convolution(clip=clip6, matrix=[1, 2, 1, 2, 4, 2, 1, 2, 1], planes=[0])

clip8 = core.std.MakeDiff(clipa=clip6, clipb=clip7, planes=[0])

clip9 = core.std.MakeDiff(clipa=clip1, clipb=clip5, planes=[0])

clip10 = core.rgvs.Repair(clip=clip8, repairclip=clip9, mode=[1, 0, 0])

# output: 1280 x 720, YUV420P16, 240 frames, 24/1 fps
clip11 = core.std.Expr(clips=[clip10, clip8], expr=['x 32768 - abs y 32768 - abs <= x y ?', '', ''])

clip12 = core.std.MergeDiff(clipa=clip5, clipb=clip11, planes=[0])

clip13 = core.rgvs.Repair(clip=clip1, repairclip=clip12, mode=[24, 0, 0])

# output: 1280 x 720, YUV420P16, 240 frames, 24/1 fps
clip14 = core.std.Expr(clips=[clip13, clip1], expr=['  x   y  >    x   y  -  abs  771.0 <=  x    x   y  -  abs  1542.0 >=  y    y    x   y  -  1542.0   x   y  -  abs  - * 0.0012970168612191958 * +  ? ?     x   y  -  abs  3084.0 <=  x    x   y  -  abs  6168.0 >=  y    y    x   y  -  6168.0   x   y  -  abs  - * 0.00032425421530479895 * +  ? ?  ? ', '', ''])

clip15 = core.std.Sobel(clip=clip1, planes=[0])

clip16 = core.std.Expr(clips=clip15, expr=['x 15420 < 0 x ?', ''])

clip17 = core.std.Median(clip=clip16, planes=[0])

clip18 = core.misc.Hysteresis(clipa=clip17, clipb=clip16, planes=[0])

clip19 = core.std.Maximum(clip=clip18, planes=[0], coordinates=[1, 1, 1, 1, 1, 1, 1, 1])

clip20 = core.std.Inflate(clip=clip19, planes=[0])

clip21 = core.std.Inflate(clip=clip18, planes=[0])

clip22 = core.std.Minimum(clip=clip21, planes=[0])

# output: 1280 x 720, YUV420P16, 240 frames, 24/1 fps
clip23 = core.std.Expr(clips=[clip20, clip22], expr=['x 65535 y - * 65535 /', ''])

clip24 = core.std.MaskedMerge(clipa=clip1, clipb=clip14, mask=clip23, planes=[0], first_plane=True)

# format: YUV420P16 -> YUV420P10
clip25 = core.resize.Bicubic(clip=clip24, format=vs.PresetFormat.YUV420P10)

clip25.set_output()

Dynamic-method

Dynamically-created custom method.

# simple registration
core.RG = core.rgvs.RemoveGrain # 'name' must start with uppercase letter
core.GaussianBlur = lambda clip, sigma=1.0: core.tcanny.TCanny(clip, sigma=sigma, mode=-1)


# register all functions under 'std' namespace
namespace = core.std

# VS-API4
func_dict = {func.name: func for func in namespace.functions()}

# VS-API3:
# func_dict = {func_name: getattr(namespace, func_name) for func_name in namespace.get_functions().keys()}
core.register_functions(**func_dict)


# usage
blur = clip.RG(4).GaussianBlur(0.5)
sharp_diff = clip.MakeDiff(blur)
sharp = clip.MergeDiff(sharp_diff)

Known issues

Callable objects with no proper definitions of __repr__ method can not be exported currently.

# exporting haf.LSFmod()

clip22 = core.std.Lut(clip=clip21, function=<function LSFmod.<locals>.get_lut3 at 0x0000026C0DE022F0>)

Other examples

import array
from functools import partial

def meshgrid_core(n, f, low, high, horizontal):
    """Auxiliary function"""
    assert low < high, f"{low} < {high}"

    f = f.copy()
    mem_view = f.get_write_array(0)
    height, width = mem_view.shape

    if horizontal:
        data = array.array('f', (((high - low) * j / (width - 1) + low) for j in range(width)))

        for i in range(height):
            mem_view[i, :] = data
    else:
        for i in range(height):
            mem_view[i, :] = array.array('f', [(low - high) * i / (height - 1) + high]) * width

    return f

def mandelbrot(width=1920, height=1280, iterations=100, 
    x_range=(-2, 1), y_range=(-1, 1), c_real=0, c_img=0):

    blank = core.std.BlankClip(format=vs.GRAYS, width=width, height=height, length=1)

    xs = core.std.ModifyFrame(
        blank, blank, 
        partial(meshgrid_core, horizontal=True, low=x_range[0], high=x_range[1]))

    ys = core.std.ModifyFrame(
        blank, blank, 
        partial(meshgrid_core, horizontal=False, low=y_range[0], high=y_range[1]))
    
    count = 0
    x, y = c_real, c_img    

    with muvs.expr():
        for _ in range(iterations):
            x_temp = (x * x - y * y + xs)#.compute()
            y_temp = (x * y + x * y + ys)#.compute()
            mask = (x_temp * x_temp + y_temp * y_temp <= 4)#.compute()
    
            count = Conditional(mask, count + 1 / iterations, count).compute()
            x = Conditional(mask, x_temp, 2).compute()
            y = Conditional(mask, y_temp, 2).compute()
    
        return (1 - count).compute()

Souce

import functools
import math

def AnisGF(clip, guidance=None, radius=4, gamma=0.01, alpha=None, 
    epsilon=2**-8, gaussian=False, sigma=None, adaptive=True):
    """ Anisotropic Guided Filtering

    Args:
        clip: Input clip. (GRAYS)

        guidance: (clip) Guidance clip used to compute the coefficient of the linear translation on 'clip'.
            It must has the same clip properties as 'clip'.
            Default is None.
 
        radius: (int) Box / Gaussian filter's radius.
            Default is 4.

        gamma: (float) Positive scalar value that controls the smoothness of the guided filter. 
            A small value preserves more detail while a large value promotes smoothing.
            Default is 0.01.

        epsilon: (float) Positive scalar value that regularizes the anisotropic weights. 
            A large value will promote more isotropy 
            while a small value will emphasize the anisotropic behavior.
            Default is 2^(-8).

        use_gauss: (bool) Whether to use gaussian guided filter.
            Default is False.

        sigma: (float) Scaled noise variance of the image.
            If it is None, it is estimated from the image.
            Default is None.

        adaptive: (bool) Boolean flag that controls the behavior of the guided filter. 
            It enables the adaptation of the guided filter regularizer (gamma) 
            to better preserve details in the image.
            Default is True.
 
    Ref:
        [1] C. N. Ochotorena and Y. Yamashita, "Anisotropic Guided Filtering," 
            in IEEE Transactions on Image Processing, vol. 29, pp. 1397-1412, 2020, 
            doi: 10.1109/TIP.2019.2941326.
    """

    assert clip.format.id == vs.GRAYS

    if guidance is not None:
        assert clip.format == guidance.format

    if alpha is None:
        alpha = max(math.log10(gamma) + 3, 0)

    if sigma is None:
        def f(clip):
            return clip.std.Convolution(
                [1, -2, 1, -2, 4, -2, 1, -2, 1], 
                divisor=6/math.sqrt(math.pi/2), saturate=False)

        def gen(n, f, clip, core):
            sigma_val = f.props.PlaneStatsAverage ** 2
            return core.std.BlankClip(clip, color=sigma_val)

        if guidance is None:
            stats = f(clip)
        else:
            stats = f(guidance)

        stats = stats.std.PlaneStats()
        sigma = core.std.FrameEval(stats, functools.partial(gen, clip=stats, core=core), stats)

    Filter1 = functools.partial(core.std.BoxBlur, hradius=radius, vradius=radius)

    if gaussian:
        Filter2 = functools.partial(core.tcanny.TCanny, sigma=radius/2+0.25, mode=-1)
    else:
        Filter2 = Filter1

    with muvs.expr():
        if guidance is None:
            X1 = Filter1(clip)
            X2 = Filter1(clip * clip)
            W = (X2 - (X1 * X1)).compute()
        else:
            G1 = Filter1(guidance)
            G2 = Filter1(guidance * guidance)
            X1 = Filter1(clip)
            XG = Filter1(clip * guidance)
            MG1 = G1
            W = (G2 - (G1 * G1)).compute()

        if adaptive:
            gamma = 0.0002 * (2 * radius + 1) * gamma / (W + 1e-6)
 
        if guidance is None:
            A = W / (W + gamma)
            B = (1 - A) * X1
        else:
            A = (XG - (X1 * MG1)) / (W + gamma)
            B = X1 - (A * G1)

        W = Max(W - sigma, 0)
        W = (epsilon / ((W * ((2 * radius + 1) ** 2)) ** alpha + epsilon)).compute()

        A = Filter2(W * A)
        B = Filter2(W * B)
        W = Filter2(W)

        if guidance is None:
            res = (A * clip + B) / W
        else:
            res = (A * guidance + B) / W

        return res.compute()
⚠️ **GitHub.com Fallback** ⚠️