class JSONOutputParser(BaseOutputParser):def parse(self, text: str):""" Parse the output of an LLM call to a valid JSON format. """return json.loads(text.replace('```json', '').replace('```', ''), strict=False)
In [8]:
def classify_article(headline, summary, body, id=None, stage_1 =True, relation =None):""" This function takes a headline, a summary, and the content of a news article and it sends a call to Google's Gemini to classify the article. There are two different classifications: Stage 1 and Stage 2. If stage_1 is set to TRUE, then the call to the model will try to answer the following question: Is this news article related or unrelated to the Rule of Law? If stage_1 is set to FALSE, then the call to the model will try to rate how closely related is the news article to each one of the eight pillars of the Rule of Law. """# print(id)# Defining the prompt according to which stage are we calling the function forif stage_1 ==True: system_prompt = ptc.context_stage_1 human_prompt = ptc.instructions_stage_1else: system_prompt = ptc.context_stage_2 human_prompt = ptc.instructions_stage_2# Setting up the Prompt Template chat_prompt = ChatPromptTemplate.from_messages([# ("system", system_prompt), ("human", human_prompt), ])# Defining our chain chain_gemini = chat_prompt | ChatGoogleGenerativeAI(model ="gemini-pro", temperature =0.1, safety_settings = safety_settings, convert_system_message_to_human =True) | JSONOutputParser()# For Stage 2, we don't want to pass articles that were already classified as "UNRELATED", so we pre-defined the outcomeif stage_1 ==Falseandall(keyword notin relation for keyword in ["Related", "Justice", "Governance"]): outcome ="Unrelated"else:try: llm_response = chain_gemini.invoke({"headline": headline,"summary" : summary,"body" : body, }) status =True time.sleep(1) # We need to slow down the calls. given that the Gemini API has a limit of 60 calls per second# The API can still block some of our prompts due to undefined reasons. Sadly, we can't do anything about it, so we# predefine the outcome except (BlockedPromptException, StopCandidateException):print("Prompt BLOCKED") status =Falseexcept JSONDecodeError:print("Decode error... trying again...")try: llm_response = chain_gemini.invoke({"headline": headline,"summary" : summary,"body" : body, }) status =True time.sleep(1)except JSONDecodeError:print("Failed. Skipping article.") status =False# We use the STATUS variable to throw an outcome to our call depending if our prompt was blocked or not and# on the stage we are calling the function forif status ==True:if stage_1 ==True:if"Governance"in llm_response["rol_related"]: llm_response["rol_related"] =="Related"if"Justice"in llm_response["rol_related"]: llm_response["rol_related"] =="Related" outcome = [llm_response["rol_related"], llm_response["country"]]else: outcome = json.dumps(llm_response["pillars_relation"])else: outcome ="Skipped article"return outcome
def extract_score(string, pillar, t =7):""" This function extracts scores from a string and returns a binary value that is equal to 1 if the score is higher/equal than a specific threshold, and it returns zero if otherwise. """try: scores_dicts = ast.literal_eval(string) ratings = [v for x in scores_dicts for _,v in x.items()] keys = [k for x in scores_dicts for k,_ in x.items()] pattern =str(pillar) +". " idx =next((index for index, element inenumerate(keys) if pattern in element), None)if idx isnotNone: score = ratings[idx]else: score =0if score >= t:return1else:return0exceptValueError:if string =="Unrelated":return0exceptSyntaxError:if string =="Skipped article":return0
In [10]:
for i inrange(1, 9): var_name ="pillar_"+str(i) classified_data[var_name] = classified_data["pillars_score"].apply(lambda x: extract_score(x, i))
Cleaning location of events and topic relation
In [11]:
def loc2bin(location, country):if pd.isna(location):returnFalseelif country in location:returnTrueelse:returnFalse