Code for 'Hadoop Platform and Application Framework'

Вчера вышла заметка 
про курс
Hadoop Platform and Application Framework
by University of California, San Diego (San Diego Supercomputer Center)

Я обещал оформить куски кода отдельно.
Вот, иллюстративный материал в виде сниппетов.

Cloudera QuickStart VM, practice

username: cloudera, password: cloudera, hostname: quickstart

Берем образ виртмашины (полный фарш), запускаем, логинимся на рабочий стол. Далее все операции  через веб-интерфейс и терминал. В общем, там внутре подробная шпаргалка.

Load data from SQL to HDFS, Sqoop:
sqoop import-all-tables \
    -m 1 \
    --connect jdbc:mysql://quickstart:3306/retail_db \
    --username=retail_dba \
    --password=cloudera \
    --compression-codec=snappy \
    --as-avrodatafile \

hadoop fs -ls /user/hive/warehouse
Found 6 items
drwxr-xr-x   - cloudera hive          0 2016-01-04 11:50 /user/hive/warehouse/categories
drwxr-xr-x   - cloudera hive          0 2016-01-04 11:51 /user/hive/warehouse/customers
drwxr-xr-x   - cloudera hive          0 2016-01-04 11:51 /user/hive/warehouse/departments
drwxr-xr-x   - cloudera hive          0 2016-01-04 11:52 /user/hive/warehouse/order_items
drwxr-xr-x   - cloudera hive          0 2016-01-04 11:52 /user/hive/warehouse/orders
drwxr-xr-x   - cloudera hive          0 2016-01-04 11:52 /user/hive/warehouse/products

# schema 
ls -l *.avsc
-rw-rw-r-- 1 cloudera cloudera  541 Jan  4 11:50 sqoop_import_categories.avsc
-rw-rw-r-- 1 cloudera cloudera 1324 Jan  4 11:50 sqoop_import_customers.avsc

sudo -u hdfs hadoop fs -mkdir /user/examples
sudo -u hdfs hadoop fs -chmod +rw /user/examples
hadoop fs -copyFromLocal ~/*.avsc /user/examples/

Query Structured Data using Hue + Impala
LOCATION 'hdfs:///user/hive/warehouse/categories'
TBLPROPERTIES ('avro.schema.url'='hdfs://quickstart/user/examples/sqoop_import_categories.avsc');
invalidate metadata;
show tables;

-- Most popular product categories
select c.category_name, count(order_item_quantity) as count
from order_items oi
inner join products p on oi.order_item_product_id = p.product_id
inner join categories c on c.category_id = p.product_category_id
group by c.category_name
order by count desc
limit 10;

-- top 10 revenue generating products
select p.product_id, p.product_name, r.revenue
from products p inner join
(select oi.order_item_product_id, sum(cast(oi.order_item_subtotal as float)) as revenue
from order_items oi inner join orders o
on oi.order_item_order_id = o.order_id
where o.order_status <> 'CANCELED'
and o.order_status <> 'SUSPECTED_FRAUD'
group by order_item_product_id) r
on p.product_id = r.order_item_product_id
order by r.revenue desc
limit 10;

Correlate Structured Data with Unstructured Data (DB records + web logs)
Bulk upload, web log data
sudo -u hdfs hadoop fs -mkdir /user/hive/warehouse/original_access_logs
sudo -u hdfs hadoop fs -copyFromLocal /opt/examples/log_files/access.log.2 /user/hive/warehouse/original_access_logs
hadoop fs -ls /user/hive/warehouse/original_access_logs
Found 1 items
-rw-r--r--   1 hdfs hive   39593868 2016-01-05 08:06 /user/hive/warehouse/original_access_logs/access.log.2

Build a table in Hive (Beeline)
First, you'll take advantage of Hive's flexible SerDe (serializers / deserializers)
to parse the logs into individual fields using a regular expression
beeline -u jdbc:hive2://quickstart:10000/default -n admin -d org.apache.hive.jdbc.HiveDriver
CREATE EXTERNAL TABLE intermediate_access_logs (
 date STRING,
 method STRING,
 url STRING,
 http_version STRING,
 code1 STRING,
 code2 STRING,
 dash STRING,
 user_agent STRING)
 ROW FORMAT SERDE 'org.apache.hadoop.hive.contrib.serde2.RegexSerDe'
 'input.regex' = '([^ ]*) - - \\[([^\\]]*)\\] "([^\ ]*) ([^\ ]*) ([^\ ]*)" (\\d*) (\\d*) "([^"]*)" "([^"]*)"',
 'output.format.string' = "%1$s %2$s %3$s %4$s %5$s %6$s %7$s %8$s %9$s"
 LOCATION '/user/hive/warehouse/original_access_logs';

Second, you'll transfer the data from this intermediate table to one that does not require any special SerDe
CREATE EXTERNAL TABLE tokenized_access_logs (
 date STRING,
 method STRING,
 url STRING,
 http_version STRING,
 code1 STRING,
 code2 STRING,
 dash STRING,
 user_agent STRING)
 LOCATION '/user/hive/warehouse/tokenized_access_logs';

ADD JAR /usr/lib/hive/lib/hive-contrib.jar;
INSERT OVERWRITE TABLE tokenized_access_logs SELECT * FROM intermediate_access_logs;

query in Hue + Impala
invalidate metadata;
show tables;

select count(*),url from tokenized_access_logs
where url like '%\/product\/%'
group by url order by count(*) desc;

Pig, PigLatin

– platform for data processing
– PigLatin: high level language
– pig execution: local, MR, Tez
– extensible
– usage: ETL; manipulating, analysing raw data
– can run as scripts, embedded programs

interactive shell example
hdfs dfs -put /etc/passwd /user/cloudera/
pig -x mapreduce
… >
grunt> A = load '/user/cloudera/passwd' using PigStorage(':');
grunt> B = foreach A generate $0, $4, $5 ;
grunt> dump B;

grunt> store B into 'userinfo.out';

[cloudera@quickstart ~]$ hdfs dfs -ls /user/cloudera/userinfo.out
Found 2 items
-rw-r--r--   1 cloudera cloudera          0 2016-01-07 09:00 /user/cloudera/userinfo.out/_SUCCESS
-rw-r--r--   1 cloudera cloudera       1459 2016-01-07 09:00 /user/cloudera/userinfo.out/part-m-00000


– query, manage data using HiveQL (SQL-like)
– run interactively using beeline, other run options (Hcatalog, WebHcat)
– warehouse tools
– exec.environments: MR, Tez, Spark
– data in HDFS, HBase
– custom mappers/reducers
– usage: data mining, analytics; ML; ad hoc analysis

interactive shell example
$ beeline -u jdbc:hive2://
> create table userinfo (uname STRING, pswd STRING, uid INT, gid INT, fullname STRING, hdir STRING, shell STRING)
 row format delimited fields terminated by ':' stored as textfile;
> load data inpath '/tmp/passwd' overwrite into table userinfo;
> select uname, fullname, hdir from userinfo order by uname;
|     uname      |           fullname            |             hdir              |
| abrt           |                               | /etc/abrt                     |
| adm            | adm                           | /var/adm                      |


– non-relational distributed database on top of HDFS
– compression, in-memory operations (memstore, blockcache)
– Consistency, high Availability, Partitioning (auto sharding)
– replication, security
– SQL-like access (using Hive, Spark, Impala)
– HBase API, external API, MR

example, interactively HBase shell
$ hbase shell
create 'userinfotable', {NAME => 'username'}, {NAME => 'fullname'}, {NAME => 'homedir'} 
put 'userinfotable', 'r1', 'username', 'vcsa' 
put 'userinfotable', 'r1', 'fullname', 'VirtualMachine Admin'
put 'userinfotable', 'r2', 'username', 'sasuser'
put 'userinfotable', 'r3', 'username', 'postfix'

hbase(main):009:0> scan 'userinfotable'
ROW                                  COLUMN+CELL                                                                                              
 r1                                  column=fullname:, timestamp=1452193130567, value=VirtualMachine Admin                                    
 r1                                  column=username:, timestamp=1452193080935, value=vcsa                                                    
 r2                                  column=username:, timestamp=1452193088826, value=sasuser                                                 
 r3                                  column=username:, timestamp=1452193096044, value=postfix  

hbase(main):011:0> scan 'userinfotable', {COLUMNS => 'fullname'}
ROW                                  COLUMN+CELL                                                                                              
 r1                                  column=fullname:, timestamp=1452193130567, value=VirtualMachine Admin


– bin/hdfs script:
– user commands, filesystem shell commands
– administrator commands
– debug commands
– native Java API, org.apache.hadoop.fs.FileSystem
– C API, libhdfs
– NFS gateway
– other options: Flume, Sqoop, …

commands example
– hdfs dfs -ls /
– hdfs dfs -mkdir /user/test
dd if=/dev/zero of=sample.txt bs=64M count=16
sudo -u hdfs hdfs dfs -chown -R cloudera /user/test
sudo -u hdfs hdfs dfs -put sample.txt /user/test/
hdfs fsck /user/test/sample.txt
sudo -u hdfs hdfs dfsadmin -report

– org.apache.hadoop.fs.FileSystem
– FSDataInputStream, FSDataOutputStream
– methods: get, open, create
FileSystem fs = FileSystem.get(URI.create(uri), conf);
in = fs.open(new Path(uri)); 
IOUtils.copyBytes(in, System.out, 4096, false);

– in hdfs-site.xml: dfs.webhdfs.enabled; dfs.web.authentication.kerberos.principal; ...kerberos.keytab
hdfs getconf -confKey dfs.webhdfs.enabled
– example
service hadoop-httpfs start
curl -i "http://quickstart.cloudera:14000/webhdfs/v1/user/cloudera?user.name=cloudera&op=GETFILESTATUS" 
curl -i -X PUT "http://quickstart.cloudera:14000/webhdfs/v1/user/test?user.name=cloudera&op=MKDIRS&permission=755" 
curl -i "http://quickstart.cloudera:14000/webhdfs/v1/user/test?user.name=cloudera&op=GETCONTENTSUMMARY" 


word count example
– mapper take line; produce pairs (word, 1)
– MR sort mapper output, feed it to reducer
– reducer take (word, 1); if current word == input word => word count += 1; output (word, count) if curr. word != inp.word

for line in sys.stdin:  
    line = line.strip()
    keys = line.split()
    for key in keys:
        value = 1        
        print('{0}\t{1}'.format(key, value))

last_key      = None
running_total = 0

for input_line in sys.stdin:
    input_line = input_line.strip()
    this_key, value = input_line.split("\t", 1)
    value = int(value)

    if last_key == this_key:
        running_total += value
        if last_key is not None:
            print( "{0}\t{1}".format(last_key, running_total) )
        last_key = this_key
        running_total = value

if last_key == this_key:
    print( "{0}\t{1}".format(last_key, running_total)) 

test and run
> chmod +x wordcount_mapper.py
> chmod +x wordcount_reducer.py
> echo "A long time ago in a galaxy far far away" > /home/cloudera/testfile1
> echo "Another episode of Star Wars" > /home/cloudera/testfile2
> cat testfile* | ./<mapper.py> | sort | ./<reducer.py>
hdfs dfs -put /home/cloudera/testfile1 /user/cloudera/input
hdfs dfs -put /home/cloudera/testfile2 /user/cloudera/input
hadoop jar /usr/lib/hadoop-mapreduce/hadoop-streaming.jar \
   -input /user/cloudera/input \
   -output /user/cloudera/output_new \
   -mapper /home/cloudera/wordcount_mapper.py \
   -reducer /home/cloudera/wordcount_reducer.py \
   -numReduceTasks 1
hdfs dfs -cat /user/cloudera/output_new/part-00000
> hdfs dfs -getmerge /user/cloudera/output_new_0/* ./wordcount_num0_output.txt

assignment: MR joining data 
(The show-to-channel relationship is Many-to-Many)
– find all ABC shows;
– group by show name;
– sum counts for each show
# A: show,cnannel
# B: show,count
# mapper 
for line in sys.stdin:
    line = line.strip()
    show, channel = line.split(",")
    count = 'NULL'
    if channel.isdigit():
        count = int(channel)
        channel = 'NULL'
    # we interested only in ABC channel
    if count != 'NULL' or channel == 'ABC':
        print('%s\t%s\t%s' % (show, channel, count))

# reducer 
def dumpAccum():
    global total
    if 'ABC' in channels:
        show = currentShow
        vnum = sum(counts)
        total += vnum
        print("{0} {1}".format(show, vnum))
for line in sys.stdin:
    line = line.strip()
    show, channel, count = line.split('\t')
    if show != currentShow and currentShow:
    currentShow = show
# print total


– on top of HDFS, YARN but can work standalone on any storage
– fast (cache, no intermediate HDFS writes)
– interactive (Scala, Python, R shells)
– iterative
– any DAG workflow

PySpark setup
sudo easy_install ipython==1.2.1

– transformations (lazy)
– actions
– caching
– shared variables (one-way transfer)

RDD: resilient distributed dataset (from storage, from RDD transformations)
– divided in partitions (atomic chunks of data), immutable;
– track history of each partition, re-run if necessary;
a_RDD = sc.parallelize(range(10), 3) 
b_RDD = a_RDD.map(lambda x: x+1)) # transformation 
b_RDD.glom.collect() # action 

join example
fileA = sc.textFile("input/join1_FileA.txt")
fileB = sc.textFile("input/join1_FileB.txt")
def split_fileA(line):
    word, cnt = line.split(',')
    count = int(cnt)
    return (word, count)
fileA_data = fileA.map(split_fileA)
# collect(); take(2)
def split_fileB(line):
    date, val = line.split(' ')
    word, count_string = val.split(',')
    return (word, date + " " + count_string)
fileB_data = fileB.map(split_fileB)
fileB_joined_fileA = fileB_data.join(fileA_data)

broadcast variables
– large variable used in all nodes read-only
– transfer just once per Executor
– torrent-like transfer
– good for config, lookup table, join (if table fits in memory)
conf = sc.broadcast({a: b, c: d})

– write-only on nodes
– collect data across the cluster
acc = sc.accumulator(0) 
def test(x): acc.add(x) 

original post http://vasnake.blogspot.com/2016/02/code-for-hadoop-platform-and.html

