当我们在终端中输入hive时,会执行位于$HIVE_HOME/bin/hive的一个脚本,这个脚本又会执行bin下面的ext目录中的每个sh脚本。在ext目录下面包含了很多脚本,用于启动各种Hive所依赖的服务。
Liangs-MacBook-Pro:ext pwrliang$ ll|grep .sh -rwxr-xr-x 1 pwrliang staff 1679 Jan 26 16:13 beeline.sh -rwxr-xr-x 1 pwrliang staff 1028 Jan 26 16:13 cli.sh -rwxr-xr-x 1 pwrliang staff 3199 Jan 26 16:13 debug.sh -rwxr-xr-x 1 pwrliang staff 1456 Jan 26 16:13 help.sh -rwxr-xr-x 1 pwrliang staff 1187 Jan 26 16:13 hiveburninclient.sh -rwxr-xr-x 1 pwrliang staff 1214 Jan 26 16:13 hiveserver.sh -rwxr-xr-x 1 pwrliang staff 1118 Jan 26 16:13 hiveserver2.sh -rwxr-xr-x 1 pwrliang staff 1625 Jan 26 16:13 hwi.sh -rwxr-xr-x 1 pwrliang staff 1424 Jan 26 16:13 jar.sh -rwxr-xr-x 1 pwrliang staff 1220 Jan 26 16:13 lineage.sh -rwxr-xr-x 1 pwrliang staff 1271 Jan 26 16:13 metastore.sh -rwxr-xr-x 1 pwrliang staff 1101 Jan 26 16:13 metatool.sh -rwxr-xr-x 1 pwrliang staff 1073 Jan 26 16:13 orcfiledump.sh -rwxr-xr-x 1 pwrliang staff 1059 Jan 26 16:13 rcfilecat.sh -rwxr-xr-x 1 pwrliang staff 1080 Jan 26 16:13 schemaTool.sh -rwxr-xr-x 1 pwrliang staff 1266 Jan 26 16:13 version.sh
这些脚本中cli.sh是我们关注的入口,中cli.sh中会调用hadoop jar命令,启动”org.apache.hadoop.hive.cli.CliDriver”。
CliDriver的第620行是main函数,在这里实例化CliDriver并调用run方法。
public static void main(String[] args) throws Exception {
int ret = new CliDriver().run(args);
System.exit(ret);
}
在CliDriver的构造函数中,会获得SessionState实例。SessionState封装了与会话相关的公共数据。SessionState同样为线程静态会话对象提供支持,这样就可以在代码的任何地方访问Session,并且能够获得配置信息。
public CliDriver() {
SessionState ss = SessionState.get();
conf = (ss != null) ? ss.getConf() : new Configuration();
Log LOG = LogFactory.getLog("CliDriver");
if (LOG.isDebugEnabled()) {
LOG.debug("CliDriver inited with classpath " + System.getProperty("java.class.path"));
}
console = new LogHelper(LOG);
}
每个线程都与SessionStates相关联,SessionStates包含SessionState与HiveConf实例。
// SessionState is not available in runtime and Hive.get().getConf() is not safe to call
private static class SessionStates {
private SessionState state;
private HiveConf conf;
private void attach(SessionState state) {
this.state = state;
attach(state.getConf());
}
private void attach(HiveConf conf) {
this.conf = conf;
ClassLoader classLoader = conf.getClassLoader();
if (classLoader != null) {
Thread.currentThread().setContextClassLoader(classLoader);
}
}
}
SessionStates实例化后,state与conf变量都为空。当CliDriver调用ss.get方法获取一个与线程关联的SessionStates,会得到空的state变量。CliDriver发现state为空,会创建Configuration实例。
当CliDriver实例化完成后,会执行run方法。在run方法中实例化OptionsProcessor、初始化Log4j、创建CliSessionState。其中CliSessionState继承自SessionState,CliSessionState添加了database、execString、filename等属性。
public class CliSessionState extends SessionState {
/**
* -database option if any that the session has been invoked with.
*/
public String database;
/**
* -e option if any that the session has been invoked with.
*/
public String execString;
/**
* -f option if any that the session has been invoked with.
*/
public String fileName;
/**
* properties set from -hiveconf via cmdline.
*/
public Properties cmdProperties = new Properties();
/**
* -i option if any that the session has been invoked with.
*/
public List<String> initFiles = new ArrayList<String>();
public CliSessionState(HiveConf conf) {
super(conf);
}
....
}
在创建CliSessionState时,需要传递HiveConf实例。HiveConf继承自hadoop的Configuration类。HiveConf的构造函数需要SessionState.class来获取hiveJar路径(“apache-hive-1.2.1-bin/lib/hive-exec-1.2.1.jar”)。
当CliSessionState实例化完成后,将该实例传递给SessionState.start来启动session。Session启动时,首先将CliSessionState关联到该线程当SessionStates当state变量上. 然后调用getMSC获取SessionHiveMetaStoreClient实例。IMetaStoreClient接口包含获取数据库名、表名、分区信息等获取元数据等方法.
public static SessionState start(SessionState startSs) {
setCurrentSessionState(startSs); // 将与线程关联的SessionStates的state变量赋予CliSessionState实例,由SessionStates.attach方法实现.
....
// Get the following out of the way when you start the session these take a
// while and should be done when we start up.
try {
// Hive object instance should be created with a copy of the conf object. If the conf is
// shared with SessionState, other parts of the code might update the config, but
// Hive.get(HiveConf) would not recognize the case when it needs refreshing
Hive.get(new HiveConf(startSs.conf)).getMSC();
UserGroupInformation sessionUGI = Utils.getUGI();
FileSystem.get(startSs.conf);
// Create scratch dirs for this session
startSs.createSessionDirs(sessionUGI.getShortUserName());
....
return startSs;
}
当Session启动后,CliDriver的executeDriver方法会被调用. 在这个方法中,还会初始化另一个CliDriver实例,接下来获取用户输入的SQL语句,将SQL语句传递给CliDriver的processLine处理.
processLine方法将会把用户输入的SQL按照分号”;”拆分,每个SQL称为oneCmd. oneCmd可能是被‘\’修饰的换行SQL中其中的一行,所以需要拼接. 拼接的完整SQL存入command变量中,交给processCmd方法处理.
for (String oneCmd : line.split(";")) {
if (StringUtils.endsWith(oneCmd, "\\")) {
command += StringUtils.chop(oneCmd) + ";";
continue;
} else {
command += oneCmd;
}
if (StringUtils.isBlank(command)) {
continue;
}
ret = processCmd(command);
//wipe cli query state
SessionState ss = SessionState.get();
ss.setCommandType(null);
command = "";
lastRet = ret;
boolean ignoreErrors = HiveConf.getBoolVar(conf, HiveConf.ConfVars.CLIIGNOREERRORS);
if (ret != 0 && !ignoreErrors) {
CommandProcessorFactory.clean((HiveConf) conf);
return ret;
}
}
在processCmd方法中,获取CliSessionState实例,将即将被执行的command存入Session的lastCommand变量, 以保存上一条执行的SQL. command不光是SQL,也可能是exit、quit等命令, 还可能是一条由!开头的shell命令, 在这里我们只关注SQL.
public int processCmd(String cmd) {
CliSessionState ss = (CliSessionState) SessionState.get();
ss.setLastCommand(cmd);
// Flush the print stream, so it doesn't include output from the last command
ss.err.flush();
String cmd_trimmed = cmd.trim();
String[] tokens = tokenizeCmd(cmd_trimmed);
if (cmd_trimmed.toLowerCase().equals("quit") || cmd_trimmed.toLowerCase().equals("exit")) {
....
} else if (tokens[0].equalsIgnoreCase("source")) {
....
} else if (cmd_trimmed.startsWith("!")) {
....
} else { // local mode
try {
CommandProcessor proc = CommandProcessorFactory.get(tokens, (HiveConf) conf); //此处会初始化Driver实例
ret = processLocalCmd(cmd, proc, ss);
} catch (SQLException e) {
console.printError("Failed processing command " + tokens[0] + " " + e.getLocalizedMessage(),
org.apache.hadoop.util.StringUtils.stringifyException(e));
ret = 1;
}
}
return ret;
}
SQL将会继续交给processLocalCmd方法处理, processLocalCmd需要待执行的SQL-cmd、CommandProcessor – proc与CliSessionState – ss. 如果proc是Driver的实例, 那么就会把待执行的SQL交给org.apache.hadoop.hive.ql.Driver的run方法执行.
int processLocalCmd(String cmd, CommandProcessor proc, CliSessionState ss) {
int tryCount = 0;
boolean needRetry;
int ret = 0;
do {
try {
needRetry = false;
if (proc != null) {
if (proc instanceof Driver) {
// 该分支处理SQL
Driver qp = (Driver) proc;
PrintStream out = ss.out;
long start = System.currentTimeMillis();
if (ss.getIsVerbose()) {
out.println(cmd);
}
qp.setTryCount(tryCount);
ret = qp.run(cmd).getResponseCode();
if (ret != 0) {
qp.close();
return ret;
}
....
} else {
// 该分支处理Set等内置命令
....
}
}
} catch (CommandNeedRetryException e) {
console.printInfo("Retry query with a different approach...");
tryCount++;
needRetry = true;
}
} while (needRetry);
return ret;
}
下图显示出了从CliDriver.main开始到Driver.run的调用栈.

本文分析了从键入hive命令开始,到接受待执行到SQL语句的过程. SQL语句的实际执行交由Driver的run方法负责, 下一章我们从分析Driver类开始.