博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
以多进程读取oss符合条件的数据为例,综合使用多进程间的通信、获取多进程的数据...
阅读量:5139 次
发布时间:2019-06-13

本文共 3526 字,大约阅读时间需要 11 分钟。

import datetimeimport sysimport oss2from itertools import isliceimport pandas as pdimport reimport jsonfrom pandas.tseries.offsets import Dayfrom multiprocessing import Process, JoinableQueue, cpu_count, Managerimport timedef mkbuck(bk):	auth = oss2.Auth(username, password)	bucket = oss2.Bucket(auth, address, bk)	return bucket#获取前天最后一小时的pathsdef getbflastpt(bucket, bfyespattern):	bfpamax = []	for bf in islice(oss2.ObjectIterator(bucket, prefix=bfyespattern), sys.maxsize):		c = bf.key		if c[-1:] != '/':			bfpamax.append(int(c.split('/')[4]))	last = pd.Series(bfpamax).unique().max()	if last < 10:		bflastpt = bfyespattern + '/0' + str(last)	else:		bflastpt = bfyespattern + '/' + str(last)	return bflastpt#获取当天第一个小时的pathsdef getnowfirstpt(bucket, nowpattern):	bfpamin = []	for bf in islice(oss2.ObjectIterator(bucket, prefix=nowpattern), sys.maxsize):		c = bf.key		if c[-1:] != '/':			bfpamin.append(int(c.split('/')[4]))	first = pd.Series(bfpamin).unique().min()	if first < 10:		nowfirstpt = nowpattern + '/0' + str(first)	else:		nowfirstpt = nowpattern + '/' + str(first)	return nowfirstpt#获取所有的昨日paths,并合并得到完全的paths和数量def getfullnum(bk, bfyespattern, nowpattern, yespattern):	lists = []	bucket = mkbuck(bk)	bfyespattern = getbflastpt(bucket, bfyespattern)	nowpattern = getnowfirstpt(bucket, nowpattern)	timelist = (s for s in (bfyespattern, yespattern, nowpattern))	for pter in timelist:		for bf in islice(oss2.ObjectIterator(bucket, prefix=pter), sys.maxsize):			c = bf.key			lists.append(c)	return lists, len(lists)#以下为进程间通信,即生产者、消费者模型def getfull(bk, bfyespattern, nowpattern, yespattern, q):	lists, num = getfullnum(bk, bfyespattern, nowpattern, yespattern)	for c in lists:		q.put(c)	q.join()def consumer(bk, q, d):	bucket = mkbuck(bk)	repattern2 = re.compile('{.*"adadji",.*}')	while True:		js = []		ress = q.get()		if ress[-1:] != '/':			remote_data = bucket.get_object(ress).read().decode('utf-8')			aa = (d for d in repattern2.findall(remote_data))			for a in aa:				temdic = json.loads(a)				if (starttime <= temdic['created_at']) and (temdic['created_at'] <= endtime):					js.append(temdic)		df = pd.DataFrame(js, columns=['dd','cc'])		d[ress] = df##d为通过主进程Manager共享变量将数据取出		# print(ress)		q.task_done()# 向q.join()发送一次信号,证明一个数据已经被取走了if __name__ == '__main__':	s1 = time.time()	now_time = datetime.datetime.now()  # 获取当前时间	bfyes_time = (now_time - 2 * Day()).strftime('%Y/%m/%d')	yes_time = (now_time - 1 * Day()).strftime('%Y/%m/%d')	yesdate = (now_time - 1 * Day()).strftime('%Y-%m-%d')	yesdate1 = (now_time - 1 * Day()).strftime('%Y%m%d')	endtime = (now_time - 1 * Day()).strftime('%Y-%m-%d 23:59:59')	starttime = (now_time - 1 * Day()).strftime('%Y-%m-%d 00:00:00')	nowdate = now_time.strftime('%Y/%m/%d')		bk = 'xxx'	bfyespattern = '%s/%s' % (bk, bfyes_time)	yespattern = '%s/%s' % (bk, yes_time)	nowpattern = '%s/%s' % (bk, nowdate)		q = JoinableQueue(cpu_count())	m = Manager()	d = m.dict()  ##创建进程间的共享内存字典,方便各个进程处理好的数据	p1 = Process(target=getfull, args=('xx', bfyespattern, nowpattern, yespattern, q))	#####生成consumer多进程	cc = []	for c in range(cpu_count() - 1):		c1 = Process(target=consumer, args=('xx', q, d))		cc.append(c1)		p_l = [p1]	for c in cc:		c.daemon = True		p_l.append(c)		for p in p_l:		p.start()	p1.join()	d = d.values()	df1 = pd.concat(d, ignore_index=True)	df1.sort_values('created_at', inplace=True)	print(time.time() - s1)	print('=' * 20)	print(df1)

  说明:需求为获取昨日的数据即可,因oss实时数据存储可能存在提前或延迟情况,因此读取前天的最后一小时,昨日全部,当天最开始一小时数据,读者可根据自身情况进行修改

转载于:https://www.cnblogs.com/mahailuo/p/9293825.html

你可能感兴趣的文章
纯粹CSS效果~
查看>>
Django的模板层
查看>>
NOI 2013
查看>>
finally块执行时间
查看>>
linux下shell编程示例-获取进程id
查看>>
windown安装bloomFilter
查看>>
SpringSecurity入门例子及遇到的问题解决
查看>>
P1600 天天爱跑步
查看>>
全文检索(SOLR)前端应用浅析
查看>>
DSL应用集成和Rhino 3
查看>>
python学习第33天
查看>>
阿里云CentOS服务器挂载数据盘
查看>>
训练记录PART5 2018.6.15~8.15
查看>>
The Free Lunch Is Over: A Fundamental Turn Toward Concurrency in Software
查看>>
form 组件
查看>>
Android模拟Home键退回桌面
查看>>
二阶段之五
查看>>
解决scrollview不滚动
查看>>
如何对拍
查看>>
为什么我markdown里的数学公式全崩了???
查看>>