链接: http://pan.baidu.com/s/1jGjhbEm 密码: xkg8
python TCP 练习
服务器端:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 |
#!/usr/bin/env python # coding=utf-8 from multiprocessing import Process import socket,time import threading s = socket.socket(socket.AF_INET,socket.SOCK_STREAM) s.bind(('127.0.0.1',9999)) s.listen(5) print 'Waiting for connection' def tcplink(sock,addr): print 'Accept new connection from %s:%s ' % addr sock.send(b'welcome') while True: data = sock.recv(1024) time.sleep(1) if not data or data.decode('utf-8') == 'exit': break sock.send(('Hello ,%s' % data).encode('utf-8')) sock.close() print 'connection from %s:%s closed ' % addr while True: sock,addr = s.accept() t = threading.Thread(target=tcplink,args=(sock,addr)) t.start() </code> |
客户端:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
#!/usr/bin/env python # coding=utf-8 import socket s = socket.socket(socket.AF_INET,socket.SOCK_STREAM) s.connect(('127.0.0.1',9999)) print s.recv(1024).decode('utf-8') for data in [b'Michael',b'Tracy',b'Sarah']: s.send(data) print s.recv(1024).decode('utf-8') s.send(b'exit') s.close() |
更多请参见http://www.liaoxuefeng.com/wiki/0014316089557264a6b348958f449949df42a6d3a2e542c000/00143……
python 检测mysql运行情况并根据情况发送邮件通知163
脚本作用:配合crontab,对myql的情况进行监控,定时发送邮件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 |
#!/usr/bin/env python # coding=utf-8 from email import encoders from email.header import Header from email.mime.text import MIMEText from email.utils import parseaddr, formataddr import smtplib import MySQLdb def _format_addr(s): name, addr = parseaddr(s) return formataddr(( \ Header(name, 'utf-8').encode(), \ addr.encode('utf-8') if isinstance(addr, unicode) else addr)) from_addr = '****@163.com' password = '******' to_addr = '******@qq.com' smtp_server = 'smtp.163.com' msg = MIMEText('please check', 'plain', 'utf-8') msg['From'] = _format_addr(u'服务器监控 <%s>' % from_addr) msg['To'] = _format_addr(u'管理员 <%s>' % to_addr) print 'msg is :' % msg try: server = smtplib.SMTP(smtp_server, 25) server.set_debuglevel(1) server.login(from_addr, password) conn = MySQLdb.connect(host='localhost',user='root',passwd='*****',port=3306) cur=conn.cursor() conn.select_db('mysql') count = cur.execute('select count(*) from user') print count cur.close() conn.close() msg = MIMEText('everything is ok.', 'plain', 'utf-8') msg['Subject'] = Header(u'Mysql 正常', 'utf-8').encode() server.sendmail(from_addr, [to_addr], msg.as_string()) except MySQLdb.Error,e: msg['Subject'] = Header(u'Mysql 出现异常,请及时排查', 'utf-8').encode() print "Mysql Error %d: %s" % (e.args[0], e.args[1]) server.sendmail(from_addr, [to_addr], msg.as_string()) finally: server.quit() |
linux 清理semaphores ruby脚本
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 |
#!/usr/bin/env oo-ruby # # # require 'etc' def get_ps_users() uids = %x[ps -o user -e].split("\n").grep(/^[1-6]{4}/).uniq! usernames = uids.collect do |uid| Etc.getpwuid(uid.to_i).name end return usernames end def main() kernel_sem = %x[/sbin/sysctl kernel.sem].split[-1] results = %x[ /usr/bin/ipcs -s -c].split("\n")[1..-1] user_semaphores = Hash.new(Array.new) results.each do |result| values = result.split next unless values[2] =~ /[a-z0-9]{24}/ user_semaphores[values[2]] += [values[0]] end users_with_procs = get_ps_users() users_without_procs = user_semaphores.keys - users_with_procs all_semaphores = [] users_without_procs.collect do |user| all_semaphores += user_semaphores[user] end cmd_str = ("-s %s " * all_semaphores.size) % all_semaphores puts %x[/usr/bin/ipcrm #{cmd_str}] puts "Cleaned #{all_semaphores.size} semaphores." end if __FILE__ == $0 main() end |
YouTube Challenge – I Told My Kids I Ate All Their Halloween Candy 2015
python 监控mysql是否正常
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 |
#!/usr/bin/env python # coding=utf-8 # first install : yum install MySQL-python import MySQLdb try: conn = MySQLdb.connect(host='localhost',user='root',passwd='yourpasswd',port=3306) cur=conn.cursor() conn.select_db('mysql') count = cur.execute('select count(*) from user') print count cur.close() conn.close() except MySQLdb.Error,e: print "Mysql Error %d: %s" % (e.args[0], e.args[1]) |
python 多线程 信息抓取 例子
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 |
#!/usr/bin/env python # coding=utf-8 import Queue import threading import urllib2 import time hosts = ["http://baidu.com", "http://baidu.com""http://youku.com", "http://ibm.com","http://ibm.com","http://ibm.com","http://ibm.com","http://ibm.com","http://ibm.com","http://ibm.com","http://ibm.com","http://ibm.com", "http://apple.com"] queue = Queue.Queue() class ThreadUrl(threading.Thread): def __init__(self, queue,name): threading.Thread.__init__(self) self.name = name self.queue = queue def run(self): while True: #grabs host from queue print 'This the %s thread' % self.name host = self.queue.get() #grabs urls of hosts and prints first 1024 bytes of page try: url = urllib2.urlopen(host) m = url.read(1024) except: print host #signals to queue job is done self.queue.task_done() start = time.time() def main(): #spawn a pool of threads, and pass them queue instance for i in range(5): t = ThreadUrl(queue,i) t.setDaemon(True) t.start() #populate queue with data for host in hosts: queue.put(host) #wait on the queue until everything has been processed queue.join() main() print "Elapsed Time: %s" % (time.time() - start) |
python 多进程 例子
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 |
#!/usr/bin/env python # coding=utf-8 from multiprocessing import Pool import os,time,random import subprocess def long_time_task(name): webiste = ['www.baidu.com','www.youku.com','www.tudou.com','www.iqiyi.com','www.google.com','www.503error.com'] print '---------Run task %s (%s)...' % (name,os.getpid()) start = time.time() time.sleep(2) print name print webiste[name] subprocess.call(['ping','-c','1',webiste[name-1]]) #print 'Result:',r end = time.time() print '*********Task %s run %0.2f second' % (name,end-start) if __name__ =='__main__': print 'Parent process %s',os.getpid() p = Pool(4) for i in range(5): p.apply_async(long_time_task, args=(i,)) print 'Waiting all the process' p.close() p.join() print 'All done' |
[zhizhang@zhizhang-desktop-nay test]$ vi myProcessPoll.py [zhizhang@zhizhang-desktop-nay test]$ cat myProcessPoll.py [crayon-647db898ef4f……
python 带存储的地址簿
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 |
#!/usr/bin/env python # coding=utf-8 import cPickle as p import sys import os class Person(): def __init__(self,name,tel,userlist): self.name = name self.tel =tel self.userlist=userlist def AddToList(self): if self.userlist.has_key(self.name): print 'Already have this one will change' anser = raw_input('Please input Y/N:') if anser == 'Y': self.userlist[self.name] = self.tel else: pass else: self.userlist[self.name] = self.tel return self.userlist class UserList(): def __init__(self,listname): self.listname = listname def getStored(self): if os.path.isfile(self.listname): #this already have one f = file(self.listname) self.storedlist = p.load(f) f.close() else: f = open(self.listname,'w+') f.close return self.storedlist if __name__ == '__main__' : storedname='first' L = UserList(storedname) Oldlist=L.getStored() name = raw_input('Please input the name:') tel = raw_input('Please input the tel:') p1=Person(name,tel,Oldlist) newlist=p1.AddToList() print newlist f = file(storedname,'w') p.dump(newlist,f) f.close |
第一版