tonglin0325的个人主页

Flink学习笔记——Flink MySQL CDC

Flink CDC提供了一系列connector,用于从其他数据源获取变更数据(change data capture),其中的Flink MySQL CDC基于Debezium

官方文档

1
2
https://ververica.github.io/flink-cdc-connectors/release-2.3/content/about.html

官方github

1
2
https://github.com/ververica/flink-cdc-connectors

Flink和Flink CDC的版本对应关系参考:

1
2
https://nightlies.apache.org/flink/flink-cdc-docs-release-3.1/docs/connectors/flink-sources/overview/

各种数据源使用案例,参考:

基于 AWS S3、EMR Flink、Presto 和 Hudi 的实时数据湖仓 – 使用 EMR 迁移 CDH

Flink CDC关于source和sink全调研及实践

原理

Flink MySQL CDC官方文档:https://github.com/apache/flink-cdc/blob/master/docs/content/docs/connectors/flink-sources/mysql-cdc.md

MySQL binlog可以参考:

全文 >>

SpringBoot学习笔记——mock

可以使用mock对springboot web接口进行测试

1.依赖

1
2
3
4
5
6
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>

2.编写测试用例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
import org.apache.tomcat.util.codec.binary.Base64;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.boot.test.autoconfigure.web.servlet.AutoConfigureMockMvc;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.http.HttpHeaders;
import org.springframework.test.context.junit4.SpringRunner;
import org.springframework.test.web.servlet.MockMvc;
import org.springframework.test.web.servlet.request.MockMvcRequestBuilders;
import org.springframework.test.web.servlet.result.MockMvcResultHandlers;
import org.springframework.test.web.servlet.result.MockMvcResultMatchers;

import javax.annotation.Resource;

import java.nio.charset.StandardCharsets;

@RunWith(SpringRunner.class)
@SpringBootTest
@AutoConfigureMockMvc
public class HelloControllerTest {

@Resource
private MockMvc mockMvc;

@Test
public void helloTest() throws Exception {
String expect = "{\"code\":200,\"msg\":\"hello: 1\",\"data\": null}";
String auth = "admin:admin";
byte[] originAuth = auth.getBytes(StandardCharsets.US_ASCII);
byte[] encodedAuth = Base64.encodeBase64(originAuth);
String authHeader = "Basic " + new String(encodedAuth);
HttpHeaders headers = new HttpHeaders();
headers.set("Authorization", authHeader);
mockMvc.perform(MockMvcRequestBuilders.get("/hello/1")
.headers(headers))
.andExpect(MockMvcResultMatchers.content().json(expect))
.andDo(MockMvcResultHandlers.print());
}

}

测试GET请求

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
MockHttpServletRequest:
HTTP Method = GET
Request URI = /hello/1
Parameters = {}
Headers = [Authorization:"Basic YWRtaW46YWRtaW4="]
Body = null
Session Attrs = {SPRING_SECURITY_CONTEXT=SecurityContextImpl [Authentication=UsernamePasswordAuthenticationToken [Principal=org.springframework.security.core.userdetails.User [Username=admin, Password=[PROTECTED], Enabled=true, AccountNonExpired=true, credentialsNonExpired=true, AccountNonLocked=true, Granted Authorities=[]], Credentials=[PROTECTED], Authenticated=true, Details=WebAuthenticationDetails [RemoteIpAddress=127.0.0.1, SessionId=1], Granted Authorities=[ROLE_USER]]]}

Handler:
Type = com.example.demo.controller.HelloController
Method = com.example.demo.controller.HelloController#hello(Integer)

Async:
Async started = false
Async result = null

Resolved Exception:
Type = null

ModelAndView:
View name = null
View = null
Model = null

FlashMap:
Attributes = null

MockHttpServletResponse:
Status = 200
Error message = null
Headers = [Content-Type:"application/json", X-Content-Type-Options:"nosniff", X-XSS-Protection:"1; mode=block", Cache-Control:"no-cache, no-store, max-age=0, must-revalidate", Pragma:"no-cache", Expires:"0", X-Frame-Options:"DENY"]
Content type = application/json
Body = {"code":200,"msg":"hello: 1","data":null}
Forwarded URL = null
Redirected URL = null
Cookies = []

Process finished with exit code 0

测试POST请求

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
@Test
@Transactional
public void createUserTest() throws Exception {
String expect = "{\"code\":200,\"msg\":\"success\",\"data\": null}";
String auth = "admin:admin";
byte[] originAuth = auth.getBytes(StandardCharsets.US_ASCII);
byte[] encodedAuth = Base64.encodeBase64(originAuth);
String authHeader = "Basic " + new String(encodedAuth);
HttpHeaders headers = new HttpHeaders();
headers.set("Authorization", authHeader);
mockMvc.perform(
MockMvcRequestBuilders.post("/user")
.headers(headers)
.contentType(MediaType.APPLICATION_JSON)
.content("{\n" +
" \"username\": \"admin\",\n" +
" \"password\": \"admin\"\n" +
"}"))
.andExpect(MockMvcResultMatchers.content().json(expect))
.andDo(MockMvcResultHandlers.print());
}  

在对POST请求的测试中,添加了

全文 >>

SpringBoot学习笔记——aop

它是一种在运行时,动态地将代码切入到类的指定方法、指定位置上的编程思想。用于切入到指定类指定方法的代码片段叫做切面,而切入到哪些类中的哪些方法叫做切入点

AOP编程允许把遍布应用各处的功能分离出来形成可重用的组件

全文 >>

SpringBoot学习笔记——统一的service接口

统一的service接口基于统一的mapper,参考:Mybatis学习笔记——通用mapper

接口AbstractService

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
package com.example.demo.core.service;

import java.util.List;

public interface AbstractService<T> {

/**
* 获取所有
*
* @return List<T>
*/
List<T> listObjects();

/**
* 通过key查找
*
* @param key Object
* @return T
*/
T selectByKey(Object key);

/**
* 根据实体条件查找
*
* @param example Object
* @return List<T>
*/
List<T> selectByExample(Object example);

/**
* 持久化
*
* @param entity T
* @return key
*/
int save(T entity);

/**
* 通过主鍵刪除
*
* @param key Object
*/
int deleteByKey(Object key);

/**
* 通过example条件刪除
*
* @param example T
*/
int deleteByExample(T example);

/**
* 更新
*
* @param entity T
*/
int update(T entity);

}

抽象类AbstractServiceImpl

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
package com.example.demo.core.service.impl;

import com.example.demo.core.mapper.MyMapper;
import com.example.demo.core.service.AbstractService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;

import java.util.List;

@Transactional(propagation = Propagation.SUPPORTS, readOnly = true, rollbackFor = Exception.class)
public abstract class AbstractServiceImpl<T> implements AbstractService<T> {

@Autowired
protected MyMapper<T> mapper;

@Override
public List<T> listObjects() {
return mapper.selectAll();
}

@Override
public T selectByKey(Object key) {
return mapper.selectByPrimaryKey(key);
}

@Override
public List<T> selectByExample(Object example) {
return mapper.selectByExample(example);
}

@Override
@Transactional
public int save(T entity) {
return mapper.insert(entity);
}

@Override
@Transactional
public int deleteByKey(Object key) {
return mapper.deleteByPrimaryKey(key);
}

@Override
@Transactional
public int deleteByExample(Object key) {
return mapper.deleteByExample(key);
}

@Override
@Transactional
public int update(T entity) {
return mapper.updateByPrimaryKeySelective(entity);
}

}

参考:

1
2
https://github.com/febsteam/FEBS-Security/blob/master/febs-common/src/main/java/cc/mrbird/common/service/impl/BaseService.java

以及

1
2
https://github.com/Zoctan/WYUOnlineJudge/blob/master/api/src/main/java/com/zoctan/api/core/service/AbstractService.java

之后在使用的时候,可以通过继承AbstractServiceImpl的方式,来省略一些通用service方法的编写,比如

UserService接口

1
2
3
4
5
6
7
8
package com.example.demo.service;

import com.example.demo.core.service.AbstractService;
import com.example.demo.model.User;

public interface UserService extends AbstractService<User> {
}

UserServiceImpl实现类

1
2
3
4
5
6
7
8
9
10
11
12
package com.example.demo.service.impl;

import com.example.demo.core.service.impl.AbstractServiceImpl;
import com.example.demo.model.User;
import com.example.demo.service.UserService;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

@Service
@Transactional(rollbackFor = Exception.class)
public class UserServiceImpl extends AbstractServiceImpl<User> implements UserService {
}

进行测试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
package com.example.demo.service.impl;

import com.example.demo.model.User;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

import java.util.List;

@RunWith(SpringJUnit4ClassRunner.class)
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.DEFINED_PORT)
public class UserServiceImplTest {

@Autowired
private UserServiceImpl userService;

@Test
public void UserMapper() {
List<User> list = userService.listObjects();
for (User user: list) {
System.out.println(user.getId());
}
}

}

全文 >>