Skip to content

Commit

Permalink
Decouple TritonInferenceStage from pipeline mode (nv-morpheus#1402)
Browse files Browse the repository at this point in the history
* Add `needs_logits` & `inout_mapping` a constructor arguments to `TritonInferenceStage`.
* Consolidate triton inference worker impls into a single `TritonInferenceWorker` class
* For compatibility the `needs_logits` & `inout_mapping`  arguments default to default values inferred by the pipeline mode.
* Refactor custom triton inference stage from `examples/log_parsing` example as a subclass of `TritonInferenceStage` removing several lines of redundant code.
* `examples/log_parsing` example now works in C++ mode
* Add the ability to subclass a stage registered with the CLI, and register the sublcass with a new CLI name.
* Add the ability to parse dictionary types with click.
* Remove `inf-triton` from the AE mode in the CLI
* Misc linting/formatting pylint suggestions

Closes nv-morpheus#1378

## By Submitting this PR I confirm:
- I am familiar with the [Contributing Guidelines](https://github.com/nv-morpheus/Morpheus/blob/main/docs/source/developer_guide/contributing.md).
- When the PR is ready for review, new or existing tests cover these changes.
- When the PR is ready for review, the documentation is up to date with these changes.

Authors:
  - David Gardner (https://github.com/dagardner-nv)

Approvers:
  - Devin Robison (https://github.com/drobison00)
  - Michael Demoret (https://github.com/mdemoret-nv)

URL: nv-morpheus#1402
  • Loading branch information
dagardner-nv authored Jan 8, 2024
1 parent bbc1cda commit 2e3f4e4
Show file tree
Hide file tree
Showing 16 changed files with 402 additions and 815 deletions.
2 changes: 1 addition & 1 deletion examples/log_parsing/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ PYTHONPATH="examples/log_parsing" \
morpheus --log_level INFO \
--plugin "inference" \
--plugin "postprocessing" \
run --num_threads 1 --use_cpp False --pipeline_batch_size 1024 --model_max_batch_size 32 \
run --num_threads 1 --pipeline_batch_size 1024 --model_max_batch_size 32 \
pipeline-nlp \
from-file --filename ./models/datasets/validation-data/log-parsing-validation-data-input.csv \
deserialize \
Expand Down
244 changes: 81 additions & 163 deletions examples/log_parsing/inference.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,33 +13,28 @@
# limitations under the License.

import logging
import typing
from functools import partial

import cupy as cp
import mrc
import numpy as np
import tritonclient.grpc as tritonclient
from mrc.core import operators as ops
from scipy.special import softmax

from messages import MultiPostprocLogParsingMessage # pylint: disable=no-name-in-module
from messages import PostprocMemoryLogParsing # pylint: disable=no-name-in-module
from messages import ResponseMemoryLogParsing # pylint: disable=no-name-in-module
from morpheus.cli.register_stage import register_stage
from morpheus.config import Config
from morpheus.config import PipelineModes
from morpheus.messages import MultiInferenceMessage
from morpheus.messages import MultiInferenceNLPMessage
from morpheus.messages import MultiResponseMessage
from morpheus.messages import TensorMemory
from morpheus.pipeline.stage_schema import StageSchema
from morpheus.stages.inference.inference_stage import InferenceStage
from morpheus.stages.inference.inference_stage import InferenceWorker
from morpheus.stages.inference.triton_inference_stage import _TritonInferenceWorker
from morpheus.stages.inference.triton_inference_stage import TritonInferenceStage
from morpheus.stages.inference.triton_inference_stage import TritonInferenceWorker
from morpheus.utils.producer_consumer_queue import ProducerConsumerQueue

logger = logging.getLogger(__name__)


class TritonInferenceLogParsing(_TritonInferenceWorker):
class TritonInferenceLogParsing(TritonInferenceWorker):
"""
This class extends TritonInference to deal with scenario-specific NLP models inference requests like building
response.
Expand All @@ -59,78 +54,47 @@ class TritonInferenceLogParsing(_TritonInferenceWorker):
use_shared_memory: bool, default = True
Whether or not to use CUDA Shared IPC Memory for transferring data to Triton. Using CUDA IPC reduces network
transfer time but requires that Morpheus and Triton are located on the same machine
needs_logits : bool, default = True
Determines whether a logits calculation is needed for the value returned by the Triton inference response.
"""

def __init__(self,
inf_queue: ProducerConsumerQueue,
c: Config,
model_name: str,
server_url: str,
force_convert_inputs: bool,
use_shared_memory: bool,
inout_mapping: typing.Dict[str, str] = None):
# Some models use different names for the same thing. Set that here but allow user customization
default_mapping = {
"attention_mask": "input_mask",
}

default_mapping.update(inout_mapping if inout_mapping is not None else {})

super().__init__(inf_queue,
c,
model_name=model_name,
server_url=server_url,
force_convert_inputs=force_convert_inputs,
use_shared_memory=use_shared_memory,
inout_mapping=default_mapping)

@classmethod
def needs_logits(cls):
return True

@classmethod
def default_inout_mapping(cls) -> typing.Dict[str, str]:
# Some models use different names for the same thing. Set that here but allow user customization
return {"attention_mask": "input_mask"}
def build_output_message(self, x: MultiInferenceMessage) -> MultiResponseMessage:
seq_ids = cp.zeros((x.count, 3), dtype=cp.uint32)
seq_ids[:, 0] = cp.arange(x.mess_offset, x.mess_offset + x.count, dtype=cp.uint32)
seq_ids[:, 2] = x.get_tensor('seq_ids')[:, 2]

def build_output_message(self, x: MultiInferenceMessage) -> MultiPostprocLogParsingMessage:

memory = PostprocMemoryLogParsing(
memory = TensorMemory(
count=x.count,
confidences=cp.zeros((x.count, self._inputs[list(self._inputs.keys())[0]].shape[1])),
labels=cp.zeros((x.count, self._inputs[list(self._inputs.keys())[0]].shape[1])),
input_ids=cp.zeros((x.count, x.input_ids.shape[1])),
seq_ids=cp.zeros((x.count, x.seq_ids.shape[1])),
)

output_message = MultiPostprocLogParsingMessage(meta=x.meta,
mess_offset=x.mess_offset,
mess_count=x.mess_count,
memory=memory,
offset=0,
count=x.count)
return output_message

def _build_response(self, batch: MultiInferenceMessage,
result: tritonclient.InferResult) -> ResponseMemoryLogParsing:

output = {output.mapped_name: result.as_numpy(output.name) for output in self._outputs.values()}
output = {key: softmax(val, axis=2) for key, val in output.items()}
confidences = {key: np.amax(val, axis=2) for key, val in output.items()}
labels = {key: np.argmax(val, axis=2) for key, val in output.items()}

mem = ResponseMemoryLogParsing(
count=output[list(output.keys())[0]].shape[0],
confidences=cp.array(confidences[list(output.keys())[0]]),
labels=cp.array(labels[list(output.keys())[0]]),
)

return mem
tensors={
'confidences': cp.zeros((x.count, self._inputs[list(self._inputs.keys())[0]].shape[1])),
'labels': cp.zeros((x.count, self._inputs[list(self._inputs.keys())[0]].shape[1])),
'input_ids': cp.zeros((x.count, x.get_tensor('input_ids').shape[1])),
'seq_ids': seq_ids
})

return MultiResponseMessage(meta=x.meta,
mess_offset=x.mess_offset,
mess_count=x.mess_count,
memory=memory,
offset=0,
count=x.count)

def _build_response(self, batch: MultiInferenceMessage, result: tritonclient.InferResult) -> TensorMemory:

outputs = {output.mapped_name: result.as_numpy(output.name) for output in self._outputs.values()}
outputs = {key: softmax(val, axis=2) for key, val in outputs.items()}
confidences = {key: np.amax(val, axis=2) for key, val in outputs.items()}
labels = {key: np.argmax(val, axis=2) for key, val in outputs.items()}

return TensorMemory(count=outputs[list(outputs.keys())[0]].shape[0],
tensors={
'confidences': cp.array(confidences[list(outputs.keys())[0]]),
'labels': cp.array(labels[list(outputs.keys())[0]])
})


@register_stage("inf-logparsing", modes=[PipelineModes.NLP])
class LogParsingInferenceStage(InferenceStage):
class LogParsingInferenceStage(TritonInferenceStage):
"""
NLP Triton inference stage for log parsing pipeline.
Expand All @@ -149,117 +113,71 @@ class LogParsingInferenceStage(InferenceStage):
use_shared_memory: bool, default = False, is_flag = True
Whether or not to use CUDA Shared IPC Memory for transferring data to Triton. Using CUDA IPC reduces network
transfer time but requires that Morpheus and Triton are located on the same machine
needs_logits : bool, default = True, is_flag = True
Determines whether a logits calculation is needed for the value returned by the Triton inference response.
inout_mapping : dict[str, str], optional
Dictionary used to map pipeline input/output names to Triton input/output names. Use this if the
Morpheus names do not match the model.
"""

def __init__(self,
c: Config,
model_name: str,
server_url: str,
force_convert_inputs: bool = False,
use_shared_memory: bool = False):
super().__init__(c)

self._config = c

self._kwargs = {
"model_name": model_name,
"server_url": server_url,
"force_convert_inputs": force_convert_inputs,
"use_shared_memory": use_shared_memory,
}

self._requires_seg_ids = False
use_shared_memory: bool = False,
needs_logits: bool = True,
inout_mapping: dict[str, str] = None):
super().__init__(c,
model_name=model_name,
server_url=server_url,
force_convert_inputs=force_convert_inputs,
use_shared_memory=use_shared_memory,
needs_logits=needs_logits,
inout_mapping=inout_mapping)

def supports_cpp_node(self):
# Get the value from the worker class
def supports_cpp_node(self) -> bool:
return False

def compute_schema(self, schema: StageSchema):
schema.output_schema.set_type(MultiPostprocLogParsingMessage)

def _build_single(self, builder: mrc.Builder, input_node: mrc.SegmentObject) -> mrc.SegmentObject:

def py_inference_fn(obs: mrc.Observable, sub: mrc.Subscriber):

worker = self._get_inference_worker(self._inf_queue)

worker.init()

outstanding_requests = 0

def on_next(x: MultiInferenceMessage):
nonlocal outstanding_requests

batches = self._split_batches(x, self._max_batch_size)

output_message = worker.build_output_message(x)

memory = output_message.memory

fut_list = []

for batch in batches:
outstanding_requests += 1
schema.output_schema.set_type(MultiResponseMessage)

fut = mrc.Future()

def set_output_fut(resp: ResponseMemoryLogParsing, inner_b, inner_f: mrc.Future):
nonlocal outstanding_requests
inner_memory = self._convert_one_response(memory, inner_b, resp)

inner_f.set_result(inner_memory)

outstanding_requests -= 1

fut_list.append(fut)

worker.process(batch, partial(set_output_fut, inner_b=batch, inner_f=fut))

for f in fut_list:
f.result()

return output_message

obs.pipe(ops.map(on_next)).subscribe(sub)

assert outstanding_requests == 0, "Not all inference requests were completed"
@staticmethod
def _convert_one_response(output: MultiResponseMessage, inf: MultiInferenceNLPMessage,
res: TensorMemory) -> MultiResponseMessage:
memory = output.memory

if (self._build_cpp_node()):
node = self._get_cpp_inference_node(builder)
else:
node = builder.make_node(self.unique_name, ops.build(py_inference_fn))
out_seq_ids = memory.get_tensor('seq_ids')
input_ids = memory.get_tensor('input_ids')
confidences = memory.get_tensor('confidences')
labels = memory.get_tensor('labels')

# Set the concurrency level to be up with the thread count
node.launch_options.pe_count = self._thread_count
builder.make_edge(input_node, node)
seq_ids = inf.get_id_tensor()

return node
seq_offset = seq_ids[0, 0].item() - output.mess_offset
seq_count = (seq_ids[-1, 0].item() + 1 - seq_offset) - output.mess_offset

@staticmethod
def _convert_one_response(output: PostprocMemoryLogParsing,
inf: MultiInferenceMessage,
res: ResponseMemoryLogParsing):
input_ids[inf.offset:inf.count + inf.offset, :] = inf.get_tensor('input_ids')
out_seq_ids[inf.offset:inf.count + inf.offset, :] = seq_ids

output.input_ids[inf.offset:inf.count + inf.offset, :] = inf.input_ids
output.seq_ids[inf.offset:inf.count + inf.offset, :] = inf.seq_ids
resp_confidences = res.get_tensor('confidences')
resp_labels = res.get_tensor('labels')

# Two scenarios:
if (inf.mess_count == inf.count):
output.confidences[inf.offset:inf.count + inf.offset, :] = res.confidences
output.labels[inf.offset:inf.count + inf.offset, :] = res.labels
assert seq_count == res.count
confidences[inf.offset:inf.offset + inf.count, :] = resp_confidences
labels[inf.offset:inf.offset + inf.count, :] = resp_labels
else:
assert inf.count == res.count

mess_ids = inf.seq_ids[:, 0].get().tolist()
mess_ids = seq_ids[:, 0].get().tolist()

# Out message has more reponses, so we have to do key based blending of probs
for i, idx in enumerate(mess_ids):
output.confidences[idx, :] = cp.maximum(output.confidences[idx, :], res.confidences[i, :])
output.labels[idx, :] = cp.maximum(output.labels[idx, :], res.labels[i, :])

return MultiPostprocLogParsingMessage.from_message(inf, memory=output, offset=inf.offset, count=inf.mess_count)
confidences[idx, :] = cp.maximum(confidences[idx, :], resp_confidences[i, :])
labels[idx, :] = cp.maximum(labels[idx, :], resp_labels[i, :])

def _get_inference_worker(self, inf_queue: ProducerConsumerQueue) -> InferenceWorker:
return MultiResponseMessage.from_message(inf, memory=memory, offset=inf.offset, count=inf.mess_count)

return TritonInferenceLogParsing(inf_queue, self._config, **self._kwargs)
def _get_inference_worker(self, inf_queue: ProducerConsumerQueue) -> TritonInferenceLogParsing:
return TritonInferenceLogParsing(inf_queue=inf_queue, c=self._config, **self._kwargs)
Loading

0 comments on commit 2e3f4e4

Please sign in to comment.