yvanliUpdagre

专注提升


  • Home

  • Tags

  • Archives

leetcode-146LRUCache

Posted on 2020-01-05

LRU Cahce 思路演进

LRU第一版本思想

头尾位独立处理实现复杂容易出错。
采用map实现o(1)获取
使用双向链表控制LRU,一直有个用例不过
golang 实现如下:测试用例 12 / 18 个通过测试用例.
经过debug后发现为题判断如果是头指针和移除节点一致时处理除了问题

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
type LRUCache struct {
dataMap map[int]*ListDataNode
head *ListDataNode
tail *ListDataNode
capacity int
use int
}
type ListDataNode struct{
data int
key int
pre *ListDataNode
next *ListDataNode
}


func Constructor(capacity int) LRUCache {
Cache :=LRUCache{
dataMap :make(map[int]*ListDataNode,capacity),
head :nil,
tail :nil,
capacity: capacity,
use:0,
}
return Cache
}
//取出操作的节点
func (this *LRUCache) TakeOutNode( node *ListDataNode){
if node.pre!=nil{
node.pre.next = node.next
}
if node.next !=nil{
node.next.pre = node.pre
}
//node 是尾节点,尾节点要前移动
if node == this.tail{
this.tail = node.pre
}
/*
//node是头节点,头节点滞空
if node == this.head{
this.head = nil
}
*/
上面是错误的写法这里更正下,移除节点是头,并不意味着链表中没有头节点啦,所以这里不应应该直接指空
if node == this.head{
this.head = node.next
}
//养成好习惯不用的指针处理掉
node.pre = nil
node.next = nil
}
func (this *LRUCache) Get(key int) int {
if node ,ok := this.dataMap[key];ok{
this.TakeOutNode(node)
this.AddHead(node)
return node.data
}
return -1
}

//把刚操作的放在最前面
func (this *LRUCache) Put(key int, value int) {
//检查是否存在,存在的直接更新值同时把节点移到head
if node ,ok := this.dataMap[key];ok{
node.data = value
node.key = key
this.TakeOutNode(node)
this.AddHead(node)
return
}
if this.use >= this.capacity{
//队列满了先去掉队列尾巴把再插入
this.DeleteTailNode()
}
node := &ListDataNode{
data:value,
key:key,
}
this.dataMap[key] = node
this.AddHead(node)
this.use++
}
//把节点插入头里面
func (this *LRUCache) AddHead(node *ListDataNode){
if this.head == nil{
this.head = node
this.tail = node
}else{
//把该节点放入队列头
tmp :=this.head
this.head = node
node.next = tmp
tmp.pre = node
}
}

//删除尾部节点
func (this *LRUCache)DeleteTailNode(){
//获取尾节点
if this.tail == nil{
return
}
key := this.tail.key
if _,ok :=this.dataMap[key];ok{
this.dataMap[key] = nil
delete(this.dataMap,key)
//从尾部节点拿掉
tmp := this.tail
this.tail = tmp.pre
if tmp.pre !=nil{
tmp.pre.next = nil
}
this.use--
}
}

LRU第二版本思想

独立申请头尾哨兵节点,数据节点不在可能是头或尾部节点,插入和移除不用考虑过多情况实现简洁,不过在空间很小的情况下会多占两个节点可能空间会多O(2),但是带来的简洁性更高

代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
type LRUCache struct {
dataMap map[int]*ListDataNode
head *ListDataNode
tail *ListDataNode
capacity int
}
type ListDataNode struct{
data int
key int
pre *ListDataNode
next *ListDataNode
}


func Constructor(capacity int) LRUCache {
head ,tail := new(ListDataNode),new(ListDataNode)
// 初始化cache 头尾相连作为哨兵,从头插入,从尾部移除
head.next = tail
tail.pre = head
head.next = tail
tail.pre = head
Cache :=LRUCache{
dataMap :make(map[int]*ListDataNode,capacity),
head :head,
tail :tail,
capacity: capacity,
}
return Cache
}

// 移除节点
func (this *LRUCache) remove( node *ListDataNode){
node.pre.next = node.next
node.next.pre = node.pre
node.next = nil
node.pre = nil
}

// 插入头后面进入
func (this *LRUCache) insertHead (node *ListDataNode){
head := this.head
node.next = head.next
node.pre = head
head.next.pre = node
head.next = node
}

func (this *LRUCache) Get(key int) int {
if node ,ok := this.dataMap[key];ok{
this.remove(node)
this.insertHead(node)
return node.data
}
return -1
}

//把刚操作的放在最前面
func (this *LRUCache) Put(key int, value int) {
if node ,ok := this.dataMap[key];ok{
node.data = value
this.remove(node)
this.insertHead(node)
return
}
if len(this.dataMap) >= this.capacity{
//队列满了先去掉队列尾巴把再插入
node := this.tail.pre
this.remove(node)
delete(this.dataMap,node.key)
}
node := &ListDataNode{
data:value,
key:key,
}
this.dataMap[key] = node
this.insertHead(node)
}

workPool实现

Posted on 2020-01-03

随手写了个workpool放上来

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
package taskPool

import (
"golib/errors"
"sync/atomic"
)

type Task struct {
Handler func(v ...interface{})
Params[]interface{}
}

type Pool struct {
capacity uint64
runningWorkersNum uint64
state int64
taskC chan *Task
closeC chan bool
}

var ErrInvalidPoolkCap = errors.New("invalid pool cap")
const (
RUNNING = 1
STOPED = 0
)



func NewPool (capacity uint64)(*Pool,error){
if capacity <= 0 {
return nil,ErrInvalidPoolkCap
}
return &Pool{
capacity:capacity,
state:RUNNING,
taskC:make(chan *Task,capacity),
closeC:make(chan bool),
},nil
}

func (p *Pool) addRunningkNum(){
atomic.AddUint64(&p.runningWorkersNum,1)
}
func (p *Pool) reduceRunningNum(){
atomic.AddUint64(&p.runningWorkersNum,1)
}
func (p *Pool) GetRunningNum()uint64{
return atomic.LoadUint64(&p.runningWorkersNum)
}

func (p *Pool) GetCap() uint64{
return atomic.LoadUint64(&p.capacity)
}
func (p *Pool) run(){
p.addRunningkNum()
go func() {
defer func() {
p.reduceRunningNum()
}()
for {
select {
case task := <- p.taskC:
task.Handler(task.Params)
case <-p.closeC:
return
}
}
}()
}

func (p *Pool) Put (task *Task)error{
if p.state == STOPED{
return ErrInvalidPoolkCap
}
//容量不满再开一个携程处理
if p.runningWorkersNum < p.GetCap(){
p.run()
}
//放入任务
p.taskC <- task
return nil
}

func (p *Pool) Clsoe(){
p.state = STOPED //没有并发操作
//等待任务消费完毕
for len(p.taskC) > 0{
}
p.closeC <- true
close(p.taskC)
}

leetcode-297BSTserilize

Posted on 2020-01-02

leetcode-297 BST序列化与反序列化

题目要求如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
序列化是将一个数据结构或者对象转换为连续的比特位的操作,进而可以将转换后的数据存储在一个文件或者内存中,同时也可以通过网络传输到另一个计算机环境,采取相反方式重构得到原数据。

请设计一个算法来实现二叉树的序列化与反序列化。这里不限定你的序列 / 反序列化算法执行逻辑,你只需要保证一个二叉树可以被序列化为一个字符串并且将这个字符串反序列化为原始的树结构。

示例: 

你可以将以下二叉树:

1
/ \
2 3
/ \
4 5

序列化为 "[1,2,3,null,null,4,5]"
提示: 这与 LeetCode 目前使用的方式一致,详情请参阅 LeetCode 序列化二叉树的格式。你并非必须采取这种方式,你也可以采用其他的方法解决这个问题。
说明: 不要使用类的成员 / 全局 / 静态变量来存储状态,你的序列化和反序列化算法应该是无状态的。

思路分析:

  1. 首先第一步我们要能遍历二叉树才能够将二叉树变成字符串存储(二叉树的遍历这里我们采用先序遍历)
  2. 树节点中有的值不知以为要能区分出那些数字字符是一个节点的值,这里我们定义了分隔符“|”
  3. 识别数的左右空节点,这里我们定义了占位符“#”
  4. 序列化开始,注意如果根为空直接插入占位符和分隔符
  5. 反序列化时候要注意采用和序列化一样的顺序,同时提取出数据后要做好指针的移动(拿到连续数据后,指针后移1位跨个分隔符,遇到占位符后移两位跨过占位符和分隔符)
  6. 递归的处理左子数和右子数
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    package main

    import (
    "strconv"
    "fmt"
    )

    // Your Codec object will be instantiated and called as such:
    // Codec codec;
    // codec.deserialize(codec.serialize(root));

    const (
    PlaceHolder = "#"
    Delimiter = "|"
    )
    type TreeNode struct{
    val int
    left *TreeNode
    right *TreeNode
    }
    func serialize(root *TreeNode) string{
    restr := ""
    //序列化
    if root == nil {
    restr += PlaceHolder
    return restr
    }
    return serilaizeDe(root)
    }

    func serilaizeDe(root *TreeNode) string{
    str := ""
    if root == nil {
    str = str + PlaceHolder + Delimiter
    return str
    }
    str = strconv.FormatInt(int64(root.val),10) + Delimiter
    return str + serilaizeDe(root.left)+ serilaizeDe(root.right)

    }
    func deserialize(data string) *TreeNode{
    loc := 0
    if string(data[loc]) == PlaceHolder {
    return nil
    }
    return deserilaizeDe(data,&loc)
    }
    func deserilaizeDe ( data string,loc *int) *TreeNode{
    lenStr := *loc
    //数组结束或遇到占位返回 nil 同时指针后移两位
    if string(data[*loc]) == PlaceHolder || *loc >= len(data){
    *loc+=2
    return nil
    }
    //正常数据指针后移
    for string(data[*loc]) != Delimiter {
    *loc ++
    }
    val,_ := strconv.ParseInt(string(data[lenStr:*loc]),10,64)
    //取数后跨个分隔符
    *loc ++
    node := &TreeNode{
    val:int(val),
    left:deserilaizeDe(data,loc),
    right:deserilaizeDe(data,loc),
    }
    return node
    }
    func main(){
    root := TreeNode{
    val:322,
    left:nil,
    right:nil,
    }
    leftNode_leftNode := TreeNode{
    val:9,
    left:nil,
    right:nil,

    }
    leftNode := TreeNode{
    val:2,
    left:&leftNode_leftNode,
    right:nil,
    }

    rightNode:= TreeNode{
    val:5,
    left:nil,
    right:nil,
    }
    root.left = &leftNode
    root.right = &rightNode
    str := serialize(&root)
    fmt.Println(str)
    root1 := deserialize(str)
    fmt.Println(serialize(root1))
    }

mysql聚簇索引

Posted on 2020-01-01

聚簇索引

聚簇索引一定要单拿出来进行分析,因为太重要啦!我们平时使用最多inndb就是聚簇索引。

划重点!!:聚簇索引并不是一种单独的索引类型,而是一种数据存储方式,起细节依赖于其实现方式。InnDB的聚簇索引实际上是在同一个结构中保存了B-Tree索引和数据行

当表有聚簇索引时,它的数据行实际上存放在索引的叶子页中。术语”聚簇”标识数据行和相邻的键值紧凑的存储在一起。因为无法同事吧数据行存放在两个不同的地方,所以一个表只能有一个聚簇索引。
这篇主要赶住InnDB,但是这里讨论的所有原理 对于支持聚簇索引的存储引擎都是使用的。

InnoDB通过主键聚集数据,索引被索引的列就是主键列。如果没有定义主键,InnoDb会选择一个唯一的非空索引代替。如果没有这样的索引,InnoDB会隐式的定义一个主键来作为聚簇索引,InnoDB 只聚集在同一个页面中的记录,包换相邻键值的页面可能会相距甚远。
想了解inndb的存储结构可以参考下面的博客:

『浅入浅出』MySQL 和 InnoDB

聚簇索引的优缺点

任何事物都是有两面性,算法和数据结构就是在空间和时间,效率和复杂性等方面的平衡,聚簇索引也一样

优点
  • 把相关数据放在一起方便一次性查询
  • 访问数据更快,聚簇索引把索引和数据保存在同一个B-Tree中
  • 还用覆盖索引扫描的查询可以直接使用页节点中的主键值
缺点
  • IO密集型对访问顺序的优化可以减少页的加载,如果数据全部在内存中访问顺序没有那么重要,聚簇索引依旧没什么优势啦。
  • 插入数度严重历来插入顺序
  • 更新聚簇索引的代价很高,会强制Innodb就昂每个呗更新的行移动到新的位置
  • 插入新行或主键被更新,可能照成页分裂
  • 如果行稀疏或页分裂可能或照成全表扫描变慢
  • 二级索引会更大因为要保存主键列,同时会进行二次查找

mysql创建高性能索引

Posted on 2019-12-30

索引:存储引擎用于快速找到记录的一种数据结构.索引可以轻易将查询提高好几个数量级!最求极致!!!

索引基础

select uid from user where uid = 5;

  • 上面的查询sql如果在user表上建立uid的索引,mysql会先在索引上按值进行查找,然后返回所有包涵该值的数据行。
  • 如果索引有多个列,那么列的顺序很重要(最左前缀列匹配)。创建一个包涵两个列的索引和创建两个只包涵一个的索引是打不相同的。

索引的类型

索引有很多类型,mysql中索引是在存储引擎层实现的,所以不同的存储引擎索引的工作方式不尽相同。即使同种类型的索引在不同的存储引擎下实现也可能不同。

相信大家一定想过,既然Inndb是聚簇索引,那有索引再存储数据行是不是就数据重复了呢?能提出这个问题说明我们对索引有了进一步的思考。下面这个博文很好的解释了

关于InnoDB表数据和索引数据的存储

B-Tree索引

大多数存储引擎都支持B+-Tree索引(B+树 叶子包含直接点指向下一个叶子节点的指针(由于数据都在叶子节点上,方便加载下一部分数据))

再强调下:
索引之所以能加快访问速度,是因为索引不在需要存储引擎进行权标扫描来寻找数据,取而代之的是从索引的根节点开始,利用索引的结构优化查询效率,最终要么找到对应的值,要么记录不存在

B+Tree的结构很适合范围查找所以基于磁盘阵列的数据库索引,可以一次加载一或一个扇区的数据对应树的一个节点。

借用高性能mysql书中的一张图看下InnDB的索引存储结构,下图中建立了一个联合索引key(last_name,first_name.dob),对于表中的每一行数据索引中包涵了上面三个值,如下图优先使用lastname排序,在lastname相等的情况下用first_name排序,这种索引结构显然只能在匹配到索引的第一列的时候才能使用到到第二列索引,直接通过first_name查找是没办法使用索引的。

划重点:索引key的顺序很重要建立的时候要妥善考虑

B-Tree索引试用范围

B-Tree索适用于全键值、键值范围或键前缀查找。其中键前缀只适用于根据最左前缀的查找、

  • 全值匹配:指的是和索引所有列进行匹配
  • 匹配最左前缀:只匹配索引的第一列
  • 匹配列前缀:匹配莫一列开头的部分,例如所有Y开头的last_name
  • 精确匹配某一列并范围匹配另一列:例如匹配last_name:Li first_name:开头是X的人。
  • 只访问索引的查询:访问只需要访问索引,而无须访问数据行。

因为索引树中的节点是有序的,所以出了按值查找外,索引还可以用于查询中的order by。同理只是用上面的查询类型

哈希索引(memroy 引擎)

哈希索引(hash index)基于哈希表实现。只有精确匹配索引所有列的查询才有效。对于每一行数据,存储引擎都回对所有的索引列一个hash code,hash code是一个较小的值。hash索引将所有的hash码存储在索引汇总同时在哈希表中保存指像每个数据行的指针。

空间数据索引

地理数据存储,mysql支持的不完善。可以使用PostgreSQL的PostGIS

全文索引

全文索引比较特殊,查找的是文本中的关键词,而不是直接比较索引中的值。全文搜索和其它积累索引的匹配方式不完全一样。有许多需要做一的细节,如停用词、词干和复数、布尔搜索等。全文索引更类似于搜索引擎做的事而不是简单的where条件匹配。
在相同的列上同时创建全文索引和基于值的B-Tree索引不会有冲突,全文索引适用于MATCH AGAINST操作,而不是普通的where条件操作。

索引的优点

索引不近可以快速的定位到表指定位置。索引还有一些其它的附加作用。
例如B-tree 由于索引是按照顺序存储的相同的索引值会放在一起索引也可以用于order by和group by操作。由于innddb索引里面存放啦实际的列值所以可以使用索引就完成全部查询。索引有以下几个有点:

  • 大大减少服务器扫描的数据量
  • 帮组服务器避免排序和临时表
  • 将随机I/O年变为顺序I/o

划重点:索引可以带来高效的查找,但是同时也要维护特定的数据结构(根据索引的不同类型)这就回给插入和删除带来一定负担(例如b-tree的分裂算法等。中型到大型的表,索引就会非常有效,过少和过大建立和使用索引的付出和可能高于回报)

高性能的索引策略

要使用高性能索引首先要正确的创建。

独立的列

select user_id from user where user_id +1 = 5;

上面的查询语句是无法使用索引的要简化where使索引列单独的放在比较符号的一侧。

前缀索引和索引的选择性

引入一个概念索引的选择性,不重复的索引值/记录总数。索引的选择性越高则查询效率越高。唯一索引的选择性是1,这是最好的索引选着性,性能也是最好的。
在选择前最索引的时候,尽量让前缀接近当前列作为key的选择性。
创建前最索引,如下创建姓氏的前4个字节作为前缀的前缀索引
Alter table user add key(lastname(4))

划重点:前缀索引更小更快可以(接近单列索引的选择性)但是前缀索引无法处理order by和group by也无法做覆盖扫描

全表扫描,全索引扫描和覆盖扫描的定义与区别见下面的blog

mysql 全表扫描、全索引扫描、索引覆盖(覆盖索引)

多列索引

常见的错误思维就是为每个列创建独立的索引或按照错误的顺序创建多列索引(个人觉得选择选择性越高的应该放到多列索引的越前面)

1
2
3
4
5
6
7
8
create table t (
c1 INT,
c2 INT,
c3 INT,
KEY (c1),
KEY (c2),
KEY (c3),
);

划重点:在一个查询中使用两个索引列,在低版本中可能会照成全表扫描,mysql5.0后查询会使用两个单列索引进行索引扫描并将结果合并

虽然上面的索引合并策略可以算作一重优化,但是也侧面说明了我们索引建立的十分糟糕

  • 当出现你服务对多个索引做相交时(AND),通常以为要包涵所有相关列的多列索引,而不是多个单独的单列索引
  • 当服务需要对多个索引做两盒操作时候,需要耗费大量的cpu和内存,在算法的缓存,排序和合并上,尤其是索引的选择选择性不高,合并后的量级就会很大,需要扫描返回的大量数据。
  • 优化器不会极端到‘查询成本’,只关心随记页面的读取。有些情况倒不如使用权标烧苗或将查询改为union的方式。
    通过expian如果看到索引合并,要重新评估下表的结构设计,或则通过参数金庸掉某些索引。

选择合适的索引顺序

上文面我提到选择选着性最高的防在索引最前列只是经验法则,还是要根据业务情况来判断,如何避免随记io和排序。
当不考虑排序和分组,业务大量查询时候,选择性最高的列放在前面通常是比较好的。同时在设计业务的时候也要注意一些特殊的逻辑,避免一个id有过多的行数,照成数据分布不均,影响索引使用的性能。

grpc通信分析-客户端

Posted on 2019-12-27

其实在了解了server端的实现后,client基本上就可以想到大致流程,下面我们还是看下源码,熟悉下具体的实现

grpc客户端的实现

  1. 再没有其它框架封装的情况下,调用createGRPCClientList(建立一个grpcClient)与server不一样他不需要绑定太多信息,最主要的就是address,client要知道去哪里通信。
  2. 拿到clientConn只需要调用pb.go里面的newsClient把client放入pb.go定义好的client里面,这样就可以通过该函数返回的client,调用实现的具体方法。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    type NewsPoolClient interface {
    GetNewsContent(ctx context.Context, in *NewsPoolRequest, opts ...grpc.CallOption) (*NewsPoolReply, error)
    UpdateNewsContent(ctx context.Context, in *NewsPoolRequest, opts ...grpc.CallOption) (*UpdateReply, error)
    DelNewsContent(ctx context.Context, in *NewsPoolRequest, opts ...grpc.CallOption) (*DelReply, error)
    // 获取视频类文章列表
    GetVideoContent(ctx context.Context, in *NewsPoolRequest, opts ...grpc.CallOption) (*NewsPoolReply, error)
    }

    type newsPoolClient struct {
    cc *grpc.ClientConn
    }

    func NewNewsPoolClient(cc *grpc.ClientConn) NewsPoolClient {
    return &newsPoolClient{cc}
    }

    //这里是client的具体实现的方法之一,可以看到主要的逻辑是grpc.Ivoke,同时method是按照server注册的同样接口传入method(网络通信一定都是基于相同的约定和协议实现通信)
    func (c *newsPoolClient) GetNewsContent(ctx context.Context, in *NewsPoolRequest, opts ...grpc.CallOption) (*NewsPoolReply, error) {
    out := new(NewsPoolReply)
    err := grpc.Invoke(ctx, "/content.NewsPool/GetNewsContent", in, out, c.cc, opts...)
    if err != nil {
    return nil, err
    }
    return out, nil
    }
  3. 通过invoker函数发请求给服务端,同时把获得的数据写入out中返回,我们看下Invoke的具体实现
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    //grpc.Invoke 里面封装了clent.cc的invoke就是我们最开始createGRPCClientList生成的clentconnect
    func (cc *ClientConn) Invoke(ctx context.Context, method string, args, reply interface{}, opts ...CallOption) error {
    // allow interceptor to see all applicable call options, which means those
    // configured as defaults from dial option as well as per-call options
    opts = combine(cc.dopts.callOptions, opts)

    if cc.dopts.unaryInt != nil {
    return cc.dopts.unaryInt(ctx, method, args, reply, cc, invoke, opts...)
    }
    return invoke(ctx, method, args, reply, cc, opts...)
    }

小节

通过前后两个博客,我们大致了解grpc c/s大致是如何通信的,后续有时间或有精力,会继续分析一些底层网络的实现。包括invoke的具体细节,消息发送和接受使用的具体函数和方法。

grpc通信分析-服务端

Posted on 2019-12-25

今天要修改下框架方面的东西,把接口请求返回的默认上报,过滤掉外部拼凑的url,想得是把router的map传进去,未命中的就不做上报,看到grpc的时候发现其并没有像http一样维护一个grpc的方法map,对grpc的原理之前有一些了解,今天希望能更进一步详细,了解下grpc注册和发现相关的逻辑。

grpc的服务端注册

  1. 首先生成srv grpcserver(google的官方sever)
  2. 通过pb.go生成的Register**Server,绑定实现了pb.go定义的接口的服务类
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    //绑定实现类到 生成的server里,同时注册serviceDesc到
    func RegisterNewsPoolServer(s *grpc.Server, srv NewsPoolServer) {
    s.RegisterService(&_NewsPool_serviceDesc, srv)
    }
    //serviceDesc 如下也是服务端收到请求能够打到具体方法的核心配置,相当于httpserver里面的路由信息
    var _NewsPool_serviceDesc = grpc.ServiceDesc{
    ServiceName: "content.NewsPool",
    HandlerType: (*NewsPoolServer)(nil),
    Methods: []grpc.MethodDesc{
    {
    MethodName: "GetNewsContent",
    Handler: _NewsPool_GetNewsContent_Handler,
    },
    {
    MethodName: "UpdateNewsContent",
    Handler: _NewsPool_UpdateNewsContent_Handler,
    },
    {
    MethodName: "DelNewsContent",
    Handler: _NewsPool_DelNewsContent_Handler,
    },
    {
    MethodName: "GetVideoContent",
    Handler: _NewsPool_GetVideoContent_Handler,
    },
    },
    Streams: []grpc.StreamDesc{},
    Metadata: "newspool.proto",
    }
    //其中一个handler定义如下
    func _NewsPool_GetNewsContent_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
    in := new(NewsPoolRequest)
    if err := dec(in); err != nil {
    return nil, err
    }
    //interceptor 直接断言绑定的srv结构体调用其函数
    if interceptor == nil {
    return srv.(NewsPoolServer).GetNewsContent(ctx, in)
    }
    info := &grpc.UnaryServerInfo{
    Server: srv,
    FullMethod: "/content.NewsPool/GetNewsContent",
    }
    //interceptor 把具体srv处理函数封装成handler传入interceptor中,可以在interceptor中做请求的预处理和其它操作,我们框架里面的熔断限流和一些统一操作也是在这里进行处理的
    handler := func(ctx context.Context, req interface{}) (interface{}, error) {
    return srv.(NewsPoolServer).GetNewsContent(ctx, req.(*NewsPoolRequest))
    }
    return interceptor(ctx, in, info, handler)
    }
  3. srv的实现绑定和配置绑定ok后,启动srv.server()会把服务启动接受请求启动协程进行处理。我们看下具实现:我们来简单分析一下 下面是部分代码
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
     func (s *Server) Serve(lis net.Listener) error {
    s.mu.Lock()
    s.printf("serving")
    s.serve = true
    //server的lisMap如果是空就直接关闭lis
    if s.lis == nil {
    // Serve called after Stop or GracefulStop.
    s.mu.Unlock()
    lis.Close()
    return ErrServerStopped
    }

    s.serveWG.Add(1)
    defer func() {
    s.serveWG.Done()
    select {
    // Stop or GracefulStop called; block until done and return nil.
    case <-s.quit:
    <-s.done
    default:
    }
    }()

    ls := &listenSocket{Listener: lis}
    s.lis[ls] = true

    if channelz.IsOn() {
    ls.channelzID = channelz.RegisterListenSocket(ls, s.channelzID, lis.Addr().String())
    }
    s.mu.Unlock()

    defer func() {
    s.mu.Lock()
    if s.lis != nil && s.lis[ls] {
    ls.Close()
    delete(s.lis, ls)
    }
    s.mu.Unlock()
    }()

    var tempDelay time.Duration // how long to sleep on accept failure

    for {
    //接受连接
    rawConn, err := lis.Accept()
    tempDelay = 0
    // Start a new goroutine to deal with rawConn so we don't stall this Accept
    // loop goroutine.
    //
    // Make sure we account for the goroutine so GracefulStop doesn't nil out
    // s.conns before this conn can be added.
    s.serveWG.Add(1)
    go func() {
    //开启协程处理连接流
    s.handleRawConn(rawConn)
    s.serveWG.Done()
    }()
    }
    }
  4. 最后一步请求路由到相应的hanlder,例如 上面的:Handler: _NewsPool_GetVideoContent_Handler (依然只截取部分代码)
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
        func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Stream, trInfo *traceInfo) {
    //获取要调用的方法
    sm := stream.Method()
    if sm != "" && sm[0] == '/' {
    sm = sm[1:]
    }
    pos := strings.LastIndex(sm, "/")
    //省略了些错误处理
    ...
    service := sm[:pos]
    method := sm[pos+1:]

    //获取到回应的md,在processUnaryRPC中调用md.method 就是注册的_NewsPool_GetVideoContent_Handler
    if srv, ok := s.m[service]; ok {
    if md, ok := srv.md[method]; ok {
    s.processUnaryRPC(t, stream, srv, md, trInfo)
    return
    }
    if sd, ok := srv.sd[method]; ok {
    s.processStreamingRPC(t, stream, srv, sd, trInfo)
    return
    }
    }
    // Unknown service, or known server unknown method.
    if unknownDesc := s.opts.unknownStreamDesc; unknownDesc != nil {
    s.processStreamingRPC(t, stream, nil, unknownDesc, trInfo)
    return
    }
    if trInfo != nil {
    trInfo.tr.LazyLog(&fmtStringer{"Unknown service %v", []interface{}{service}}, true)
    trInfo.tr.SetError()
    }
    errDesc := fmt.Sprintf("unknown service %v", service)
    if err := t.WriteStatus(stream, status.New(codes.Unimplemented, errDesc)); err != nil {
    if trInfo != nil {
    trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
    trInfo.tr.SetError()
    }
    grpclog.Warningf("grpc: Server.handleStream failed to write status: %v", err)
    }
    if trInfo != nil {
    trInfo.tr.Finish()
    }
    }

    //进入到这个函数就可以看到,handler就是我们实现的getNewsContent函数,其实如果定义了interceptor会在interceptor中执行hander,做一些通一的操作。
    func _NewsPool_GetNewsContent_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
    in := new(NewsPoolRequest)
    if err := dec(in); err != nil {
    return nil, err
    }
    if interceptor == nil {
    return srv.(NewsPoolServer).GetNewsContent(ctx, in)
    }
    info := &grpc.UnaryServerInfo{
    Server: srv,
    FullMethod: "/content.NewsPool/GetNewsContent",
    }
    handler := func(ctx context.Context, req interface{}) (interface{}, error) {
    return srv.(NewsPoolServer).GetNewsContent(ctx, req.(*NewsPoolRequest))
    }
    return interceptor(ctx, in, info, handler)
    }

grpc的服务注册,启动,监听和路由到对应的实现就完成了,下一篇看下grpclient的建立,怎样把一个grpc请求转化为具体的调用,这样就把grpc的实现完整的了解啦

leetcode-5最长回文子串

Posted on 2019-12-22

最长回文子串

1
2
3
4
5
6
7
8
9
10
11
给定一个字符串 s,找到 s 中最长的回文子串。你可以假设 s 的最大长度为 1000。

示例 1:

输入: "babad"
输出: "bab"
注意: "aba" 也是一个有效答案。
示例 2:

输入: "cbbd"
输出: "bb"

leetcode官方分析

首先排除暴力列举法,用中心扩展法:
事实上,只需使用恒定的空间,我们就可以在 O(n^2) 的时间内解决这个问题。

我们观察到回文中心的两侧互为镜像。因此,回文可以从它的中心展开,并且只有 2n - 12n−1 个这样的中心。

你可能会问,为什么会是 2n - 12n−1 个,而不是 nn 个中心?原因在于所含字母数为偶数的回文的中心可以处于两字母之间(例如 \textrm{“abba”}“abba” 的中心在两个 \textrm{‘b’}‘b’ 之间)。

自己的分析和实现误区

一开始直观的思想就是遍历 i->n,以i为中心,分别判断其作为奇回文串和偶回文串的情况,取最大值,记录最大值和起始i,实际写代码的时候由于有两种情况,写的代码十分繁杂,最后返回的时候还要根据长度取是否是奇偶,来判断返回的s,结果就是返回字符串起始计算很懵逼,最后不通过。

走出思维误区

做一个函数,传入起始位置,起始位置相等,往两边扩张(注意边界条件0和len),最后返回right - left -1 (因为符合条件多减了2)
如果返回的长度大于之前记录的更新,起始长度。
golang代码如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
func max(a,b int) int{
if a>b {
return a
}
return b
}

func longestPalindrome(s string) string {
if s == ""{
return ""
}
start ,end := 0,0
sLen := len(s)
for i:=0;i<sLen;i++{
len1 := expanCenter(s,i,i)
len2 := expanCenter(s,i,i+1)
len := max(len1,len2)
if len > end - start{
start = i - (len - 1)/2
end = i + len/2;
}
}
return s[start:end + 1]
}

func expanCenter(s string,left,right int) int {
L,R:=left,right
for ;L >= 0 && R<len(s) && s[L] == s[R];{
L--;
R++;
}
return R - L -1
}

mysql存储引擎

Posted on 2019-12-20

mysql的存储引擎

在文件系统中,mysql将每个数据库(也可以称之为schema)保存为数据目录下的一个子目录。创建表时mysql会在数据库字幕侠创建一个和表同名字的.frm文件保存表的定义。因为mysql使用文件系统的目录和文件来超纯数据库和表的定义,不同的存储引擎保存数据和索引的方式是不同的,单表的则是在mysql服务层统一处理的。

InnoDB存储引擎

InnoDB是mysql 默认使用的事务型引擎,也是最重要使用最广泛的存储引擎。被设计用来处理大量短期事务,短期事务大部分情况是正常提交的,很好会被回滚。

InooDb概览

InnoDb把在4.1以后可以把每个表的数据结构和索引放在一个单独的文件中(但是我实际操作发现默认是放在同一个文件中)。

划重点

索引数据和表数据分开存储这种理解在InnoDB是错误的,实际上InnoDB的表数据保存在主键索引的B-Tree的叶子节点;

InnDb才用MVCC的方式支持高并发,实现了四个隔离界别.默认级别(REPEATABLE READ),通过间隙所策略防止幻读的出现,间隙所使得Innodb不仅仅锁定查询设计的行,还得会对缩影中的间隙进行锁定,防止幻影行的插入。
InnoDB数据表是基于聚簇索引建立的。聚簇索引对逐渐的查询有很高的性能,不过二级索引必须包涵主键列,如果主键列比较大索引也会比较大。
作为事务型存储引擎,InnoDB通过一些机制和工具支持热备份(获得一致性视图不需要通知对所有表的写入)

myISAM存储引擎

5.1以前的mysql默认是MyIsam引擎。支持大量的特性,包括全文索引,压缩,空就按函数,单不支持事务和行级锁,崩溃后无法安全恢复。

MyIsam概述

MyIsam也是存储在两个文件中:数据文件和索引文件,以.MYD和.MYI为扩展名字。

  1. 加锁和并发:myIsam只能加表锁,是支持读写锁,值得强调的是在其读的时候是可以向表中插入新的记录(并发插入)
  2. 修复:支持手动修复
  3. 索引特性: myIsam表,及时是BLOB和TEXT等长字段也可以基于前500哥字符创建索引。MyIsam支持圈粉索引
  4. 延迟更新索引键,清理缓冲区该表的时候才会吧索引块写入到磁盘,必然会照成搞崩溃后的重建,可以再全局设置和单表设置设置延迟更新索引键的特性。
  5. MyIsam 压缩表:,对不在加入的新数据的吧IAO进行压缩。
  6. 性能L在某些场景性能很好(读大量高于写的时候)

mysqlTranstction

Posted on 2019-12-18

mysql事务

可以说无论去哪里面试mysql的事务和锁一定会被问到的,要回答并答的优雅

事务

事务的概念:易主原子性的sql查询,或者说一个独立的工作单元。如果数据引擎能成功的对数据库应用该组查询的全部语句,那么就执行改组查询,如果其中一条语句因为其他原因无法执行,那么所有的语句都不会执行。事务内的语句要不全部执行,要么全部执行失败。(原子性我理解就是保证一系列操作像最基本的原子一样不能被拆分不能分开成功)

必须注意的是:事务是在一系列能力和基础功能下实现的,如果系统没有经过ACID的严格测试,空谈事务是没有意义的。

原子性(atomicity)

一个事务必须呗是谓一个不可分割的最小工作单元,整个事务中的所有操作要么全部提交成功,要么全部失败回滚,对于一个事务,不可能只执行其中的一部分这就是事务的原子性。
还是上面加黑的那句话,事务的定义也是基于原子性的,没有实现原子性就不会达成事务的定义

一致性(consistency)

数据库总是从一个一致性的状态转换到另一个一致性状态,即使执行到一半一桶崩溃事务中的修改也不会保存到数据中。

隔离性(isolation)

正常来说一个事务所做的提交在最终提交以前,对其它的事务应该通常是不可见的数据库的事务隔离级别会影响可见性

持久性 (durability)

事务做了提交,所有修改就会永久的保存到数据库中。即使系统崩溃,锁保存的数据也不会丢失。持久性是个模糊的概念,因为实际上持久性有很多级别。有些持久性策略能够提供安全的保障,有些则不能。
例如:可以把系统提交数据倒数据库的数据想成存入到银行的钱,银行有很多安全措施,比放在家里安全,但是如果银行破产了或钱被抢劫啦怎么办。可能就要考虑做各种容灾,数据的冗余备份,没有百分之百的持久性我们只能应该用各种方法尽量保证数据的持久性和可恢复性。

事务小节

综上所述,数据库为了我们数据的安全做了这么多的努力和工作,一个实现了ACID的数据库,肯定需要更大的内存和磁盘空间,更强的cpu能力。
我们唯一要做好的就是根据自己的业务是否需要事务处理来选择合适的存储引擎,以获得更高的性能。及时引擎不支持事务也可以通过lock Tables 为应用提供一定程度的保护,如果这点程度都做不到,真的不好意思称自己为工程师。

事务的隔离级别

上面在说隔离性的时候提到了一个事务在提交之前,其它事务应该是通常不可见的,注意是通常不过sql标准为了应对我门的一些业务场景做了四种隔离界别

事务的隔离界别

隔离级别比想象跟复杂,SQL标准汇中定义了四种隔离级别,每一种级别都详细规定了一个事务中所做的修改在其它事务内和事务间是可见的,那些是不可兼得的,较低级别的隔离通常可以执行更高的并发,系统的开销也更低。

READ UNCOMMITED (未递交读)

未提交读中如果事务做了修改即使没有提交,其它事务也是可以看见的。事务读到未提交的数据被称为脏读。照成问题的同时并未提高性能,所以一般在应用中很少使用。

READ CIMMITED (提交读)

一个事务开始到未提交之前,可以看到其它事务在此期间做的修改。也可以称为不可重复读(因为在事务中,两次读取的数据可能被其它事务修改)

REPEATABLE READ(可重复度)

可重复读解决了脏读的问题,保证了在同一事务中多次读取同样的记录结果是一致的。但是还是会产生幻读
幻读,并不是说两次读取获取的结果集不同,幻读侧重的方面是某一次的 select 操作得到的结果所表征的数据状态无法支撑后续的业务操作。更为具体一些:select 某记录是否存在,不存在,准备插入此记录,但执行 insert 时发现此记录已存在,无法插入,此时就发生了幻读。
(InnoDbhe 和XtraDb)通过多版本并发控制(mvcc)解决了幻读问题。

SERIALIZABLE(可串行化)

SERIALIZABLE是最高的隔离界别,通过强制事务串行执行,避免了幻读,但是会照成大量的锁竞争,非常需要保证数据一致性切没有可以接受并发的情况下,考虑使用该级别。

死锁

死锁是指两个或者多个事务在同一资源上相互占用,并请求锁定对方占用的资源,从而导致恶性循环的现象。当多个事务试图以不同的顺序锁定资源时候,就可能产生死锁。多个事务同时锁定同一个资源时,有也会产生死锁。
解决死锁问题,最简单的方法可能就是等到一个超时时间后放弃锁请求,显而易见这回影响性能,InnoDb可以检测到死锁依赖,并将持有最少行级排它锁的事务进行回滚。
锁的行为和顺序是会存储引擎相关的。以同样的顺序执行语句,有的引擎会死锁,有的不会。所以死锁产生有双重原因:有些是因为真正的数据冲突(很难避免),有些是完全由于存储引擎的实现方式导致的。
解除死锁只有部分或完全回滚一个事务才能打破死锁(有没有点珍珑棋局的感觉或者堵车必须有人倒车哈哈~)

事务日志

修改内存拷贝,再把修改行行为顺序记录到磁盘的事务日志中,而不是实时把数据写入磁盘,这样数据在后台可以慢慢刷回磁盘。

mysql中的事务

mysql支持了两总事务型存储引擎:InnoDb和NDB Cluster。还有一些第三方存储引擎支持事务,例如XtaDb和PBXT。

自动提交(AUTOCOMMIT)

自动提交打开如果不是显示的开始一个事务,则每个查询都被当作一个事务执行提交操作。同时也要也可以显示修改当前会话的隔离级别。

事务中混用存储引擎

不要在一个事务中混用存储引擎,否则不支持事务的引擎在事务回滚时候无法撤销,这时候数据很难恢复

隐式和显式锁定

InnoDb才用的是两阶段锁定协议,在事务的执行过程中,随时都可以锁定执行,锁只有在执行commit或roolback的时候才会释放,且同一个时刻释放,所有锁都是隐式锁定,InnoDb会根据隔离级别在需要的时候自动加锁。
对不支持事务的引擎可以显示的执行lock。但是建议有事务需求的使用innodb引擎,行级锁毕竟可以有更好的定恩那个。尽量不使用显示锁。

多版本并发控制

mvcc可以认为是行级锁的变种,它在很多情况下避免了加锁操作,所以开销更低。虽然实现机制不同,但大都实现了非阻塞的读操作,写操作也只锁定了必要的行。
mvcc 是通过保存数据在墨哥时间点的快照实现的,也就是不管需要执行多长时间,每个事物看到的数据都是一致的。根据事物开始的时间不同,每个事物对同一张表,同一时刻看到的数据可能是不一样的。
MVCC只在repeatedbale Read和read comitted 两个隔离级别下工作,其它的隔离级别都和mvcc不兼容。

<i class="fa fa-angle-left"></i>123<i class="fa fa-angle-right"></i>

yvanli

22 posts
19 tags
© 2020 yvanli
Powered by Hexo
|
Theme — NexT.Muse v5.1.4