Find answers from the community

Updated 4 months ago

Hi everyone!! I’ve been playing around

Hi everyone!! I’ve been playing around with the latest LlamaIndex workflow features and had a question. How can I trigger parallel sub-events? Like, if I break down a task into issues A, B, and C, is there a way to have all these events fire off at the same time for fetching and generating answers, then combine the results at the end?
I saw examples where a single event triggers another, but nothing showing how to trigger multiple events and then merge their results. I did find something called “collection_event”, but I’m not sure how to use it for this.
L
T
5 comments
Good question! As long as you use proper async function calls (i.e. for llms or query engines or whatever), this is pretty possible. Not entirely documented at the moment though, I can probably add this to the docs 🙂

Plain Text
class MyWorkflow(Workflow):
  @step(pass_context=True)
  async def kickoff(self, ev: StartEvent) -> GatherEvent
    self.send_event(LLMEvent(input="one"))
    self.send_event(LLMEvent(input="one"))
    self.send_event(LLMEvent(input="one"))

    ctx.data['num_to_collect'] = 3

    return GatherEvent()
   
  @step()
  async def llm_call(self, ev: LLMEvent) -> LLMResultEvent:
    response = await llm.acomplete(ev.input)
    return LLMResultEvent(result=str(response))
   
  @step(pass_context=True)
  async def gather(self, ctx: Context, ev: GatherEvent | LLMResultEvent) -> StopEvent | None:
    events = ctx.collect_events(ev, [LLMResultEvent]*ctx.data['num_to_collect'])
    if data is None:
       return None

    <do something with each event result?>
    return StopEvent(result=result)  
(I did not run the above lol let me know if it works out)
I'm glad to receive your reply. Here are the new issues I've discovered:

  1. The self._validate() raises a WorkflowValidationError:
Copyllama_index.core.workflow.errors.WorkflowValidationError: The following events are consumed but never produced: {<class 'main.LLMEvent'>}
I suspect this is because none of the steps' output types are LLMEvent. After I marked it off, it ran normally.

  1. There's an issue with the implementation of collect_events:
Since self._events_buffer[type(ev)] = ev uses the type rather than something like an event_id, it causes later LLMEvents to overwrite earlier results. I'm not sure if this is intentional.
Moreover, the following code:
Plain Text
for e_type in expected:
    e_instance = self._events_buffer.get(e_type)
    if e_instance:
        retval.append(e_instance)

leads to the first incoming LLMEvent triggering a StopEvent because expected contains three consecutive same LLMEvents, it keeps append retval and finally causing len(self._events_retval) == len(expected).
Here's my attempt at a modified, runnable function:
Plain Text
class Context:
    def __init__(self, parent: Optional["Context"] = None) -> None:
        # Global state
        if parent:
            self.data = parent.data
        else:
            self.data: Dict[str, Any] = {}
        
        # Step-specific instance
        self.parent = parent
        self._events_buffer: Dict[Type[Event], Event] = {}
        self._events_retval: List[Event] = []

    def collect_events(
        self, ev: Event, expected: List[Type[Event]]
    ) -> Optional[List[Event]]:
        self._events_buffer[type(ev)] = ev
        
        for e_type in expected:
            e_instance = self._events_buffer.get(e_type)
            if e_instance:
                self._events_retval.append(e_instance)
                break
        
        if len(self._events_retval) == len(expected):
            return self._events_retval
        
        return None

Note that I used break to avoid adding duplicates to retval due to expecting the same type multiple times. Also, I set it to self so that retval isn't cleared every time it's triggered.
I hope my suggestions are helpful to you.
Great feedback! I can make a PR to make this more supported and fix a few of these issues
Add a reply
Sign up and join the conversation on Discord