Sentiment Analysis with VADER

Sentiment analysis (also known as opinion mining ) refers to the use of natural language processing, text analysis, computational linguistics to systematically identify, extract, quantify, and study affective states and subjective information. [1] In short, Sentiment analysis gives an objective idea of whether the text uses mostly positive, negative, or neutral language. [2]

Sentiment analysis software can assist estimate people opinion on the events in finance world, generate reports for relevant information, analyze correlation between events and stock prices.


Image by Gino Crescoli from Pixabay

The problem

In this post we investigate how to extract information about company and detect its sentiment. For each text sentence or paragraph we will detect its positivity or negativity by calculating sentiment score. This also called polarity. Polarity in sentiment analysis refers to identifying sentiment orientation (positive, neutral, and negative) in the text.

Given the list of companies we want to find polarity of sentiment in the text that has names of companies from the list. Below is the description how it can be implemented.

Getting Data

We will use google to collect data. For this we search google via script for documents with some predefined keywords.
It will return the links that we will save to array.

try: 
    from googlesearch import search 
except ImportError:  
    print("No module named 'google' found") 
  
# to search 
query = "financial_news Warren Buffett 2019"

links=[]  
for j in search(query, tld="co.in", num=10, stop=10, pause=6): 
    print(j) 
    links.append(j)

Preprocessing

After we got links, we need get text documents and remove not needed text and characters. So in this step we remove html tags, not valid characters. We keep however paragraph tags. Using paragraph tag we divide text document in smaller text units. After that we remove p tags.

para_data=[]


def get_text(url):
   print (url) 
   
   try:
      req  = requests.get(url, timeout=5)
   except: 
      return "TIMEOUT ERROR"  
  
   data = req.text
   soup = BeautifulSoup(data, "html.parser")
   
     
   paras=[]
   paras_ = soup.find_all('p')
   filtered_paras= filter(tag_visible, paras_)
   for s in filtered_paras:
       paras.append(s)
   if len(paras) > 0:
      for i, para in enumerate(paras):
           para=remove_tags(para)
           # remove non text characters
           para_data.append(clean_txt(para))

Calculating Sentiment

Now we calculate sentiment score using VADER (Valence Aware Dictionary and sEntiment Reasoner) VADER is a lexicon and rule-based sentiment analysis tool that is specifically attuned to sentiments.[3] Based on calculated sentiment we build plot. In this example we only build plot for first company name which is Coca Cola.

from vaderSentiment.vaderSentiment import SentimentIntensityAnalyzer 
  

def sentiment_scores(sentence): 
    
    # Create a SentimentIntensityAnalyzer object. 
    sid_obj = SentimentIntensityAnalyzer() 
  
    # polarity_scores method of SentimentIntensityAnalyzer 
    # oject gives a sentiment dictionary. 
    # which contains pos, neg, neu, and compound scores. 
    sentiment_dict = sid_obj.polarity_scores(sentence) 
      
    print("Overall sentiment dictionary is : ", sentiment_dict) 
    print("sentence was rated as ", sentiment_dict['neg']*100, "% Negative") 
    print("sentence was rated as ", sentiment_dict['neu']*100, "% Neutral") 
    print("sentence was rated as ", sentiment_dict['pos']*100, "% Positive")

    
    # decide sentiment as positive, negative and neutral 
    if sentiment_dict['compound'] >= 0.05 : 
        print("Positive") 
        
    elif sentiment_dict['compound'] <= - 0.05 : 
        print("Negative") 
  
    else : 
        print("Neutral") 
    return sentiment_dict['compound'] 

Below you can find full source code.

# -*- coding: utf-8 -*-
from bs4 import BeautifulSoup, NavigableString
from bs4.element import Comment
import requests
import re

TAG_RE = re.compile(r'<[^>]+>')

def remove_tags(text_string):
    print (text_string)
    
    return TAG_RE.sub('', str(text_string))

MIN_LENGTH_of_document = 40
MIN_LENGTH_of_word = 2

from vaderSentiment.vaderSentiment import SentimentIntensityAnalyzer 
  
# function to print sentiments 
# of the sentence. 
# below function based on function from https://www.geeksforgeeks.org/python-sentiment-analysis-using-vader/
def sentiment_scores(sentence): 
    
    # Create a SentimentIntensityAnalyzer object. 
    sid_obj = SentimentIntensityAnalyzer() 
  
    # polarity_scores method of SentimentIntensityAnalyzer 
    # oject gives a sentiment dictionary. 
    # which contains pos, neg, neu, and compound scores. 
    sentiment_dict = sid_obj.polarity_scores(sentence) 
      
    print("Overall sentiment dictionary is : ", sentiment_dict) 
    print("sentence was rated as ", sentiment_dict['neg']*100, "% Negative") 
    print("sentence was rated as ", sentiment_dict['neu']*100, "% Neutral") 
    print("sentence was rated as ", sentiment_dict['pos']*100, "% Positive")
    print("Sentence Overall Rated As", end = " ") 
  
    return sentiment_dict['compound']    

def remove_min_words(txt):
   # https://www.w3resource.com/python-exercises/re/python-re-exercise-49.php
   shortword = re.compile(r'\W*\b\w{1,1}\b')
   return(shortword.sub('', txt))        
    
        
def clean_txt(text):
  
   text = re.sub('[^A-Za-z.  ]', ' ', str(text))
   text=' '.join(text.split())
   text = remove_min_words(text)
   text=text.lower()
   text = text if  len(text) >= MIN_LENGTH_of_document else ""
   return text
        

def between(cur, end):
    while cur and cur != end:
        if isinstance(cur, NavigableString):
            text = cur.strip()
            if len(text):
                yield text
        cur = cur.next_element
        
def next_element(elem):
    while elem is not None:
        # Find next element, skip NavigableString objects
        elem = elem.next_sibling
        if hasattr(elem, 'name'):
            return elem        

def tag_visible(element):
    if element.parent.name in ['style', 'script', 'head',  'meta', '[document]']:
        return False
    if isinstance(element, Comment):
        return False
    return True
    
para_data=[]


def get_text(url):
   print (url) 
   
   try:
      req  = requests.get(url, timeout=5)
   except: 
      return "TIMEOUT ERROR"  
  
   data = req.text
   soup = BeautifulSoup(data, "html.parser")
   
     
   paras=[]
   paras_ = soup.find_all('p')
   filtered_paras= filter(tag_visible, paras_)
   for s in filtered_paras:
       paras.append(s)
   if len(paras) > 0:
      for i, para in enumerate(paras):
           para=remove_tags(para)
           # remove non text characters
           para_data.append(clean_txt(para))
           
     

try: 
    from googlesearch import search 
except ImportError:  
    print("No module named 'google' found") 
  
# to search 
query = "coca cola 2019"

links=[]  
for j in search(query, tld="co.in", num=25, stop=25, pause=6): 
    print(j) 
    links.append(j)
  
# Here our list consists from one company name, but it can include more than one.  
orgs=["coca cola" ]    
 
results=[] 
count=0  


def update_dict_value( dict, key, value):
    if key in dict:
           dict[key]= dict[key]+value
    else:
           dict[key] =value
    return dict

for link in links:
    # will update paras - array of paragraphs
    get_text(link)
    
    for pr in para_data:
              
        for org in orgs:
            if pr.find (org) >=0:
                # extract sentiment
                score=0
                results.append ([org, sentiment_scores(pr), pr])


positive={}
negative={}
positive_sentiment={}
negative_sentiment={}

  
for i in range(len(results)):
    org = results[i][0]
   
    if (results[i][1] >=0):
        positive = update_dict_value( positive, org, 1)
        positive_sentiment =  update_dict_value( positive_sentiment, org,results[i][1])

    else:
        negative = update_dict_value( negative, org, 1)
        negative_sentiment =  update_dict_value( negative_sentiment, org,results[i][1])

for org in orgs:
   
    positive_sentiment[org]=positive_sentiment[org] / positive[org]
    print (negative_sentiment[org])
    negative_sentiment[org]=negative_sentiment[org] / negative[org]   

import matplotlib.pyplot as plt 


# x-coordinates of left sides of bars  
labels = ['negative', 'positive'] 
  
# heights of bars 
sentiment = [(-1)*negative_sentiment[orgs[0]], positive_sentiment[orgs[0]]] 


# labels for bars 
tick_label = ['negative', 'positive'] 
  
# plotting a bar chart 
plt.bar(labels, sentiment, tick_label = tick_label, 
        width = 0.8, color = ['red', 'green']) 
  
# naming the x-axis 
plt.xlabel('x - axis') 
# naming the y-axis 
plt.ylabel('y - axis') 
# plot title 
plt.title('Sentiment Analysis') 
  
# function to show the plot 
plt.show() 

References
1. Sentiment analysis Wikipedia
2. What is a “Sentiment Score” and how is it measured?
3. VADER-Sentiment-Analysis

How to Search Text Documents with Whoosh

Whoosh is a python library of classes and functions for indexing text and then searching the index. If the application requires text documents search functionality, Whoosh module can be used for this task. This post will summarize main steps needed for implementing search with Whoosh.

Text Search

Using Whoosh consists of indexing documents and then querying (searching) the index.
First we need to import needed modules:

from whoosh.fields import Schema, TEXT, ID
from whoosh import index

To index documents we need define folder where to save needed files.

import os.path
if not os.path.exists("indexdir"):
    os.mkdir("indexdir")

We also need define Schema – the set of all possible fields in a document.

The schema specifies the fields of documents in an index. Each document can have multiple fields, such as title, content, url, date, etc.

schema = Schema(title=TEXT(stored=True), path=ID(stored=True), content=TEXT(stored = True))


ix = index.create_in("indexdir", schema)

writer = ix.writer()
writer.add_document(title=u"My document", content=u"This is my python document! hello big world",
                    path=u"/a")
writer.add_document(title=u"Second try", content=u"This is the second example hello world.",
                    path=u"/b")
writer.add_document(title=u"Third time's the charm", content=u"More examples. Examples are many.",
                    path=u"/c")

writer.commit()

Once index is created, we can search documents using index:

from whoosh.qparser import QueryParser

with ix.searcher() as searcher:
     query = QueryParser("content", ix.schema).parse("hello world")
     results = searcher.search(query, terms=True)
    
     for r in results:
         print (r, r.score)
         # Was this results object created with terms=True?
         if results.has_matched_terms():
            # What terms matched in the results?
            print(results.matched_terms())
        
     # What terms matched in each hit?
     print ("matched terms")
     for hit in results:
        print(hit.matched_terms())

The output that we get:

<Hit {'path': '/b', 'title': 'Second try', 'content': 'This is the second example hello world.'}>
<Hit {'path': '/b', 'title': 'Second try', 'content': 'This is the second example hello world.'}> 2.124137931034483
{('content', b'hello'), ('content', b'world')}
<Hit {'path': '/a', 'title': 'My document', 'content': 'This is my python document! hello big world'}> 1.7906976744186047
{('content', b'hello'), ('content', b'world')}
matched terms
[('content', b'hello'), ('content', b'world')]
[('content', b'hello'), ('content', b'world')]

Whoosh has many features that can enhance searching. We can get more documents like a certain search hit. This requires that the field you want to match on is vectored or stored, or that you have access to the original text (such as from a database). Here is the example, more_like_this() is used for this.

print ("more_results")
     first_hit = results[0]
     more_results = first_hit.more_like_this("content")
     print (more_results)   

Output:

more_results
<Top 1 Results for Or([Term('content', 'example', boost=0.6588835188105945), Term('content', 'second', boost=0.6588835188105945), Term('content', 'hello', boost=0.5617184491361429), Term('content', 'world', boost=0.5617184491361429)]) runtime=0.0038603000000136944>  

If we want to know the number of matched documents we can call len(results) but on very large indexes it can cause delay, but there is a way avoid this by getting just low and high estimate.

found = results.scored_length()
if results.has_exact_length():
    print("Scored", found, "of exactly", len(results), "documents")
else:
    low = results.estimated_min_length()
    high = results.estimated_length()

    print("Scored", found, "of between", low, "and", high, "documents")    

Below you can find full python source code for the above and references to the Whoosh documentation and other articles about Whoosh. You will find how to use Whoosh with pandas or how to use Whoosh with web2py for web crawling project.

# -*- coding: utf-8 -*-

from whoosh.fields import Schema, TEXT, ID
from whoosh import index

#To create an index in a directory, use index.create_in:

import os.path

if not os.path.exists("indexdir"):
    os.mkdir("indexdir")
    
schema = Schema(title=TEXT(stored=True), path=ID(stored=True), content=TEXT(stored = True))


ix = index.create_in("indexdir", schema)

writer = ix.writer()
writer.add_document(title=u"My document", content=u"This is my python document! hello big world",
                    path=u"/a")
writer.add_document(title=u"Second try", content=u"This is the second example hello world.",
                    path=u"/b")
writer.add_document(title=u"Third time's the charm", content=u"More examples. Examples are many.",
                    path=u"/c")

writer.commit()


from whoosh.qparser import QueryParser

with ix.searcher() as searcher:
     query = QueryParser("content", ix.schema).parse("hello world")
     results = searcher.search(query, terms=True)
     print(results[0])

     for r in results:
         print (r, r.score)
         # Was this results object created with terms=True?
         if results.has_matched_terms():
            # What terms matched in the results?
            print(results.matched_terms())
        
     # What terms matched in each hit?
     print ("matched terms")
     for hit in results:
        print(hit.matched_terms())

     

     print ("more_results")
     first_hit = results[0]
     more_results = first_hit.more_like_this("content")
     print (more_results)     
        
    
found = results.scored_length()
if results.has_exact_length():
    print("Scored", found, "of exactly", len(results), "documents")
else:
    low = results.estimated_min_length()
    high = results.estimated_length()

    print("Scored", found, "of between", low, "and", high, "documents")    

References

1. Quickstart
2. Developing a fast Indexing and Full text Search Engine with Whoosh: A Pure-Python Library
3. Whoosh , Pandas, and Redshift: Implementing Full Text Search in a Relational Database
4. USING WHOOSH WITH WEB2PY