Code snippet - yiliu30/yi GitHub Wiki

Gerrit

git push origin HEAD:refs/heads/dev/y/q3
Enumerating objects: 36, done.
Counting objects: 100% (36/36), done.
Delta compression using up to 12 threads
Compressing objects: 100% (17/17), done.
Writing objects: 100% (21/21), 6.04 KiB | 6.04 MiB/s, done.
Total 21 (delta 13), reused 5 (delta 4), pack-reused 0
remote: Resolving deltas: 100% (13/13)
remote: Processing changes: refs: 1, done    
remote: warning: xxx: subject >50 characters; use shorter first paragraph
To ssh://xxx/proj-name
 * [new branch]          HEAD -> dev/yi/qdq3

HPU

run hpu test

python3 -m pytest  --mode lazy  -sv ./any_mode/test_hpu_fp8_ops_any_mode.py -k test_fp8_gemm_v2_quick
more details `env_var_in_scope`
  • For example, mark_step which is used to trigger execution of accumulated graphs in Lazy mode.

Python

Tensor mem size

mem = lambda a:  a.element_size() * a.nelement() / (1.0 *1024**2)

breakpoint

# main.py
a = 1
breakpoint() # It's a built-in method
b = a + 1
# debug it
# python main.py
# ignore breakpoint
# PYTHONBREAKPOINT=0 python main.py
# https://stackoverflow.com/a/71541003/23445462

Mem

def see_memory_usage(message, force=True):
    # Modified from DeepSpeed
    import gc
    import logging

    import torch.distributed as dist

    if not force:
        return
    if dist.is_initialized() and not dist.get_rank() == 0:
        return

    # python doesn't do real-time garbage collection so do it explicitly to get the correct RAM reports
    gc.collect()

    # Print message except when distributed but not rank 0
    logging.info(message)
    logging.info(
        f"AllocatedMem {round(torch.cuda.memory_allocated() / (1024 * 1024 * 1024),2 )} GB \
        MaxAllocatedMem {round(torch.cuda.max_memory_allocated() / (1024 * 1024 * 1024),2)} GB \
        ReservedMem {round(torch.cuda.memory_reserved() / (1024 * 1024 * 1024),2)} GB \
        MaxReservedMem {round(torch.cuda.max_memory_reserved() / (1024 * 1024 * 1024))} GB "
    )

    # get the peak memory to report correct data, so reset the counter for the next call
    torch.cuda.reset_peak_memory_stats()

LLM

@torch.no_grad()
def batch_gen_text(model, tokenizer, msg="", prompt="What's AI?", max_tokens = 50, device="cpu"):
    model = model.to(device)
    inputs = tokenizer.batch_encode_plus(prompt, return_tensors="pt", padding=True, truncation=True)
    inputs = move_data_to_device(inputs, device)
    new_tokens = model.generate(**inputs.to(device), max_length=max_tokens)
    text = tokenizer.batch_decode(new_tokens, skip_special_tokens=True)
    for i, t in enumerate(text):
        print(f"Generated text ({msg}): {t}")


def get_example_inputs(tokenizer):
    iters = 4
    prompt = "What are we having for dinner?"
    example_inputs = tokenizer(prompt, return_tensors="pt")
    for i in range(iters):
        yield example_inputs


def check_package(package_name: str):
    try:
        __import__(package_name)
        return True
    except ImportError:
        print(f"Package {package_name} not found.")
        return False

WA for logger issue

import logging
logging.basicConfig(level = logging.INFO)
### Your code
...

https://stackoverflow.com/a/57234760/23445462

Great register

https://github.com/neuralmagic/compressed-tensors/blob/main/src/compressed_tensors/registry/registry.py

class RegistryMixin:
    """
    Universal registry to support registration and loading of child classes and plugins
    of neuralmagic utilities.

    Classes that require a registry or plugins may add the `RegistryMixin` and use
    `register` and `load` as the main entrypoints for adding new implementations and
    loading requested values from its registry.

    If a class should only have its child classes in its registry, the class should
    set the static attribute `registry_requires_subclass` to True

    example
    ```python
    class Dataset(RegistryMixin):
        pass


    # register with default name
    @Dataset.register()
    class ImageNetDataset(Dataset):
        pass

    # load as "ImageNetDataset"
    imagenet = Dataset.load("ImageNetDataset")

    # register with custom name
    @Dataset.register(name="cifar-dataset")
    class Cifar(Dataset):
        pass

    Note: the name will be standardized for lookup in the registry.
    For example, if a class is registered as "cifar_dataset" or
    "cifar dataset", it will be stored as "cifar-dataset". The user
    will be able to load the class with any of the three name variants.

    # register with multiple aliases
    @Dataset.register(alias=["cifar-10-dataset", "cifar_100_dataset"])
    class Cifar(Dataset):
        pass

    # load as "cifar-dataset"
    cifar = Dataset.load_from_registry("cifar-dataset")

    # load from custom file that implements a dataset
    mnist = Dataset.load_from_registry("/path/to/mnnist_dataset.py:MnistDataset")
    ```
    """

Measure mem

def measure_mem():
    import os
    import psutil
    # Get the process ID of the current process
    pid = os.getpid()

    # Create a Process object for the current process
    process = psutil.Process(pid)

    # Get the memory usage in bytes
    memory_info = process.memory_info()

    print(f"RAM mem: {memory_info.rss / 1024 ** 3} GB")
    print(f"Virtual mem: {memory_info.vms / 1024 ** 3} GB")
    print(f"Shared mem: {memory_info.shared / 1024 ** 3} GB")

Let's time it

import timeit

import numpy as np
import pytest
import torch

import neural_compressor.torch.algorithms.weight_only.modules as inc_modules

fn1 = inc_modules.WeightOnlyLinear.pack_tensor_with_numpy_static
fn2 = inc_modules.WeightOnlyLinear.pack_tensor_with_numpy_opt_np_numba
fn1 = torch.compile(inc_modules.WeightOnlyLinear.pack_tensor_with_torch_static)


@pytest.mark.parametrize("out_features", [128, 1024, 5120, 13824])
@pytest.mark.parametrize("in_features", [1024, 13824])
def test_pack(in_features, out_features):
    bits = 4

    raw_tensor = torch.randint(0, 15, (out_features, in_features), dtype=torch.int8)
    n_pack = 32 // 4
    compression_dtype: torch.dtype = torch.int32
    iters = 20
    raw_np = raw_tensor.numpy()
    time_ref = timeit.timeit(lambda: fn1(raw_tensor, n_pack, bits, compression_dtype), number=iters)
    time_res = timeit.timeit(lambda: fn1(raw_tensor, n_pack, bits, compression_dtype), number=iters)

    print(f"ref : {time_ref},  res: {time_res}, speed up: {time_ref / time_res}")

    # print(f"ref_dur:{ref_dur}, res_dur:{res_dur} res_np")

    # assert np.array_equal(ref.numpy(), res), f"ref:{ref}, res:{res}"
    # assert torch.allclose(ref, torch.from_numpy(res)), f"ref:{ref}, res:{res}"

Compare two objs

from typing import Optional, Callable, Any, List, Tuple, Dict
def assert_same(
    a: Tuple[torch.FloatTensor, Optional[Tuple[torch.FloatTensor, torch.FloatTensor]]],
    b: Tuple[torch.FloatTensor, Optional[Tuple[torch.FloatTensor, torch.FloatTensor]]],
):
    assert len(a) == len(b), f"len: {len(a)} != {len(b)}"
    for i in range(len(a)):
        assert type(a[i]) == type(b[i]), f"type: {type(a[i])} != {type(b[i])}"
        if isinstance(a[i], torch.Tensor):
            torch.testing.assert_allclose(a[i], b[i])
        elif isinstance(a[i], tuple):
            assert_same(a[i], b[i])
        elif isinstance(a[i], dict):
            for k in a[i].keys():
                assert k in b[i], f"key: {k} not in {b[i]}"
                assert_same(a[i][k], b[i].get(k))
        elif a[i] is None:
            assert b[i] is None
        else:
            raise ValueError(f"Unsupported type: {type(a[i])}")
    print("Same!")

Manual seed

seed = 0
import random
random.seed(seed)
import torch
torch.manual_seed(seed)
torch.cuda.manual_seed(seed)
import numpy as np
np.random.seed(seed)

torch.use_deterministic_algorithms(True)
def seed_worker(worker_id):
    worker_seed = torch.initial_seed() % 2**32
    numpy.random.seed(worker_seed)
    random.seed(worker_seed)

g = torch.Generator()
g.manual_seed(0)

DataLoader(
    train_dataset,
    batch_size=batch_size,
    num_workers=num_workers,
    worker_init_fn=seed_worker,
    generator=g,
)

conda

  • Update g++/gcc
# update g++/gcc
conda install -c conda-forge cxx-compiler
  • conda replacement
curl -L -O "https://github.com/conda-forge/miniforge/releases/latest/download/Miniforge3-$(uname)-$(uname -m).sh"
bash Miniforge3-$(uname)-$(uname -m).sh

Shell

Query NV Capability

nvidia-smi --query-gpu=compute_cap --format=csv

Some useful alias

gitrmb() {
  # Loop through all provided branch names
  for branch in "$@"
  do
    # Try to delete the local branch
    if git branch -d "$branch" 2>/dev/null; then
      echo "Successfully deleted local branch: $branch"
    else
      # If the branch was not merged, force delete it
      if git branch -D "$branch" 2>/dev/null; then
        echo "Forcefully deleted local branch: $branch"
      else
        echo "Failed to delete local branch: $branch"
      fi
    fi

    # Try to delete the remote branch
    if git push origin --delete "$branch" 2>/dev/null; then
      echo "Successfully deleted remote branch: $branch"
    else
      echo "Failed to delete remote branch or branch does not exist remotely: $branch"
    fi
  done
}

alias condact='conda activate '
alias p='python 
alias exportpath='export PYTHONPATH=$PYTHONPATH:$PWD'

NUMA For benchmark

OMP_NUM_THREADS=24 numactl -l -C 0-11,24-35  python main.py
# specify the memory nodes from which memory should be allocated.
# OMP_NUM_THREADS=<num_threads> numactl -m <node_index> -C <start_core>-<end_core>  python main.py
OMP_NUM_THREADS=12 numactl -m 0 -C 0-11  python main.py
# allocate memory only on the current node
# OMP_NUM_THREADS=<num_threads> numactl -c -C <start_core>-<end_core>  python main.py
OMP_NUM_THREADS=12 numactl -l -C 0-11  python main.py

taskset

taskset -c 0-11 python xxx

Benckmark LLMs

def generate_torch(model, input_ids, n_generate):
    context_time = 0
    generate_time = []

    with torch.inference_mode():
        for i in range(n_generate):
            if DEVICE != "cpu":
                torch.cuda.synchronize()
            start = time.time()

            if i == 0:
                # prefill context
                inputs = torch.as_tensor(input_ids, device=next(model.parameters()).device)
            else:
                # decode tokens
                inputs = torch.as_tensor(token, device=next(model.parameters()).device)

            out = model(inputs, use_cache=True)

            if DEVICE != "cpu":
                torch.cuda.synchronize()
            token = out[0][:, -1].max(1)[1].unsqueeze(1)

            if i == 0:
                context_time += time.time() - start
            else:
                generate_time.append(time.time() - start)

    return context_time, generate_time

Win-related

#export
set PYTHONPATH=%PYTHONPATH%;%CD%
⚠️ **GitHub.com Fallback** ⚠️