基本使用

1、将trans数据导入hdfs

1
2
hadoop fs -mkdir trans-201702
hadoop fs -put trans-201702.txt trans-201702

2、shell中使用hive

1
2
create database trans;
show databases;

库名不能用-符号

1
create table trans.trans201612  (line STRING);

表名不能用数字开头

加载数据进表

1
2
load data inpath 'trans-201702/trans-201702.txt' overwrite
into table trans.trans201612;

统计一下总共有多少行数据

1
select count(line) from trans.trans201612


然后用wc -l统计一下原始文件为472755103,发现少了一行,
wc -l是按\n作为行结束符统计行数,所以最后一行如果没有\n的话会统计丢失,一次少了一行

1
create table word_count as select word,count(1) as count from (select explode(split(line,’\s’)) as word from trans.trans201612) w group by word group by word;

导入Trans数据

要导入数据,首先需要在hive中建立一个表,用于存储数据的一些元信息,便于查询。
数据格式如下:

1
[Trans],600000.SH,20170203,101358480,977,272352,,,S,16.7300,400,540050,538028,

1、创建数据表trans.trans201702

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
create  table trans.test201702(
type STRING,
code STRING,
trans_date STRING,
trans_time STRING,
seq STRING,
trans_number STRING,
func STRING,
trans_order STRING,
bs STRING,
price float,
volume STRING,
ask STRING,
bid STRING)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ',';

2、修改字段名:

1
alter table trans.trans201702 change column tyep type STRING;

3、导入数据到trans.test201702

1
load data inpath 'trans-201702/trans-201702.txt' overwrite into table trans.test201702;

4、删除表:

1
drop database.table_name

查询操作

Hive的sql语句与sql类似,只要熟悉sql语句,基本上就能使用Hive就能查询出想要的数据。
1、查询股票代码

1
select wcode from trans.trans201702 group by wcode;


2、将查询的数据导出

1
insert overwrite local file '/tmp/code_list.txt' select code  from trans.trans201702 group by code;

生成的文件数 和redurcer的数目的一样的。

上面的文件需要进行合并才能看,不方便,可以通过直接导出查询结果到本地

1
bin/hive -e 'select code  from trans.trans201702 group by code;' > ~/data/code_list

3、Python 异步实现方式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
#!/bin/python
from pyhive import hive
from TCLIService.ttypes import TOperationState
cursor = hive.connect('localhost').cursor()
cursor.execute('select code from trans.trans201702 group by code', async=True)
status = cursor.poll().operationState
while status in (TOperationState.INITIALIZED_STATE, TOperationState.RUNNING_STATE):
status = cursor.poll().operationState
# If needed, an asynchronous query can be cancelled at any time with:
# cursor.cancel()
# 获取数据
try:
rows = cursor.fetchall()
except:
print 'null'
else:
for row in rows:
print row

hive streaming

Hive streaming提供了类似unix管道的功能,将查询的数据,直接传给其它的脚本处理,并且支持多种语言如python,java
编辑streaming.py

1
2
3
4
5
6
7
8
9
#!/usr/bin/env  python  
#-*- coding:utf-8 -*-
# hive streaming test
import sys

lines = []
for line in sys.stdin:
lines.append(line)
print lines

在hive中执行,加入自己定义的脚本

1
add file /home/wodehp/code/streaming.py;

然后执行:

1
select transform(bs) using 'python streaming.py' as bs from trans.trans201702  where trans_date='20170203' limit 10;

输出

Python使用hive

Hive基于thirft框架,提供一套外部调用的接口,支持java,python等程序调用。首先启动HiveServer2服务

1
nohup hiveserver2 &

可用的python包
pyhs2
PyHive
impyla

环境准备

需要安装的包为thrift_sasl,sasl,pythive,impyle
安装sasl

将下面的包全装上

PyHive使用

pyhive是dropbox开源的python连接hive的库,并且支持异步。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
#!/bin/python
from pyhive import hive
from TCLIService.ttypes import TOperationState
]
# 连接
cursor = hive.connect(‘localhost’).cursor()
# 执行查询
cursor.execute(‘SELECT * FROM trans.trans201612 limit 10’, async=True)
status = cursor.poll().operationState
while status in (TOperationState.INITIALIZED_STATE, TOperationState.RUNNING_STATE):
# 查询状态
logs = cursor.fetch_logs()
for message in logs:
print message
# If needed, an asynchronous query can be cancelled at any time with:
# cursor.cancel()
#print cursor.fetchall()
for row in cursor.fetchall():
print row

pyhs2使用

出现此问题时由于用户密码设置不对

HiveServer2默认权限没有开起
authMechanism设置为NOSASL

之后又爆出以下问题,暂时问题不明

pyhs2已经于2016年1月份停止更新,bug较多

impyla

cloudera的impyla,bug多,没有试成功
`

hive版本切换

由于安装的hive版本偏高,在使用的时候存在python连接的许多问题,现切换成hive1.2版,安装和hive2.1一样,配置也一样不需要更改。

Hive2虽然也是一个稳定的版本(2016年12发布),但是其刚出来不久,许多的配套的库并没有及时的更新用来支持,例如pyhs2在2016年1月停止更新,impyla也停止更新接近1年等,出现的问题很难找到答案。但是在版本的选择上还是需要尝试,知道存在的问题。后续将采用PyHive来作为Python连接Hive的包。

参考文章

Hive导入10G数据的测试