在python中连接hive和impala有很多中方式,有pyhive,impyla,pyspark,ibis等等,本篇我们就逐一介绍如何使用这些包连接hive或impala,以及如何通过kerberos认证。
如果集群没开启kerberos认证则不需要这里的代码,或者在系统环境内通过kinit命令认证也不需要这部分的代码。
krbcontext.context_shell
# -*- coding: utf-8 -*- __all__ = [ 'krbcontext', 'KRB5KinitError', ] import os, sys # import pwd import subprocess from contextlib import contextmanager class KRB5KinitError(Exception): pass # def get_login(): # ''' Get current effective user name ''' # # return pwd.getpwuid(os.getuid()).pw_name def init_ccache_as_regular_user(principal=None, ccache_file=None): '''Initialize credential cache as a regular user Return the filename of newly initialized credential cache ''' if not sys.stdin.isatty(): raise IOError('This is not running on console. So, you need to run kinit ' 'with your principal manually before anything goes.') cmd = 'kinit %(ccache_file)s %(principal)s' args = {} args['principal'] = principal args['ccache_file'] = '-c %s' % ccache_file kinit_proc = subprocess.Popen( (cmd % args).split(), stderr=subprocess.PIPE) stdout_data, stderr_data = kinit_proc.communicate() if kinit_proc.returncode > 0: raise KRB5KinitError(stderr_data) return ccache_file def init_ccache_with_keytab(principal, keytab_file, ccache_file): '''Initialize credential cache using keytab file Return the filename of newly initialized credential cache ''' cmd = 'kinit -kt %(keytab_file)s -c %(ccache_file)s %(principal)s' args = {} args['principal'] = principal args['ccache_file'] = ccache_file args['keytab_file'] = keytab_file kinit_proc = subprocess.Popen( (cmd % args).split(), stderr=subprocess.PIPE) stdout_data, stderr_data = kinit_proc.communicate() if kinit_proc.returncode > 0: raise KRB5KinitError(stderr_data) return ccache_file @contextmanager def krbcontext(using_keytab=False, **kwargs): '''A context manager for Kerberos-related actions using_keytab: specify to use Keytab file in Kerberos context if True, or be as a regular user. kwargs: contains the necessary arguments used in kerberos context. It can contain principal, keytab_file, ccache_file. When you want to use Keytab file, keytab_file must be included. ''' env_name = 'KRB5CCNAME' old_ccache = os.getenv(env_name) if using_keytab: ccache_file = init_ccache_with_keytab(**kwargs) else: ccache_file = init_ccache_as_regular_user(kwargs.get("principal"), kwargs.get("ccache_file")) os.environ[env_name] = ccache_file yield
使用pyhive连接hive
""" decorator==4.4.2 future==0.18.2 gssapi==1.6.5 krbcontext==0.10 PyHive==0.5.0 impyla==0.14.1 sasl==0.2.1 six==1.11.0 thrift_sasl==0.3.0 # 如果使用ibis或impyla需要thrift_sasl==0.2.1 thrift==0.13.0 thriftpy==0.3.5 """ from pyhive import sqlalchemy_hive,hive from krbcontext.context_shell import krbcontext
两种方式,一种是在系统环境内让当前用户通过kinit 先保持kerberos的认证,那所有的代码里就无需再写kerberos认证代码了。另一种方式,在python脚本内使用下面的代码进入kerberos认证会话:
config = { "kerberos_principal": "hive@CSDNTEST.COM.LOCAL", "keytab_file": '/home/tools/wyk/keytab/hive.keytab', "kerberos_ccache_file": '/home/tools/wyk/keytab/hive_ccache_uid', "AUTH_MECHANISM": "GSSAPI" } with krbcontext(using_keytab=True, principal=config['kerberos_principal'], keytab_file=config['keytab_file'], ccache_file=config['kerberos_ccache_file']): #在这个代码块内执行的脚本都是在kerberos认证内的。 #可在此代码块内查询hdfs hive hbase等等只要是kerberos principal有权限的组件都可以
下面的代码需要在上面的kerberos代码块内才能通过kerberos认证,如果集群没有开启kerberos认证或当期系统已使用kinit进行了认证则无需上面的代码:
con = hive.connect(host='uatnd02.csdntest.com.local',port=10000,auth='KERBEROS',kerberos_service_name="hive") #host为hiveserver2所在节点,port默认10000,为hs2的端口 cursor = con.cursor() cursor.execute('select * from dl_nccp.account limit 5') #不能有分号! # cursor.execute('desc dl_nccp.account') #不能有分号! datas = cursor.fetchall() print(datas) cursor.close() con.close()
""" decorator==4.4.2 future==0.18.2 gssapi==1.6.5 krbcontext==0.10 PyHive==0.5.0 impyla==0.14.1 sasl==0.2.1 six==1.11.0 thrift_sasl==0.2.1 # 只有这个与pyhive的不一样 thrift==0.13.0 thriftpy==0.3.5 """ from impala.dbapi import connect from krbcontext.context_shell import krbcontext
与上面的pyhive一样,略
#impyla thrift==0.2.1 与ibis依赖版本相同,可同时使用 conn = connect(host='uatnd02.csdntest.com.local', port=10000, auth_mechanism='GSSAPI',kerberos_service_name='hive') cur = conn.cursor() cur.execute('SHOW databases') #不能有分号 cur.table_exists(table_name='account',database_name='dl_nccp') #return True or False cur.ping() #return True or False cur.status() #return True or False cur.get_table_schema(table_name='account',database_name='dl_nccp') #return 表结构 类似desc print(cur.fetchall()) cur.close() conn.close()
ibis是个很强大的第三方环境包,支持对各类数据库或文件系统进行访问,功能强大,并返回pandas dataframe格式,对数据开发人员很友好,这个包我用了两年多,非常推荐。
官方文档:https://docs.ibis-project.org/getting-started.html
""" ibis-framework==0.14.0 #pip install ibis-framework[impala]==0.14.0 decorator==4.4.2 future==0.18.2 gssapi==1.6.5 krbcontext==0.10 PyHive==0.5.0 impyla==0.14.1 sasl==0.2.1 six==1.11.0 thrift_sasl==0.2.1 # 只有这个与pyhive的不一样 thrift==0.13.0 thriftpy==0.3.5 """
与上面的pyhive一样,略
官方文档以及源码里还有很多函数可供使用,这里仅列出常用的一些,建议使用ibis包的可以看看源码或官方文档。
import ibis import pandas as pd import ibis.expr.datatypes as dt conf={ "impala_host":"uathd01.csdntest.com.local", "impala_port":21050, "kerberos_service_name":"impala", "auth_mechanism":"GSSAPI", "webhdfs_host1":"uatnd01.csdntest.com.local", "webhdfs_host2":"uatnd02.csdntest.com.local", "webhdfs_port":9870 #如果是hadoop3之前的版本这里的参数改为50070 } #获取hdfs连接 try: hdfs_client = ibis.hdfs_connect(host = conf["webhdfs_host2"], port = conf["webhdfs_port"], auth_mechanism = conf["auth_mechanism"], use_https = False, verify = False) hdfs_client.ls('/') except: hdfs_client = ibis.hdfs_connect(host = conf["webhdfs_host1"], port = conf["webhdfs_port"], auth_mechanism = conf["auth_mechanism"], use_https = False, verify = False) #获取impala连接 impala_client = ibis.impala.connect(host=conf["impala_host"], port=conf["impala_port"], hdfs_client = hdfs_client, auth_mechanism=conf["auth_mechanism"], timeout = 300) # 直接读表,select * from dh_sales.r_order limit 10,返回pandas dataframe res = impala_client.table('r_order', database='dh_sales').execute(limit=10) print(type(res)) print(res.dtypes) print(res) #使用SQL查询impala res = impala_client.sql("""select area,is_oversea,dw_ins_ts,sum(gid_cnt) from dh_t3_report_crm.r_duration_area_metr where is_oversea=0 group by 1,2,3 limit 5""") df_res = res.execute() print(df_res) #列库下所有表 impala_client.list_tables(database='dl_common') #使用pandas dataframe建表,类型自动映射 impala_client.create_table(table_name='ibis_create0602',obj=sys_dict,database='default',force=True) ... #pd_res_acc 略 #pd_res_cre 略 ... #使用pandas datraframe插入数据 impala_client.insert(table_name='tmp_account',obj=pd_res_acc,database='default',overwrite=True) impala_client.insert(table_name='ibis_create',obj=pd_res_cre,database='default',overwrite=True)
执行insert或create命令可能会报下面的错误,这个是源码级别的BUG,参考下面的连接修改源码即可解决:
UnicodeEncodeError: 'latin-1' codec can't encode characters in position 160-161: ordinal not in range(256)
参考:https://github.com/ibis-project/ibis/issues/2120
vim /home/tools/python3/Python-3.6.8/lib/python3.6/site-packages/hdfs/client.py
vim /etc/profile
export SPARK_HOME=/opt/cloudera/parcels/CDH/lib/spark export SPARK_CONF_DIR=$SPARK_HOME/conf export PYTHONPATH=/home/tools/anaconda3/envs/csdn/bin export PYSPARK_PYTHON=/home/tools/anaconda3/envs/csdn/bin/python3.6
""" pyspark==2.4.5 """
与上面的pyhive一样,略
使用pyspark连接hive进行查询,并将spark dataframe转为pandas dataframe:
from __future__ import division #import findspark as fs #fs.init() import pandas as pd from pyspark.sql import HiveContext,SparkSession,SQLContext from pyspark import SparkContext, SparkConf import pyspark.sql.functions as F import datetime as dt from datetime import datetime import random import numpy as np from log3 import log_to_file, log from pyspark.sql.types import * conf = SparkConf().setMaster("yarn").setAppName("MF_return_calc") sc = SparkContext(conf=conf) sc.setLogLevel("WARN") hiveCtx = HiveContext(sc) spark = SparkSession.builder.master("yarn").appName("MF_return_calc").config("spark.debug.maxToStringFields", "100").getOrCreate() #执行SQL test_sql = """select * from dl_nccp.account limit 5""" res = hiveCtx.sql(test_sql) type(res) #返回spark dataframe res.head(3) res_pd = res.toPandas() #将spark dataframe转为pandas dataframe res_pd
好了上面四种在python中访问hive和impala的方式就介绍完了,希望对大家有帮助,谢谢!
希望本文对你有帮助,请点个赞鼓励一下作者吧~ 谢谢!