Embedded src 0003 - JohnHau/mis GitHub Wiki
//init.py============================================= nothing, empty
//common.py======================================================= #!/usr/bin/env python
import os import hashlib import datetime import random import configparser import binascii
from pynrfjprog import Hex, LowLevel from intelhex import bin2hex, IntelHex, IntelHex16bit
from libs.mlogger import Logger
logger = Logger().get_log()
def get_random_vector(vec_size): return [random.randrange(0,0xff) for x in range(vec_size)]
def get_log(): return logger
def list_padding(list1, list2, offset): if offset > len(list1): logger.error("list1 size need more than offset!" + "list1 size = " + str(len(list1)) + "offset = " + str(offset)) raise Exception("list padding error, list1 size need more than offset!") list1[offset:offset + len(list2)] = list2 return list1
def get_now_date(): # now = datetime.datetime.now() return datetime.datetime.now().strftime('%Y%m%d')
def list_to_string(list_data): if type(list_data) is str: return list_data else: return "".join([chr(x) for x in list_data])
def string_to_list(string_data): if type(string_data) is list: return string_data elif type(string_data) is bytes: return list(string_data) else: return [ord(x) for x in string_data]
def int_to_list(in_data): if type(in_data) is list: return in_data else: return [in_data & 0xff, (in_data >> 8) & 0xff, (in_data >> 16) & 0xff, (in_data >> 24) & 0xff]
def list_to_int(list_data):
if type(list_data) is int:
return list_data
else:
return list_data[0] | (list_data[1] << 8) |
(list_data[2] << 16) | (list_data[3] << 24)
def file_readf(path, r_len=0): if not os.path.exists(path): raise Exception("File not exists!")
logger.info("Open file: " + path)
f = open(path, 'rb')
if r_len > 0:
r_data = f.read(r_len)
else:
r_data = f.read()
f.close()
return r_data
def file_writef(path, wdata): if wdata is None or len(wdata) == 0: raise Exception("Data error!")
if type(wdata) is str:
in_data = bytes(string_to_list(wdata))
elif type(wdata) is list:
in_data = bytes(wdata)
else:
in_data = wdata
logger.info("Write to file: " + path)
f = open(path, 'wb')
f.write(in_data)
f.close()
def calc_digest(file_data, hash_type="sha256"): if type(file_data) is list: in_data = bytes(file_data) elif type(file_data) is str : in_data = bytes(string_to_list(file_data)) else: in_data = file_data
if hash_type == "sha256":
x = hashlib.sha256()
elif hash_type == "sha512":
x = hashlib.sha512()
else:
x = hashlib.sha256()
x.update(in_data)
dig = x.digest()
return list(dig)
def read_file_data(file_name, align=4): if file_name[-4:] == ".bin": file_type = 1 elif file_name[-4:] == ".hex": file_type = 2 else: logger.info(file_name + "is not bin or hex") return None, None, None, -1
if file_type == 1:
file_data = string_to_list(file_readf(file_name))
else:
try:
ih = IntelHex(file_name)
except Exception as e:
raise e
bin_start, bin_end=ih._get_start_end()
file_data = [0xff] * (bin_end - bin_start + 1)
ll = len(file_data)
for start, end in ih.segments():
wordarray = ih.tobinarray(start=start, end=end - 1)
# file_data[start - bin_start:end - bin_start] = wordarray.tolist()
list_padding(file_data, wordarray.tolist(), start - bin_start)
file_size = len(file_data)
if align > 0:
# align
if file_size % align:
left = align - file_size % align
file_data += [0xff]*left
file_size += left
return file_type, file_size, file_data, 0
def load_config(file): if not os.path.exists(file): logger.error(file + "is not exists!") #raise Exception("File not exists!") exit(-1)
if not ".ini" in file:
logger.error(file + " don't have config file, should include config file")
exit(-1)
cf = configparser.ConfigParser()
cf.read(file)
return cf
def mkdir(path): path = path.strip() path = path.rstrip("\") isExists = os.path.exists(path) if not isExists: os.makedirs(path) return path else: return path
def convert_string2int(string): """ Convert string to int format :param string: string of utf-8 format :return: hexlified string of int format """
return int(binascii.hexlify(string), 16)
def encrypt_message(key, iv, plain_text): """ Encrypt message using AES CTR Mode :param key: key for encrypt :param iv:initial vector :param plain_text: message to encrypt :return: encrypted message """
ctr = Counter.new(128, initial_value=convert_string2int(iv)) # Set initial vector
aes = AES.new(key, AES.MODE_CTR, counter=ctr) # Set CTR Mode
return aes.encrypt(plain_text)
def decrypt_message(key, iv, cipher_text): """ Decrypt message using AES CTR Mode :param key: key for decrypt :param iv: initial vector for AES encrypt :param cipher_text: encrypted message :return: utf-8 format string of decrypted message """
ctr = Counter.new(128, initial_value=convert_string2int(iv)) # Set initial vector
aes = AES.new(key, AES.MODE_CTR, counter=ctr) # Also, Set CTR Mode to AES
return aes.decrypt(cipher_text)
def get16_str(list):
bts = binascii.b2a_hex(bytes(list))
str16 = bytes.decode(bts)
return str16
def add_to_16(text): if len(text) % 16: add = 16 - (len(text) % 16) else: add = 0 text = text + b'' + ('\0' * add).encode("utf-8") return text
def encrypt_AESCBC(key, iv, text): mode = AES.MODE_CBC text = add_to_16(text) cryptos = AES.new(key.encode('utf-8'), mode, iv) cipher_text = cryptos.encrypt(text)
# hex string
return bytes.decode(binascii.b2a_hex(cipher_text))
def decrypt_AESCBC(key, iv, text): mode = AES.MODE_CBC cryptos = AES.new(key.encode('utf-8'), mode, iv) plain_text = cryptos.decrypt(binascii.a2b_hex(text)) #print(plain_text)
# take out the supplementary space
return bytes.decode(binascii.b2a_hex(plain_text)).rstrip('\0')
//mlogger.py===================================================================== #!/usr/bin/env python3
import os import sys from loguru import logger
class Logger(object):
tmp_folder = os.path.join(os.environ["TMP"], "GWP_LOG")
def __init__(self, path=tmp_folder):
file_name = os.path.join(path, "log_{time:YYYY_MM_DD_HH_mm_ss}.log")
logger.configure(handlers=[{"sink": sys.stderr, "level": "DEBUG"}])
logger.add(file_name)
self._log = logger
def info(self, message):
self._log.info(message)
def debug(self, message):
self._log.debug(message)
def error(self, message):
self._log.error(message)
def success(self, message):
self._log.success(message)
def exception(self, message):
self.exception(message)
def get_log(self):
return self._log
//buildingMachineTool-tool.py #!/usr/bin/env python3.7
import shutil import sys
from libs.common import * import subprocess
tool_version = "1.0.0.1" #debug_path = ".\output\temp" debug_path = "./output\temp"
#temp_path = "C:\tempFolder" temp_path = "./tempFolder"
l2_signed_file_name = "l2.imx" app_kicker_signed_file_name = "app_kicker.imx" flash_Loader_signed_file_name = "flash_Loader.imx" cst_config_file = "CST.config"
#config_file = "..\Project_Config.ini" config_file = "./Project_Config.ini" xcwd = os.getcwd(); mm = os.path.dirname(os.getcwd()); nn = os.path.abspath(os.path.dirname(os.getcwd()))
zz = os.path.join("abc","123");
print(zz) #print(nn) #print(mm) #print(xcwd) exit(-1)
run_pdt_cmd = os.path.join(os.path.join(os.path.abspath(os.path.dirname(os.getcwd())), "PDT"), "ProvDataTool.py gen_pd")
run_pdt_cmd = os.path.join(os.path.join(os.path.abspath(os.path.dirname(os.getcwd())), "PDT\dist"), "ProvDataTool.exe gen_pd")
run_lsct_cmd = os.path.join(os.path.join(os.path.abspath(os.path.dirname(os.getcwd())), "LSCT"), "L2SigningConfigTool"
run_lsct_cmd = os.path.join(os.path.join(os.path.abspath(os.path.dirname(os.getcwd())), "LSCT\dist"), "L2SigningConfigTool" ".exe gen_cst")
run_ast_cmd = os.path.join(os.path.join(os.path.abspath(os.path.dirname(os.getcwd())), "AST\Exe"), "astt.exe --sign") +
" " + os.path.join(os.path.join(os.path.abspath(os.path.dirname(os.getcwd())), "Project_Config.ini"))
run_cst_base_cmd = "powershell -file " + os.path.join( os.path.join(os.path.abspath(os.path.dirname(os.getcwd())), "nxp-cst"), "create-imxrt-image.ps1")
run_fat_cmd = os.path.join(os.path.join(os.path.abspath(os.path.dirname(os.getcwd())), "FAT"), "fat.py --vsom") + \
" " + os.path.join(os.path.join(os.path.abspath(os.path.dirname(os.getcwd())), "Project_Config.ini"))
run_fat_cmd = os.path.join(os.path.join(os.path.abspath(os.path.dirname(os.getcwd())), "FAT\Exe"), "fat.exe --vsom") +
" " + os.path.join(os.path.join(os.path.abspath(os.path.dirname(os.getcwd())), "Project_Config.ini"))
def init_func(): logger.debug("=============== init function ===========================\n")
# check the required files.
project_cf = load_config(config_file)
# elftosb.exe check
elftosb_path = get_config_data(project_cf, "ProjectConfig", "NXP_tool_path")
elftosb_file = os.path.join(elftosb_path, "elftosb.exe")
if not os.path.exists(elftosb_file):
logger.error(elftosb_file + " is not exists!")
# raise Exception("File not exists!")
exit(-1)
# RawKeySigner.exe check
rawKeySigner_path = get_config_data(project_cf, "ProjectConfig", "ECC_SignTool_Path")
rawKeySigner_file = os.path.join(rawKeySigner_path, "RawKeySigner.exe")
if not os.path.exists(rawKeySigner_file):
logger.error(rawKeySigner_file + " is not exists!")
# raise Exception("File not exists!")
exit(-1)
# OPGT Path check
opgt_path = get_config_data(project_cf, "ProjectConfig", "OPGT_Path")
if not os.path.exists(opgt_path):
logger.error(opgt_path + " OPGT_Path is not exists!")
# raise Exception("File not exists!")
exit(-1)
# Fcb file check
fcb_file = get_config_data(project_cf, "ProjectConfig", "FcbFile")
if not os.path.exists(fcb_file):
logger.error(fcb_file + " FcbFile is not exists!")
# raise Exception("File not exists!")
exit(-1)
# L2 file check
l2_file = get_config_data(project_cf, "L2", "file")
if not os.path.exists(l2_file):
logger.error(l2_file + " L2 file is not exists!")
# raise Exception("File not exists!")
exit(-1)
# App file check
app_file = get_config_data(project_cf, "App", "file")
if not os.path.exists(app_file):
logger.error(app_file + " App file is not exists!")
# raise Exception("File not exists!")
exit(-1)
# App_kicker file check
app_kicker_file = get_config_data(project_cf, "App_kicker", "file")
if not os.path.exists(app_kicker_file):
logger.error(app_kicker_file + " App_kicker file is not exists!")
# raise Exception("File not exists!")
exit(-1)
# App_config file check
app_config_file = get_config_data(project_cf, "App_config", "file")
if not os.path.exists(app_config_file):
logger.error(app_config_file + " App_config file is not exists!")
# raise Exception("File not exists!")
exit(-1)
# FCT file check
fct_file = get_config_data(project_cf, "FCT", "file")
if not os.path.exists(fct_file):
logger.error(fct_file + " FCT file is not exists!")
# raise Exception("File not exists!")
exit(-1)
# FlashLoader check
flashloader_file = get_config_data(project_cf, "FlashLoader", "file")
if not os.path.exists(flashloader_file):
logger.error(flashloader_file + " FlashLoader file is not exists!")
# raise Exception("File not exists!")
exit(-1)
# SRKFuseFile check
srkfuse_file = get_config_data(project_cf, "Efuse", "SRKFuseFile")
if not os.path.exists(srkfuse_file):
logger.error(srkfuse_file + " SRKFuseFile is not exists!")
# raise Exception("File not exists!")
exit(-1)
# check ecc signing certificate file
ecccert_file = get_config_data(project_cf, "ProjectConfig", "ECC_signing_certificate_file")
if not os.path.exists(ecccert_file):
logger.error(ecccert_file + " ecc signing certificate file is not exists!")
# raise Exception("File not exists!")
exit(-1)
# check OPGT.exe file
opgtPath = get_config_data(project_cf, "ProjectConfig", "OPGT_Path")
opgtFile = os.path.join(opgtPath, "OPGT.exe")
if not os.path.exists(opgtFile):
logger.error(opgtFile + " is not exists!")
# raise Exception("File not exists!")
exit(-1)
# create temporary folder
if not os.path.exists(temp_path):
os.mkdir(temp_path)
else:
# remove the temp path
shutil.rmtree(temp_path)
os.mkdir(temp_path)
# create debug folder
if not os.path.exists(debug_path):
os.makedirs(debug_path)
else:
# remove the debug folder
shutil.rmtree(debug_path)
os.makedirs(debug_path)
def finish_func(): logger.debug("=============== finished function ===========================\n") if os.path.exists(temp_path): # remove the temp path shutil.rmtree(temp_path) logger.info("Build Machine Tool run success!!!")
def get_config_data(cf, key, value): try: data = cf.get(key, value) except Exception as e: if value == "dcdFile": return -1 else: logger.error("please add config parameter in current folder! " + key + "->" + value) # raise e exit(-1) return data
def run_pdt(): logger.debug("=============== run Provision Data Tool ===========================\n") logger.debug("run Provision Data Tool CMD : " + run_pdt_cmd) p = subprocess.Popen(run_pdt_cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, shell=True) output, _ = p.communicate(b'Y\n') returncode = p.returncode if returncode == 0: logger.info("run Provision Data Tool success!") else: logger.error("run Provision Data Tool failed, build Machine tool exit!!!") # shutil.rmtree(temp_path) sys.exit()
def run_lsct(): logger.debug("=============== run L2 Signing Config Tool ========================\n") logger.debug("run L2 Signing Config Tool CMD : " + run_lsct_cmd) p = subprocess.Popen(run_lsct_cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, shell=True) output, _ = p.communicate(b'Y\n') returncode = p.returncode if returncode == 0: logger.info("run L2 Signing Config Tool success!") cst_config_file = os.path.join(temp_path, "CST.config") shutil.copy(cst_config_file, debug_path) else: logger.error("runL2 Signing Config Tool failed, build Machine tool exit!!!") # shutil.rmtree(temp_path) sys.exit()
def run_nxp_cst(): logger.debug("=============== run NXP CST =======================================\n") logger.debug("run NXP CST BASE CMD : " + run_cst_base_cmd)
project_cf = load_config(config_file)
L2_file = get_config_data(project_cf, "L2", "file")
app_kicker_file = get_config_data(project_cf, "App_kicker", "file")
flash_loader_file = get_config_data(project_cf, "FlashLoader", "file")
basePath = get_config_data(project_cf, "ProjectConfig", "Base_path")
imageOutputPath = get_config_data(project_cf, "ProjectConfig", "Factory_Image_output_path")
dcdFile = get_config_data(project_cf, "CST", "dcdFile")
facImageOutputPath = os.path.join(basePath, imageOutputPath)
cstSignConfigFile = os.path.join(temp_path, cst_config_file)
# run NXP CST L2 CMD
if dcdFile == -1:
logger.debug("NXP CST L2 CMD without dcdFile!")
run_cst_L2_cmd = run_cst_base_cmd + " -Type L2" + " -InFile " + L2_file + " -ConfigFile " + cstSignConfigFile + " -OutFile " + os.path.join(
temp_path, l2_signed_file_name)
else:
logger.debug("NXP CST L2 CMD with dcdFile!")
run_cst_L2_cmd = run_cst_base_cmd + " -Type L2" + " -InFile " + L2_file + " -ConfigFile " + cstSignConfigFile + " -OutFile " + os.path.join(
temp_path, l2_signed_file_name) + " -DcdFile " + dcdFile
logger.debug("run NXP CST L2 CMD : " + run_cst_L2_cmd)
if os.system(run_cst_L2_cmd) == 0:
logger.info("run NXP CST L2 CMD success!")
else:
logger.error("run NXP CST L2 CMD failed, build Machine tool exit!!!")
# shutil.rmtree(temp_path)
sys.exit()
# run NXP CST App Kicker CMD
run_cst_app_kicker_cmd = run_cst_base_cmd + " -Type AppKicker" + " -InFile " + app_kicker_file + " -ConfigFile " + cstSignConfigFile + " -OutFile " + os.path.join(
temp_path, app_kicker_signed_file_name)
logger.debug("run NXP CST App Kicker CMD : " + run_cst_app_kicker_cmd)
if os.system(run_cst_app_kicker_cmd) == 0:
logger.info("run NXP CST App Kicker CMD success!")
else:
logger.error("run NXP CST App Kicker CMD failed, build Machine tool exit!!!")
# shutil.rmtree(temp_path)
sys.exit()
# run Flash loader CMD
run_cst_flash_loader_cmd = run_cst_base_cmd + " -Type FlashLoader" + " -InFile " + flash_loader_file + " -ConfigFile " + cstSignConfigFile + " -OutFile " + os.path.join(
facImageOutputPath, flash_Loader_signed_file_name)
logger.debug("run NXP CST Flash loader CMD : " + run_cst_flash_loader_cmd)
if os.system(run_cst_flash_loader_cmd) == 0:
logger.info("run NXP CST Flash loader CMD success!")
else:
logger.error("run NXP CST Flash loader CMD failed, build Machine tool exit!!!")
# shutil.rmtree(temp_path)
sys.exit()
def run_ast(): logger.debug("=============== run App Sign Tool =================================\n") logger.debug("run App Sign Tool CMD : " + run_ast_cmd) if os.system(run_ast_cmd) == 0: logger.info("run App Sign Tool success!")
config_ota_file = os.path.join(temp_path, "config_ota.xml")
config_hybird1_file = os.path.join(temp_path, "config_hybird1.xml")
config_hybird2_file = os.path.join(temp_path, "config_hybird2.xml")
shutil.copy(config_ota_file, debug_path)
shutil.copy(config_hybird1_file, debug_path)
shutil.copy(config_hybird2_file, debug_path)
else:
logger.error("run App Sign Tool failed, build Machine tool exit!!!")
# shutil.rmtree(temp_path)
sys.exit()
def run_opgt(): logger.debug("=============== run OTA Package Generate Tool =====================\n") project_cf = load_config(config_file) opgt_Path = get_config_data(project_cf, "ProjectConfig", "OPGT_Path") base_path = get_config_data(project_cf, "ProjectConfig", "Base_path") ota_Path = os.path.join(base_path, get_config_data(project_cf, "ProjectConfig", "OTA_image_output_path"))
logger.debug("OPGT_PATH is : " + opgt_Path)
opgtFile = os.path.join(opgt_Path, "OPGT.exe")
if not os.path.exists(opgtFile):
logger.error(opgtFile + "is not exists!")
exit(-1)
configOTAFile = os.path.join(temp_path, "config_ota.xml")
configHybird1File = os.path.join(temp_path, "config_hybird1.xml")
configHybird2File = os.path.join(temp_path, "config_hybird2.xml")
run_opgt_ota_cmd = opgtFile + " OPGT " + configOTAFile + " C:\\tempFolder\OTA.bin"
run_opgt_hybird1_cmd = opgtFile + " OPGT " + configHybird1File + " C:\\tempFolder\hybird_app_signed.bin"
run_opgt_hybird2_cmd = opgtFile + " OPGT " + configHybird2File + " C:\\tempFolder\hybird_fct_signed.bin"
logger.debug("run opgt ota cmd is : " + run_opgt_ota_cmd)
if os.system(run_opgt_ota_cmd) == 0:
logger.info("run opgt ota cmd success!")
else:
logger.error("run opgt ota cmd failed, build Machine tool exit!!!")
# shutil.rmtree(temp_path)
sys.exit()
ota_file = os.path.join(ota_Path, "OTA.bin")
if not os.path.exists(ota_Path):
os.makedirs(ota_Path)
shutil.copy('C:\\tempFolder\OTA.bin', ota_file)
logger.debug("run opgt hybird1 cmd is : " + run_opgt_hybird1_cmd)
if os.system(run_opgt_hybird1_cmd) == 0:
logger.info("run opgt hybird1 cmd success!")
else:
logger.error("run opgt hybird1 cmd failed, build Machine tool exit!!!")
# shutil.rmtree(temp_path)
sys.exit()
logger.debug("run opgt hybird2 cmd is : " + run_opgt_hybird2_cmd)
if os.system(run_opgt_hybird2_cmd) == 0:
try:
rt105x_xip = project_cf.get("ProjectConfig", "rt105x_xip")
print(rt105x_xip)
except Exception as e:
rt105x_xip = "0"
if rt105x_xip == "1":
src_file = os.path.join("C:\\tempFolder\\hybird_fct_signed.bin",)
dst_file = os.path.join("C:\\tempFolder\\hybird_fct_signed_no_header.bin",)
if os.path.exists(dst_file):
os.remove(dst_file)
if not os.path.exists(src_file):
logger.error("run opgt hybird2 cmd failed, build Machine tool exit!!!")
sys.exit()
with open(src_file, "rb") as f:
f_data = f.read()
with open(dst_file, "wb") as f:
f.write(f_data[512:])
src_file = os.path.join("C:\\tempFolder\\hybird_app_signed.bin",)
dst_file = os.path.join("C:\\tempFolder\\hybird_app_signed_no_header.bin",)
if os.path.exists(dst_file):
os.remove(dst_file)
if not os.path.exists(src_file):
logger.error("run opgt hybird2 cmd failed, build Machine tool exit!!!")
sys.exit()
with open(src_file, "rb") as f:
f_data = f.read()
with open(dst_file, "wb") as f:
f.write(f_data[512:])
logger.info("xip file create success")
logger.info("run opgt hybird2 cmd success!")
else:
logger.error("run opgt hybird2 cmd failed, build Machine tool exit!!!")
# shutil.rmtree(temp_path)
sys.exit()
logger.info("run OTA Package Generate Tool success!")
def run_fat(): logger.debug("=============== run Firmware Assembly Tool ========================\n") logger.debug("run Firmware Assembly Tool CMD : " + run_fat_cmd) if os.system(run_fat_cmd) == 0: logger.info("run Firmware Assembly Tool success!") else: logger.error("run Firmware Assembly Tool failed, build Machine tool exit!!!") # shutil.rmtree(temp_path) sys.exit()
tools_list = [run_pdt, run_lsct, run_nxp_cst, run_ast, run_opgt, run_fat]
if name == "main": logger.info("Start Build Machine Tool!") logger.info("build Machine tools version is : " + tool_version) init_func() # run each script for i in range(len(tools_list)): tools_listi
finish_func()
//buildingMachineTool-tool-bak.py======================================================= #!/usr/bin/env python3.7
import shutil import sys
from libs.common import * import subprocess
tool_version = "1.0.0.1" debug_path = ".\output\temp" temp_path = "C:\tempFolder" l2_signed_file_name = "l2.imx" app_kicker_signed_file_name = "app_kicker.imx" flash_Loader_signed_file_name = "flash_Loader.imx" cst_config_file = "CST.config"
config_file = "..\Project_Config.ini"
run_pdt_cmd = os.path.join(os.path.join(os.path.abspath(os.path.dirname(os.getcwd())), "PDT"), "ProvDataTool.py gen_pd")
run_pdt_cmd = os.path.join(os.path.join(os.path.abspath(os.path.dirname(os.getcwd())), "PDT\dist"), "ProvDataTool.exe gen_pd")
run_lsct_cmd = os.path.join(os.path.join(os.path.abspath(os.path.dirname(os.getcwd())), "LSCT"), "L2SigningConfigTool"
run_lsct_cmd = os.path.join(os.path.join(os.path.abspath(os.path.dirname(os.getcwd())), "LSCT\dist"), "L2SigningConfigTool" ".exe gen_cst")
run_ast_cmd = os.path.join(os.path.join(os.path.abspath(os.path.dirname(os.getcwd())), "AST\Exe"), "astt.exe --sign") +
" " + os.path.join(os.path.join(os.path.abspath(os.path.dirname(os.getcwd())), "Project_Config.ini"))
run_cst_base_cmd = "powershell -file " + os.path.join( os.path.join(os.path.abspath(os.path.dirname(os.getcwd())), "nxp-cst"), "create-imxrt-image.ps1")
run_fat_cmd = os.path.join(os.path.join(os.path.abspath(os.path.dirname(os.getcwd())), "FAT"), "fat.py --vsom") + \
" " + os.path.join(os.path.join(os.path.abspath(os.path.dirname(os.getcwd())), "Project_Config.ini"))
run_fat_cmd = os.path.join(os.path.join(os.path.abspath(os.path.dirname(os.getcwd())), "FAT\Exe"), "fat.exe --vsom") +
" " + os.path.join(os.path.join(os.path.abspath(os.path.dirname(os.getcwd())), "Project_Config.ini"))
def init_func(): logger.debug("=============== init function ===========================\n")
# check the required files.
project_cf = load_config(config_file)
# elftosb.exe check
elftosb_path = get_config_data(project_cf, "ProjectConfig", "NXP_tool_path")
elftosb_file = os.path.join(elftosb_path, "elftosb.exe")
if not os.path.exists(elftosb_file):
logger.error(elftosb_file + " is not exists!")
# raise Exception("File not exists!")
exit(-1)
# RawKeySigner.exe check
rawKeySigner_path = get_config_data(project_cf, "ProjectConfig", "ECC_SignTool_Path")
rawKeySigner_file = os.path.join(rawKeySigner_path, "RawKeySigner.exe")
if not os.path.exists(rawKeySigner_file):
logger.error(rawKeySigner_file + " is not exists!")
# raise Exception("File not exists!")
exit(-1)
# OPGT Path check
opgt_path = get_config_data(project_cf, "ProjectConfig", "OPGT_Path")
if not os.path.exists(opgt_path):
logger.error(opgt_path + " OPGT_Path is not exists!")
# raise Exception("File not exists!")
exit(-1)
# Fcb file check
fcb_file = get_config_data(project_cf, "ProjectConfig", "FcbFile")
if not os.path.exists(fcb_file):
logger.error(fcb_file + " FcbFile is not exists!")
# raise Exception("File not exists!")
exit(-1)
# L2 file check
l2_file = get_config_data(project_cf, "L2", "file")
if not os.path.exists(l2_file):
logger.error(l2_file + " L2 file is not exists!")
# raise Exception("File not exists!")
exit(-1)
# App file check
app_file = get_config_data(project_cf, "App", "file")
if not os.path.exists(app_file):
logger.error(app_file + " App file is not exists!")
# raise Exception("File not exists!")
exit(-1)
# App_kicker file check
app_kicker_file = get_config_data(project_cf, "App_kicker", "file")
if not os.path.exists(app_kicker_file):
logger.error(app_kicker_file + " App_kicker file is not exists!")
# raise Exception("File not exists!")
exit(-1)
# App_config file check
app_config_file = get_config_data(project_cf, "App_config", "file")
if not os.path.exists(app_config_file):
logger.error(app_config_file + " App_config file is not exists!")
# raise Exception("File not exists!")
exit(-1)
# FCT file check
fct_file = get_config_data(project_cf, "FCT", "file")
if not os.path.exists(fct_file):
logger.error(fct_file + " FCT file is not exists!")
# raise Exception("File not exists!")
exit(-1)
# FlashLoader check
flashloader_file = get_config_data(project_cf, "FlashLoader", "file")
if not os.path.exists(flashloader_file):
logger.error(flashloader_file + " FlashLoader file is not exists!")
# raise Exception("File not exists!")
exit(-1)
# SRKFuseFile check
srkfuse_file = get_config_data(project_cf, "Efuse", "SRKFuseFile")
if not os.path.exists(srkfuse_file):
logger.error(srkfuse_file + " SRKFuseFile is not exists!")
# raise Exception("File not exists!")
exit(-1)
# check ecc signing certificate file
ecccert_file = get_config_data(project_cf, "ProjectConfig", "ECC_signing_certificate_file")
if not os.path.exists(ecccert_file):
logger.error(ecccert_file + " ecc signing certificate file is not exists!")
# raise Exception("File not exists!")
exit(-1)
# check OPGT.exe file
opgtPath = get_config_data(project_cf, "ProjectConfig", "OPGT_Path")
opgtFile = os.path.join(opgtPath, "OPGT.exe")
if not os.path.exists(opgtFile):
logger.error(opgtFile + " is not exists!")
# raise Exception("File not exists!")
exit(-1)
# create temporary folder
if not os.path.exists(temp_path):
os.mkdir(temp_path)
else:
# remove the temp path
shutil.rmtree(temp_path)
os.mkdir(temp_path)
# create debug folder
if not os.path.exists(debug_path):
os.makedirs(debug_path)
else:
# remove the debug folder
shutil.rmtree(debug_path)
os.makedirs(debug_path)
def finish_func(): logger.debug("=============== finished function ===========================\n") if os.path.exists(temp_path): # remove the temp path shutil.rmtree(temp_path) logger.info("Build Machine Tool run success!!!")
def get_config_data(cf, key, value): try: data = cf.get(key, value) except Exception as e: if value == "dcdFile": return -1 else: logger.error("please add config parameter in current folder! " + key + "->" + value) # raise e exit(-1) return data
def run_pdt(): logger.debug("=============== run Provision Data Tool ===========================\n") logger.debug("run Provision Data Tool CMD : " + run_pdt_cmd) p = subprocess.Popen(run_pdt_cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, shell=True) output, _ = p.communicate(b'Y\n') returncode = p.returncode if returncode == 0: logger.info("run Provision Data Tool success!") else: logger.error("run Provision Data Tool failed, build Machine tool exit!!!") # shutil.rmtree(temp_path) sys.exit()
def run_lsct(): logger.debug("=============== run L2 Signing Config Tool ========================\n") logger.debug("run L2 Signing Config Tool CMD : " + run_lsct_cmd) p = subprocess.Popen(run_lsct_cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, shell=True) output, _ = p.communicate(b'Y\n') returncode = p.returncode if returncode == 0: logger.info("run L2 Signing Config Tool success!") cst_config_file = os.path.join(temp_path, "CST.config") shutil.copy(cst_config_file, debug_path) else: logger.error("runL2 Signing Config Tool failed, build Machine tool exit!!!") # shutil.rmtree(temp_path) sys.exit()
def run_nxp_cst(): logger.debug("=============== run NXP CST =======================================\n") logger.debug("run NXP CST BASE CMD : " + run_cst_base_cmd)
project_cf = load_config(config_file)
L2_file = get_config_data(project_cf, "L2", "file")
app_kicker_file = get_config_data(project_cf, "App_kicker", "file")
flash_loader_file = get_config_data(project_cf, "FlashLoader", "file")
basePath = get_config_data(project_cf, "ProjectConfig", "Base_path")
imageOutputPath = get_config_data(project_cf, "ProjectConfig", "Factory_Image_output_path")
dcdFile = get_config_data(project_cf, "CST", "dcdFile")
facImageOutputPath = os.path.join(basePath, imageOutputPath)
cstSignConfigFile = os.path.join(temp_path, cst_config_file)
# run NXP CST L2 CMD
if dcdFile == -1:
logger.debug("NXP CST L2 CMD without dcdFile!")
run_cst_L2_cmd = run_cst_base_cmd + " -Type L2" + " -InFile " + L2_file + " -ConfigFile " + cstSignConfigFile + " -OutFile " + os.path.join(
temp_path, l2_signed_file_name)
else:
logger.debug("NXP CST L2 CMD with dcdFile!")
run_cst_L2_cmd = run_cst_base_cmd + " -Type L2" + " -InFile " + L2_file + " -ConfigFile " + cstSignConfigFile + " -OutFile " + os.path.join(
temp_path, l2_signed_file_name) + " -DcdFile " + dcdFile
logger.debug("run NXP CST L2 CMD : " + run_cst_L2_cmd)
if os.system(run_cst_L2_cmd) == 0:
logger.info("run NXP CST L2 CMD success!")
else:
logger.error("run NXP CST L2 CMD failed, build Machine tool exit!!!")
# shutil.rmtree(temp_path)
sys.exit()
# run NXP CST App Kicker CMD
run_cst_app_kicker_cmd = run_cst_base_cmd + " -Type AppKicker" + " -InFile " + app_kicker_file + " -ConfigFile " + cstSignConfigFile + " -OutFile " + os.path.join(
temp_path, app_kicker_signed_file_name)
logger.debug("run NXP CST App Kicker CMD : " + run_cst_app_kicker_cmd)
if os.system(run_cst_app_kicker_cmd) == 0:
logger.info("run NXP CST App Kicker CMD success!")
else:
logger.error("run NXP CST App Kicker CMD failed, build Machine tool exit!!!")
# shutil.rmtree(temp_path)
sys.exit()
# run Flash loader CMD
run_cst_flash_loader_cmd = run_cst_base_cmd + " -Type FlashLoader" + " -InFile " + flash_loader_file + " -ConfigFile " + cstSignConfigFile + " -OutFile " + os.path.join(
facImageOutputPath, flash_Loader_signed_file_name)
logger.debug("run NXP CST Flash loader CMD : " + run_cst_flash_loader_cmd)
if os.system(run_cst_flash_loader_cmd) == 0:
logger.info("run NXP CST Flash loader CMD success!")
else:
logger.error("run NXP CST Flash loader CMD failed, build Machine tool exit!!!")
# shutil.rmtree(temp_path)
sys.exit()
def run_ast(): logger.debug("=============== run App Sign Tool =================================\n") logger.debug("run App Sign Tool CMD : " + run_ast_cmd) if os.system(run_ast_cmd) == 0: logger.info("run App Sign Tool success!")
config_ota_file = os.path.join(temp_path, "config_ota.xml")
config_hybird1_file = os.path.join(temp_path, "config_hybird1.xml")
config_hybird2_file = os.path.join(temp_path, "config_hybird2.xml")
shutil.copy(config_ota_file, debug_path)
shutil.copy(config_hybird1_file, debug_path)
shutil.copy(config_hybird2_file, debug_path)
else:
logger.error("run App Sign Tool failed, build Machine tool exit!!!")
# shutil.rmtree(temp_path)
sys.exit()
def run_opgt(): logger.debug("=============== run OTA Package Generate Tool =====================\n") project_cf = load_config(config_file) opgt_Path = get_config_data(project_cf, "ProjectConfig", "OPGT_Path") base_path = get_config_data(project_cf, "ProjectConfig", "Base_path") ota_Path = os.path.join(base_path, get_config_data(project_cf, "ProjectConfig", "OTA_image_output_path"))
logger.debug("OPGT_PATH is : " + opgt_Path)
opgtFile = os.path.join(opgt_Path, "OPGT.exe")
if not os.path.exists(opgtFile):
logger.error(opgtFile + "is not exists!")
exit(-1)
configOTAFile = os.path.join(temp_path, "config_ota.xml")
configHybird1File = os.path.join(temp_path, "config_hybird1.xml")
configHybird2File = os.path.join(temp_path, "config_hybird2.xml")
run_opgt_ota_cmd = opgtFile + " OPGT " + configOTAFile + " C:\\tempFolder\OTA.bin"
run_opgt_hybird1_cmd = opgtFile + " OPGT " + configHybird1File + " C:\\tempFolder\hybird_app_signed.bin"
run_opgt_hybird2_cmd = opgtFile + " OPGT " + configHybird2File + " C:\\tempFolder\hybird_fct_signed.bin"
logger.debug("run opgt ota cmd is : " + run_opgt_ota_cmd)
if os.system(run_opgt_ota_cmd) == 0:
logger.info("run opgt ota cmd success!")
else:
logger.error("run opgt ota cmd failed, build Machine tool exit!!!")
# shutil.rmtree(temp_path)
sys.exit()
ota_file = os.path.join(ota_Path, "OTA.bin")
if not os.path.exists(ota_Path):
os.makedirs(ota_Path)
shutil.copy('C:\\tempFolder\OTA.bin', ota_file)
logger.debug("run opgt hybird1 cmd is : " + run_opgt_hybird1_cmd)
if os.system(run_opgt_hybird1_cmd) == 0:
logger.info("run opgt hybird1 cmd success!")
else:
logger.error("run opgt hybird1 cmd failed, build Machine tool exit!!!")
# shutil.rmtree(temp_path)
sys.exit()
logger.debug("run opgt hybird2 cmd is : " + run_opgt_hybird2_cmd)
if os.system(run_opgt_hybird2_cmd) == 0:
try:
rt105x_xip = project_cf.get("ProjectConfig", "rt105x_xip")
print(rt105x_xip)
except Exception as e:
rt105x_xip = "0"
if rt105x_xip == "1":
src_file = os.path.join("C:\\tempFolder\\hybird_fct_signed.bin",)
dst_file = os.path.join("C:\\tempFolder\\hybird_fct_signed_no_header.bin",)
if os.path.exists(dst_file):
os.remove(dst_file)
if not os.path.exists(src_file):
logger.error("run opgt hybird2 cmd failed, build Machine tool exit!!!")
sys.exit()
with open(src_file, "rb") as f:
f_data = f.read()
with open(dst_file, "wb") as f:
f.write(f_data[512:])
src_file = os.path.join("C:\\tempFolder\\hybird_app_signed.bin",)
dst_file = os.path.join("C:\\tempFolder\\hybird_app_signed_no_header.bin",)
if os.path.exists(dst_file):
os.remove(dst_file)
if not os.path.exists(src_file):
logger.error("run opgt hybird2 cmd failed, build Machine tool exit!!!")
sys.exit()
with open(src_file, "rb") as f:
f_data = f.read()
with open(dst_file, "wb") as f:
f.write(f_data[512:])
logger.info("xip file create success")
logger.info("run opgt hybird2 cmd success!")
else:
logger.error("run opgt hybird2 cmd failed, build Machine tool exit!!!")
# shutil.rmtree(temp_path)
sys.exit()
logger.info("run OTA Package Generate Tool success!")
def run_fat(): logger.debug("=============== run Firmware Assembly Tool ========================\n") logger.debug("run Firmware Assembly Tool CMD : " + run_fat_cmd) if os.system(run_fat_cmd) == 0: logger.info("run Firmware Assembly Tool success!") else: logger.error("run Firmware Assembly Tool failed, build Machine tool exit!!!") # shutil.rmtree(temp_path) sys.exit()
tools_list = [run_pdt, run_lsct, run_nxp_cst, run_ast, run_opgt, run_fat]
if name == "main": logger.info("Start Build Machine Tool!") logger.info("build Machine tools version is : " + tool_version) init_func() # run each script for i in range(len(tools_list)): tools_listi
finish_func()
//main.c======================================================================= #include<stdio.h> #include<stdlib.h> #include<string.h> #include<unistd.h> #include<fcntl.h>
int main(int argc,char* argv[]) {
int fd;
char tbuf[128]={0};
fd = open("t.log",O_WRONLY|O_APPEND);
if(fd < 0)
{
perror("open failed\n");
exit(EXIT_FAILURE);
}
time_t t = time(NULL);
char* ts = ctime(&t);
memcpy(tbuf,ts,strlen(ts));
//tbuf[strlen(ts)] = '\n';
write(fd,tbuf,strlen(ts));
close(fd);
char str[16] = {0};
scanf("%s",str);
printf("str[0] is %02x\n",str[0]);
printf("str[1] is %02x\n",str[1]);
printf("str[2] is %02x\n",str[2]);
printf("str[3] is %02x\n",str[3]);
// if(str[0] == 'Y' && str[1] == '\n') if(str[0] == 'Y' && str[1] == 0) { printf("hello python\n"); } else {
printf("answer wrong\n");
}
return 0;
}
//PpDdTy.py=========================================================================
#!/usr/bin/env python
import re import click
from libs.common import * from collections import deque import OpenSSL import operator
tool_version = "0.1.0.1" Prov_Data_Version = "1.0.0.12"
KEY_FILE_POSTFIX = ".pdb" KEY_FILE_NAME = "Project_Config.ini"
PROV_MAGIC = 0x564F5250 TYPE_ECC608 = 0 MAC_SIZE = 6
ECC_TYPE = "ECC608" serialFD = None
RESERVED_8BYTE00 = "0000000000000000"
RESERVED_8BYTE = "ffffffffffffffff"
RESERVED_32BYTE = RESERVED_8BYTE +
RESERVED_8BYTE +
RESERVED_8BYTE +
RESERVED_8BYTE
RESERVED_32BYTE00 = RESERVED_8BYTE00 +
RESERVED_8BYTE00 +
RESERVED_8BYTE00 +
RESERVED_8BYTE00
AES_KEY = "1234567812345678" ReveiveBufferQueue = deque([])
logger = get_log() key_file = os.path.join(os.path.abspath(os.path.dirname(os.getcwd())), KEY_FILE_NAME)
def get_public_key(cer_file_path): logger.info("cer file path: " + cer_file_path) with open(cer_file_path, "r") as my_cert_file: my_cert_text = my_cert_file.read() x509 = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, my_cert_text) pk = x509.get_pubkey() pubKeyString = OpenSSL.crypto.dump_publickey(OpenSSL.crypto.FILETYPE_ASN1, pk) #print(pubKeyString.hex()) pubKeyHex = pubKeyString.hex() if len(pubKeyHex) < 128: logger.error("public key error") exit(-1) return pubKeyHex[-128:]
def get_config_data(cf, key, value): try: data = cf.get(key, value) except Exception as e: logger.error("please add config parameter in current folder! " + key + "->" + value) # raise e exit(-1) return data
def check_file(folder, key_file_name): if os.path.exists(os.path.join(folder, key_file_name)) is True: while True: choose = input("replace this file or not ,[Y/N]") if choose is "Y": break elif choose is "N": return else: continue
def check_file1(file_name): if os.path.exists(file_name) is True: while True: choose = input("replace this file or not ,[Y/N]") if choose is "Y": break elif choose is "N": return else: continue
def check_file_path(file_path, file_name): if os.path.exists(file_path) is False: os.makedirs(file_path) if os.path.exists(os.path.join(file_path, file_name)) is True: while True: choose = input("replace this file or not ,[Y/N]") if choose is "Y": break elif choose is "N": return else: continue
def check_file_exit(folder, key_file_name): if os.path.exists(os.path.join(folder, key_file_name)) is True: logger.info("file normal!") else: logger.error("key file name don't exit!") exit(-1)
def get_random_vector(vec_size): return [random.randrange(0, 0xff) for x in range(vec_size)]
def str_to_list(a): pass
def update_PD_file(file_name, mac, config_file, sn): logger.info("Provision Data is in " + file_name) logger.info("project config file path is " + config_file)
if not os.path.exists(file_name):
logger.error(file_name + " is not exists!")
# raise Exception("File not exists!")
exit(-1)
elif not KEY_FILE_POSTFIX in file_name:
logger.error(file_name + " don't have Provision Data file!")
exit(-1)
project_cf = load_config(config_file)
fiOutputPath = get_config_data(project_cf, "ProjectConfig", "Factory_Image_output_path")
basePath = get_config_data(project_cf, "ProjectConfig", "Base_path")
upload_file_path = os.path.join(basePath, fiOutputPath)
fi_file_name = os.path.join(upload_file_path, mac + KEY_FILE_POSTFIX)
logger.info("Factory Image file name, include path = " + fi_file_name)
check_file_path(upload_file_path, mac + KEY_FILE_POSTFIX)
if len(mac) is not 12:
logger.error("mac length is error")
exit(-1)
else:
for x in range(len(mac)):
if not (('0' <= mac[x] <= '9') or ('a' <= mac[x] <= 'f') or ('A' <= mac[x] <= 'F')):
logger.error("The format of mac is error!")
exit(-1)
if len(sn) > 32:
logger.error("sn length is error")
exit(-1)
size = os.path.getsize(file_name)
keyfile = open(file_name, 'rb')
a = [] # data before Nonce
b = [] # Nonce data
KLatch = get_random_vector(32)
Kiop = get_random_vector(32)
Kenw = get_random_vector(32)
Kenr = get_random_vector(32)
Kall = KLatch + Kiop + Kenw + Kenr
for i in range(size):
data = keyfile.read(1) # read 1 byte
if i < 0x1E0:
a.append(binascii.b2a_hex(data).decode())
elif i < 0x200:
if i == 0x1E0:
logger.debug("=======data before Nonce read from Provision data file===========")
logger.debug("".join(a)) # cipher text before Nonce
b.append(binascii.b2a_hex(data).decode())
if i == 0x1FF:
logger.debug("==========Nonce data read from Provision data file==============")
logger.debug("".join(b)) # IV / Nonce data
if RESERVED_32BYTE00 == "".join(b): # Nonce data is 0 means plain text.
logger.debug("Provision Data is plain text")
dataBeforeNonce = "".join(a)
dataBeforeNonceList = list(bytearray.fromhex(dataBeforeNonce))
logger.debug("================dataBeforeNonceList=======================")
logger.debug(dataBeforeNonceList)
dataBeforeHashList = dataBeforeNonceList[:448]
logger.debug("================dataBeforeHashList=======================")
logger.debug(dataBeforeHashList)
dataHashList = dataBeforeNonceList[448:480]
logger.debug("================dataHashList=======================")
logger.debug(dataHashList)
if not operator.eq(dataHashList, calc_digest(dataBeforeHashList)):
logger.error("hash error")
exit(-1)
else:
logger.debug("hash OK!")
listAfterPubKey = dataBeforeNonceList[80:] # 80 = 0x50 key address
for x in range(len(Kall)):
listAfterPubKey[x] = Kall[x]
logger.debug("===========Kall=============")
logger.debug(Kall)
logger.debug("===========listAfterPubKey=============")
logger.debug(listAfterPubKey)
dataAddKeyList = dataBeforeNonceList[:80] + listAfterPubKey
logger.debug("===========data add kall=============")
logger.debug(dataAddKeyList)
listAfterDeviceKey = dataAddKeyList[272:448] # 272 = 0x110 mac address 448 = 0x1C0 hash address
for x in range(MAC_SIZE + 2 + len(sn)): # 2 = 8-6
if x < MAC_SIZE:
listAfterDeviceKey[x] = list(bytearray.fromhex(mac))[x]
elif x < (MAC_SIZE + 2):
pass
else:
listAfterDeviceKey[x] = ord(sn[x - MAC_SIZE -2])
logger.debug("=========data add mac + sn============")
logger.debug(listAfterDeviceKey)
dataBeforeHash = dataAddKeyList[:272] + listAfterDeviceKey
logger.debug("==========dataBeforeHash==========")
logger.debug(dataBeforeHash)
# dataHash = calc_digest(get16_str(dataBeforeHash))
dataHash = calc_digest(dataBeforeHash)
logger.debug("==========dataHash==========")
logger.debug(dataHash)
logger.debug("==========Nonce data==========")
logger.debug("".join(b))
logger.debug("==========dataBeforeHash + dataHash + Nonce data==========")
logger.debug(get16_str(dataBeforeHash + dataHash) + "".join(b))
logger.info("==========fi_file_name==========")
logger.info(fi_file_name)
file_writef(fi_file_name, list(bytearray.fromhex(get16_str(dataBeforeHash + dataHash) + "".join(b))))
else:
logger.debug("Provision Data is cipher text")
iv = ("".join(b))[:32] # nonce Data 16 bytes
# dataDecrypt = decrypt_AESCBC(AES_KEY, bytes(list(bytearray.fromhex(iv))), "".join(a))
dataDecrypt = decrypt_message(AES_KEY, bytes(list(bytearray.fromhex(iv))), "".join(a))
logger.debug("============data decrypt before Nonce data===================")
logger.debug(dataDecrypt)
#print(list(bytearray.fromhex(dataDecrypt)))
#print(string_to_list(dataDecrypt))
dataDecryptList = list(bytearray.fromhex(dataDecrypt)) # int list
dataBeforeHashList = dataDecryptList[:448]
logger.debug("================dataBeforeHashList=======================")
logger.debug(dataBeforeHashList)
dataHashList = dataDecryptList[448:480]
logger.debug("================dataHashList=======================")
logger.debug(dataHashList)
if not operator.eq(dataHashList, calc_digest(dataBeforeHashList)):
logger.error("hash error")
exit(-1)
else:
logger.debug("hash OK!")
listAfterPubKey = dataDecryptList[80:] # 80 = 0x50 key address
for x in range(len(Kall)):
listAfterPubKey[x] = Kall[x]
logger.debug("===========Kall=============")
# print(Kall)
logger.debug(get16_str(Kall))
logger.debug("===========listAfterPubKey=============")
# print(listAfterPubKey)
logger.debug(get16_str(listAfterPubKey))
dataDecryptList = dataDecryptList[:80] + listAfterPubKey
logger.debug("===========data Decrypt add kall=============")
# print(dataDecryptList)
logger.debug(get16_str(dataDecryptList))
listAfterDeviceKey = dataDecryptList[272:448] # 272 = 0x110 mac address 448 = 0x1C0 hash address
for x in range(MAC_SIZE + 2 + len(sn)): # 2 = 8 - 6(mac size)
if x < MAC_SIZE:
listAfterDeviceKey[x] = list(bytearray.fromhex(mac))[x]
elif x < (MAC_SIZE + 2):
pass
else:
listAfterDeviceKey[x] = ord(sn[x - MAC_SIZE - 2])
logger.debug(listAfterDeviceKey)
dataBeforeHash = dataDecryptList[:272] + listAfterDeviceKey
logger.debug("==========dataBeforeHash==========")
# print(dataBeforeHash)
# logger.debug(get16_str(dataBeforeHash))
# dataHash = calc_digest(get16_str(dataBeforeHash))
logger.debug(dataBeforeHash)
dataHash = calc_digest(dataBeforeHash)
logger.debug("==========dataHash==========")
# print(dataHash)
#logger.debug(get16_str(dataHash))
logger.debug(dataHash)
# dataEncrypt = encrypt_AESCBC(AES_KEY,bytes(list(bytearray.fromhex(iv))),bytes(dataBeforeHash + dataHash))
dataEncrypt = encrypt_message(AES_KEY,bytes(list(bytearray.fromhex(iv))),bytes(dataBeforeHash + dataHash))
logger.debug("==========dataEncrypt==========")
logger.debug(dataEncrypt)
logger.debug("==========Nonce data==========")
logger.debug("".join(b))
logger.debug("==========dataEncrypt + Nonce data==========")
logger.debug(dataEncrypt + "".join(b))
logger.debug("==========dataDecrypt==========")
# logger.debug(decrypt_AESCBC(AES_KEY, bytes(list(bytearray.fromhex(iv))), dataEncrypt))
logger.debug(decrypt_message(AES_KEY, bytes(list(bytearray.fromhex(iv))), dataEncrypt))
logger.info("==========fi_file_name==========")
logger.info(fi_file_name)
file_writef(fi_file_name, list(bytearray.fromhex(dataEncrypt + "".join(b))))
keyfile.close()
return
@click.group() def main(): logger.info("current PMT tool version is " + tool_version) pass
@main.command("gen_pd", help="Generate Provision Data File, use: ProvDataTool.py gen_pd --help") @click.option('--config_file', default=key_file, type=str, help="config file,if not set using default ./Project_Config.ini") def generatePD(config_file): logger.info("project config file path is " + config_file) project_cf = load_config(config_file)
# Mandatory
projectName = get_config_data(project_cf, "ProjectConfig", "Project_Name")
eccCapacity = int(get_config_data(project_cf, "ProjectConfig", "ECC_chip"))
prdtType = int(get_config_data(project_cf, "ProjectConfig", "Platform_type"))
eccCertFile = get_config_data(project_cf, "ProjectConfig", "ECC_signing_certificate_file")
basePath = get_config_data(project_cf, "ProjectConfig", "Base_path")
pdOutputPath = get_config_data(project_cf, "ProjectConfig", "ProvData_output_path")
rootKey = get_config_data(project_cf, "ProjectConfig", "RootKey")
encPD = int(get_config_data(project_cf, "ProjectConfig", "Enc_ProvData"))
gen_file_path = os.path.join(basePath, pdOutputPath)
file_name = os.path.join(gen_file_path, projectName + KEY_FILE_POSTFIX)
logger.info("Provision Data save in = " + gen_file_path)
logger.info("Provision Data file name, include path = " + file_name)
check_file_path(gen_file_path, projectName + KEY_FILE_POSTFIX)
if os.path.exists(eccCertFile) is False:
logger.error("ECC signing certificate file is not exist")
exit(-1)
else:
pubKey = get_public_key(eccCertFile)
#print(pubKey)
if re.search(r"^(?:[0-9]{1,3}\.){3}[0-9]{1,3}$", Prov_Data_Version):
KprovDataVerValue = Prov_Data_Version.split('.', 4)
for item in list(map(int, KprovDataVerValue)):
if item > 255:
logger.error("Prov Data Version type is error")
exit(-1)
else:
logger.error(" Prov Data Version type is error")
exit(-1)
if len(projectName) > 20 or len(projectName) == 0:
logger.error(config_file + " project name length is error")
exit(-1)
if eccCapacity is not 1 and not 2 and not 3:
logger.error(config_file + " ECC CAPACITY invalid value(1/2/3)")
exit(-1)
elif eccCapacity == 1:
eccCapacityValue = 0x55AA0000
elif eccCapacity == 2:
eccCapacityValue = 0xAA550000
else:
eccCapacityValue = 0xFFFFFFFF
if prdtType is not 1 and not 2 and not 3:
logger.error(config_file + " prdtType invalid value(1/2/3)")
exit(-1)
if len(rootKey) is not 64:
logger.error(config_file + " rootKey value length is error")
exit(-1)
if encPD is not 0 and not 1:
logger.error(config_file + " encrypt Provision data invalid value(0/1)")
exit(-1)
elif encPD == 0:
nonceData = [0 for index in range(32)]
else:
nonceData = get_random_vector(32)
# if nonce is 0, will regain
while operator.eq(nonceData, [0 for index in range(32)]):
logger.info("regain nonce")
nonceData = get_random_vector(32)
# print(nonceData)
# need update
KLatch = RESERVED_32BYTE
Kiop = RESERVED_32BYTE
Kenw = RESERVED_32BYTE
Kenr = RESERVED_32BYTE
mac = RESERVED_8BYTE
# Reserved
devKey = RESERVED_32BYTE
reservedData = RESERVED_32BYTE + \
RESERVED_32BYTE + \
RESERVED_32BYTE + \
RESERVED_32BYTE + \
RESERVED_32BYTE + \
RESERVED_8BYTE
hashData = calc_digest(int_to_list(PROV_MAGIC) + \
list(map(int, KprovDataVerValue)) + \
int_to_list(eccCapacityValue) + \
int_to_list(prdtType) + \
list(bytearray.fromhex(pubKey)) + \
list(bytearray.fromhex(KLatch)) + \
list(bytearray.fromhex(Kiop)) + \
list(bytearray.fromhex(Kenw)) + \
list(bytearray.fromhex(Kenr)) + \
list(bytearray.fromhex(rootKey)) + \
list(bytearray.fromhex(devKey)) + \
list(bytearray.fromhex(mac)) + \
list(bytearray.fromhex(reservedData))
)
logger.debug("=====================hashData======================")
logger.debug(hashData)
'''
hashData = calc_digest(get16_str(int_to_list(PROV_MAGIC)) + \
get16_str(list(map(int, KprovDataVerValue))) + \
get16_str(int_to_list(eccCapacityValue)) + \
get16_str(int_to_list(prdtType)) + \
pubKey + \
KLatch + \
Kiop + \
Kenw + \
Kenr + \
rootKey + \
devKey + \
mac + \
reservedData
)
'''
if encPD == 0:
file_writef(file_name, int_to_list(PROV_MAGIC) + \
list(map(int, KprovDataVerValue)) + \
int_to_list(eccCapacityValue) + \
int_to_list(prdtType) + \
list(bytearray.fromhex(pubKey)) + \
list(bytearray.fromhex(KLatch)) + \
list(bytearray.fromhex(Kiop)) + \
list(bytearray.fromhex(Kenw)) + \
list(bytearray.fromhex(Kenr)) + \
list(bytearray.fromhex(rootKey)) + \
list(bytearray.fromhex(devKey)) + \
list(bytearray.fromhex(mac)) + \
list(bytearray.fromhex(reservedData)) + \
hashData + \
nonceData)
else:
dataBeforeNonce = int_to_list(PROV_MAGIC) + \
list(map(int, KprovDataVerValue)) + \
int_to_list(eccCapacityValue) + \
int_to_list(prdtType) + \
list(bytearray.fromhex(pubKey)) + \
list(bytearray.fromhex(KLatch)) + \
list(bytearray.fromhex(Kiop)) + \
list(bytearray.fromhex(Kenw)) + \
list(bytearray.fromhex(Kenr)) + \
list(bytearray.fromhex(rootKey)) + \
list(bytearray.fromhex(devKey)) + \
list(bytearray.fromhex(mac)) + \
list(bytearray.fromhex(reservedData)) + \
hashData
logger.debug("================plain text=================\r\n")
#print(dataBeforeNonce) # plain
#print(bytes(dataBeforeNonce))
logger.debug(dataBeforeNonce)
# logger.debug(get16_str(dataBeforeNonce))
logger.debug("================IV=========================\r\n")
nonceData_16 = nonceData[:16]
logger.debug(nonceData_16) # IV
logger.debug(bytes(nonceData_16))
logger.debug("================KEY========================\r\n")
logger.debug(AES_KEY)
# dataEncrypt = encrypt_AESCBC(AES_KEY, bytes(nonceData_16), bytes(dataBeforeNonce))
dataEncrypt = encrypt_message(AES_KEY, bytes(nonceData_16), bytes(dataBeforeNonce))
logger.debug("================cipher text================\r\n")
# logger.debug(dataEncrypt)
logger.debug(list(bytearray.fromhex(dataEncrypt)))
logger.debug("================Nonce data================\r\n")
logger.debug(get16_str(nonceData))
logger.debug("================plain text22================\r\n")
# logger.debug(decrypt_AESCBC(AES_KEY, bytes(nonceData_16), dataEncrypt))
# logger.debug(decrypt_message(AES_KEY, bytes(nonceData_16), dataEncrypt))
logger.debug(list(bytearray.fromhex(decrypt_message(AES_KEY, bytes(nonceData_16), dataEncrypt))))
file_writef(file_name, list(bytearray.fromhex(dataEncrypt))+ nonceData)
return
@main.command("update_pd", help="Update Provision Data File In Factory, use : ProvDataTool.py gen_pd --help") @click.option('--file_name', prompt="input Provision Data file name, include path", type=str, nargs=1, help="input Provision Data file name, include path, use --file_name D:\123.pdb ") @click.option('--mac', prompt="input mac number", type=str, nargs=1, help="input mac number , --mac 04786300CCE7") #@click.option('--update_folder_path', default=os.path.dirname(file), type=str,
@click.option('--config_file', default=key_file, type=str, help="config file,if not set using default ./Project_Config.ini") @click.option('--sn', prompt="input sn number", type=str, nargs=1, help="input sn number , --sn 12345678") def UpdatePD(file_name, mac, config_file, sn): update_PD_file(file_name, mac, config_file, sn)
return
if name == "main": main()
//PprrovDdaattaaTool-tool.py======================================================== #!/usr/bin/env python
import re import click
from libs.common import * from collections import deque import OpenSSL import operator
def get_public_key(): print("get public key")
def get_config_data(): print("get config data")
def check_file(): print("check file")
def check_file1(file_name): print("check file1")
def check_file_path(): print("check file path")
def check_file_exit(): print("check file exit")
def get_random_vector(): print("get random vector")
def str_to_list(a): pass
def update_PD_file(): print("update PD file")
def main(): print("main") pass
def generatePD(): print("generatePD")
#main.command()
def UpdatePD(): print("UpdatePD")
if name == "main": main()
//requirements.txt================================================================ aes==1.0.0 certifi==2019.11.28 cffi==1.14.0 chardet==3.0.4 Click==7.0 colorama==0.4.3 common==0.1.2 cryptography==2.8 future==0.18.2 idna==2.9 intelhex==2.2.1 iso8601==0.1.12 loguru==0.3.2 lxml==4.5.0 Naked==0.1.31 NeedForCryptography==1.5 numpy==1.18.2 pkcs7==0.1.2 pycparser==2.20 pycryptodome==3.9.7 pycryptodomex==3.9.0 pynrfjprog==10.2.1 pyOpenSSL==19.1.0 pyserial==3.4 pytils==0.3 PyYAML==5.3 requests==2.23.0 serial==0.0.97 shellescape==3.8.1 six==1.14.0 tools==0.1.9 urllib3==1.25.8 win32-setctime==1.0.1
//test.py============================================================= #!/usr/bin/env python
import re import click import configparser from libs.common import * from collections import deque import OpenSSL import operator
def greet(people): print("greet"+ people)
def get_config_data(cf,key,value): try: data = cf.get(key,value) except Exception as e: if value == "dcdFile": return -1 else: logger.error("please add config parameter in current folder!" + key + value) #raise e exit(-1) return data
def init_func(): project_cf = load_config("Project_Config.ini") testp = get_config_data(project_cf,"ProjectConfig","NXP_tool_path") print(testp)
@click.group() def main(): logger.info("current PMT tool version is v1.0.0.0" ) #hello() init_func() pass
@main.command("hi",help="get help") @click.option('--count',default=1,help='Number of greetings.') @click.option('--name',prompt='Your name',help='The person to greet.')
def hello(count,name): for x in range(count): click.echo(f"Hello {name}!")
if name == "main": main()
//testa.py============================================================= #!/usr/bin/env python
import re import click import configparser from libs.common import * from collections import deque import OpenSSL import operator import shutil import sys import subprocess
def main(): logger.info("current PMT tool version is v1.0.0.0" )
p = subprocess.Popen("./main.exe",stdin=subprocess.PIPE,stdout=subprocess.PIPE,shell=True)
output, _ = p.communicate(b'Y\n')
print(output)
ap = os.path.join("abc","OPGT.COMPRESS.exe")
print(ap)
pass
if name == "main": main()
//ctime.c================================================================================== #include<stdio.h> #include<stdlib.h> #include<time.h>
#include <unistd.h>
int main(int argc,char* argv[]) {
time_t t;
uint32_t cnt =0;
int8_t* time_str;
struct tm *tmi;
printf("sizeof time_t is %d\n",sizeof(time_t));
while(1)
{
t = time(NULL);
printf("epoch is %d\n",t);
time_str = ctime(&t);
printf("date is %s\n",time_str);
//tmi = gmtime(&t);
tmi = localtime(&t);
//printf("%d-%d-%d %d\n",tmi->tm_year + 1900,tmi->tm_mon + 1,tmi->tm_mday,tmi->tm_sec);
printf("%d-%d-%d \n",tmi->tm_hour,tmi->tm_min,tmi->tm_sec);
sleep(1);
cnt++;
if(cnt > 6)
break;
}
return 0;
}
//msg_queue.h================================================================= #ifndef MSG_QUEUE_H #define MSG_QUEUE_H
typedef struct mqueue { long type; uint32_t device_id; float temperature; float moisture; int8_t year; int8_t month; int8_t day; int8_t hour; int8_t minute; int8_t second;
}MQUEUE;
#endif
//msg_recv.c============================================================================ #include <stdio.h> #include <stdlib.h> #include <string.h> #include <sys/msg.h> #include "msg_queue.h"
MQUEUE msq;
int main(int argc,char* argv[]) { int msqid = 0; key_t mkey = 0x9889;
if((msqid = msgget(mkey,0777)) < 0 )
{
perror("msgget failed\n");
exit(EXIT_FAILURE);
}
msq.type = 1;
msq.temperature = 25.0;
msq.moisture = 65.0;
if(msgrcv(msqid,&msq,sizeof(msq)-sizeof(long),0,0) < 0)
{
perror("msgrcv failed\n");
exit(EXIT_FAILURE);
}
printf("one message received\n");
printf("msg.tpye is %d\n",msq.type);
printf("msg.temperature is %f\n",msq.temperature);
printf("msg.moisture is %f\n",msq.moisture);
return 0;
}
//msg_send.c================================================================= #include <stdio.h> #include <stdlib.h> #include <string.h> #include <sys/msg.h> #include "msg_queue.h"
MQUEUE msq;
int main(int argc,char* argv[]) { int msqid = 0; key_t mkey = 0x9889;
if((msqid = msgget(mkey,IPC_CREAT|IPC_EXCL|0777)) < 0 )
{
perror("msgget failed\n");
exit(EXIT_FAILURE);
}
msq.type = 1;
msq.temperature = 25.0;
msq.moisture = 65.0;
if(msgsnd(msqid,&msq,sizeof(msq)-sizeof(long),0) < 0)
{
perror("msgsnd failed\n");
exit(EXIT_FAILURE);
}
printf("one message sent\n");
return 0;
}