基于生产者消费者模式实现数据从 MongoDB 导入 HBase

摘要

目前有数千万的数据存储在 MongoDB 中,单台机器、单库单表。要使用 Spark 处理,所以打算先转到 HBase 中。使用生产者消费者模式,利用多线程实现该功能。

主要内容

数据是爬虫爬取的一些问答数据,存储在 MongoDB 中,大概六七千万条,一百五十多G。单库单表。

由于数据读取和写入肯定是有速度差的,因此使用生产者消费者模式,利用线程进行均衡。采用三段式,分给读取数据ID、数据内容、写入。

读取 ID

由于数据量较大,肯定不能一次读取,因此需要采用类似分页的方式读取。MongoDB 中分页有两种方式,一种是 skip,直接跳过一定数量的数据,但这种方式如果要跳过较多数据量时性能很差。

因此采取设置起始行的方式进行分页。也就是读取 N 行数据,然后以第一批数据的最后一行为起点,读取下一批 N 行数据,这种方式的效率较高。

将所有读取的数据放入一个队列中,供后续线程处理。

读取内容

另外开辟线程读取 ID 后从 MongoDB 中读取数据内容。当初这样考虑的原因是考虑到读取数据和解析可能会耗费一些时间,因此可以多开两个线程处理这部分功能。不过实际测试发现,这部分处理的速度仍然非常快,基本上一万条数据在两百毫秒左右。

这部分读取的内容封装成对象之后,再放入一个队列中,供写入线程使用。

写入

写入是一个比较耗费时间的过程,一开始使用生产中消费者模式也是出于这个考虑。不过实际测试下来,基本上读取 ID 和内容都只要一个线程即可,而写入数据则需要多开辟几个线程,其速度是相当慢。

并且对于提高 HBase 写入性能还需要专门研究,我觉得这里还有很大的提升空间。

性能分析

通过日志记录对数据读写进行分析。其中读取部分主要是读取完整内容,单单读取ID的速度非常快,可以忽略。

读写数据分别在两组不同的机器上测试:
1、MongoDB 所在 I5-8500 16G,HBase 是三台 1C 4G KVM。
2、MongoDB 所在 I7-8700K 64G,HBase 是五台 1C 16G KVM。

第一组机器

以 250W 条数据进行测试。

enter image description here

可以看到在 60W 左右,读取速度骤然下降:

enter image description here

推测这里应该是与 MongoDB 数据存储机制相关。MongDB 会将一部分数据缓存在内存中,写不下的会放到文件中。前面速度快的部分应该就是从内存中读取,后面速度骤然下降,就是从文件中读取了。

调研过 MongoDB 的数据缓存策略,是通过数据访问频繁程度控制的,无法进行手动调整,因此优化这部分性能的方法就在于提高物理内存了。该猜想可以在第二组机器测试结果里面得到验证。第二组机器内存较大,全程读取速度都非常快。

enter image description here

至于写入速度,就相对较慢了。主要瓶颈也在于此。开了三个线程,如果开更多,就会有部分写入线程被挂起,速度没有提升。HBase 的写入机制需要研究下,难道是每个 RegionServer 支持一个写入?如果是这样,写入的性能也不是很高啊。

第二组机器

以 150W 条数据进行测试。

enter image description here

可以看到,其读取速度一直很稳定,相当快。

enter image description here

写入速度也要快一些,是因为多了两台 RegionServer 的原因么?

分析脚本

绘制分析折线图采用的是 Python 的 Seaborn 类库,需要安装 Python 环境。

脚本代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import pandas as pd

df = pd.read_csv("250W_read_70W.csv")

ds = df[df['data/W'] % 50000 == 0]

ds['data/W'] = ds['data/W'] / 10000
ds['time/Min'] = ds['time/Min'] / 60000

import seaborn as sns
import matplotlib.pyplot as plt

plt.rcParams['font.sans-serif'] =['Microsoft YaHei']
f, ax= plt.subplots(figsize = (14, 8))
ax.set_title('Read-70W Speed')

sns.set_style("darkgrid")
sns.pointplot(x="data/W",y="time/Min",data=ds)
plt.show()

项目介绍

项目采用基于 Maven 的 Java 控制台项目。

依赖

项目基于 JDK 1.8,另外主要还用到了 MongoDB 驱动、HBase Client、日志等类库。其 Maven 依赖如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>2.6.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.6.0</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>1.2.0</version>
</dependency>
<dependency>
<groupId>org.mongodb</groupId>
<artifactId>mongo-java-driver</artifactId>
<version>3.9.0</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.25</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.2.3</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-core</artifactId>
<version>1.2.3</version>
</dependency>

Resources 目录

core-site.xml & hbase-site.xml

为了能够访问 HBase,需要在项目的 classpath 中加入目标集群的这两个文件。分别从 Hadoop 和 HBase 部署的配置目录中拷贝这两个文件到项目的 resources 目录中。

log4j.properties

日志配置文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# Set root logger level to DEBUG and its only appender to A1.
log4j.rootLogger=INFO, STO, F

# A1 is set to be a ConsoleAppender.
log4j.appender.STO=org.apache.log4j.ConsoleAppender

# A1 uses PatternLayout.
log4j.appender.STO.layout=org.apache.log4j.PatternLayout
log4j.appender.STO.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} [%t] %-5p %c %x - %m%n

log4j.appender.F=org.apache.log4j.DailyRollingFileAppender
log4j.appender.F.File=./log.log
log4j.appender.F.layout=org.apache.log4j.PatternLayout
log4j.appender.F.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} [%t] %-5p %c %x - %m%n

log4j.logger.com.oolong=DEBUG
#log4j.logger.org.mongodb=ERROR

logback.xml

日志配置文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
<?xml version="1.0" encoding="UTF-8"?>
<!--debug="true" : 打印logback内部状态(默认当logback运行出错时才会打印内部状态 ),配置该属性后打印条件如下(同时满足):
1、找到配置文件 2、配置文件是一个格式正确的xml文件 也可编程实现打印内部状态,例如: LoggerContext lc = (LoggerContext)
LoggerFactory.getILoggerFactory(); StatusPrinter.print(lc); -->
<!-- scan="true" : 自动扫描该配置文件,若有修改则重新加载该配置文件 -->
<!-- scanPeriod="30 seconds" : 配置自动扫面时间间隔(单位可以是:milliseconds, seconds, minutes
or hours,默认为:milliseconds), 默认为1分钟,scan="true"时该配置才会生效 -->
<configuration debug="false" scan="true" scanPeriod="30 seconds" packagingData="true">
<!-- 设置 logger context 名称,一旦设置不可改变,默认为default -->
<contextName>MedicalQATransfer</contextName>

<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<!-- encoders are by default assigned the type ch.qos.logback.classic.encoder.PatternLayoutEncoder -->
<encoder>
<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
</encoder>
</appender>

<appender name="FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
<!-- 当前活动日志文件名 -->
<file>./medicalqatransfer.log</file>
<!-- 文件滚动策略根据%d{patter}中的“patter”而定,此处为每天产生一个文件 -->
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<!-- 归档文件名“.zip或.gz结尾”,表示归档文件自动压缩 -->
<FileNamePattern>./medicalqatransfer%d{yyyyMMdd}.log.zip</FileNamePattern>
<maxHistory>30</maxHistory>
</rollingPolicy>
<encoder>
<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS}[%-5level][%thread]%logger{36} - %msg%n</pattern>
<!-- <pattern>%d{HH:mm:ss.SSS}[%-5level][%thread] - %msg%n</pattern> -->
</encoder>
</appender>

<logger name="com.oolong" level="DEBUG">
<appender-ref ref="STDOUT" />
<appender-ref ref="FILE" />
</logger>

<!--<logger name="org.mongodb" level="ERROR">-->
<!--<appender-ref ref="STDOUT" />-->
<!--<appender-ref ref="FILE" />-->
<!--</logger>-->


<!-- 至多只能配置一个root -->
<root level="debug">
<appender-ref ref="STDOUT" />
<appender-ref ref="FILE" />
</root>
</configuration>

程序入口

App

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
package com.oolong;

import com.oolong.model.QAModel;
import com.oolong.read.ReadDocumentsTask;
import com.oolong.read.ReadIdsTask;

import com.oolong.write.WriteTask;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.IOException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

public class App {
public static void main( String[] args ) throws IOException {
// 以下四行代码解决该错误,原因未知
// ERROR org.apache.hadoop.util.Shell - Failed to locate the winutils binary in the hadoop binary path
String path = new File(".").getCanonicalPath();
System.getProperties().put("hadoop.home.dir", path);
new File("./bin").mkdirs();
new File("./bin/winutils.exe").createNewFile();

Logger logger = LoggerFactory.getLogger(App.class);

LinkedBlockingQueue<String> idPool = new LinkedBlockingQueue<>();
AtomicInteger count = new AtomicInteger(0);
AtomicInteger savedCount = new AtomicInteger(0);
AtomicLong start = new AtomicLong(System.currentTimeMillis());

logger.info("Create Thread: Load Ids from MongoDB.");
Thread idReadTask = new Thread(new ReadIdsTask(idPool, 1000000));
idReadTask.start();
logger.info("Starting Thread: Load Ids from MongoDB...");

logger.info("Create Thread: Load QAData from MongoDB.");

LinkedBlockingQueue<QAModel> questionPool = new LinkedBlockingQueue<>();
ExecutorService docReadTask = Executors.newCachedThreadPool();

for (int i = 0; i < 1; i++) {
docReadTask.execute(new ReadDocumentsTask(idPool, questionPool
, 100, count, start));
}

logger.info("Starting Thread: Load QAData from MongoDB...");

ExecutorService docSaveTask = Executors.newCachedThreadPool();

for (int i = 0; i < 3; i++) {
docReadTask.execute(new WriteTask(questionPool
, 1000, savedCount, start));
}

logger.info("Starting Thread: Save QAData to HBase...");

}

}

主要逻辑

ReadIdsTask 读取 ID

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
package com.oolong.read;

import com.mongodb.client.FindIterable;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoCursor;
import com.oolong.utils.MongoUtil;
import org.bson.Document;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;

/**
* 从 MongoDB 中读取 ID,并加入队列中。
* 单线程工作,不支持多线程。
*/
public class ReadIdsTask implements Runnable {

private LinkedBlockingQueue<String> idPool;
private int capacity;
private int batchNum;
private String start;
private Logger logger;

public ReadIdsTask(LinkedBlockingQueue<String> idPool, int capacity) {
this.idPool = idPool;
this.capacity = capacity;
this.start = "";
this.batchNum = 10000;
this.logger = LoggerFactory.getLogger(ReadIdsTask.class);
}

@Override
public void run() {
MongoCollection<Document> collection = MongoUtil.getCollection();

while (true) {
try {
if (capacity - idPool.size() < batchNum) {
Thread.sleep(1000);
continue;
}

List<String> ids = this.readBatchIds(collection, start, batchNum);

if (ids.size() > 0) {
this.start = ids.get(ids.size() - 1);
idPool.addAll(ids);
logger.info("当前待处理ID: " + idPool.size());
} else {
logger.info("完成所有Id加载,结束当前任务。");
break;
}
} catch (InterruptedException e) {
logger.debug(e.getMessage());
}
}
}

private List<String> readBatchIds(MongoCollection<Document> collection, String start, int num) {
FindIterable<Document> findIterable = null;
Document fieldFilter = new Document("_id", 1);

if (!start.isEmpty()) {
Document condition = new Document("_id", new Document("$gt", start));
findIterable = collection.find(condition).projection(fieldFilter).limit(num);
} else {
findIterable = collection.find().projection(fieldFilter).limit(num);
}

MongoCursor<Document> mongoCursor = findIterable.iterator();
List<String> res = new ArrayList<>();

while(mongoCursor.hasNext()){
res.add(mongoCursor.next().getString("_id"));
}
return res;
}
}

ReadDocumentsTask 读取内容

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
package com.oolong.read;

import com.mongodb.client.FindIterable;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoCursor;
import com.oolong.model.Answer;
import com.oolong.model.QAModel;
import com.oolong.utils.MongoUtil;
import org.bson.Document;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

public class ReadDocumentsTask implements Runnable {
private LinkedBlockingQueue<String> idPool;
private LinkedBlockingQueue<QAModel> questionPool;
private int batchNum;
private AtomicInteger count;
private AtomicLong start;
private Logger logger;

public ReadDocumentsTask(LinkedBlockingQueue<String> idPool
, LinkedBlockingQueue<QAModel> questionPool, int batchNum
, AtomicInteger count, AtomicLong start) {
this.idPool = idPool;
this.questionPool = questionPool;
this.batchNum = batchNum;
this.count = count;
this.start = start;
this.logger = LoggerFactory.getLogger(ReadDocumentsTask.class
+ Thread.currentThread().getName());
}

@Override
public void run() {
MongoCollection<Document> collection = MongoUtil.getCollection();
List<String> ids = null;

while (true) {
try {
if (idPool.size() <= 0) {
Thread.sleep(500);
continue;
}

ids = new ArrayList<>();
int num = Math.min(idPool.size(), batchNum);
for (int i = 0; i <num; i++) {
ids.add(idPool.poll());
}

List<QAModel> qas = this.readDocument(collection, ids);
questionPool.addAll(qas);
count.addAndGet(qas.size());

logger.info("读取数据: " + count + " 耗费时间: "
+ (System.currentTimeMillis() - start.get()) + "ms");

} catch (InterruptedException e) {
logger.debug(e.getMessage());
}
}
}

private List<QAModel> readDocument(MongoCollection<Document> collection, List<String> ids) {

Document condition = new Document("_id", new Document("$in", ids));
FindIterable<Document> findIterable = collection.find(condition);
MongoCursor<Document> mongoCursor = findIterable.iterator();

List<QAModel> res = new ArrayList<>();
QAModel model = null;
Answer answer = null;
Document document = null;

while(mongoCursor.hasNext()){
model = new QAModel();
document = mongoCursor.next();

model.setId(document.getString("_id"));

// 省略

List<Document> ans = (ArrayList<Document>)document.get("answers");

for (Document a : ans) {
answer = new Answer();
answer.setAnswer(a.getString("answer"));

// 省略

model.getAnswers().add(answer);
}

res.add(model);
}

return res;
}
}

WriteTask 写入 HBase

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
package com.oolong.write;

import com.oolong.model.Answer;
import com.oolong.model.QAModel;

import com.oolong.utils.HBaseUtil;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

public class WriteTask implements Runnable {
private LinkedBlockingQueue<QAModel> questionPool;
private int batchNum;
private AtomicInteger savedCount;
private AtomicLong start;
private Logger logger;

public WriteTask(LinkedBlockingQueue<QAModel> questionPool, int batchNum
, AtomicInteger savedCount, AtomicLong start) {
this.questionPool = questionPool;
this.batchNum = batchNum;
this.savedCount = savedCount;
this.start = start;
this.logger = LoggerFactory.getLogger(WriteTask.class
+ Thread.currentThread().getName());
}

@Override
public void run() {
Table table = null;

while (true) {
try {
if (table == null) {
table = HBaseUtil.getTable();
Thread.sleep(1000);
} else {
break;
}
} catch (InterruptedException e) {
logger.debug(e.getMessage());
}
}

while (true) {
try {
if (questionPool.size() <= 0) {
Thread.sleep(1000);
continue;
}

List<QAModel> questions = new ArrayList<>();
int num = Math.min(questionPool.size(), batchNum);
for (int i = 0; i < num; i++) {
questions.add(questionPool.poll());
}
writeToHBase(table, questions);
savedCount.addAndGet(questions.size());
logger.info("保存数据: " + savedCount + " 耗费时间: "
+ (System.currentTimeMillis() - start.get()) + "ms");

} catch (InterruptedException e) {
logger.debug(e.getMessage());
}
}
}

private void writeToHBase(Table table, List<QAModel> questions) {
try {
List<Put> actions = new ArrayList<>();

for (QAModel model : questions) {
Put put = new Put(Bytes.toBytes(model.getId()));

put.addColumn(Bytes.toBytes("http"), Bytes.toBytes("url"),
Bytes.toBytes(model.getUrl()));

// 省略

actions.add(put);
}

table.put(actions);

} catch (IOException e) {
logger.debug(e.getMessage());
}
}
}

工具类

Constant 常量

1
2
3
4
5
6
7
8
9
10
package com.oolong.utils;

public class Constant {
public static String MONGO_IP = "192.168.0.125";
public static int MONGO_PORT = 27017;
public static String MONGO_DB = "medicalqa_8";
public static String MONGO_COLLECTION = "questions_2";

public static String HB_TABLE = "medicalqa";
}

MongoUtil MongoDB 连接工具

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
package com.oolong.utils;

import com.mongodb.MongoClient;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoDatabase;
import org.bson.Document;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MongoUtil {

private static Logger logger;

static {
logger = LoggerFactory.getLogger(MongoUtil.class);
}

public static MongoCollection<Document> getCollection() {
MongoClient mongo = new MongoClient(Constant.MONGO_IP, Constant.MONGO_PORT);
MongoDatabase database = mongo.getDatabase(Constant.MONGO_DB);

logger.info("Connect to MongoDB successfully");

MongoCollection<Document> collection = database.getCollection(Constant.MONGO_COLLECTION);
return collection;
}
}

HBaseUtil HBase 连接工具

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
package com.oolong.utils;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Table;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.net.URISyntaxException;

public class HBaseUtil {
private static Logger logger;

static {
logger = LoggerFactory.getLogger(HBaseUtil.class);
}

public static Table getTable() {
Table table = null;
try {
Configuration config = HBaseConfiguration.create();
config.addResource(new Path(ClassLoader.getSystemResource("hbase-site.xml").toURI()));
config.addResource(new Path(ClassLoader.getSystemResource("core-site.xml").toURI()));
Connection connection = ConnectionFactory.createConnection(config);
table = connection.getTable(TableName.valueOf(Constant.HB_TABLE));

logger.info("Connect to HBase successfully");
} catch (URISyntaxException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}

return table;
}
}