工作需要,自定义实现hadoop的一个inputformat,使用v1的接口(org.apache.hadoop.mapred),此inputformat的功能为读取mysql数据库的数据,将这些数据分成几块作为多个InputSplit,
package com.demo7;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.Statement;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
import org.apache.log4j.Logger;
import com.google.common.base.Joiner;
public class MysqlInputformat implements InputFormat<Text,Text>{
private static Logger logger = Logger.getLogger(MysqlInputformat.class);
private String beginTradeDay = Config.getConfig().getProperty("dealday.begin.mysqlinputformat");
private String endTradeDay = Config.getConfig().getProperty("dealday.end.mysqlinputformat");
private int oneTaskStocks = Integer.valueOf(Config.getConfig().getProperty("stocknumber_permap.mysqlinputformat"));
@Override
public RecordReader<Text, Text> getRecordReader(InputSplit split,
JobConf conf, Reporter reporter) throws IOException {
MysqlInputSplit mysqlSplit = (MysqlInputSplit)split;
logger.info("---------------------ln------------------------");
logger.info(mysqlSplit.tradeDay);
logger.info(mysqlSplit.stockcodes);
return new MysqlRecordReader(mysqlSplit.tradeDay, mysqlSplit.stockcodes);
}
@Override
public InputSplit[] getSplits(JobConf arg0, int arg1) throws IOException {
ArrayList<InputSplit> splits = new ArrayList<InputSplit>();
logger.info(String.format("begin generate map task, from %s to %s", beginTradeDay, endTradeDay));
HashMap<String,ArrayList<String>> dayStocks = new HashMap<String,ArrayList<String>>(); //key为交易日,value为股票列表
Connection conn = null;
try {
conn = DriverManager.getConnection(MysqlProxy.getProxoolUrl(), MysqlProxy.getProxoolConf());
// 创建一个Statement对象
Statement stmt = conn.createStatement(); // 创建Statement对象
String sql = String.format(
"select date,stock_code from gushitong.s_kline_day_complexbeforeright_ln " +
" where date>='%s' and date<='%s'",
beginTradeDay, endTradeDay);
ResultSet rs = stmt.executeQuery(sql);// 创建数据对象
String date = null;
String stockcode = null;
while (rs.next()) {
date = rs.getString("date");
stockcode = rs.getString("stock_code");
if(dayStocks.containsKey(date) == false){
dayStocks.put(date, new ArrayList<String>(3300));
}
dayStocks.get(date).add(stockcode);
}
rs.close();
stmt.close();
conn.close();
} catch (Exception e) {
logger.error(e);
}
Joiner joiner = Joiner.on(":").useForNull("");
SimpleDateFormat sdf_1 = new SimpleDateFormat("yyyyMMdd");
SimpleDateFormat sdf_2 = new SimpleDateFormat("yyyy-MM-dd");
for(Map.Entry<String, ArrayList<String>> dayStockEntry : dayStocks.entrySet()){
String tradeDay = dayStockEntry.getKey();
for(int i=0; i<dayStockEntry.getValue().size();){
int endindex;
if(i+oneTaskStocks<=dayStockEntry.getValue().size()){
endindex = i+oneTaskStocks;
}else{
endindex = dayStockEntry.getValue().size();
}
String stocks = joiner.join(dayStockEntry.getValue().subList(i, endindex));
i = endindex;
try {
MysqlInputSplit split = new MysqlInputSplit();
split.tradeDay = sdf_2.format(sdf_1.parse(tradeDay));
split.stockcodes = stocks;
splits.add(split);
} catch (ParseException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
InputSplit[] rtn = splits.toArray(new InputSplit[splits.size()]);
return rtn;
}
public static class MysqlInputSplit implements InputSplit{
public String getTradeDay() {
return tradeDay;
}
public void setTradeDay(String tradeDay) {
this.tradeDay = tradeDay;
}
public String getStockcodes() {
return stockcodes;
}
public void setStockcodes(String stockcodes) {
this.stockcodes = stockcodes;
}
private String tradeDay = null;
private String stockcodes = null;
@Override
public void readFields(DataInput in) throws IOException {
this.tradeDay = Text.readString(in);
this.stockcodes = Text.readString(in);
}
@Override
public void write(DataOutput out) throws IOException {
Text.writeString(out, tradeDay);
Text.writeString(out, stockcodes);
}
@Override
public long getLength() throws IOException {
// TODO Auto-generated method stub
return 0;
}
@Override
public String[] getLocations() throws IOException {
String[] arr = {"aa"}; //必须有,因为不管有没有用,框架都要用。
return arr;
}
}
public static class MysqlRecordReader implements RecordReader<Text, Text>{
public String tradeDay = null;
public String stockcodes = null;
private boolean isDeal = false;
private long begintimeLong = new Date().getTime();
public MysqlRecordReader(String tradeDay, String stockcodes){
this.tradeDay = tradeDay;
this.stockcodes = stockcodes;
}
@Override
public void close() throws IOException {
// TODO Auto-generated method stub
}
@Override
public Text createKey() {
return new Text();
}
@Override
public Text createValue() {
return new Text();
}
@Override
public long getPos() throws IOException {
// TODO Auto-generated method stub
return 0;
}
/**
* 预计15小时为100%
*/
@Override
public float getProgress() throws IOException {
logger.info(String.format("get process is %f", ((float)(new Date().getTime() - this.begintimeLong))/(float)(15*3600*1000)));
return Math.min(0.9f, ((float)(new Date().getTime() - this.begintimeLong))/(float)(15*3600*1000));
}
@Override
public synchronized boolean next(Text key, Text value) throws IOException {
if(this.isDeal == true){
return false;
}else{
this.isDeal = true;
}
key.set(this.tradeDay);
value.set(this.stockcodes);
return true;
}
}
}
分享到:
相关推荐
Hadoop 用mapreduce实现Wordcount实例,绝对能用
upon the widely used and highly successful Hadoop MapReduce v1. The recipes that will help you analyze large and complex datasets with next generation Hadoop MapReduce will provide you with the skills...
用MapReduce实现TF-IDF,Hadoop版本是2.7.7,参考某教程亲自手写的,可以运行,有问题可以留言
基于Hadoop Mapreduce 实现酒店评价文本情感分析(python源码+项目说明).zip基于Hadoop Mapreduce 实现酒店评价文本情感分析(python源码+项目说明).zip基于Hadoop Mapreduce 实现酒店评价文本情感分析(python...
自定义MapReduce的InputFormat,实现提取指定开始与结束限定符的内容。
本书对Hadoop Mapreduce进行详细讲解,切合实际应用,能够更深入地学习MapReduce,确实是一本不错的书。
基于Hadoop Mapreduce 实现酒店评价文本情感分析(python开发源码+项目说明).zip基于Hadoop Mapreduce 实现酒店评价文本情感分析(python开发源码+项目说明).zip基于Hadoop Mapreduce 实现酒店评价文本情感分析...
Hadoop MapReduce Cookbook 高清完整版PDF下载 Hadoop MapReduce Cookbook
在hadoop平台上,用mapreduce编程实现大数据的词频统计
Hadoop mapreduce 实现KMeans,可用
Java操作Hadoop Mapreduce基本实践源码.
本章介绍了 Hadoop MapReduce,同时发现它有以下缺点: 1、程序设计模式不容易使用,而且 Hadoop 的 Map Reduce API 太过低级,很难提高开发者的效率。 2、有运行效率问题,MapReduce 需要将中间产生的数据保存到...
Hadoop MapReduce v2 Cookbook (第二版), Packt Publishing
Hadoop mapreduce 实现MatrixMultiply矩阵相乘
基于Apriori算法的频繁项集Hadoop mapreduce
这本书都是实例,很接地气,多加练习和阅读,可稳步上升
Hadoop mapreduce 实现MR_DesicionTreeBuilder 决策树
Hadoop mapreduce 实现NaiveBayes朴素贝叶斯
Hadoop mapreduce 实现InvertedIndexer倒排索引,能用。
Hadoop MapReduce v2 Cookbook, 2nd Edition-Packt Publishing(2015) 高清完整版PDF下载