Code for 'Data Manipulation at Scale: Systems and Algorithms'

Как и обещал, выкладываю сниппеты к посту
про курс:

Data Manipulation at Scale: Systems and Algorithms
University of Washington

Assignment: Tweets sentiment analisys
– sentiment of each tweet: sum(sentiments for each word)
– sentiment of new word: numPositiveTweets — numNegativeTweets
– word frequency: wordCount / numWords
– happiest state: tweetsForState; max(sumSentiment / numTweets)
– top ten hash tags: sort(tagCount / totalTags)

# frequency.py
import sys
import json
import re

def parseTweetFile(fp):
    """Return iterator, item is a dict object
    for line in fp:
        tw = json.loads(line)
        yield tw

def tweetText(tweet):
    """Return text from tweet or ''
    return tweet.get('text', '')

def getTerms(text):
    """Return list of words.
    Each word is lowercase and cleared from non alphabet symbols.
    if not text:
        return []

    pat = '[^A-Za-z]+'
    clean = re.sub(pat, ' ', text)
    lst = clean.split()
    res = [x.strip().lower() for x in lst]
    return res

def wordOccurences(tweets):
    """Return dict with records:
        term: num of occurences
    res = {}
    total = 0
    for tw in tweets:
        text = tweetText(tw)
        terms = getTerms(text)
        for term in terms:
            cnt = res.get(term, 0)
            res[term] = cnt + 1
            total += 1
    return res, total

def main():
    tweet_file = open(sys.argv[1])
    tweets = parseTweetFile(tweet_file)
    db, total = wordOccurences(tweets)
    for rec in db.items():
        term, occ = rec
        freq = float(occ) / float(total)
        print("%s %f" % (term, freq))

if __name__ == '__main__':

# happiest_state.py
import sys
import json
import re

def getUSStates():
    """Return (abr, states)
    st = {
        'AK': 'Alaska',
        'AL': 'Alabama',
        'WV': 'West Virginia',
        'WY': 'Wyoming'
    abr = [(key.lower(), key) for key, val in st.items()]
    abr = dict(abr)
    states = [(val.lower(), key) for key, val in st.items()]
    states = dict(states)
    return (abr, states)

ST_ABBR, ST_NAMES = getUSStates()

def parseSentFile(fp):
    """Return dictionary word: score
    res = {}
    for line in fp:
        term, score = line.split('\t')
        res[term] = int(score)
    return res

def totalSentiment(terms, scores):
    """Return sentiment score for word list
    res = 0
    for term in terms:
        score = scores.get(term, 0)
        res += score
    return res

def splitFullName(fn):
    """Return tuple (city, state) or ('', '')
    res = ('', '')
    lst = fn.split(',')
    cleanLst = [x.strip() for x in lst if x.strip()]
    if len(cleanLst) == 2:
        res = (cleanLst[0], cleanLst[1])
    return res

def detectState(tweet):
    """Return two letter USA state name or ''
    res = ''
    place = tweet.get('place', None)
    if place is not None:
        country = place.get('country', '')
        country_code = place.get('country_code', '')
        full_name = place.get('full_name', '')
        if country == 'Unated States' or country_code == 'US':
            fullName = splitFullName(full_name)
            city, state = fullName
            res = ST_ABBR.get(state.lower(), '')
            if not res:
                res = ST_NAMES.get(city.lower(), '')
    return res

def tweetsForStates(tweets, scores):
    """Return dict with records:
        state: (totalScore, numTweets)
    res = {}
    for tw in tweets:
        text = tweetText(tw)
        if text:
            terms = getTerms(text)
            twScore = totalSentiment(terms, scores)
            state = detectState(tw)
            tot, num = res.get(state, (0, 0))
            res[state] = (tot + twScore, num + 1)
    return res

def main():
    sent_file = open(sys.argv[1])
    tweet_file = open(sys.argv[2])
    scores = parseSentFile(sent_file)
    tweets = parseTweetFile(tweet_file)
    db = tweetsForStates(tweets, scores)
    curr = sys.float_info.min
    happystate = 'undetected'
    for rec in db.items():
        state, data = rec
        score, num = data
        aver = float(score) / float(num)
        if aver > curr and state:
            curr = aver
            happystate = state
    print happystate

if __name__ == '__main__':

# term_sentiment.py
import sys
import json
import re

def parseSentFile(fp):
    """Return dictionary word: score
    res = {}
    for line in fp:
        term, score = line.split('\t')
        res[term] = int(score)
    return res

def totalSentiment(terms, scores):
    """Return sentiment score for word list
    res = 0
    for term in terms:
        score = scores.get(term, 0)
        res += score
    return res

def calcTermsSentiment(tweets, scores):
    """Return dict with records:
        term: (pos, neg, tot)
    res = {}
    for tw in tweets:
        text = tweetText(tw)
        terms = getTerms(text)
        twScore = totalSentiment(terms, scores)
        for term in terms:
            score = scores.get(term, None)
            if score is None:
                pos, neg, tot = res.get(term, (0, 0, 0))
                if twScore > 0:
                    pos += 1
                elif twScore < 0:
                    neg +=1
                tot += twScore
                res[term] = (pos, neg, tot)
    return res

def termSentiment(pos, neg, tot):
    """Return float: term sentiment
    return float(pos - neg)
    if neg == 0:
        neg = 1
    return float(pos) / float(neg)

def main():
    sent_file = open(sys.argv[1])
    tweet_file = open(sys.argv[2])
    scores = parseSentFile(sent_file)
    tweets = parseTweetFile(tweet_file)
    db = calcTermsSentiment(tweets, scores)
    for rec in db.items():
        term, counts = rec
        pos, neg, tot = counts
        sent = termSentiment(pos, neg, tot)
        print("%s %f" % (term, sent))

if __name__ == '__main__':

# top_ten.py
import sys
import json
import re
import collections
import heapq

def getTags(tweet):
    """Return list of hashtags or []
    ents = tweet.get('entities', {})
    tags = ents.get('hashtags', [])
    res = [x['text'] for x in tags]
    return res

def hashtagOccurences(tweets):
    """Return dict with records:
        hashtag: num of occurences
    res = collections.defaultdict(int)
    total = 0
    for tw in tweets:
        tags = getTags(tw)
        for tag in tags:
            res[tag] += 1
            total += 1
    return res, total

def main():
    tweet_file = open(sys.argv[1])
    tweets = parseTweetFile(tweet_file)
    db, total = hashtagOccurences(tweets)
    tagfreq = []
    for rec in db.items():
        tag, occ = rec
        freq = float(occ) / float(total)
        tagfreq.append((tag, occ))
    tt = heapq.nlargest(10, tagfreq, key = lambda (t,f): f)
    for rec in tt:
        tag, freq = rec
        print("%s %d" % (tag, freq))

if __name__ == '__main__':

# tweet_sentiment.py
import sys
import json
import re

def getTermScore(term, db):
    """Return term sentiment score from db
    res = db.get(term, 0)
    return res

def calcTweetSentiment(tweet, db):
    """Return int: tweet sentiment score.
    If tweet is not really a tweet (no text in it), return None
    res = None
    if 'text' in tweet:
        res = 0
        text = tweet['text']
        terms = getTerms(text)
        for term in terms:
            score = getTermScore(term, db)
            res += score
    return res

def main():
    sent_file = open(sys.argv[1])
    tweet_file = open(sys.argv[2])
    scores = parseSentFile(sent_file)
    tweets = parseTweetFile(tweet_file)
    for tw in tweets:
        score = calcTweetSentiment(tw, scores)
        if score is not None:
            print score
            print 0

if __name__ == '__main__':

SQL assignment
– sql for RA expression 'πtermdocid=10398_txt_earn and count=1(frequency))'
select term from frequency where docid = '10398_txt_earn' and count = 1

– RA 'πtermdocid=10398_txt_earn and count=1(frequency)) U πtermdocid=925_txt_trade and count=1(frequency))'
 select term from frequency where docid = '10398_txt_earn' and count = 1
 select term from frequency where docid = '925_txt_trade' and count = 1

– Write a SQL statement to count the number of unique documents containing the word "law" or containing the word "legal"
select count(*) from (select distinct docid from frequency where term = 'law' or term = 'legal') x

– Write a SQL statement to find all documents that have more than 300 total terms
select docid, count(term) as numterms, sum(count) numwords 
from frequency
group by docid
having numterms > 300

– count the number of unique documents that contain both the word 'transactions' and the word 'world'
select distinct docid
from frequency
where term = 'transactions'
select distinct docid
from frequency
where term = 'world'

– Matrix multiplication in SQL (sparse matrix, may be very fast and efficient in some DB engines):
select A.row_num, B.col_num, sum(A.value * B.value)
from A, B
where A.col_num = B.row_num
group by A.row_num, B.col_num;

– Find the best matching document to the keyword query "washington taxes treasury"
compute the similarity of documents B = A dot Atranspose
Each row of the matrix is a document vector, with one column for every term in the entire corpus
docid : rownum
term : colnum
count : value
create view corpus as
SELECT * FROM frequency
SELECT 'q' as docid, 'washington' as term, 1 as count 
SELECT 'q' as docid, 'taxes' as term, 1 as count
SELECT 'q' as docid, 'treasury' as term, 1 as count;

# term-document matrix
create view A as
select docid rownum, term colnum, count value
from corpus;

# td matrix transposed
create view B as
select term rownum, docid colnum, count value
from corpus;

# matrix C = A dot B
create view C as
select A.rownum, B.colnum, sum(A.value * B.value) value
from A, B
where A.colnum = B.rownum and A.rownum < B.colnum
group by A.rownum, B.colnum;

# find max similarity score for 'q' doc 
select rownum, max(value)
from C
where rownum = 'q' or colnum = 'q';

MapReduce assignment
– example
import MapReduce
mr = MapReduce.MapReduce()

def mapper(record):
    # key: document identifier
    # value: document contents
    key = record[0]
    value = record[1]
    words = value.split()
    for w in words:
      mr.emit_intermediate(w, 1) # appent '1' to list, list is dict value under 'w' key in storage

def reducer(key, list_of_values):
    # key: word
    # value: list of occurrence counts from intermediate res.
    total = 0
    for v in list_of_values:
      total += v
    mr.emit((key, total)) # appent tuple to result list

if __name__ == '__main__':
  inputdata = open(sys.argv[1])
  mr.execute(inputdata, mapper, reducer)

– Create an Inverted index. Given a set of documents, an inverted index is a dictionary where each word is associated with a list of the document identifiers in which that word appears
def mapper((docid, text)) ...
words = text.split()
for w in words: mr.emit_intermediate(w, docid)
def reducer(word, values) ...
docs = distinct list(values)
mr.emit((word, docs))

– Implement a relational join as a MapReduce query
def mapper((tabname, joinkey, tabrow)) ...
mr.emit_intermediate(joinkey, (tabname, tabrow))
def reducer(key, values) ...
if len(values) >= 2: ...
masterrow = getMaster(values)
detailrows = getDetails(values)
for line in detailrows: mr.emit(masterrow + line)

– Consider a simple social network dataset consisting of a set of key-value pairs (person, friend) representing a friend relationship between two people. Count the number of friends for each person
def mapper((pers, frnd)) ...
mr.emit_intermediate(pers, frnd)
def reducer(pers, friends) ...
mr.emit(len distinct list(friends))

– Generate a list of all non-symmetric friend relationships
def mapper((pers, frnd)) ...
mr.emit_intermediate((pers, frnd), frnd)
mr.emit_intermediate((frnd, pers), '')
def reducer((name1, name2), values) ...
if name2 not in values: mr.emit((name1, name2))

– Consider a set of key-value pairs where each key is sequence id and each value is a string of nucleotides, e.g., GCTTCCGAAATGCTCGAA.... Remove the last 10 characters from each string of nucleotides, then remove any duplicates generated
def mapper((sid, nucs)) ...
mr.emit_intermediate(nucs[:-10], sid)
def reducer(key, val) ...

– Design a MapReduce algorithm to compute the matrix multiplication A x B
def mapper((mname, rownum, colnum, val)) ...
if mname == 'a': ...
for col in 0..BCOLS: ...
mr.emit_intermediate((rownum, col), (mname, colnum, val))
if mname == 'b': ...
for row in 0..AROWS: ...
mr.emit_intermediate((row, colnum), (mname, rownum, val))
def reducer((row, col), values) ...
for item in values: ...
_, idx, value = item
for idx in res.keys(): ...
a, b = res[idx]
res[idx] = a * b
val = sum(res.values())
mr.emit((row, col, val))

original post http://vasnake.blogspot.com/2016/02/code-for-data-manipulation-at-scale.html

