otel_sqs.py 2.6 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879
  1. from opentelemetry import trace
  2. from opentelemetry.exporter.otlp.proto.http.trace_exporter import (
  3. OTLPSpanExporter,
  4. )
  5. from opentelemetry.sdk.resources import Resource
  6. from opentelemetry.sdk.trace import TracerProvider
  7. from opentelemetry.sdk.trace.export import (
  8. BatchSpanProcessor,
  9. ConsoleSpanExporter,
  10. )
  11. from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator
  12. import json
  13. import time
  14. import random
  15. import os
  16. api_host = os.environ.get('API_HOST', 'guu84124.live.dynatrace.com')
  17. api_token = os.environ.get('API_TOKEN', 'asdfasdfasdfasdfaasdf')
  18. span_exporter = OTLPSpanExporter(
  19. endpoint="https://".string(api_host)."/api/v2/otlp/v1/traces", #TODO replace <URL> with the URL as determined in section 2 above
  20. headers={
  21. "Authorization": "Api-Token ".string(api_token) #TODO replace <TOKEN> with the authentication token created in section 2 above
  22. },
  23. )
  24. resource = Resource.create({
  25. #customizable resource attributes
  26. "service.name": "otel_python_sqs_consumer",
  27. "service.version": "1.0.0"
  28. })
  29. trace.set_tracer_provider(TracerProvider(resource=resource))
  30. tracer = trace.get_tracer_provider().get_tracer(__name__)
  31. trace.get_tracer_provider().add_span_processor(
  32. BatchSpanProcessor(span_exporter)
  33. )
  34. import boto3
  35. session = boto3.session.Session()
  36. sqs = session.client('sqs', region_name="us-east-1", aws_access_key_id="", aws_secret_access_key="")
  37. def main():
  38. queue_url = 'https://sqs.us-east-1.amazonaws.com/<ACCOUNT>/otel_queue'
  39. response = sqs.receive_message(
  40. QueueUrl=queue_url,
  41. AttributeNames=[
  42. 'SentTimestamp'
  43. ],
  44. MessageAttributeNames=[
  45. 'All'
  46. ],
  47. VisibilityTimeout=0,
  48. WaitTimeSeconds=0
  49. )
  50. try:
  51. for message in response['Messages']:
  52. body = json.loads(message['Body'])
  53. print(body['detail'])
  54. ctx = TraceContextTextMapPropagator().extract(carrier=body['detail'])
  55. with tracer.start_as_current_span("Read From SQS Queue", kind=trace.SpanKind(4), context=ctx) as span:
  56. span.set_attribute("aws.service", "sqs/otel_queue")
  57. print(message)
  58. receipt_handle = message['ReceiptHandle']
  59. # Delete received message from queue
  60. sqs.delete_message(
  61. QueueUrl=queue_url,
  62. ReceiptHandle=receipt_handle
  63. )
  64. span.set_status(status=trace.StatusCode(1))
  65. except Exception as e:
  66. pass
  67. if __name__ == '__main__':
  68. while True:
  69. main()