tonglin0325的个人主页

Kafka中bootstrap-server、broker-list和zookeeper的区别

参考 Kafka bootstrap-servers vs zookeeper in kafka-console-consumer  中说建议使用新版(新版本指的是kafka 0.8.0之后的版本)的 –bootstrap-server

 

Kafka专业术语,参考 Apache kafka 工作原理介绍

Broker:Kafka 集群包含一个或多个服务器,这种服务器被称为 broker。

Topic:每条发布到 Kafka 集群的消息都有一个类别,这个类别被称为 Topic。(物理上不同 Topic 的消息分开存储,逻辑上一个 Topic 的消息虽然保存于一个或多个 broker 上,但用户只需指定消息的 Topic 即可生产或消费数据而不必关心数据存于何处)。

Partition:Partition 是物理上的概念,每个 Topic 包含一个或多个 Partition。

Producer:负责发布消息到 Kafka broker。

Consumer:消息消费者,向 Kafka broker 读取消息的客户端。

Consumer Group:每个 Consumer 属于一个特定的 Consumer Group(可为每个 Consumer 指定 group name,若不指定 group name 则属于默认的 group)。

 

《Kafka权威指南》中是这样描述的

 

对于消费者,kafka中有两个设置的地方:对于老的消费者,由**–zookeeper参数设置;对于新的消费者,由–bootstrap-server参数**设置

如果使用了–zookeeper参数,那么consumer的信息将会存放在zk之中

查看的方法是使用./zookeeper-client,然后 ls /consumers/[group_id]/offsets/[topic]/[broker_id-part_id],这个是查看某个group_id的某个topic的offset

全文 >>

flume学习笔记——安装和使用

Flume是一个分布式、可靠、和高可用的海量日志聚合的系统,支持在系统中定制各类数据发送方,用于收集数据;
同时,Flume提供对数据进行简单处理,并写到各种数据接受方(可定制)的能力。

Flume是一个专门设计用来从大量的源,推送数据到Hadoop生态系统中各种各样存储系统中去的,例如HDFS和HBase。

Guide: http://flume.apache.org/FlumeUserGuide.html

 

体系架构

Flume的数据流由事件(Event)贯穿始终。事件是Flume的基本数据单位,它携带日志数据(字节数组形式)并且携带有头信息,这些Event由Agent外部的Source生成,当Source捕获事件后会进行特定的格式化,然后Source会把事件推入(单个或多个)Channel中。你可以把Channel看作是一个缓冲区,它将保存事件直到Sink处理完该事件。Sink负责持久化日志或者把事件推向另一个Source。

 Flume以Flume Agent最小的独立运行单位。一个Agent就是一个JVM。单agent由Source、Sink和Channel三大组件构成。一个Flume Agent可以连接一个或者多个其他的Flume Agent;一个Flume Agent也可以从一个或者多个Flume Agent接收数据。

注意:在Flume管道中如果有意想不到的错误、超时并进行了重试,Flume会产生重复的数据最终被被写入,后续需要处理这些冗余的数据。

具体可以参考文章:Flume教程(一) Flume入门教程

 

组件

Source:source是从一些其他产生数据的应用中接收数据的活跃组件。Source可以监听一个或者多个网络端口,用于接收数据或者可以从本地文件系统读取数据。每个Source必须至少连接一个Channel。基于一些标准,一个Source可以写入几个Channel,复制事件到所有或者某些Channel。

Source可以通过处理器 - 拦截器 - 选择器路由写入多个Channel。

Channel:channel的行为像队列,Source写入到channel,Sink从Channel中读取。多个Source可以安全地写入到相同的Channel,并且多个Sink可以从相同的Channel进行读取。

可是一个Sink只能从一个Channel读取。如果多个Sink从相同的Channel读取,它可以保证只有一个Sink将会从Channel读取一个指定特定的事件。

Flume自带两类Channel:Memory Channel和File Channel。Memory Channel的数据会在JVM或者机器重启后丢失;File Channel不会。

**Sink: **sink连续轮询各自的Channel来读取和删除事件。

拦截器:每次Source将数据写入Channel,它是通过委派该任务到其Channel处理器来完成,然后Channel处理器将这些事件传到一个或者多个Source配置的拦截器中。

拦截器是一段代码,基于某些标准,如正则表达式,拦截器可以用来删除事件,为事件添加新报头或者移除现有的报头等。每个Source可以配置成使用多个拦截器,按照配置中定义的顺序被调用,将拦截器的结果传递给链的下一个单元。一旦拦截器处理完事件,拦截器链返回的事件列表传递到Channel列表,即通过Channel选择器为每个事件选择的Channel。

全文 >>

Elasticsearch学习笔记——安装、数据导入和查询

到elasticsearch网站下载最新版本的elasticsearch 6.2.1

1
2
https://www.elastic.co/downloads/elasticsearch

其他版本

1
2
https://www.elastic.co/cn/downloads/past-releases/elasticsearch-6-4-2

嫌弃官方下载速度慢的可以去华为的镜像站去

1
2
https://mirrors.huaweicloud.com/elasticsearch/6.4.2/

中文文档请参考

1
2
https://www.elastic.co/guide/cn/elasticsearch/guide/current/index.html

英文文档及其Java API使用方法请参考,官方文档比任何博客都可信

1
2
https://www.elastic.co/guide/en/elasticsearch/client/java-api/current/index.html

Python API使用方法

1
2
http://elasticsearch-py.readthedocs.io/en/master/

下载tar包,然后解压到/usr/local目录下,修改一下用户和组之后可以使用非root用户启动,启动命令

1
2
./bin/elasticsearch

然后访问http://127.0.0.1:9200/

全文 >>

GraphX学习笔记——Programming Guide

学习的资料是官网的Programming Guide

1
2
https://spark.apache.org/docs/latest/graphx-programming-guide.html

 首先是GraphX的简介

GraphX是Spark中专门负责图和图并行计算的组件。

GraphX通过引入了图形概念来继承了Spark RDD:一个连接节点和边的有向图

为了支持图计算,GraphX引入了一些算子: subgraphjoinVertices, and aggregateMessages

和 Pregel API,此外还有一些algorithmsbuilders 来简化图分析任务。

 

关于构建 节点Vertex边Edge

1.如果需要将节点定义成一个类

全文 >>

GraphX学习笔记——可视化

首先自己造了一份简单的社交关系的图

第一份是人物数据,id和姓名,person.txt

1
2
3
4
5
6
7
8
9
1 孙俪
2 邓超
3 佟大为
4 冯绍峰
5 黄晓明
6 angelababy
7 李冰冰
8 范冰冰

 第二份是社交关系数据,两个人的id和社交关系,social.txt

1
2
3
4
5
6
7
8
9
10
1 丈夫 2
2 妻子 1
1 搭档 3
3 同学 4
3 好友 5
5 好友 3
5 妻子 6
5 好友 7
7 好友 8

 使用SparkX和GraphStream来处理数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
package graphx

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.graphx._
import org.apache.spark.rdd.RDD
import org.graphstream.graph.implementations.{AbstractEdge, SingleGraph, SingleNode}

/**
* Created by common on 18-1-22.
*/
object GraphxLearning {

def main(args: Array[String]): Unit = {

val conf = new SparkConf().setAppName("GraphX").setMaster("local")
val sc = new SparkContext(conf)

val path1 = "input/graphx/person.txt"
val path2 = "input/graphx/social.txt"


// 顶点RDD[顶点的id,顶点的属性值]
val users: RDD[(VertexId, (String, String))] = sc.textFile(path1).map { line =>
val vertexId = line.split(" ")(0).toLong
val vertexName = line.split(" ")(1)
(vertexId, (vertexName, vertexName))
}

// 边RDD[起始点id,终点id,边的属性(边的标注,边的权重等)]
val relationships: RDD[Edge[String]] = sc.textFile(path2).map { line =>
val arr = line.split(" ")
val edge = Edge(arr(0).toLong, arr(2).toLong, arr(1))
edge
}

// 默认(缺失)用户
//Define a default user in case there are relationship with missing user
val defaultUser = ("John Doe", "Missing")

//使用RDDs建立一个Graph(有许多建立Graph的数据来源和方法,后面会详细介绍)
val srcGraph = Graph(users, relationships, defaultUser)

val graph: SingleGraph = new SingleGraph("graphDemo")

// load the graphx vertices into GraphStream
for ((id, name) <- srcGraph.vertices.collect()) {
val node = graph.addNode(id.toString).asInstanceOf[SingleNode]
node.addAttribute("ui.label", name._1)
}

// load the graphx edges into GraphStream edges
for (Edge(x, y, relation) <- srcGraph.edges.collect()) {
val edge = graph.addEdge(x.toString ++ y.toString, x.toString, y.toString, true).asInstanceOf[AbstractEdge]
edge.addAttribute("ui.label", relation)
}

graph.setAttribute("ui.quality")
graph.setAttribute("ui.antialias")

graph.display()


}

}

可视化的结果,该图数据节点数很少,本来想尝试一份百万节点的数据,结果遇到了爆内存的问题

后来发现爆内存是肯定的,而且显示的点太多也不太利于debug,解决方法是使用subgraph()方法来对图进行裁剪以减小节点和边的数量

全文 >>

Gephi学习笔记

使用gephi对图数据进行可视化操作,下面网址是gephi的说明文档

1
2
https://seinecle.github.io/gephi-tutorials/generated-pdf/semantic-web-importer-en.pdf

使用的gephi版本号:0.9.1

系统:Ubuntu 16.04

内存:8G

 

1.启动Gephi,在semantic web import中输入,该web接口是dbpedia的RDF格式数据,然后点击run

1
2
http://dbpedia.org/sparql

 

接下来在 界面中可以看到RDF数据的可视化界面,注意这里只有100条RDF数据,包含了101个节点和100条边,这是由于查询的sparql语句中的limit 100

全文 >>

Centos7.0下MySQL的安装

1.下载mysql的repo源

1
2
wget http://repo.mysql.com/mysql-community-release-el7-5.noarch.rpm

2.安装mysql-community-release-el7-5.noarch.rpm包

1
2
sudo rpm -ivh mysql-community-release-el7-5.noarch.rpm

安装这个包后,会获得两个mysql的yum repo源:/etc/yum.repos.d/mysql-community.repo,/etc/yum.repos.d/mysql-community-source.repo。

3.安装mysql

1
2
sudo yum install mysql-server

根据提示安装就可以了,不过安装完成后没有密码,需要重置密码

4.重置mysql密码

1
2
mysql -u root

登录时有可能报这样的错:ERROR 2002 (HY000): Can‘t connect to local MySQL server through socket ‘/var/lib/mysql/mysql.sock‘ (2),原因是/var/lib/mysql的访问权限问题。下面的命令把/var/lib/mysql的拥有者改为当前用户:

1
2
sudo chown -R root:root /var/lib/mysql

重启mysql服务

1
2
service mysqld restart

全文 >>

Ubuntu16.04安装apache-airflow

1.安装apache-airflow 1.8.0

服务器使用的是centos系统,需要安装好pip和setuptools,同时注意更新安装的版本

接下来参考安装好Airflow

1
2
3
Airflow 1.8 工作流平台搭建 http://blog.csdn.net/kk185800961/article/details/78431484
airflow最简安装方法 centos 6.5 http://blog.csdn.net/Excaliburace/article/details/53818530

以mysql作为数据库,airflow默认使用sqlite作为数据库

1.建表

1
2
3
4
5
6
# 创建相关数据库及账号  
mysql> create database airflow default charset utf8 collate utf8_general_ci;
mysql> create user airflow@'localhost' identified by 'airflow';
mysql> grant all on airflow.* to airflow@'localhost';
mysql> flush privileges;

2.安装airflow,需要环境隔离的时候请使用virtualenv ./env创建隔离环境

1
2
sudo pip install apache-airflow==1.8.0

3.使用pip来安装,安装的路径在python路径下site-packages文件夹,在使用上述命令一遍就能看到

1
2
~/anaconda2/lib/python2.7/site-packages/airflow

全文 >>

在Ubuntu上使用shadow$ocks

安装

1
2
pip install shadow$ocks

创建文件

1
2
touch /etc/shadow$ocks.json

 

1
2
3
4
5
6
7
8
9
10
11
12
{
"server":"服務器IP或域名",
"server_port":端口號,
"local_address": "127.0.0.1",
"local_port":1080,
"password":"密碼",
"timeout":300,
"method":"aes-256-cfb",
"fast_open": false
}


 安装proxychains

1
2
sudo apt-get install proxychains

 编辑/etc/proxychains.conf,最后一行改为socks5 127.0.0.1 1080

 然后在root下运行

全文 >>