Add the functionality from v2 by reorganizing compilation into source file groups

This commit is contained in:
Cosmin Ciocan
2024-01-02 10:28:43 +01:00
parent 0d2ab99ccc
commit 665654aff6
+274 -38
View File
@@ -1,62 +1,298 @@
import argparse
import glob
import os import os
import shutil
import subprocess import subprocess
import tempfile import tempfile
import uuid import uuid
from typing import List, Optional
from IPython.core.interactiveshell import InteractiveShell
from IPython.core.magic import Magics, cell_magic, line_magic, magics_class
from IPython.core.magic import Magics, cell_magic, magics_class
from common import helper from common import helper
compiler = '/usr/local/cuda/bin/nvcc' DEFAULT_EXEC_FNAME = "cuda_exec.out"
profiler = '/usr/local/cuda/bin/ncu' SHARED_GROUP_NAME = "shared"
ext = '.cu'
@magics_class @magics_class
class NVCCPlugin(Magics): class NVCCPlugin(Magics):
def __init__(self, shell: InteractiveShell):
def __init__(self, shell):
super(NVCCPlugin, self).__init__(shell) super(NVCCPlugin, self).__init__(shell)
self.shell: InteractiveShell # type hint not provided by parent class
self.argparser = helper.get_argparser() self.parser_cuda = helper.get_parser_cuda()
self.parser_cuda_group_save = helper.get_parser_cuda_group_save()
self.parser_cuda_group_delete = helper.get_parser_cuda_group_delete()
self.parser_cuda_group_run = helper.get_parser_cuda_group_run()
@staticmethod self.workdir = tempfile.mkdtemp()
def compile(file_path): print(f'Source files will be saved in "{self.workdir}".')
subprocess.check_output(
[compiler, file_path + ext, "-o", file_path + ".out", '-Wno-deprecated-gpu-targets'], stderr=subprocess.STDOUT)
def run(self, file_path, timeit=False, profile=False, profiler_args=[]): def _save_source(
self, source_name: str, source_code: str, group_name: str
) -> None:
"""
Save source code as a .cu or .h file in the group directory where
files can be compiled together. Saving a source file to the group
named "shared" will make those source files available when compiling
any group.
Args:
source_name: The name of the source file. Must end in ".cu" or
".h".
source_code: The source code to be written to the source file.
group_name: The name of the group directory where the file will be
saved.
Raises:
ValueError: If the source name does not have a proper extension.
"""
_, ext = os.path.splitext(source_name)
if ext != ".cu" and ext != ".h":
raise ValueError(
f'Given source name "{source_name}" must end in ".h" or ".cu".'
)
group_dirpath = os.path.join(self.workdir, group_name)
os.makedirs(group_dirpath, exist_ok=True)
source_fpath = os.path.join(group_dirpath, source_name)
with open(source_fpath, "w", encoding="utf-8") as f:
f.write(source_code)
def _delete_group(self, group_name: str) -> None:
"""
Removes all source files from the given group.
Args:
group_name: The name of the source files group.
"""
group_dirpath = os.path.join(self.workdir, group_name)
if os.path.exists(group_dirpath):
shutil.rmtree(group_dirpath)
def _compile(
self, group_name: str, executable_fname: str = DEFAULT_EXEC_FNAME
) -> str:
"""
Compiles all source files in a given group together with all source
files from the group named "shared".
Args:
group_name: The name of the source file group to be compiled.
executable_fname: The output executable file name. Defaults to
"cuda_exec.out".
Raises:
RuntimeError: If the group does not exist or if does not have any
source files associated with it.
Returns:
The file path of the resulted executable file.
"""
shared_dirpath = os.path.join(self.workdir, SHARED_GROUP_NAME)
group_dirpath = os.path.join(self.workdir, group_name)
if not os.path.exists(group_dirpath):
raise RuntimeError(f'Group "{group_name}" does not exist.')
source_files = list(glob.glob(os.path.join(group_dirpath, "*.cu")))
if len(source_files) == 0:
raise RuntimeError(
f'Group "{group_name}" does not have any source files.'
)
source_files.extend(
list(glob.glob(os.path.join(shared_dirpath, "*.cu")))
)
executable_fpath = os.path.join(group_dirpath, executable_fname)
args = [
"nvcc",
"-I" + shared_dirpath + "," + group_dirpath,
]
args.extend(source_files)
args.extend(
[
"-o",
executable_fpath,
"-Wno-deprecated-gpu-targets",
]
)
subprocess.check_output(args, stderr=subprocess.STDOUT)
return executable_fpath
def _run(
self,
exec_fpath: str,
timeit: bool = False,
profile: bool = False,
profiler_args: str = "",
) -> str:
"""
Runs a CUDA executable.
Args:
exec_fpath: The file path of the executable.
timeit: If True, returns the result of the "timeit" magic instead
of the standard output of the CUDA process. Defaults to False.
profile: If True, the executable is profiled with NVIDIA Nsight
Compute profiling tool and its output is added to stdout.
Defaults to False.
profiler_args: The profiler arguments used to customize the
information gathered by it and its overall behaviour. Defaults
to an empty string.
Returns:
The standard output of the CUDA process or the "timeit" magic
output.
"""
if timeit: if timeit:
stmt = f"subprocess.check_output(['{file_path}.out'], stderr=subprocess.STDOUT)" stmt = (
f"subprocess.check_output(['{exec_fpath}'],"
" stderr=subprocess.STDOUT)"
)
output = self.shell.run_cell_magic( output = self.shell.run_cell_magic(
magic_name="timeit", line="-q -o import subprocess", cell=stmt) magic_name="timeit", line="-q -o import subprocess", cell=stmt
output = str(output) # convert TimeitResult object to human readable string )
# convert TimeitResult object to human readable string
output = str(output)
else: else:
run_args = [] run_args = []
if profile: if profile:
run_args.extend([profiler] + profiler_args) run_args.extend(["ncu"] + profiler_args.split())
run_args.append(file_path + ".out") run_args.append(exec_fpath)
output = subprocess.check_output(run_args, stderr=subprocess.STDOUT) output = subprocess.check_output(
output = output.decode('utf8') run_args, stderr=subprocess.STDOUT
)
helper.print_out(output) output = output.decode("utf8")
return None
return output
def _compile_and_run(
self, group_name: str, args: argparse.Namespace
) -> str:
try:
exec_fpath = self._compile(group_name)
output = self._run(
exec_fpath=exec_fpath,
timeit=args.timeit,
profile=args.profile,
profiler_args=args.profiler_args,
)
except subprocess.CalledProcessError as e:
output = e.output.decode("utf8")
return output
def _read_args(
self, line: str, parser: argparse.ArgumentParser
) -> Optional[argparse.Namespace]:
"""
Read arguments from the magic line. Makes sure to keep arguments
between double quotes together for use with profiler arguments or
compiler arguments.
Args:
line: The arguments on the line of the magic call in the jupyter
cell.
parser: The parser which will process the arguments after they are
correctly tokenized.
Returns:
The parsed arguments.
"""
tokens = line.strip().split('"')
args_tokenized: List[str] = []
for index, tok in enumerate(tokens):
if index % 2 == 0:
# tokens found outside double quotes are split at whitespace
args_tokenized.extend(tok.split(" "))
else:
# anything found between double quotes will not be split
args_tokenized.append(tok)
args_tokenized = [arg for arg in args_tokenized if len(arg) > 0]
try:
return parser.parse_args(args_tokenized)
except SystemExit:
parser.print_help()
return None
@cell_magic @cell_magic
def cu(self, line, cell): def cuda(self, line: str, cell: str) -> None:
try: """Compile and run the CUDA code in the cell.
args = self.argparser.parse_args(line.split())
except SystemExit as e: Args:
self.argparser.print_help() line: The arguments on the line of the magic call in the jupyter
cell.
cell: All of the lines in the jupyter cell besides the magic call
itself. It should contain all of the source code to be
compiled and run.
"""
args = self._read_args(line, self.parser_cuda)
if args is None:
return return
with tempfile.TemporaryDirectory() as tmp_dir: group_name = str(uuid.uuid4())
file_path = os.path.join(tmp_dir, str(uuid.uuid4())) self._save_source(
with open(file_path + ext, "w") as f: source_name="single_file.cu",
f.write(cell) source_code=cell,
try: group_name=group_name,
self.compile(file_path) )
output = self.run(file_path, timeit=args.timeit, profile=args.profile, profiler_args=args.profiler_args)
except subprocess.CalledProcessError as e: output = self._compile_and_run(group_name, args)
helper.print_out(e.output.decode("utf8")) helper.print_out(output)
output = None
return output @cell_magic
def cuda_group_save(self, line: str, cell: str) -> None:
"""
Save the CUDA code in the cell in a group of source files to be later
compiled and executed by the "cuda_group_run" line magic.
Args:
line: The arguments on the line of the magic call in the jupyter
cell.
cell: All of the lines in the jupyter cell besides the magic call
itself. It should contain all of the source code to be
saved.
"""
args = self._read_args(line, self.parser_cuda_group_save)
if args is None:
return
self._save_source(
source_name=args.name,
source_code=cell,
group_name=args.group,
)
@line_magic
def cuda_group_run(self, line: str) -> None:
"""
Compile and run all source files inside a specific source file group.
Args:
line: The arguments on the line of the magic call in the jupyter
cell.
"""
args = self._read_args(line, self.parser_cuda_group_run)
if args is None:
return
output = self._compile_and_run(args.group, args)
helper.print_out(output)
@line_magic
def cuda_group_delete(self, line: str) -> None:
"""
Remove all source files inside a specific source file group.
Args:
line: The arguments on the line of the magic call in the jupyter
cell.
"""
args = self._read_args(line, self.parser_cuda_group_delete)
if args is None:
return
self._delete_group(args.group)