tonglin0325的个人主页

使用python操作Amazon S3

1.判断s3 object是否存在

1
2
3
4
5
6
7
8
9
10
11
import boto3

s3 = boto3.resource('s3')
bucket = s3.Bucket('my-bucket')
key = 'dootdoot.jpg'
objs = list(bucket.objects.filter(Prefix=key))
if any([w.key == path_s3 for w in objs]):
print("Exists!")
else:
print("Doesn't exist")

2.读取s3 object文件内容

1
2
3
4
5
6
import boto3

s3 = boto3.resource('s3')
object_content = s3.Object('my_bucket_name', object_dir)
file_content = object_content.get()['Body'].read().decode('utf-8')

3.列出s3 object目录

1
2
3
4
5
6
7
8
import boto3

s3 = boto3.resource('s3')
my_bucket = s3.Bucket('my_bucket_name')
dirs = sorted([_.key for _ in my_bucket.objects.filter(Prefix="aaa/bbb/ccc/")], reverse=True)
for dir in dirs:
print(dir)

4.查看emr集群信息,参考:https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/emr.html#EMR.Client.describe_cluster

1
2
3
4
5
6
import boto3

emr_client = boto3.client('emr',region_name='us-west-2')
response = emr_client.describe_cluster(ClusterId="j-xxxxxx")
print(response['Cluster']['MasterPublicDnsName'])

CDH5.16集成ldap

集成ldap之前请参考安装好openldap:Ubuntu16.04安装openldap和phpldapadmin

1.hadoop集成ldap

HDFS 的文件权限与 Linux/Unix 系统类似,也是采用UGO模型,分成用户、组和其他权限。其权限you两种实现方式:1.基于Linux/Unix系统的用户和用户组;2.基于使用LDAP协议的数据库

参考网易数帆的文章:HDFS权限管理实践

使用基于Linux/Unix系统的用户和用户组,即 hadoop.security.group.mapping 的值为 org.apache.hadoop.security.ShellBasedUnixGroupsMapping

使用基于使用LDAP协议的数据库,即 hadoop.security.group.mapping 的值为

全文 >>

ubuntu下JDK的安装

1.deb安装方法

安装Oracle的jdk8

1
2
3
wget https://s3.cn-north-1.amazonaws.com.cn/tfssa/packages/oracle-java/oracle-java8-jdk_8u181_amd64.deb
sudo dpkg -i ./oracle-java8-jdk_8u181_amd64.deb

如何依赖不满足

1
2
sudo apt-get install -f

如果依赖的包找不到,可以尝试

1
2
sudo mv /etc/apt/sources.list.d/backports.list /etc/apt/sources.list.d/backports.list.bak

选择使用rpm或者apt-get来安装java-8-oracle,这时的配置将会在 /etc/profile.d/jdk.sh 中

1
2
cat /etc/profile.d/jdk.sh

全文 >>

Flink学习笔记——读写hdfs

Flink自带Exactly Once语义,对于支持事务的存储,可以做到数据的不重不丢。

当使用Flink来写hdfs的时候,因为hdfs文件只能在末尾进行append,如果要做到数据不重不丢,hdfs在2.7.0及其以上的版本中提供了truncate功能,可以根据valid-length长度对hdfs文件中的无效数据进行截断操作,从而保证数据不重复。

参考:Flink exactly-once 实战笔记

关于hdfs的truncate功能可以参考:HDFS Truncate文件截断

Kafka学习笔记——Kafka Connect

kafka connect是kafka提供的一个用于在kafka和其他数据系统之间传输数据的工具

1
2
https://kafka.apache.org/documentation/#connect

1.Kafka Connect组件

1
2
https://docs.confluent.io/platform/current/connect/concepts.html

  1. Connectors – the high level abstraction that coordinates data streaming by managing tasks

Connectors定义了数据是如何拷贝进kafka以及如何复制出kafka的

包含2种形式的connector,source connector和sink connector

SpringBoot学习笔记——spring-boot-configuration-processor作用

spring-boot-configuration-processor的作用是生成配置的元数据信息,即META-INF目录下的spring-configuration-metadata.json文件,从而告诉spring这个jar包中有哪些自定义的配置

1.其中spring-configuration-metadata.json文件是在编译的时候自动生成的

2.还可以在resources目录下手动添加META-INF/additional-spring-configuration-metadata.json文件,这个文件是手动添加的,用于对spring-configuration-metadata.json进行补充,编译后会合并到spring-configuration-metadata.json中

参考:spring 自动加载配置

全文 >>

confluent学习笔记——schema registry

schema registry是confluent公司开发的一个集中式管理和验证kafka消息schema的组件。官方网站如下

1
2
https://docs.confluent.io/platform/current/schema-registry/index.html

其支持3种格式的schema:JSON,AVRO和protobuf

docker镜像地址如下

1
2
https://hub.docker.com/r/confluentinc/cp-schema-registry

其提供了schema registry的后端服务,提供API用于管理,验证和存储schema,保证了kafka的生产者和消费者可以使用schema来保证数据的一致性和兼容性。

有开源的schema registry前端UI,比如

schema-registry-ui,只支持AVRO的schema

1
2
https://hub.docker.com/r/landoop/schema-registry-ui/

kafka-ui,支持JSON,AVRO和protobuf(pb嵌套schema需要等0.8版本发布)格式的schema

1
2
https://hub.docker.com/r/provectuslabs/kafka-ui

全文 >>