mapreduce code error:(behind is the source code, there is no brother to help debugging, this error, online related information did not find, the solution will automatically pay rewards)
Take date as the key to calculate the average value of comments avg_comment, average price, number of books
package cn.bigdata.classTop500Big;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.lib.db.DBConfiguration;
import org.apache.hadoop.mapred.lib.db.DBInputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.db.DBOutputFormat;
import org.junit.Test;
import java.io.IOException;
public class JDBCDriver {
@Test
public void test() throws IOException, InterruptedException, ClassNotFoundException {
// 1.获取job
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
// 2.设置jar路径
job.setJarByClass(JDBCDriver.class);
// 3.数据输入类型为数据库输入
job.setInputFormatClass(DBInputFormat.class); //read
// 4.设置数据库配置并且连接
String driverClass = "com.mysql.jdbc.Driver";
String url = "jdbc:mysql://localhost:3306/book?zeroDateTimeBehavior=convertToNull&useUnicode=true&characterEncoding=utf-8";
String userName = "root";
String passWord = "123456";
DBConfiguration.configureDB(job.getConfiguration(), driverClass, url,
userName, passWord);
// 5.设置数据输入内容-sql查询数据作为输入数据 classtop500
DBInputFormat.setInput(job, MyDBWritable.class,
"select id,book_id,type,name,image,comment,score,author,press,date,price from classtop500",
"select count(*) from classtop500");
// 6.设置输出的表 classTop500CleanPrice
DBOutputFormat.setOutput(job,"classTop500BigScreenClean","date","book_number","avg_comment","avg_price");
// DBOutputFormat.setOutput(job,"fruitsCleanPrice","fruits_name","fruits_min_price",
// "fruits_max_price","fruits_avg_price");
// 7.关联mapper和reducer
job.setMapperClass(JDBCMapper.class);
job.setReducerClass(JDBCReduce.class);
// 8.设置map输出的kv类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
// 9.设置最终输出的kv类型
job.setOutputKeyClass(MyDBWritable.class);
job.setOutputValueClass(NullWritable.class);
// 10.提交job
boolean b = job.waitForCompletion(true);
System.exit(b ? 0 : 1);
}
}
JDBCMapper class:
package cn.bigdata.classTop500Big;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
import java.util.Date;
public class JDBCMapper extends Mapper<LongWritable, MyDBWritable, Text, Text> {
private Text text = new Text();
private Text outWritable = new Text();
// private MyDBWritable price = new MyDBWritable();
@Override
protected void map(LongWritable key, MyDBWritable value, Context context) throws IOException, InterruptedException {
String date = value.getDate(); // 读取每一行的年龄作为K
String comment = value.getAvg_comment();
float price = value.getPrice();
String price1 = comment + "\t" + price;
text.set(date);
outWritable.set(price1);
context.write(text, outWritable);
}
}
JDBCReduce class:
package cn.bigdata.classTop500Big;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
import java.util.Date;
public class JDBCReduce extends Reducer<Text, Text, MyDBWritable, NullWritable> {
// Double max = 0.0;
@Override
protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
MyDBWritable myDBName = new MyDBWritable();
Double avg_price = new Double(0.0);
Double avg_comment = new Double(0.0);
int bookNum=0;
// text = values.iterator().next();
// MyDBWritable myDBM_price = new MyDBWritable();
myDBName.setDate(key.toString());
Double comment1= new Double(0.0);
Double price1= new Double(0.0);
int num=1;
for (Text value : values){
String[] str = value.toString().split("\t");
Double comment2 = Double.parseDouble(str[0]);
Double price2 = Double.parseDouble(str[1]);
if(comment2 == 0.0 || price2 == 0.0){
continue;
}
price1=price1+price2;
comment1=comment1+comment2;
avg_price=price1/num;
avg_comment=comment1/num;
bookNum=num;
num++;
}
myDBName.setBookNumber(String.valueOf(bookNum));
String avg_comment1 =String.format("%.2f",avg_comment);
myDBName.setAvg_comment(avg_comment1);
String avg_price1 =String.format("%.2f",avg_price);
myDBName.setAvg_price(avg_price1);
context.write(myDBName, NullWritable.get());
}
}
MyDBWritable class:
package cn.bigdata.classTop500Big;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.lib.db.DBWritable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Date;
// 1.实现 DBWritable, Writable
public class MyDBWritable implements DBWritable, Writable {
// 数据库的写入字段
private int id;
private int book_id;
private String type;
private String name;
private String image;
private long comment;
private float score;
private String author;
private String press;
private String date;
private float price;
private String bookNumber;
private String avg_price;
private String avg_comment;
// 写出字段
// 2.反序列化所需要的空参构造
public MyDBWritable() {
}
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public int getBook_id() {
return book_id;
}
public void setBook_id(int book_id) {
this.book_id = book_id;
}
public String getType() {
return type;
}
public void setType(String type) {
this.type = type;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getImage() {
return image;
}
public void setImage(String image) {
this.image = image;
}
public long getComment() {
return comment;
}
public void setComment(long comment) {
this.comment = comment;
}
public float getScore() {
return score;
}
public void setScore(int score) {
this.score = score;
}
public String getAuthor() {
return author;
}
public void setAuthor(String author) {
this.author = author;
}
public String getPress() {
return press;
}
public void setPress(String press) {
this.press = press;
}
public String getDate() {
return date;
}
public void setDate(String date) {
this.date = date;
}
public float getPrice() {
return price;
}
public void setPrice(float price) {
this.price = price;
}
public String getAvg_price() {
return avg_price;
}
public void setAvg_price(String avg_price) {
this.avg_price = avg_price;
}
public void setScore(float score) {
this.score = score;
}
public String getBookNumber() {
return bookNumber;
}
public void setBookNumber(String bookNumber) {
this.bookNumber = bookNumber;
}
public String getAvg_comment() {
return avg_comment;
}
public void setAvg_comment(String avg_comment) {
this.avg_comment = avg_comment;
}
// 3.序列化
@Override
public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeInt(id);
dataOutput.writeInt(book_id);
dataOutput.writeUTF(type);
dataOutput.writeUTF(name);
dataOutput.writeUTF(image);
dataOutput.writeLong(comment);
dataOutput.writeFloat(score);
dataOutput.writeUTF(author);
dataOutput.writeUTF(press);
dataOutput.writeUTF(date);
dataOutput.writeDouble(price);
dataOutput.writeUTF(bookNumber);
dataOutput.writeUTF(avg_price);
dataOutput.writeUTF(avg_comment);
}
// 4.反序列化
@Override
public void readFields(DataInput dataInput) throws IOException {
this.id=dataInput.readInt();
this.book_id = dataInput.readInt();
this.type=dataInput.readUTF();
this.name = dataInput.readUTF();
this.image = dataInput.readUTF();
this.score = dataInput.readFloat();
this.author=dataInput.readUTF();
this.press=dataInput.readUTF();
this.date=dataInput.readUTF();
this.price=dataInput.readFloat();
// this.min_price=dataInput.readUTF();
// this.avg_price=dataInput.readUTF();
// this.max_price=dataInput.readUTF();
}
// 5.从DB读取
@Override
public void readFields(ResultSet resultSet) throws SQLException {
id=resultSet.getInt(1);
book_id = resultSet.getInt(2); // 1,2,3对应列的坐标,从1开始
type=resultSet.getString(3);
name = resultSet.getString(4);
image = resultSet.getString(5);
comment = resultSet.getLong(6);
score = resultSet.getFloat(7);
author = resultSet.getString(8);
press = resultSet.getString(9);
date = resultSet.getString(10);
price = resultSet.getFloat(11);
// min_price=resultSet.getString(11);
// avg_price=resultSet.getString(12);
// max_price=resultSet.getString(13);
}
// 6.写入数据库
@Override
public void write(PreparedStatement preparedStatement) throws SQLException {
preparedStatement.setString(1, date);
preparedStatement.setString(2, bookNumber);
preparedStatement.setString(3, avg_comment);
preparedStatement.setString(4, avg_price);
}
}
mysql table structure :
0 Answer
No answer yet
这家伙很懒,什么都没留下...