Source code for fvm.drom2psl.generator

"""
Functions to actually convert the .json wavedroms into PSL sequences
"""

# To use the JSON encoder and decoder
import json

# Allow to obtain basename of input files
from pathlib import Path
import os

# To use sys.exit and be able to print to sys.stderr
import sys

# To easily get command-line arguments
import argparse

# To allow pretty cool debug prints than can be disabled after development
from icecream import ic

# Since JSON is a subset of YAML, use pyyaml since it accepts missing quotes
import yaml

# Allow to render wavedrom signals from inside python
import wavedrom

# Import the wavedrom definitions we need
from fvm.drom2psl.definitions import GROUP, WAVE, NAME

# Import our own functions to traverse the dictionary
from fvm.drom2psl.traverse import traverse

# Import our own functions to interpret the dictionary
from fvm.drom2psl.interpret import (get_signal, check_wavelane, get_type,
                                    get_group_name, flatten, get_wavelane_wave,
                                    get_wavelane_name, get_wavelane_data,
                                    get_group_arguments, get_clock_value,
                                    is_pipe, gen_sere_repetition,
                                    get_signal_value, adapt_value_to_hdltype)

# Import our own logging functions
from fvm.drom2psl.basiclogging import info, error #, warning

[docs] def generator(filename, outdir = None, verbose_psl = True, debug = False, do_traverse = False): """ Actually generate the PSL file with the sequences :param filename: input wavedrom file (JSON) :type filename: string :param outdir: output directory (if not specified, each .psl file will be generated in the same directory of each input file) :type outdir: string :param verbose_psl: add extra comments to generated psl file :type verbose_psl: bool :param debug: print debug messages :type debug: bool :param do_traverse: traverse the wavedrom file, printing structural debug information :type do_traverse: bool :returns: 0 if no errors detected, 1 if errors were detected, 2 if the dict extracted from the json file was empty :rtype: int """ # Disable icecream if we are not debugging if debug is False: ic.disable() ic("generator") ic(filename, outdir, verbose_psl, debug, do_traverse) # Set custom prefix for icecream ic.configureOutput(prefix='generator | ') # Open the input file f = open(filename, encoding="utf-8") full_filename = Path(filename).resolve() ic(full_filename) # Pass the input file through the linter #conf = config.YamlLintConfig('extends: default') #for p in yamllint.linter.run(f, conf): # print(p.desc, p.line, p.rule) #gen = linter.run(f, conf) #debug(type(gen)) #debug(gen) # Extract the dictionary from the file. This should detect any JSON syntax # errors # We'll use the YAML parser instead of the JSON parser since the JSON # parser is very strict with the double quotes (expects all keys to be # inside double quotes), but the wavedrom format doesn't require those # double quotes ic("Extracting dictionary interpreting it as YAML") # Load YAML and output JSON, to fix typical json format errors, so we can # accept mosty-correct json. We do this because the online wavedom website also # does it, even the examples are not correct (they do not put the keys in # quotes, which is valid YAML but invalid JSON, and in example step7_head_foot # they don't put a space after the colons, which is invalid YAML but I believe # it is correct JSON. Loading YAML and outputting JSON seems to correct the # issues. #dict = yaml.load(f, Loader=yaml.SafeLoader) #debug(type(dict)) #debug(dict) #string = json.dumps(dict, indent=4) #debug(type(string)) #debug(string) #fixed_dict = json.loads(string) #debug(type(fixed_dict)) #debug(fixed_dict) source = json.loads(wavedrom.fixQuotes(filename)) ic(source) try: ok = True dictionary = yaml.load(f, Loader=yaml.SafeLoader) except yaml.YAMLError: ok = False if not ok: error("Invalid YAML syntax in file: "+str(filename)) if ok: ic(dictionary) if ok: if dictionary is None: error("Input JSON file is empty!") empty_json = True else: empty_json = False #debug("Extracting dictionary interpreting it as JSON") #dictionary = json.load(f) #debug(type(dictionary)) #debug(dictionary) # Since wavedrompy reads a string and not a dict, let's read the file again, # this time into a string if ok: ic("Rendering input file -> string -> wavedrompy") with open(filename, "r", encoding="utf-8") as f: string = f.read() # Close the input file f.close() # Let's process the dict now # We probably should do this recursively #detect_groups() if ok: ic("Traversing dictionary") if do_traverse: print("TRAVERSE: Interpreting dict") traverse(" ", dictionary) # Process dictionary if ok: ic("Getting the signal list") signal, ok = get_signal(dictionary) #if error == False and debug : # ic("Listing signal elements") # list_elements("ListElements:", signal) if ok: ic("Counting the number of primary groups") num_groups = 0 groups = [] for i, value in enumerate(signal): if get_type(value) == GROUP: num_groups += 1 groups.append(get_group_name(value)) ic(num_groups) ic(groups) if ok: ic("Flattening signal") flattened_signal, ok = flatten("", signal, None) if ok: ic(flattened_signal) ic("Detected", len(flattened_signal), "wavelanes") if ok: ic("Checking wavelanes in flattened signal") for wavelane in flattened_signal: ok = check_wavelane(wavelane) if not ok: error("At least a wavelane error") if ok: ic("Checking all non-empty wavelanes' waves have the same length") lengths = [] for wavelane in flattened_signal : if len(wavelane) != 0 : #ic(wavelane.get(WAVE)) #ic(type(wavelane.get(WAVE))) #ic(len(wavelane.get(WAVE))) lengths.append(len(wavelane.get(WAVE))) ic("Wavelane lengths", lengths) #ic(set(lengths)) #ic(len(set(lengths))) if len(set(lengths)) != 1 : error("Not all wavelanes' wave fields have the same length!") for wavelane in flattened_signal : error(" wavelane "+str(wavelane.get(NAME))+ " has a wave with length "+str(len(wavelane.get(WAVE)))+ " (wave is "+str(wavelane.get(WAVE))+" )") ok = False else: ic("detected", lengths[0], "clock cycles") clock_cycles = lengths[0] if ok: ic("Counting wavelanes") allwavelanes = 0 nonemptywavelanes = 0 for wavelane in flattened_signal: allwavelanes += 1 if len(wavelane) != 0 : nonemptywavelanes += 1 ic("detected", allwavelanes, "wavelanes") ic("from which", nonemptywavelanes, "are non-empty") if ok: ic("Creating a psl vunit") vunit_name = full_filename.stem if outdir is not None: os.makedirs(outdir, exist_ok=True) output_file = os.path.join(outdir, os.path.basename(Path(full_filename).with_suffix('.psl'))) else: output_file = Path(full_filename).with_suffix('.psl') ic(output_file) vunit = '' vunit += '-- Automatically created by drom2psl\n' vunit += f'-- Input file: {full_filename}\n' vunit += ('-- These sequences and/or properties can be reused from' ' other PSL files by doing:\n') vunit += f'-- inherit {vunit_name};\n\n' vunit += f'vunit {vunit_name} ' + '{\n\n' # We are assuming a number of things to make this usable, see the # module docstring # To cover the special case where we have no groups, in that case let's # define a group whose name is the empty string if num_groups == 0: groups.append('') for groupname in groups: if groupname == '': sequence_name = f'{vunit_name}' else: sequence_name = f'{vunit_name}_{groupname}' # Get group arguments group_arguments = get_group_arguments(groupname, flattened_signal) ic(group_arguments) # If there are no group arguments, we don't want to print the # parentheses if len(group_arguments) == 0: vunit += f' sequence {sequence_name}\n' else: vunit += f' sequence {sequence_name} (\n' vunit += format_group_arguments(group_arguments) # If we are in the last element, we don't want a semicolon # so we remove the last two characters: ';\n', then we add the \n # again if vunit[-2:] == ";\n": vunit = vunit[:-2] vunit += '\n' if len(group_arguments) == 0: vunit += ' is {\n' else: vunit += ' ) is {\n' prev_line = '' prev_cycles = 0 prev_or_more = False sequence_start = None sequence_end = None for cycle in range(clock_cycles): # The clock wavelane is processed a bit different and apart # from the rest of the wavelanes line = '' line += ' (' for wavelane in flattened_signal[1:]: name = get_wavelane_name(wavelane) if name[:len(groupname)] == groupname: wave = get_wavelane_wave(wavelane) data = get_wavelane_data(wavelane) value = get_signal_value(wave, data, cycle) value = adapt_value_to_hdltype(value) if value != "'-'": if sequence_start is None: sequence_start = cycle sequence_end = cycle for cycle in range(clock_cycles): # The clock wavelane is processed a bit different and apart # from the rest of the wavelanes line = '' line += ' (' for wavelane in flattened_signal[1:]: name = get_wavelane_name(wavelane) if name[:len(groupname)] == groupname: wave = get_wavelane_wave(wavelane) data = get_wavelane_data(wavelane) value = get_signal_value(wave, data, cycle) value = adapt_value_to_hdltype(value) if value != "'-'": line += f'({name} = {value}) and ' # The last one doesn't need the ' and ' so we'll remove 5 # characters if they are ' and ' if line[-5:] == ' and ': line = line[:-5] line += ')' # And now to compute how many cycles we have to indicate, we # have to do two things: # 1. Check if the clock is '|' (will mean zero or more) # 2. Compare against the previous line cycles = get_clock_value(flattened_signal[0], cycle) or_more = is_pipe(flattened_signal[0], cycle) # If lines are different, then just: # 1. Finish the previous line with the cycles # 2. Write the current line, except the cycles # 3. The actual current line will be the next prev_line if sequence_start is not None: if sequence_start > cycle: # We are before the start of the sequence, so ignore continue if line == ' ()': line = ' (true)' if line != prev_line: if prev_line != '': prev_cycles_text = gen_sere_repetition(prev_cycles, prev_or_more, True) vunit += prev_cycles_text + '\n' vunit += line prev_line = line prev_cycles = cycles prev_or_more = or_more # If lines are equal: # Do not finish the previous line, just add the cycles to # prev_cycles and compute the relevant 'or_more': both lines # are equal so if at least one of them allows repeat, then # the merged line must allow repeat else: prev_cycles += cycles prev_or_more = bool(prev_or_more or or_more) if sequence_end == cycle: # Finish the sequence here break # After the for loop finishes, we will have the last cycles to # write, so let's write them: prev_cycles_text = gen_sere_repetition(prev_cycles, prev_or_more, False) if prev_line != '': vunit += prev_cycles_text + '\n' vunit += ' };\n' vunit += '\n' # If we have exactly two sequences (two wavedrom groups), let's create # a sample property relating them if num_groups == 2: if verbose_psl: vunit += " -- Relational operands between sequences may be, among others:\n" vunit += " -- && : both must happen and last exactly the same number of cycles\n" vunit += " -- & : both must happen, without any requirement on their durations\n" vunit += (" -- |-> : implication: both must happen, with" " the first cycle of the second occurring during the" " last cycle of the first\n") vunit += (" -- |=> : non-overlapping implication: both must" " happen, with the first cycle of the second occuring" " the cycle after the last cycle of the first\n") vunit += f' property {vunit_name} (\n' group_arguments = [] for groupname in groups: group_arguments += get_group_arguments(groupname, flattened_signal) ic(group_arguments) vunit += format_group_arguments(group_arguments) vunit += format_group_arguments([['fvm_rst', 'std_ulogic']]) # Again, remove the unneded semicolon and restore the deleted \n if vunit[-2:] == ";\n": vunit = vunit[:-2] vunit += '\n' vunit += ' ) is\n' # {\n' group0_args = format_group_arguments_in_call(get_group_arguments(groups[0], flattened_signal)) group1_args = format_group_arguments_in_call(get_group_arguments(groups[1], flattened_signal)) if groups[0] == "Precond" and groups[1] == "Cond": vunit = vunit.replace("Precond.", "") vunit = vunit.replace("Cond.", "") group0_args = format_arguments_suffix_implication(group0_args) group1_args = format_arguments_suffix_implication(group1_args) vunit += f' always ( {vunit_name}_{groups[0]}{group0_args}' vunit += f' |-> {vunit_name}_{groups[1]}{group1_args} ) abort (fvm_rst = \'1\');\n' elif groups[0] == "Precond" and groups[1] == "Cond_non_overlap": vunit = vunit.replace("Precond.", "") vunit = vunit.replace("Cond_non_overlap.", "") group0_args = format_arguments_suffix_implication(group0_args) group1_args = format_arguments_suffix_implication(group1_args) vunit += f' always ( {vunit_name}_{groups[0]}{group0_args}' vunit += f' |=> {vunit_name}_{groups[1]}{group1_args} ) abort (fvm_rst = \'1\');\n' else: vunit += f' always {{ {{{vunit_name}_{groups[0]}{group0_args}}}' vunit += f' && {{{vunit_name}_{groups[1]}{group1_args}}} }} abort (fvm_rst = \'1\');\n' vunit += '\n' vunit += '}\n' ic(flattened_signal[0]) with open(output_file, 'w', encoding="utf-8") as f: f.write(vunit) ic("Was the execution correct?") ic(ok) # Render the json using wavedrompy. This way we should receive an error if # there are any wavedrom-specific errors in an otherwise correct JSON if ok: if (not empty_json): ic("Rendering the JSON into an .svg") render = wavedrom.render(string) ic(render) ic(output_file) svgfilename = Path(output_file).with_suffix('.svg') ic(svgfilename) render.saveas(svgfilename) # for i in range(len(value)): # print("i:", i, "value[i]:", value[i]) # print("type(value[i]):", type(value[i])) # Return different values to the shell, depending on the type of error if not ok: ret = 1 elif empty_json: ret = 2 else: ret = 0 if ret != 0 : error("At least one error!") else: if debug: info("No errors detected!") return ret
def format_group_arguments(group_arguments): """Returns the group arguments with an extra semicolon that should be removed separately. Avoids duplicated arguments.""" seen = set() string = '' for j in group_arguments: argument, datatype = j # In case we have a record type, only keep the base name argument = argument.split('.')[0] if argument in seen: continue seen.add(argument) string += f' hdltype {datatype} {argument};\n' return string def format_arguments_suffix_implication(group_arguments): """Returns the group arguments ready to parameterize an implication, for example: (addr, data)""" args = group_arguments.strip("()") items = [arg.strip() for arg in args.split(",")] # Extract name before the first dot bases = [item.split(".")[0] for item in items] # Remove duplicates while preserving order seen = set() unique_bases = [] for b in bases: if b not in seen: seen.add(b) unique_bases.append(b) # Convert back to a string with parentheses final = "(" + ", ".join(unique_bases) + ")" return final def format_group_arguments_in_call(group_arguments): """Returns the group arguments ready to parameterize a property or sequence, for example: (addr, data)""" string = '' # Only return a non-empty string if there it at least one argument if len(group_arguments) > 0: string += '(' for j in group_arguments: argument = j[0] string += f'{argument}, ' # Remove the last command and space, and add the closing parenthesis string = string[:-2] string += ')' return string def create_parser(): """Configure drom2psl's argument parser""" parser = argparse.ArgumentParser(description=('Generate PSL sequences from' ' .json wavedrom descriptions.')) parser.add_argument('inputfiles', nargs='+', help='.json input file(s) (must be wavedrom compatible)') parser.add_argument('--outdir', default=None, help=('Output directory for generated files. By' ' default, outputs are generated in the same' ' directories where the input files are.')) parser.add_argument('-d', '--debug', default=False, action='store_true', help='Show debug messages. (default: %(default)s)') parser.add_argument('-t', '--traverse', default=False, action='store_true', help=('Traverse the wavedrom file, printing even more' ' debug information. (default: %(default)s)')) parser.add_argument('-q', '--quiet_psl', default=False, action='store_true', help=('Do not include extra comments in generated PSL' ' files. (default: %(default)s)')) return parser def main(): """ main() function for drom2psl generator. To be used when called from the command-line """ parser = create_parser() args = parser.parse_args() if not args.debug: ic.disable() ic(args) for file in args.inputfiles: retval = generator(file, outdir=args.outdir, verbose_psl=not args.quiet_psl, debug=args.debug, do_traverse=args.traverse) if retval != 0: break sys.exit(retval) if __name__ == "__main__": main()