并发面试题(1)

用chan实现,不做什么讲解了

题目

实现一个Map,要求:
(1) 支持高并发;
(2) 只要求插入查询操作,O(1);
(3) 查询时,存在则返回value;否则,等待一个超时时间后返回错误;
(4) 运行不存在死锁、panic;

实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
type ConcurrentMap struct {
sync.Mutex
mp map[any]any
keyChan map[any]chan struct{}
}

func NewConcurrentMap() *ConcurrentMap {
return &ConcurrentMap{
mp: make(map[any]any),
keyChan: make(map[any]chan struct{}),
}
}

func (m *ConcurrentMap) Put(k, v any) {
m.Lock()
defer m.Unlock()

// 正常写入
m.mp[k] = v

// 给正在等的携程一个信号
ch, ok := m.keyChan[k]
if ok {
close(ch)
delete(m.keyChan, k) // 关键:删除已关闭的通道,避免后续重复操作
}
}

func (m *ConcurrentMap) Get(k any, timeout time.Duration) (any, error) {
m.Lock()
// key存在:直接返回
v, ok := m.mp[k]
if ok {
m.Unlock()
return v, nil
}

// key不存在:等待(阻塞)一段时间
ch, ok := m.keyChan[k]
if !ok {
// 这样没法赋值
// m.keyChan[k] = make(chan struct{})
ch = make(chan struct{})
m.keyChan[k] = ch
}
// 要先解锁,否则put携程没法拿到ch
m.Unlock()

tCtx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()

select {
case <-ch:
m.Lock()
v := m.mp[k]
m.Unlock()
return v, nil
case <-tCtx.Done():
return nil, tCtx.Err()
}
}

测试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
package main

import (
"sync"
"sync/atomic"
"testing"
"time"
)

/*
测试目标:
1. 高并发下不会 panic / 死锁
2. key 不存在时 Get 会阻塞,Put 后全部唤醒
3. 超时场景正确返回 error
4. 已存在 key 时 Get 立即返回
*/

func TestConcurrentMap_GetAfterPut(t *testing.T) {
m := NewConcurrentMap()

key := "foo"
value := "bar"

var wg sync.WaitGroup
const goroutines = 50

wg.Add(goroutines)

for i := 0; i < goroutines; i++ {
go func(id int) {
defer wg.Done()
v, err := m.Get(key, 2*time.Second)
if err != nil {
t.Errorf("goroutine %d: unexpected error: %v", id, err)
return
}
if v != value {
t.Errorf("goroutine %d: want %v, got %v", id, value, v)
}
}(i)
}

// 确保 Get 先跑起来并进入等待
time.Sleep(100 * time.Millisecond)
m.Put(key, value)

wg.Wait()
}

func TestConcurrentMap_Timeout(t *testing.T) {
m := NewConcurrentMap()

start := time.Now()
v, err := m.Get("not-exist", 200*time.Millisecond)

if err == nil {
t.Fatalf("expected timeout error, got value=%v", v)
}

elapsed := time.Since(start)
if elapsed < 180*time.Millisecond {
t.Fatalf("timeout too fast: %v", elapsed)
}
}

func TestConcurrentMap_ImmediateGet(t *testing.T) {
m := NewConcurrentMap()
m.Put("a", 123)

start := time.Now()
v, err := m.Get("a", time.Second)

if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if v != 123 {
t.Fatalf("want 123, got %v", v)
}

if time.Since(start) > 10*time.Millisecond {
t.Fatalf("Get should be immediate, but took too long")
}
}

func TestConcurrentMap_MixedConcurrency(t *testing.T) {
m := NewConcurrentMap()

const (
keys = 10
readers = 100
writers = 10
iterations = 100
)

var success atomic.Int64
var wg sync.WaitGroup

// writers
wg.Add(writers)
for i := 0; i < writers; i++ {
go func(id int) {
defer wg.Done()
for j := 0; j < iterations; j++ {
k := j % keys
m.Put(k, j)
time.Sleep(time.Millisecond)
}
}(i)
}

// readers
wg.Add(readers)
for i := 0; i < readers; i++ {
go func() {
defer wg.Done()
for j := 0; j < iterations; j++ {
k := j % keys
_, err := m.Get(k, 500*time.Millisecond)
if err == nil {
success.Add(1)
}
}
}()
}

wg.Wait()

if success.Load() == 0 {
t.Fatal("no successful Get operations")
}
}