Debugging In Detributed System

Debug官方博客Debugging by Pretty Printing ](https://blog.josejg.com/debugging-pretty/))

Go Side

在文件util.go中进行修改

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
package raft

import (
"fmt"
"log"
"os"
"strconv"
"time"
)

// Retrieve the verbosity level from an environment variable
func getVerbosity() int {
v := os.Getenv("VERBOSE")
level := 0
if v != "" {
var err error
level, err = strconv.Atoi(v)
if err != nil {
log.Fatalf("Invalid verbosity %v", v)
}
}
return level
}

type logTopic string

const (
dClient logTopic = "CLNT"
dCommit logTopic = "CMIT"
dDrop logTopic = "DROP"
dError logTopic = "ERRO"
dInfo logTopic = "INFO"
dLeader logTopic = "LEAD"
dLog logTopic = "LOG1"
dLog2 logTopic = "LOG2"
dPersist logTopic = "PERS"
dSnap logTopic = "SNAP"
dTerm logTopic = "TERM"
dTest logTopic = "TEST"
dTimer logTopic = "TIMR"
dTrace logTopic = "TRCE"
dVote logTopic = "VOTE"
dWarn logTopic = "WARN"
)

var debugStart time.Time
var debugVerbosity int

func init() {
debugVerbosity = getVerbosity()
debugStart = time.Now()

log.SetFlags(log.Flags() &^ (log.Ldate | log.Ltime))
}

func Debug(topic logTopic, format string, a ...interface{}) {
if debugVerbosity >= 1 {
time := time.Since(debugStart).Microseconds()
time /= 100
prefix := fmt.Sprintf("%06d %v ", time, string(topic))
format = prefix + format
log.Printf(format, a...)
}
}

Debug方式

在原有的log.Printf处修改为Debug();注S%d打印当前server号是有必要的,可以在后续dslog的脚本中进行分栏打印

示例:

1
2
3
Debug(dLog, "S%d T:%d -> S%d Sending PLI: %d PLT: %d LC: %d ",
rf.me, rf.currentTerm, server, Args.PrevlogIndex, Args.PrevlogTerm,
Args.LeaderCommit)

运行方式

1
VERBOSE=1 go test -run 2B

Python Script

将python脚本转为终端命令

本需要加上python3 .py( 可以不用后缀),将其转为如ls那样的内部终端命令

1.需要的环境,python3解释器

2.打开终端,输入which python3 复制解释器地址

3.在你需要执行的python文件最上方加上 #! python解释器地址

1
#!/usr/bin python3

4.修改当前需要执行的文件的权限 chmod +x python文件名

5.复制当前文件到python解释器的bin/文件目录下

1
sudo cp dtest ~/bin

6.终端直接输入你的python文件名就可以看到运行结果了

效果如下

command

dslog:Prettifying the Logs

经过Go side处理后我们的Log仍旧比较杂乱颜色单一,因此使用python脚本进行优化,首先要去在系统环境中装载python rich的美化包

例如:

1
VERBOSE=1 go test -run 2B > output.log

然后打印处output.log

如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
...
008258 LOG2 S2 Saved Log (-1, 0) [{<nil> 0}]
008256 VOTE S3 Granting Vote to S1 at T1
008258 VOTE S1 <- S0 Got vote
008258 VOTE S4 Granting Vote to S1 at T1
008259 PERS S4 Saved State T:1 VF:1
008258 PERS S3 Saved State T:1 VF:1
008259 LOG2 S3 Saved Log (-1, 0) [{<nil> 0}]
008259 VOTE S1 <- S2 Got vote
008260 LEAD S1 Achieved Majority for T1 (3), converting to Leader
008260 TIMR S1 Broadcast, reseting HBT
008260 LOG1 S1 -> S0 Sending PLI: 0 PLT: 0 N: 0 LC: 0 - []
008260 LOG1 S1 -> S2 Sending PLI: 0 PLT: 0 N: 0 LC: 0 - []
008259 LOG2 S4 Saved Log (-1, 0) [{<nil> 0}]
008261 LOG1 S1 -> S3 Sending PLI: 0 PLT: 0 N: 0 LC: 0 - []
008262 LOG1 S1 -> S4 Sending PLI: 0 PLT: 0 N: 0 LC: 0 - []
008262 TIMR S1 Leader, checking heartbeats
008263 TIMR S0 Resetting ELT, received AppEnt T1
008263 TIMR S3 Resetting ELT, received AppEnt T1
008264 TIMR S2 Resetting ELT, received AppEnt T1
008264 LOG2 S2 LOG: (-1, 0) [{<nil> 0}]
...

然后将脚本代码写入dslog

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
#!/usr/bin/python3
import sys
import shutil
from typing import Optional, List, Tuple, Dict

import typer
from rich import print
from rich.columns import Columns
from rich.console import Console
from rich.traceback import install

# fmt: off
# Mapping from topics to colors
TOPICS = {
"TIMR": "#9a9a99",
"VOTE": "#67a0b2",
"LEAD": "#d0b343",
"TERM": "#70c43f",
"LOG1": "#4878bc",
"LOG2": "#398280",
"CMIT": "#98719f",
"PERS": "#d08341",
"SNAP": "#FD971F",
"DROP": "#ff615c",
"CLNT": "#00813c",
"TEST": "#fe2c79",
"INFO": "#ffffff",
"WARN": "#d08341",
"ERRO": "#fe2626",
"TRCE": "#fe2626",
}
# fmt: on


def list_topics(value: Optional[str]):
if value is None:
return value
topics = value.split(",")
for topic in topics:
if topic not in TOPICS:
raise typer.BadParameter(f"topic {topic} not recognized")
return topics


def main(
file: typer.FileText = typer.Argument(None, help="File to read, stdin otherwise"),
colorize: bool = typer.Option(True, "--no-color"),
n_columns: Optional[int] = typer.Option(None, "--columns", "-c"),
ignore: Optional[str] = typer.Option(None, "--ignore", "-i", callback=list_topics),
just: Optional[str] = typer.Option(None, "--just", "-j", callback=list_topics),
):
topics = list(TOPICS)

# We can take input from a stdin (pipes) or from a file
input_ = file if file else sys.stdin
# Print just some topics or exclude some topics (good for avoiding verbose ones)
if just:
topics = just
if ignore:
topics = [lvl for lvl in topics if lvl not in set(ignore)]

topics = set(topics)
console = Console()
width = console.size.width

panic = False
for line in input_:
try:
time, topic, *msg = line.strip().split(" ")
# To ignore some topics
if topic not in topics:
continue

msg = " ".join(msg)

# Debug calls from the test suite aren't associated with
# any particular peer. Otherwise we can treat second column
# as peer id
if topic != "TEST":
i = int(msg[1])

# Colorize output by using rich syntax when needed
if colorize and topic in TOPICS:
color = TOPICS[topic]
msg = f"[{color}]{msg}[/{color}]"

# Single column printing. Always the case for debug stmts in tests
if n_columns is None or topic == "TEST":
print(time, msg)
# Multi column printing, timing is dropped to maximize horizontal
# space. Heavylifting is done through rich.column.Columns object
else:
cols = ["" for _ in range(n_columns)]
msg = "" + msg
cols[i] = msg
col_width = int(width / n_columns)
cols = Columns(cols, width=col_width - 1, equal=True, expand=True)
print(cols)
except:
# Code from tests or panics does not follow format
# so we print it as is
if line.startswith("panic"):
panic = True
# Output from tests is usually important so add a
# horizontal line with hashes to make it more obvious
if not panic:
print("-" * console.width)
print(line, end="")


if __name__ == "__main__":
typer.run(main)

当我们将脚本安装好后运行

1
2
3
dslog --help #查看脚本使用参数

dslog -n 3 -i TIMR output.log

效果如下

dtest:Capturing Rare Failures

dtest脚本的作用是多次运行测试,方便检查出稀有的错误

如dslog将dtest安装完毕

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
#!/usr/bin/env python3

import itertools
import math
import signal
import subprocess
import tempfile
import shutil
import time
import os
import sys
import datetime
from collections import defaultdict
from concurrent.futures import ThreadPoolExecutor, wait, FIRST_COMPLETED
from dataclasses import dataclass
from pathlib import Path
from typing import List, Optional, Dict, DefaultDict, Tuple

import typer
import rich
from rich import print
from rich.table import Table
from rich.progress import (
Progress,
TimeElapsedColumn,
TimeRemainingColumn,
TextColumn,
BarColumn,
SpinnerColumn,
)
from rich.live import Live
from rich.panel import Panel
from rich.traceback import install

install(show_locals=True)


@dataclass
class StatsMeter:
"""
Auxiliary classs to keep track of online stats including: count, mean, variance
Uses Welford's algorithm to compute sample mean and sample variance incrementally.
https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance#On-line_algorithm
"""

n: int = 0
mean: float = 0.0
S: float = 0.0

def add(self, datum):
self.n += 1
delta = datum - self.mean
# Mk = Mk-1+ (xk – Mk-1)/k
self.mean += delta / self.n
# Sk = Sk-1 + (xk – Mk-1)*(xk – Mk).
self.S += delta * (datum - self.mean)

@property
def variance(self):
return self.S / self.n

@property
def std(self):
return math.sqrt(self.variance)


def print_results(results: Dict[str, Dict[str, StatsMeter]], timing=False):
table = Table(show_header=True, header_style="bold")
table.add_column("Test")
table.add_column("Failed", justify="right")
table.add_column("Total", justify="right")
if not timing:
table.add_column("Time", justify="right")
else:
table.add_column("Real Time", justify="right")
table.add_column("User Time", justify="right")
table.add_column("System Time", justify="right")

for test, stats in results.items():
if stats["completed"].n == 0:
continue
color = "green" if stats["failed"].n == 0 else "red"
row = [
f"[{color}]{test}[/{color}]",
str(stats["failed"].n),
str(stats["completed"].n),
]
if not timing:
row.append(f"{stats['time'].mean:.2f} ± {stats['time'].std:.2f}")
else:
row.extend(
[
f"{stats['real_time'].mean:.2f} ± {stats['real_time'].std:.2f}",
f"{stats['user_time'].mean:.2f} ± {stats['user_time'].std:.2f}",
f"{stats['system_time'].mean:.2f} ± {stats['system_time'].std:.2f}",
]
)
table.add_row(*row)

print(table)


def run_test(test: str, race: bool, timing: bool):
test_cmd = ["go", "test", f"-run={test}"]
if race:
test_cmd.append("-race")
if timing:
test_cmd = ["time"] + cmd
f, path = tempfile.mkstemp()
start = time.time()
proc = subprocess.run(test_cmd, stdout=f, stderr=f)
runtime = time.time() - start
os.close(f)
return test, path, proc.returncode, runtime


def last_line(file: str) -> str:
with open(file, "rb") as f:
f.seek(-2, os.SEEK_END)
while f.read(1) != b"\n":
f.seek(-2, os.SEEK_CUR)
line = f.readline().decode()
return line


# fmt: off
def run_tests(
tests: List[str],
sequential: bool = typer.Option(False, '--sequential', '-s', help='Run all test of each group in order'),
workers: int = typer.Option(1, '--workers', '-p', help='Number of parallel tasks'),
iterations: int = typer.Option(10, '--iter', '-n', help='Number of iterations to run'),
output: Optional[Path] = typer.Option(None, '--output', '-o', help='Output path to use'),
verbose: int = typer.Option(0, '--verbose', '-v', help='Verbosity level', count=True),
archive: bool = typer.Option(False, '--archive', '-a', help='Save all logs intead of only failed ones'),
race: bool = typer.Option(False, '--race/--no-race', '-r/-R', help='Run with race checker'),
loop: bool = typer.Option(False, '--loop', '-l', help='Run continuously'),
growth: int = typer.Option(10, '--growth', '-g', help='Growth ratio of iterations when using --loop'),
timing: bool = typer.Option(False, '--timing', '-t', help='Report timing, only works on macOS'),
# fmt: on
):

if output is None:
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
output = Path(timestamp)

if race:
print("[yellow]Running with the race detector\n[/yellow]")

if verbose > 0:
print(f"[yellow] Verbosity level set to {verbose}[/yellow]")
os.environ['VERBOSE'] = str(verbose)

while True:

total = iterations * len(tests)
completed = 0

results = {test: defaultdict(StatsMeter) for test in tests}

if sequential:
test_instances = itertools.chain.from_iterable(itertools.repeat(test, iterations) for test in tests)
else:
test_instances = itertools.chain.from_iterable(itertools.repeat(tests, iterations))
test_instances = iter(test_instances)

total_progress = Progress(
"[progress.description]{task.description}",
BarColumn(),
TimeRemainingColumn(),
"[progress.percentage]{task.percentage:>3.0f}%",
TimeElapsedColumn(),
)
total_task = total_progress.add_task("[yellow]Tests[/yellow]", total=total)

task_progress = Progress(
"[progress.description]{task.description}",
SpinnerColumn(),
BarColumn(),
"{task.completed}/{task.total}",
)
tasks = {test: task_progress.add_task(test, total=iterations) for test in tests}

progress_table = Table.grid()
progress_table.add_row(total_progress)
progress_table.add_row(Panel.fit(task_progress))

with Live(progress_table, transient=True) as live:

def handler(_, frame):
live.stop()
print('\n')
print_results(results)
sys.exit(1)

signal.signal(signal.SIGINT, handler)

with ThreadPoolExecutor(max_workers=workers) as executor:

futures = []
while completed < total:
n = len(futures)
if n < workers:
for test in itertools.islice(test_instances, workers-n):
futures.append(executor.submit(run_test, test, race, timing))

done, not_done = wait(futures, return_when=FIRST_COMPLETED)

for future in done:
test, path, rc, runtime = future.result()

results[test]['completed'].add(1)
results[test]['time'].add(runtime)
task_progress.update(tasks[test], advance=1)
dest = (output / f"{test}_{completed}.log").as_posix()
if rc != 0:
print(f"Failed test {test} - {dest}")
task_progress.update(tasks[test], description=f"[red]{test}[/red]")
results[test]['failed'].add(1)
else:
if results[test]['completed'].n == iterations and results[test]['failed'].n == 0:
task_progress.update(tasks[test], description=f"[green]{test}[/green]")

if rc != 0 or archive:
output.mkdir(exist_ok=True, parents=True)
shutil.copy(path, dest)

if timing:
line = last_line(path)
real, _, user, _, system, _ = line.replace(' '*8, '').split(' ')
results[test]['real_time'].add(float(real))
results[test]['user_time'].add(float(user))
results[test]['system_time'].add(float(system))

os.remove(path)

completed += 1
total_progress.update(total_task, advance=1)

futures = list(not_done)

print_results(results, timing)

if loop:
iterations *= growth
print(f"[yellow]Increasing iterations to {iterations}[/yellow]")
else:
break


if __name__ == "__main__":
typer.run(run_tests)

运行示例

1
2
3
4
dtest --help #查看运行参数

dtest -n 100(运行一百遍) -p 5(五个并发的运行测试加快运行速率) -s(顺序执行)
-v(将Debug打印到log) 2B(测试点名称)