"""L2P is compatible with FastDownward: https://www.fast-downward.orgFor usage, users must clone or download the submodule /downward separately and direct the`planner_path` to the folder. This module is not necessary to use L2P, but for ease of useto produce plans from generated domain and problem PDDL specifications via LLMs."""importsubprocess,re# Define the exit codesSUCCESS=0SEARCH_PLAN_FOUND_AND_OUT_OF_MEMORY=1SEARCH_PLAN_FOUND_AND_OUT_OF_TIME=2SEARCH_PLAN_FOUND_AND_OUT_OF_MEMORY_AND_TIME=3TRANSLATE_UNSOLVABLE=10SEARCH_UNSOLVABLE=11SEARCH_UNSOLVED_INCOMPLETE=12TRANSLATE_OUT_OF_MEMORY=20TRANSLATE_OUT_OF_TIME=21SEARCH_OUT_OF_MEMORY=22SEARCH_OUT_OF_TIME=23SEARCH_OUT_OF_MEMORY_AND_TIME=24TRANSLATE_CRITICAL_ERROR=30TRANSLATE_INPUT_ERROR=31SEARCH_CRITICAL_ERROR=32SEARCH_INPUT_ERROR=33SEARCH_UNSUPPORTED=34DRIVER_CRITICAL_ERROR=35DRIVER_INPUT_ERROR=36DRIVER_UNSUPPORTED=37
[docs]classFastDownward:def__init__(self,planner_path:str):self.planner_path=planner_path# directory of FastDownward planner
[docs]defrun_fast_downward(self,domain_file:str,problem_file:str,search_alg:str="lama-first"):""" Main function to run planner. Args: domain_file (str): PDDL domain file path problem_file (str): PDDL problem file path search_alg (str): search algorithm/heuristic to use + refer to: https://www.fast-downward.org/PlannerUsage Returns: success (bool): if a plan was found, otherwise False for incomplete. plan_output (str): plan output information. """try:result=subprocess.run([self.planner_path,"--alias",search_alg,domain_file,problem_file],capture_output=True,text=True,)exitcodes=[result.returncode]ifresult.returncode==SUCCESS:# Planning succeededprint("Planning succeeded!")print("All run components successfully terminated (translator: completed, search: found a plan, validate: validated a plan)")# Extract the plan steps from the outputplan_output=self.extract_plan_steps(result.stdout)ifplan_output:returnTrue,plan_outputelse:returnFalse,"No plan found in the output."else:# Planning failedexitcode,plan_found=self.generate_portfolio_exitcode(exitcodes)returnFalse,self.handle_error(exitcode,plan_found)exceptExceptionase:print("An error occurred while running the planner.")returnFalse,str(e)
[docs]defhandle_error(self,exitcode,plan_found):ifplan_found:ifexitcode==SEARCH_PLAN_FOUND_AND_OUT_OF_MEMORY:return"Plan found but the search ran out of memory."elifexitcode==SEARCH_PLAN_FOUND_AND_OUT_OF_TIME:return"Plan found but the search ran out of time."elifexitcode==SEARCH_PLAN_FOUND_AND_OUT_OF_MEMORY_AND_TIME:return"Plan found but the search ran out of memory and time."else:returnf"Unknown plan occurred with exit code: {exitcode}"else:ifexitcode==TRANSLATE_UNSOLVABLE:return"Translate phase determined the problem is unsolvable."elifexitcode==SEARCH_UNSOLVABLE:return"Search phase determined the problem is unsolvable."elifexitcode==SEARCH_UNSOLVED_INCOMPLETE:return"Search phase was incomplete and did not solve the problem."elifexitcode==TRANSLATE_OUT_OF_MEMORY:return"Translate phase ran out of memory."elifexitcode==TRANSLATE_OUT_OF_TIME:return"Translate phase ran out of time."elifexitcode==SEARCH_OUT_OF_MEMORY:return"Search phase ran out of memory."elifexitcode==SEARCH_OUT_OF_TIME:return"Search phase ran out of time."elifexitcode==SEARCH_OUT_OF_MEMORY_AND_TIME:return"Search phase ran out of memory and time."elifexitcode==TRANSLATE_CRITICAL_ERROR:return"Critical error in translate phase."elifexitcode==TRANSLATE_INPUT_ERROR:return"Input error in translate phase."elifexitcode==SEARCH_CRITICAL_ERROR:return"Critical error in search phase."elifexitcode==SEARCH_INPUT_ERROR:return"Input error in search phase."elifexitcode==SEARCH_UNSUPPORTED:return"Search phase encountered an unsupported feature."elifexitcode==DRIVER_CRITICAL_ERROR:return"Critical error in the driver."elifexitcode==DRIVER_INPUT_ERROR:return"Input error in the driver."elifexitcode==DRIVER_UNSUPPORTED:return"Driver encountered an unsupported feature."else:returnf"Unknown error occurred with exit code: {exitcode}"
[docs]defis_unrecoverable(self,exitcode):# Exit codes in the range from 30 to 39 represent unrecoverable failures.return30<=exitcode<40
[docs]defgenerate_portfolio_exitcode(self,exitcodes):print("Exit codes: {}".format(exitcodes))exitcodes=set(exitcodes)unrecoverable_codes=[codeforcodeinexitcodesifself.is_unrecoverable(code)]# There are unrecoverable exit codes.ifunrecoverable_codes:print("Error: Unexpected exit codes: {}".format(unrecoverable_codes))iflen(unrecoverable_codes)==1:return(unrecoverable_codes[0],False)else:return(SEARCH_CRITICAL_ERROR,False)# At least one plan was found.ifSUCCESSinexitcodes:ifSEARCH_OUT_OF_MEMORYinexitcodesandSEARCH_OUT_OF_TIMEinexitcodes:return(SEARCH_PLAN_FOUND_AND_OUT_OF_MEMORY_AND_TIME,True)elifSEARCH_OUT_OF_MEMORYinexitcodes:return(SEARCH_PLAN_FOUND_AND_OUT_OF_MEMORY,True)elifSEARCH_OUT_OF_TIMEinexitcodes:return(SEARCH_PLAN_FOUND_AND_OUT_OF_TIME,True)else:return(SUCCESS,True)# A config proved unsolvability or did not find a plan.forcodein[SEARCH_UNSOLVABLE,SEARCH_UNSOLVED_INCOMPLETE]:ifcodeinexitcodes:return(code,False)# No plan was found due to hitting resource limits.ifSEARCH_OUT_OF_MEMORYinexitcodesandSEARCH_OUT_OF_TIMEinexitcodes:return(SEARCH_OUT_OF_MEMORY_AND_TIME,False)elifSEARCH_OUT_OF_MEMORYinexitcodes:return(SEARCH_OUT_OF_MEMORY,False)elifSEARCH_OUT_OF_TIMEinexitcodes:return(SEARCH_OUT_OF_TIME,False)assertFalse,"Error: Unhandled exit codes: {}".format(exitcodes)